เพิ่มเซิร์ฟเวอร์ echo ซ็อกเก็ต iostreams ด้วยการบีบอัด zlib จะเข้าสู่โหมดสลีปจนกว่าการเชื่อมต่อจะถูกปิด

Jan 18 2021

ฉันพยายามที่จะสร้างเซิร์ฟเวอร์เสียงสะท้อนที่เรียบง่ายด้วยการบีบอัด zlib ต่อไปนี้และนี้ตัวอย่าง

ความคิดของฉันคือการส่งสตริงตอนนี้เพราะฉันสามารถแปลงประเภท POD เป็นสตริง ( std::string(reinterpret_cast<const char *>(&pod), sizeof(pod))) ก่อนที่จะส่งเมื่อฉันจะแน่ใจว่าเลเยอร์การขนส่งทำงานได้

และมีปัญหาที่นี่ ลูกค้าบีบอัดข้อมูลส่งและแจ้งว่ามีการส่งข้อมูล แต่เซิร์ฟเวอร์ถูกบล็อกในการอ่านข้อมูล ฉันไม่เข้าใจว่าทำไมมันถึงเกิดขึ้น

ผมพยายามที่จะใช้operator<<กับยังฉันพยายามที่จะใช้out.flush() boost::iostreams::copy()ผลลัพธ์ก็เหมือนกัน ตัวอย่างโค้ดคือ (ฉันใช้ซอร์สไฟล์เดียวกันสำหรับเซิร์ฟเวอร์และไคลเอนต์ขึ้นอยู่กับ args):

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

void receive()
{
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::stringstream buffer;

    std::cout << "start session" << std::endl;

    try
    {
        for (;;)
        {
            {
                boost::iostreams::filtering_istream in;
                in.push(boost::iostreams::zlib_decompressor());
                in.push(stream);

                std::cout << "start reading" << std::endl;

                // looks like server is blocked here
                boost::iostreams::copy(in, buffer);
            }

            std::cout << "data: " << buffer.str() << std::endl;

            {
                boost::iostreams::filtering_ostream out;
                out.push(boost::iostreams::zlib_compressor());
                out.push(stream);

                boost::iostreams::copy(buffer, out);
            }

            std::cout << "Reply is sended" << std::endl;
        }
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << e.error() << '\n';
        stream.close();
    }
}

void send(const std::string &data)
{
    tcp::endpoint ep(ip::address::from_string(host), port);
    
    tcp::iostream stream;
    stream.connect(ep);

    std::stringstream buffer;
    buffer << data;

    if (!stream)
    {
        std::cerr << "Cannot connect to " << host << ":" << port << std::endl;
        return;
    }

    try
    {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(stream);

            out << buffer.str();
            out.flush();
        }

        std::cout << "sended: " << data << std::endl;
        buffer.str("");

        {
            boost::iostreams::filtering_istream in;
            in.push(boost::iostreams::zlib_decompressor());
            in.push(stream);

            // looks like client is blocked here
            boost::iostreams::copy(in, buffer);
        }

        std::cout << "result: " << buffer.str() << std::endl;
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << '\n';
    }
}

int main(int argc, const char *argv[])
{
    if (argc > 1 && argv[1] ==  std::string("sender"))
        send("hello world");
    else
        receive();

    return 0;
}

ก่อนอื่นฉันเริ่มเซิร์ฟเวอร์จากนั้นฉันเริ่มไคลเอนต์ มีการสร้างเอาต์พุตต่อไปนี้:

เซิร์ฟเวอร์

$ ./example
# now it waits while client will be accepted
start session
start reading

ลูกค้า

$ ./example sender
sended: hello world

โปรแกรมถูกบล็อกด้วยเอาต์พุตด้านบน ฉันเดาว่าเซิร์ฟเวอร์ยังคงรอข้อมูลจากไคลเอนต์และไม่ทราบว่าไคลเอนต์ส่งข้อมูลทั้งหมดที่มี

หากฉันปิดไคลเอนต์ด้วยCtrl + Cผลลัพธ์จะเป็นดังนี้:

$ ./example
# now it waits while client will be accepted
start session
start reading
# now it is blocked until I press Ctrl + C
data: hello world
Reply is sended
start reading
zlib error-5

และ

$ ./example sender
sended: hello world
^C

ฉันเดาว่าzlib error-5เป็นเพราะเซิร์ฟเวอร์คิดว่าไฟล์เก็บถาวรไม่สมบูรณ์

พฤติกรรมที่คาดหวังคือไม่มีการปิดกั้น ข้อความจะต้องปรากฏในเอาต์พุตโปรแกรมเซิร์ฟเวอร์เมื่อไคลเอ็นต์เริ่มทำงาน

เหตุใดโปรแกรมจึงถูกบล็อกในการอ่าน จะแก้ไขได้อย่างไร?

คำตอบ

1 sehe Jan 18 2021 at 21:56

iostreams::copy ทำเพียงแค่นั้น: มันคัดลอกสตรีม

ชมเชยรหัสของคุณ มันอ่านได้มาก :) มันทำให้ผมนึกถึงคำตอบนี้อ่านและการเขียนไฟล์ที่มีซ็อกเก็ตเพิ่ม iostream ข้อแตกต่างที่สำคัญคือคำตอบนั้นจะส่งหยดบีบอัดเดียวและปิดลง

คุณ "ถูกต้อง" ที่ตัวคลายการบีบอัดรู้เมื่อบล็อกที่บีบอัดเสร็จสมบูรณ์ แต่ไม่ได้ตัดสินว่าอีกบล็อกหนึ่งจะไม่ทำตาม

