zlib 압축을 사용하는 소켓 iostreams 에코 서버는 연결이 닫힐 때까지 절전 모드로 전환됩니다.

Jan 18 2021

나는 다음과 zlib 압축 간단한 에코 서버를 만들려고 이 와 이 예.

내 생각은 std::string(reinterpret_cast<const char *>(&pod), sizeof(pod))전송 레이어가 작동하는지 확인하기 전에 POD 유형을 문자열 ( ) 로 변환 할 수 있기 때문에 지금 문자열 을 보내는 것입니다.

그리고 여기에 문제가 있습니다. 클라이언트는 데이터를 압축하고 전송하며 데이터가 전송되었다고 말하지만 서버는 데이터 읽기를 차단합니다. 왜 그런지 이해할 수 없습니다.

operator<<와 함께 사용해 보았습니다 out.flush(), 또한 사용해 보았습니다 boost::iostreams::copy(). 결과는 동일합니다. 코드 예제는 다음과 같습니다 (args에 따라 서버와 클라이언트에 동일한 소스 파일을 사용합니다).

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

void receive()
{
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::stringstream buffer;

    std::cout << "start session" << std::endl;

    try
    {
        for (;;)
        {
            {
                boost::iostreams::filtering_istream in;
                in.push(boost::iostreams::zlib_decompressor());
                in.push(stream);

                std::cout << "start reading" << std::endl;

                // looks like server is blocked here
                boost::iostreams::copy(in, buffer);
            }

            std::cout << "data: " << buffer.str() << std::endl;

            {
                boost::iostreams::filtering_ostream out;
                out.push(boost::iostreams::zlib_compressor());
                out.push(stream);

                boost::iostreams::copy(buffer, out);
            }

            std::cout << "Reply is sended" << std::endl;
        }
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << e.error() << '\n';
        stream.close();
    }
}

void send(const std::string &data)
{
    tcp::endpoint ep(ip::address::from_string(host), port);
    
    tcp::iostream stream;
    stream.connect(ep);

    std::stringstream buffer;
    buffer << data;

    if (!stream)
    {
        std::cerr << "Cannot connect to " << host << ":" << port << std::endl;
        return;
    }

    try
    {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(stream);

            out << buffer.str();
            out.flush();
        }

        std::cout << "sended: " << data << std::endl;
        buffer.str("");

        {
            boost::iostreams::filtering_istream in;
            in.push(boost::iostreams::zlib_decompressor());
            in.push(stream);

            // looks like client is blocked here
            boost::iostreams::copy(in, buffer);
        }

        std::cout << "result: " << buffer.str() << std::endl;
    }
    catch(const boost::iostreams::zlib_error &e)
    {
        std::cerr << e.what() << '\n';
    }
}

int main(int argc, const char *argv[])
{
    if (argc > 1 && argv[1] ==  std::string("sender"))
        send("hello world");
    else
        receive();

    return 0;
}

먼저 서버를 시작한 다음 클라이언트를 시작합니다. 다음 출력이 생성됩니다.

섬기는 사람

$ ./example
# now it waits while client will be accepted
start session
start reading

고객

$ ./example sender
sended: hello world

위의 출력으로 프로그램이 차단됩니다. 나는 서버가 여전히 클라이언트의 데이터를 기다리고 있다고 생각하며 클라이언트가 가진 모든 것을 보낸 것을 알지 못합니다.

클라이언트를 닫으면 Ctrl + C출력은 다음과 같습니다.

$ ./example
# now it waits while client will be accepted
start session
start reading
# now it is blocked until I press Ctrl + C
data: hello world
Reply is sended
start reading
zlib error-5

$ ./example sender
sended: hello world
^C

나는 추측 zlib error-5서버가 아카이브가 불완전하다고 생각하기 때문이다.

예상되는 동작 은 차단되지 않습니다. 메시지는 클라이언트가 시작될 때 서버 프로그램 출력에 나타나야합니다.

프로그램이 읽기에서 차단되는 이유는 무엇입니까? 어떻게 고칠 수 있습니까?

답변

1 sehe Jan 18 2021 at 21:56

iostreams::copy 그것은 단지 그것을합니다 : 그것은 스트림을 복사합니다.

코드에 대한 칭찬. 그것은 매우 읽기 쉽습니다 :) 부스트 iostream socket으로 파일 읽기 및 쓰기이 답변을 상기시킵니다 . 가장 큰 차이점은 응답이 압축 된 단일 blob을 보내고 닫는다는 것입니다.

압축 해제 기가 하나의 압축 된 블록이 완료 될 때를 알고 있다는 것은 "맞습니다".하지만 다른 블록이 따라 오지 않을 것이라고 결정하지는 않습니다.

따라서 프레임을 추가해야합니다. 전통적인 방법은 대역 외 길이를 전달하는 것입니다. IO 조작기를 사용하여 코드 중복을 줄이면서 변경 사항을 구현했습니다.

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) ;
    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) ;
};

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) ;
    friend std::istream& operator>>(std::istream& is, ZLIB z) ;
};

ZLIB 속이는 사람

이것은 주로 발신자 / 수신자간에 복제 한 코드를 캡슐화합니다.

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(os);
            out << z.data << std::flush;
        }
        return os.flush();
    }

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;
        in.push(boost::iostreams::zlib_decompressor());
        in.push(is);

        std::ostringstream oss;
        copy(in, oss);
        z.data = oss.str();

        return is;
    }
};

나는 T그것을 저장 std::string&하거나 std::string const&필요에 따라 템플릿을 만들었습니다 .

