कनेक्शन बंद होने तक zlib संपीड़न सो के साथ सॉकेट iostreams इको सर्वर को बढ़ावा दें

Jan 18 2021

मैं इस और इस उदाहरण के बाद zlib संपीड़न के साथ सरल इको सर्वर बनाने की कोशिश करता हूं ।

मेरा विचार अब कुछ स्ट्रिंग भेजने का है क्योंकि मैं std::string(reinterpret_cast<const char *>(&pod), sizeof(pod))भेजने से पहले POD प्रकारों को स्ट्रिंग में परिवर्तित कर सकता हूं ( जब मैं सुनिश्चित करूंगा कि ट्रांसपोर्ट लेयर काम करती है।

और यहाँ एक समस्या है। क्लाइंट डेटा को संपीड़ित करता है, भेजता है और कहता है कि डेटा भेजा गया था लेकिन सर्वर डेटा रीडिंग पर अवरुद्ध है। मैं समझ नहीं पा रहा हूं कि ऐसा क्यों होता है।

मैं का उपयोग करने की कोशिश की operator<<के साथ out.flush(), यह भी मैं का उपयोग करने की कोशिश की boost::iostreams::copy()। नतीजा वही है। कोड उदाहरण है (मैं सर्वर और क्लाइंट के लिए उसी स्रोत फ़ाइल का उपयोग करता हूं जो आर्ग पर निर्भर करता है):

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

void receive()
{
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::stringstream buffer;

    std::cout << "start session" << std::endl;

    try
    {
        for (;;)
        {
            {
                boost::iostreams::filtering_istream in;
                in.push(boost::iostreams::zlib_decompressor());
                in.push(stream);

                std::cout << "start reading" << std::endl;

                // looks like server is blocked here
                boost::iostreams::copy(in, buffer);
            }

            std::cout << "data: " << buffer.str() << std::endl;

            {
                boost::iostreams::filtering_ostream out;
                out.push(boost::iostreams::zlib_compressor());
                out.push(stream);

                boost::iostreams::copy(buffer, out);
            }

            std::cout << "Reply is sended" << std::endl;
        }
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << e.error() << '\n';
        stream.close();
    }
}

void send(const std::string &data)
{
    tcp::endpoint ep(ip::address::from_string(host), port);
    
    tcp::iostream stream;
    stream.connect(ep);

    std::stringstream buffer;
    buffer << data;

    if (!stream)
    {
        std::cerr << "Cannot connect to " << host << ":" << port << std::endl;
        return;
    }

    try
    {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(stream);

            out << buffer.str();
            out.flush();
        }

        std::cout << "sended: " << data << std::endl;
        buffer.str("");

        {
            boost::iostreams::filtering_istream in;
            in.push(boost::iostreams::zlib_decompressor());
            in.push(stream);

            // looks like client is blocked here
            boost::iostreams::copy(in, buffer);
        }

        std::cout << "result: " << buffer.str() << std::endl;
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << '\n';
    }
}

int main(int argc, const char *argv[])
{
    if (argc > 1 && argv[1] ==  std::string("sender"))
        send("hello world");
    else
        receive();

    return 0;
}

पहले मैं सर्वर शुरू करता हूं और फिर मैं क्लाइंट शुरू करता हूं। निम्नलिखित आउटपुट का उत्पादन किया जाता है:

सर्वर

$ ./example
# now it waits while client will be accepted
start session
start reading

ग्राहक

$ ./example sender
sended: hello world

कार्यक्रम उपरोक्त आउटपुट के साथ अवरुद्ध हैं। मुझे लगता है कि सर्वर अभी भी क्लाइंट से डेटा का इंतजार कर रहा है और यह नहीं जानता कि क्लाइंट ने जो कुछ भी भेजा था।

यदि मैं क्लाइंट को बंद करता हूं Ctrl + Cतो आउटपुट निम्न है:

$ ./example
# now it waits while client will be accepted
start session
start reading
# now it is blocked until I press Ctrl + C
data: hello world
Reply is sended
start reading
zlib error-5

तथा

$ ./example sender
sended: hello world
^C

मुझे लगता है zlib error-5कि क्योंकि सर्वर को लगता है कि संग्रह अधूरा है।

अपेक्षित व्यवहार अवरुद्ध नहीं है। क्लाइंट शुरू होने पर सर्वर प्रोग्राम आउटपुट में संदेश दिखाई देना चाहिए।

पढ़ने पर कार्यक्रम क्यों अवरुद्ध है? मेरे द्वारा यह कैसे किया जा सकता है?

जवाब

1 sehe Jan 18 2021 at 21:56

iostreams::copy बस यही करता है: यह स्ट्रीम को कॉपी करता है।

अपने कोड के लिए बधाई। यह बहुत पठनीय है :) यह मुझे इस उत्तर की याद दिलाता है कि बूस्ट आईस्ट्रीम सॉकेट के साथ फाइल पढ़ना और लिखना । मुख्य अंतर यह है कि उत्तर एकल संपीडित बूँद भेजता है और बंद हो जाता है।

आप "सही" हैं कि डिकम्प्रेसर को पता है कि एक संपीड़ित ब्लॉक कब पूरा हो गया है, लेकिन यह तय नहीं करता है कि कोई अन्य अनुसरण नहीं करेगा।