ดังนั้นคุณต้องเพิ่มกรอบ วิธีดั้งเดิมคือการส่งความยาวออกจากวง ฉันได้ดำเนินการเปลี่ยนแปลงในขณะที่ยังลดการทำซ้ำโค้ดโดยใช้ตัวปรับแต่ง IO

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) ;
    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) ;
};

และ

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) ;
    friend std::istream& operator>>(std::istream& is, ZLIB z) ;
};

ZLIB ผู้ปลุกปั่น

อันนี้ส่วนใหญ่ห่อหุ้มรหัสที่คุณทำซ้ำระหว่างผู้ส่ง / ผู้รับ:

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(os);
            out << z.data << std::flush;
        }
        return os.flush();
    }

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;
        in.push(boost::iostreams::zlib_decompressor());
        in.push(is);

        std::ostringstream oss;
        copy(in, oss);
        z.data = oss.str();

        return is;
    }
};

ฉันทำTเทมเพลตเพื่อให้สามารถจัดเก็บstd::string&หรือstd::string const&ขึ้นอยู่กับความต้องการ

LengthPrefixed ผู้ปลุกปั่น

หุ่นยนต์นี้ไม่สนใจว่าอะไรจะถูกทำให้เป็นอนุกรม แต่จะนำหน้าด้วยความยาวที่มีประสิทธิภาพบนสาย:

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
    }

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (is.read(on_the_wire.data(), on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
            }
        }
        return is;
    }
};

เราเพิ่มความละเอียดอ่อน: โดยการจัดเก็บข้อมูลอ้างอิงหรือค่าขึ้นอยู่กับสิ่งที่เราสร้างขึ้นด้วยเรายังสามารถรับจังหวะ (เช่นตัวควบคุม ZLIB)

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

ฉันไม่ได้คิดที่จะทำให้ZLIBหุ่นยนต์มีขนาดเท่า ๆ กัน ก็เลยฝากไว้ว่าเป็นการสะเดาะเคราะห์ให้กับผู้อ่าน

โปรแกรมสาธิต

เมื่อรวมสองสิ่งนี้เข้าด้วยกันคุณสามารถเขียนผู้ส่ง / ผู้รับได้ง่ายๆดังนี้:

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
    }
}

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;
}

อันที่จริงมันพิมพ์:

reader: start session
sender: Writing length 19
reader: Reading length 19
sender: sent: "hello world"
reader: data: "hello world"
reader: Writing length 19
sender: Reading length 19
sender: result: "hello world"

รายชื่อที่สมบูรณ์

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <iomanip>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

#ifdef DEBUG
    std::ostream debug(std::cerr.rdbuf());
#else
    std::ostream debug(nullptr);
#endif

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
    }

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (is.read(on_the_wire.data(), on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
            }
        }
        return is;
    }
};

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(os);
            out << z.data << std::flush;
        }
        return os.flush();
    }

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;
        in.push(boost::iostreams::zlib_decompressor());
        in.push(is);

        std::ostringstream oss;
        copy(in, oss);
        z.data = oss.str();

        return is;
    }
};

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
    }
}

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;
}

int main(int argc, const char**) {
    try {
        if (argc > 1)
            client("hello world");
        else
            server();
    } catch (const std::exception& e) {
        std::cerr << e.what() << '\n';
    }
}
Bogdan Jan 18 2021 at 21:50

ปัญหาได้รับการแก้ไขโดยใช้boost::serializationขั้นตอนต่อไปนี้:

  1. ก่อนอื่นฉันย้ายการบีบอัดไปยังฟังก์ชั่นเช่นนี้:
namespace io = boost::iostreams;

namespace my {
std::string compress(const std::string &data)
{
    std::stringstream input, output;

    input << data;

    io::filtering_ostream io_out;
    io_out.push(io::zlib_compressor());
    io_out.push(output);

    io::copy(input, io_out);

    return output.str();
}

std::string decompress(const std::string &data)
{
    std::stringstream input, output;

    input << data;

    io::filtering_istream io_in;
    io_in.push(io::zlib_decompressor());
    io_in.push(input);

    io::copy(io_in, output);

    return output.str();
}
} // namespace my
  1. จากนั้นฉันสร้าง wrapper สำหรับบัฟเฟอร์สตริงเช่นนี้ (ทำตามบทช่วยสอนจากเอกสารประกอบ ):
class Package
{
public:
    Package(const std::string &buffer) : buffer(buffer) {}

private:
    std::string buffer;

    friend class boost::serialization::access;

    template<class Archive>
    void serialize(Archive &ar, const unsigned int)
    {
        ar & buffer;
    }

};
  1. และในที่สุดฉันก็ต่อท้ายอนุกรมหลังจากอ่านและก่อนส่ง
/**
 * receiver
 */
Package request;

{
    boost::archive::text_iarchive ia(*stream);
    ia >> request;
}

std::string data = my::decompress(request.buffer);

// do something with data

Package response(my::compress(data));

{
    boost::archive::text_oarchive oa(*stream);
    oa << response;
}

/**
 * sender
 */
std::string data = "hello world";
Package package(my::compress(data));

// send request
{
    boost::archive::text_oarchive oa(*m_stream);
    oa << package;
}

// waiting for a response
{
    boost::archive::text_iarchive ia(*m_stream);
    ia >> package;
}

// decompress response buffer
result = my::decompress(package.get_buffer());