LengthPrefixed 속이는 사람

이 조작자는 직렬화되는 항목을 신경 쓰지 않지만 단순히 유효 길이의 접두사를 연결합니다.

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
    }

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (is.read(on_the_wire.data(), on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
            }
        }
        return is;
    }
};

우리는 미묘함을 추가합니다. 우리가 구성한 것에 따라 참조 또는 값을 저장함으로써 임시 (ZLIB 조작자와 같은)를 수용 할 수도 있습니다.

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

나는 ZLIB매니퓰레이터를 똑같이 일반화 하려고 생각하지 않았습니다. 그래서 저는 독자를위한 엑소시즘으로 남겨 둡니다.

데모 프로그램

이 두 가지를 결합하여 발신자 / 수신자를 다음과 같이 간단하게 작성할 수 있습니다.

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
    }
}

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;
}

실제로 다음과 같이 인쇄됩니다.

reader: start session
sender: Writing length 19
reader: Reading length 19
sender: sent: "hello world"
reader: data: "hello world"
reader: Writing length 19
sender: Reading length 19
sender: result: "hello world"

전체 목록

#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>

#include <iostream>
#include <iomanip>
#include <sstream>

namespace ip = boost::asio::ip;
using ip::tcp;

const unsigned short port = 9999;
const char host[] = "127.0.0.1";

#ifdef DEBUG
    std::ostream debug(std::cerr.rdbuf());
#else
    std::ostream debug(nullptr);
#endif

template <typename T> struct LengthPrefixed {
    T _wrapped;

    friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
        std::ostringstream oss;
        oss << lp._wrapped;
        auto on_the_wire = std::move(oss).str();

        debug << "Writing length " << on_the_wire.length() << std::endl;
        return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
    }

    friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
        size_t len;
        if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
            debug << "Reading length " << len << std::endl;

            std::string on_the_wire(len, '\0');
            if (is.read(on_the_wire.data(), on_the_wire.size())) {
                std::istringstream iss(on_the_wire);
                iss >> lp._wrapped;
            }
        }
        return is;
    }
};

template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;

template <typename T> struct ZLIB {
    T& data;
    ZLIB(T& ref) : data(ref){}

    friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
        {
            boost::iostreams::filtering_ostream out;
            out.push(boost::iostreams::zlib_compressor());
            out.push(os);
            out << z.data << std::flush;
        }
        return os.flush();
    }

    friend std::istream& operator>>(std::istream& is, ZLIB z) {
        boost::iostreams::filtering_istream in;
        in.push(boost::iostreams::zlib_decompressor());
        in.push(is);

        std::ostringstream oss;
        copy(in, oss);
        z.data = oss.str();

        return is;
    }
};

void server() {
    boost::asio::io_context ctx;
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::acceptor a(ctx, ep);

    tcp::iostream stream;
    a.accept(stream.socket());

    std::cout << "start session" << std::endl;

    for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
        std::cout << "data: " << std::quoted(data) << std::endl;
        stream << LengthPrefixed{ZLIB{data}} << std::flush;
    }
}

void client(std::string data) {
    tcp::endpoint ep(ip::address::from_string(host), port);
    tcp::iostream stream(ep);

    stream << LengthPrefixed{ZLIB{data}} << std::flush;
    std::cout << "sent: " << std::quoted(data) << std::endl;

    stream >> LengthPrefixed{ZLIB{data}};
    std::cout << "result: " << std::quoted(data) << std::endl;
}

int main(int argc, const char**) {
    try {
        if (argc > 1)
            client("hello world");
        else
            server();
    } catch (const std::exception& e) {
        std::cerr << e.what() << '\n';
    }
}
Bogdan Jan 18 2021 at 21:50

boost::serialization다음 단계를 사용하여 문제를 해결 합니다.

  1. 우선 다음과 같은 기능으로 압축을 옮겼습니다.
namespace io = boost::iostreams;

namespace my {
std::string compress(const std::string &data)
{
    std::stringstream input, output;

    input << data;

    io::filtering_ostream io_out;
    io_out.push(io::zlib_compressor());
    io_out.push(output);

    io::copy(input, io_out);

    return output.str();
}

std::string decompress(const std::string &data)
{
    std::stringstream input, output;

    input << data;

    io::filtering_istream io_in;
    io_in.push(io::zlib_decompressor());
    io_in.push(input);

    io::copy(io_in, output);

    return output.str();
}
} // namespace my
  1. 그런 다음 다음과 같은 문자열 버퍼에 대한 래퍼를 만들었습니다 ( documentation 의 자습서에 따라 ).
class Package
{
public:
    Package(const std::string &buffer) : buffer(buffer) {}

private:
    std::string buffer;

    friend class boost::serialization::access;

    template<class Archive>
    void serialize(Archive &ar, const unsigned int)
    {
        ar & buffer;
    }

};
  1. 그리고 마침내 읽은 후 전송하기 전에 직렬화를 추가했습니다.
/**
 * receiver
 */
Package request;

{
    boost::archive::text_iarchive ia(*stream);
    ia >> request;
}

std::string data = my::decompress(request.buffer);

// do something with data

Package response(my::compress(data));

{
    boost::archive::text_oarchive oa(*stream);
    oa << response;
}

/**
 * sender
 */
std::string data = "hello world";
Package package(my::compress(data));

// send request
{
    boost::archive::text_oarchive oa(*m_stream);
    oa << package;
}

// waiting for a response
{
    boost::archive::text_iarchive ia(*m_stream);
    ia >> package;
}

// decompress response buffer
result = my::decompress(package.get_buffer());