इसलिए आपको फ्रेमिंग जोड़ने की जरूरत है। पारंपरिक तरीका यह है कि एक लम्बे आउट-ऑफ-बैंड को पास किया जाए। मैंने IO मैनिपुलेटर्स का उपयोग करके कोड दोहराव को कम करते हुए परिवर्तनों को लागू किया है।

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) ;
    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) ;
};

तथा

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) ;
    friend std::istream& operator>>(std::istream& is, ZLIB z) ;
};

ZLIB आपरेटर

यह मुख्य रूप से उस कोड को एन्क्रिप्ट करता है जिसे आपने प्रेषक / रिसीवर के बीच दोहराया था:

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(os);
            out << z.data << std::flush;
        }
        return os.flush();
    }

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;
        in.push(boost::iostreams::zlib_decompressor());
        in.push(is);

        std::ostringstream oss;
        copy(in, oss);
        z.data = oss.str();

        return is;
    }
};

मैंने Tटेम्प्लेट किया ताकि यह स्टोर हो सके std::string&या std::string const&जरूरत के आधार पर।

LengthPrefixed आपरेटर

इस मैनिप्युलेटर को परवाह नहीं है कि क्या क्रमबद्ध किया जा रहा है, लेकिन बस इसे तार पर प्रभावी लंबाई के साथ उपसर्ग करेगा:

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
    }

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (is.read(on_the_wire.data(), on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
            }
        }
        return is;
    }
};

हम एक सूक्ष्मता जोड़ते हैं: एक संदर्भ या मूल्य को संग्रहीत करके जो हम अपने साथ निर्मित किए जाते हैं उसके आधार पर भी अस्थायी (ZLIB जोड़तोड़ की तरह) स्वीकार कर सकते हैं:

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

मैंने ' ZLIBमैनिपुलेटर को समान रूप से सामान्य बनाने के बारे में सोचा । इसलिए मैं पाठक के लिए एक अतिवाद के रूप में छोड़ता हूं

डेमो कार्यक्रम

इन दोनों को मिलाकर, आप प्रेषक / रिसीवर को इस प्रकार लिख सकते हैं:

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
    }
}

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;
}

वास्तव में, यह प्रिंट करता है:

reader: start session
sender: Writing length 19
reader: Reading length 19
sender: sent: "hello world"
reader: data: "hello world"
reader: Writing length 19
sender: Reading length 19
sender: result: "hello world"

पूरी लिस्टिंग

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <iomanip>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

#ifdef DEBUG
    std::ostream debug(std::cerr.rdbuf());
#else
    std::ostream debug(nullptr);
#endif

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
    }

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (is.read(on_the_wire.data(), on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
            }
        }
        return is;
    }
};

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(os);
            out << z.data << std::flush;
        }
        return os.flush();
    }

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;
        in.push(boost::iostreams::zlib_decompressor());
        in.push(is);

        std::ostringstream oss;
        copy(in, oss);
        z.data = oss.str();

        return is;
    }
};

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
    }
}

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;
}

int main(int argc, const char**) {
    try {
        if (argc > 1)
            client("hello world");
        else
            server();
    } catch (const std::exception& e) {
        std::cerr << e.what() << '\n';
    }
}
Bogdan Jan 18 2021 at 21:50

boost::serializationनिम्न चरणों का उपयोग करके समस्या हल की गई है :

  1. सबसे पहले मैं इस तरह के कार्यों के लिए ज़िपिंग ले गया:
namespace io = boost::iostreams;

namespace my {
std::string compress(const std::string &data)
{
    std::stringstream input, output;

    input << data;

    io::filtering_ostream io_out;
    io_out.push(io::zlib_compressor());
    io_out.push(output);

    io::copy(input, io_out);

    return output.str();
}

std::string decompress(const std::string &data)
{
    std::stringstream input, output;

    input << data;

    io::filtering_istream io_in;
    io_in.push(io::zlib_decompressor());
    io_in.push(input);

    io::copy(io_in, output);

    return output.str();
}
} // namespace my
  1. फिर मैंने इस तरह स्ट्रिंग बफर के लिए एक आवरण बनाया ( प्रलेखन से ट्यूटोरियल का अनुसरण करते हुए ):
class Package
{
public:
    Package(const std::string &buffer) : buffer(buffer) {}

private:
    std::string buffer;

    friend class boost::serialization::access;

    template<class Archive>
    void serialize(Archive &ar, const unsigned int)
    {
        ar & buffer;
    }

};
  1. और अंत में मैंने पढ़ने के बाद और भेजने से पहले क्रमबद्धता को जोड़ा।
/**
 * receiver
 */
Package request;

{
    boost::archive::text_iarchive ia(*stream);
    ia >> request;
}

std::string data = my::decompress(request.buffer);

// do something with data

Package response(my::compress(data));

{
    boost::archive::text_oarchive oa(*stream);
    oa << response;
}

/**
 * sender
 */
std::string data = "hello world";
Package package(my::compress(data));

// send request
{
    boost::archive::text_oarchive oa(*m_stream);
    oa << package;
}

// waiting for a response
{
    boost::archive::text_iarchive ia(*m_stream);
    ia >> package;
}

// decompress response buffer
result = my::decompress(package.get_buffer());