boost socket iostreams echo server con compressione zlib rimane inattivo fino alla chiusura della connessione

Jan 18 2021

Provo a creare un semplice server echo con compressione zlib seguendo questo e questo esempi.

La mia idea è di inviare una stringa ora perché posso convertire i tipi POD in string ( std::string(reinterpret_cast<const char *>(&pod), sizeof(pod))) prima di inviarli quando sarò sicuro che il livello di trasporto funzioni.

E qui c'è un problema. Il client comprime i dati, li invia e dice che i dati sono stati inviati ma il server è bloccato durante la lettura dei dati. Non riesco a capire perché accada.

Ho provato a usare operator<<con out.flush(), inoltre ho provato a usare boost::iostreams::copy(). Il risultato è lo stesso. L'esempio di codice è (utilizzo lo stesso file sorgente per server e client a seconda degli argomenti):

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "";

void receive()
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;

    std::stringstream buffer;

    std::cout << "start session" << std::endl;

        for (;;)
                boost::iostreams::filtering_istream in;

                std::cout << "start reading" << std::endl;

                // looks like server is blocked here
                boost::iostreams::copy(in, buffer);

            std::cout << "data: " << buffer.str() << std::endl;

                boost::iostreams::filtering_ostream out;

                boost::iostreams::copy(buffer, out);

            std::cout << "Reply is sended" << std::endl;
    catch(const boost::iostreams::zlib_error &e)
        std::cerr << e.what() << e.error() << '\n';

void send(const std::string &data)
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream;

    std::stringstream buffer;
    buffer << data;

    if (!stream)
        std::cerr << "Cannot connect to " << host << ":" << port << std::endl;

            boost::iostreams::filtering_ostream out;

            out << buffer.str();

        std::cout << "sended: " << data << std::endl;

            boost::iostreams::filtering_istream in;

            // looks like client is blocked here
            boost::iostreams::copy(in, buffer);

        std::cout << "result: " << buffer.str() << std::endl;
    catch(const boost::iostreams::zlib_error &e)
        std::cerr << e.what() << '\n';

int main(int argc, const char *argv[])
    if (argc > 1 && argv[1] ==  std::string("sender"))
        send("hello world");

    return 0;

Per prima cosa avvio il server e poi il client. Viene prodotto il seguente output:


$ ./example
# now it waits while client will be accepted
start session
start reading


$ ./example sender
sended: hello world

I programmi sono bloccati con l'uscita sopra. Immagino che il server stia ancora aspettando i dati dal client e non sa che il client ha inviato tutto ciò che aveva.

Se chiudo il client con, Ctrl + Cl'output è il seguente:

$ ./example
# now it waits while client will be accepted
start session
start reading
# now it is blocked until I press Ctrl + C
data: hello world
Reply is sended
start reading
zlib error-5


$ ./example sender
sended: hello world

Immagino zlib error-5sia perché il server pensa che l'archivio sia incompleto.

Il comportamento previsto non è un blocco. Il messaggio deve apparire nell'output del programma del server all'avvio del client.

Perché il programma è bloccato in lettura? Come posso risolverlo?


1 sehe Jan 18 2021 at 21:56

iostreams::copy fa proprio questo: copia stream.

Complimenti al tuo codice. È molto leggibile :) Mi ricorda questa risposta Lettura e scrittura di file con socket iostream boost . La differenza principale è che quella risposta invia un singolo BLOB compresso e si chiude.

Hai "ragione" che il decompressore sa quando un blocco compresso è completo, ma non decide che un altro non lo seguirà.

Quindi è necessario aggiungere l'inquadratura. Il modo tradizionale è passare una lunghezza fuori banda. Ho implementato le modifiche riducendo anche la duplicazione del codice utilizzando i manipolatori IO.

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) ;
    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) ;


template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) ;
    friend std::istream& operator>>(std::istream& is, ZLIB z) ;

ZLIB manipolatore

Questo incapsula principalmente il codice che hai duplicato tra il mittente / destinatario:

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
            boost::iostreams::filtering_ostream out;
            out << << std::flush;
        return os.flush();

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;

        std::ostringstream oss;
        copy(in, oss); = oss.str();

        return is;

Ho realizzato un Tmodello in modo che possa archiviare std::string&o in std::string const&base alle necessità.

LengthPrefixed manipolatore

Questo manipolatore non si preoccupa di ciò che viene serializzato, ma lo anteporrà semplicemente alla lunghezza effettiva sul filo:

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (, on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
        return is;

Aggiungiamo una sottigliezza: memorizzando un riferimento o un valore a seconda di ciò con cui siamo costruiti possiamo anche accettare provvisori (come il manipolatore ZLIB):

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

Non pensavo di rendere il ZLIBmanipolatore altrettanto generico. Quindi lo lascio come un esorcismo per il lettore


Combinando questi due, puoi scrivere il mittente / destinatario semplicemente come:

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;

Infatti stampa:

reader: start session
sender: Writing length 19
reader: Reading length 19
sender: sent: "hello world"
reader: data: "hello world"
reader: Writing length 19
sender: Reading length 19
sender: result: "hello world"

Elenco completo

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <iomanip>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "";

#ifdef DEBUG
    std::ostream debug(std::cerr.rdbuf());
    std::ostream debug(nullptr);

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (, on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
        return is;

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
            boost::iostreams::filtering_ostream out;
            out << << std::flush;
        return os.flush();

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;

        std::ostringstream oss;
        copy(in, oss); = oss.str();

        return is;

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;

int main(int argc, const char**) {
    try {
        if (argc > 1)
            client("hello world");
    } catch (const std::exception& e) {
        std::cerr << e.what() << '\n';
Bogdan Jan 18 2021 at 21:50

Il problema viene risolto utilizzando di boost::serializationcon i seguenti passaggi:

  1. Prima di tutto ho spostato lo zip in funzioni come questa:
namespace io = boost::iostreams;

namespace my {
std::string compress(const std::string &data)
    std::stringstream input, output;

    input << data;

    io::filtering_ostream io_out;

    io::copy(input, io_out);

    return output.str();

std::string decompress(const std::string &data)
    std::stringstream input, output;

    input << data;

    io::filtering_istream io_in;

    io::copy(io_in, output);

    return output.str();
} // namespace my
  1. Quindi ho creato un wrapper per il buffer di stringa come questo (seguendo il tutorial dalla documentazione ):
class Package
    Package(const std::string &buffer) : buffer(buffer) {}

    std::string buffer;

    friend class boost::serialization::access;

    template<class Archive>
    void serialize(Archive &ar, const unsigned int)
        ar & buffer;

  1. E alla fine ho aggiunto la serializzazione dopo la lettura e prima dell'invio.
 * receiver
Package request;

    boost::archive::text_iarchive ia(*stream);
    ia >> request;

std::string data = my::decompress(request.buffer);

// do something with data

Package response(my::compress(data));

    boost::archive::text_oarchive oa(*stream);
    oa << response;

 * sender
std::string data = "hello world";
Package package(my::compress(data));

// send request
    boost::archive::text_oarchive oa(*m_stream);
    oa << package;

// waiting for a response
    boost::archive::text_iarchive ia(*m_stream);
    ia >> package;

// decompress response buffer
result = my::decompress(package.get_buffer());