boost socket iostreams echo server với nén zlib ở chế độ ngủ cho đến khi kết nối sẽ bị đóng

Jan 18 2021

Tôi cố gắng tạo máy chủ echo đơn giản với nén zlib sau đây và ví dụ này .

Ý tưởng của tôi là gửi một số chuỗi ngay bây giờ vì tôi có thể chuyển đổi các loại POD thành chuỗi ( std::string(reinterpret_cast<const char *>(&pod), sizeof(pod))) trước khi gửi khi tôi chắc chắn rằng lớp truyền tải hoạt động.

Và có một vấn đề ở đây. Máy khách nén dữ liệu, gửi dữ liệu và nói rằng dữ liệu đã được gửi đi nhưng máy chủ bị chặn khi đọc dữ liệu. Tôi không thể hiểu tại sao nó xảy ra.

Tôi đã cố gắng sử dụng operator<<với out.flush(), tôi cũng đã cố gắng sử dụng boost::iostreams::copy(). Kết quả là như nhau. Ví dụ về mã là (Tôi sử dụng cùng một tệp nguồn cho máy chủ và máy khách tùy thuộc vào args):

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

void receive()
{
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::stringstream buffer;

    std::cout << "start session" << std::endl;

    try
    {
        for (;;)
        {
            {
                boost::iostreams::filtering_istream in;
                in.push(boost::iostreams::zlib_decompressor());
                in.push(stream);

                std::cout << "start reading" << std::endl;

                // looks like server is blocked here
                boost::iostreams::copy(in, buffer);
            }

            std::cout << "data: " << buffer.str() << std::endl;

            {
                boost::iostreams::filtering_ostream out;
                out.push(boost::iostreams::zlib_compressor());
                out.push(stream);

                boost::iostreams::copy(buffer, out);
            }

            std::cout << "Reply is sended" << std::endl;
        }
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << e.error() << '\n';
        stream.close();
    }
}

void send(const std::string &data)
{
    tcp::endpoint ep(ip::address::from_string(host), port);
    
    tcp::iostream stream;
    stream.connect(ep);

    std::stringstream buffer;
    buffer << data;

    if (!stream)
    {
        std::cerr << "Cannot connect to " << host << ":" << port << std::endl;
        return;
    }

    try
    {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(stream);

            out << buffer.str();
            out.flush();
        }

        std::cout << "sended: " << data << std::endl;
        buffer.str("");

        {
            boost::iostreams::filtering_istream in;
            in.push(boost::iostreams::zlib_decompressor());
            in.push(stream);

            // looks like client is blocked here
            boost::iostreams::copy(in, buffer);
        }

        std::cout << "result: " << buffer.str() << std::endl;
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << '\n';
    }
}

int main(int argc, const char *argv[])
{
    if (argc > 1 && argv[1] ==  std::string("sender"))
        send("hello world");
    else
        receive();

    return 0;
}

Đầu tiên tôi khởi động máy chủ và sau đó tôi khởi động máy khách. Đầu ra sau đây được tạo ra:

Người phục vụ

$ ./example
# now it waits while client will be accepted
start session
start reading

Khách hàng

$ ./example sender
sended: hello world

Các chương trình bị chặn với đầu ra ở trên. Tôi đoán máy chủ vẫn đợi dữ liệu từ máy khách và nó không biết máy khách đã gửi tất cả những gì nó có.

Nếu tôi đóng khách hàng với Ctrl + Cthì kết quả đầu ra như sau:

$ ./example
# now it waits while client will be accepted
start session
start reading
# now it is blocked until I press Ctrl + C
data: hello world
Reply is sended
start reading
zlib error-5

$ ./example sender
sended: hello world
^C

Tôi đoán zlib error-5là do máy chủ cho rằng kho lưu trữ chưa hoàn chỉnh.

Hành vi được mong đợi là không chặn. Thông báo phải xuất hiện trong đầu ra chương trình máy chủ khi máy khách được khởi động.

Tại sao chương trình bị chặn khi đọc? Làm thế nào tôi có thể sửa chữa nó?

Trả lời

1 sehe Jan 18 2021 at 21:56

iostreams::copy thực hiện điều đó: nó sao chép luồng.

Tuân thủ mã của bạn. Nó rất dễ đọc :) Nó nhắc tôi về câu trả lời này Đọc và ghi tệp với ổ cắm boost iostream . Sự khác biệt chính là câu trả lời đó gửi một đốm màu nén duy nhất và đóng lại.

Bạn "đúng" rằng trình giải nén biết khi nào một khối nén hoàn tất, nhưng nó không quyết định rằng một khối khác sẽ không tuân theo.

Vì vậy, bạn cần phải thêm khung. Cách truyền thống là vượt qua độ dài ngoài dải. Tôi đã thực hiện các thay đổi đồng thời giảm sự trùng lặp mã bằng cách sử dụng trình điều khiển IO.

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) ;
    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) ;
};

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) ;
    friend std::istream& operator>>(std::istream& is, ZLIB z) ;
};

ZLIB người thao túng

Cái này chủ yếu đóng gói mã mà bạn đã sao chép giữa người gửi / người nhận:

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(os);
            out << z.data << std::flush;
        }
        return os.flush();
    }

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;
        in.push(boost::iostreams::zlib_decompressor());
        in.push(is);

        std::ostringstream oss;
        copy(in, oss);
        z.data = oss.str();

        return is;
    }
};

Mình làm Ttemplated nên có thể lưu trữ std::string&hoặc std::string const&tùy theo nhu cầu.

LengthPrefixed người thao túng

Người thao tác này không quan tâm đến những gì đang được tuần tự hóa, nhưng sẽ chỉ đơn giản là tiền tố nó bằng độ dài hiệu quả trên dây:

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
    }

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (is.read(on_the_wire.data(), on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
            }
        }
        return is;
    }
};

Chúng tôi thêm một sự tinh tế: bằng cách lưu trữ một tham chiếu hoặc giá trị tùy thuộc vào những gì chúng tôi được xây dựng, chúng tôi cũng có thể chấp nhận các khoảng thời gian tạm thời (như trình thao tác ZLIB):

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

Tôi không nghĩ rằng làm cho kẻ ZLIBthao túng trở nên chung chung như nhau. Vì vậy, tôi để đó như một thứ trừ tà cho người đọc

CHƯƠNG TRÌNH DEMO

Kết hợp hai điều này, bạn có thể viết người gửi / người nhận đơn giản là:

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
    }
}

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;
}

Thật vậy, nó in:

reader: start session
sender: Writing length 19
reader: Reading length 19
sender: sent: "hello world"
reader: data: "hello world"
reader: Writing length 19
sender: Reading length 19
sender: result: "hello world"

Hoàn thành danh sách

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <iomanip>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

#ifdef DEBUG
    std::ostream debug(std::cerr.rdbuf());
#else
    std::ostream debug(nullptr);
#endif

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
    }

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (is.read(on_the_wire.data(), on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
            }
        }
        return is;
    }
};

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(os);
            out << z.data << std::flush;
        }
        return os.flush();
    }

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;
        in.push(boost::iostreams::zlib_decompressor());
        in.push(is);

        std::ostringstream oss;
        copy(in, oss);
        z.data = oss.str();

        return is;
    }
};

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
    }
}

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;
}

int main(int argc, const char**) {
    try {
        if (argc > 1)
            client("hello world");
        else
            server();
    } catch (const std::exception& e) {
        std::cerr << e.what() << '\n';
    }
}
Bogdan Jan 18 2021 at 21:50

Sự cố được giải quyết bằng cách sử dụng boost::serializationcác bước sau:

  1. Trước hết, tôi đã chuyển tính năng nén sang các chức năng như thế này:
namespace io = boost::iostreams;

namespace my {
std::string compress(const std::string &data)
{
    std::stringstream input, output;

    input << data;

    io::filtering_ostream io_out;
    io_out.push(io::zlib_compressor());
    io_out.push(output);

    io::copy(input, io_out);

    return output.str();
}

std::string decompress(const std::string &data)
{
    std::stringstream input, output;

    input << data;

    io::filtering_istream io_in;
    io_in.push(io::zlib_decompressor());
    io_in.push(input);

    io::copy(io_in, output);

    return output.str();
}
} // namespace my
  1. Sau đó, tôi đã tạo một trình bao bọc cho bộ đệm chuỗi như thế này (theo hướng dẫn từ tài liệu ):
class Package
{
public:
    Package(const std::string &buffer) : buffer(buffer) {}

private:
    std::string buffer;

    friend class boost::serialization::access;

    template<class Archive>
    void serialize(Archive &ar, const unsigned int)
    {
        ar & buffer;
    }

};
  1. Và cuối cùng, tôi đã thêm tuần tự sau khi đọc và trước khi gửi.
/**
 * receiver
 */
Package request;

{
    boost::archive::text_iarchive ia(*stream);
    ia >> request;
}

std::string data = my::decompress(request.buffer);

// do something with data

Package response(my::compress(data));

{
    boost::archive::text_oarchive oa(*stream);
    oa << response;
}

/**
 * sender
 */
std::string data = "hello world";
Package package(my::compress(data));

// send request
{
    boost::archive::text_oarchive oa(*m_stream);
    oa << package;
}

// waiting for a response
{
    boost::archive::text_iarchive ia(*m_stream);
    ia >> package;
}

// decompress response buffer
result = my::decompress(package.get_buffer());