zlib 압축을 사용하는 소켓 iostreams 에코 서버는 연결이 닫힐 때까지 절전 모드로 전환됩니다.
나는 다음과 zlib 압축 간단한 에코 서버를 만들려고 이 와 이 예.
내 생각은 std::string(reinterpret_cast<const char *>(&pod), sizeof(pod))
전송 레이어가 작동하는지 확인하기 전에 POD 유형을 문자열 ( ) 로 변환 할 수 있기 때문에 지금 문자열 을 보내는 것입니다.
그리고 여기에 문제가 있습니다. 클라이언트는 데이터를 압축하고 전송하며 데이터가 전송되었다고 말하지만 서버는 데이터 읽기를 차단합니다. 왜 그런지 이해할 수 없습니다.
operator<<
와 함께 사용해 보았습니다 out.flush()
, 또한 사용해 보았습니다 boost::iostreams::copy()
. 결과는 동일합니다. 코드 예제는 다음과 같습니다 (args에 따라 서버와 클라이언트에 동일한 소스 파일을 사용합니다).
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <sstream>
namespace ip = boost::asio::ip;
using ip::tcp;
const unsigned short port = 9999;
const char host[] = "127.0.0.1";
void receive()
{
boost::asio::io_context ctx;
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::acceptor a(ctx, ep);
tcp::iostream stream;
a.accept(stream.socket());
std::stringstream buffer;
std::cout << "start session" << std::endl;
try
{
for (;;)
{
{
boost::iostreams::filtering_istream in;
in.push(boost::iostreams::zlib_decompressor());
in.push(stream);
std::cout << "start reading" << std::endl;
// looks like server is blocked here
boost::iostreams::copy(in, buffer);
}
std::cout << "data: " << buffer.str() << std::endl;
{
boost::iostreams::filtering_ostream out;
out.push(boost::iostreams::zlib_compressor());
out.push(stream);
boost::iostreams::copy(buffer, out);
}
std::cout << "Reply is sended" << std::endl;
}
}
catch(const boost::iostreams::zlib_error &e)
{
std::cerr << e.what() << e.error() << '\n';
stream.close();
}
}
void send(const std::string &data)
{
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::iostream stream;
stream.connect(ep);
std::stringstream buffer;
buffer << data;
if (!stream)
{
std::cerr << "Cannot connect to " << host << ":" << port << std::endl;
return;
}
try
{
{
boost::iostreams::filtering_ostream out;
out.push(boost::iostreams::zlib_compressor());
out.push(stream);
out << buffer.str();
out.flush();
}
std::cout << "sended: " << data << std::endl;
buffer.str("");
{
boost::iostreams::filtering_istream in;
in.push(boost::iostreams::zlib_decompressor());
in.push(stream);
// looks like client is blocked here
boost::iostreams::copy(in, buffer);
}
std::cout << "result: " << buffer.str() << std::endl;
}
catch(const boost::iostreams::zlib_error &e)
{
std::cerr << e.what() << '\n';
}
}
int main(int argc, const char *argv[])
{
if (argc > 1 && argv[1] == std::string("sender"))
send("hello world");
else
receive();
return 0;
}
먼저 서버를 시작한 다음 클라이언트를 시작합니다. 다음 출력이 생성됩니다.
섬기는 사람
$ ./example
# now it waits while client will be accepted
start session
start reading
고객
$ ./example sender
sended: hello world
위의 출력으로 프로그램이 차단됩니다. 나는 서버가 여전히 클라이언트의 데이터를 기다리고 있다고 생각하며 클라이언트가 가진 모든 것을 보낸 것을 알지 못합니다.
클라이언트를 닫으면 Ctrl + C
출력은 다음과 같습니다.
$ ./example
# now it waits while client will be accepted
start session
start reading
# now it is blocked until I press Ctrl + C
data: hello world
Reply is sended
start reading
zlib error-5
과
$ ./example sender
sended: hello world
^C
나는 추측 zlib error-5
서버가 아카이브가 불완전하다고 생각하기 때문이다.
예상되는 동작 은 차단되지 않습니다. 메시지는 클라이언트가 시작될 때 서버 프로그램 출력에 나타나야합니다.
프로그램이 읽기에서 차단되는 이유는 무엇입니까? 어떻게 고칠 수 있습니까?
답변
iostreams::copy
그것은 단지 그것을합니다 : 그것은 스트림을 복사합니다.
코드에 대한 칭찬. 그것은 매우 읽기 쉽습니다 :) 부스트 iostream socket으로 파일 읽기 및 쓰기이 답변을 상기시킵니다 . 가장 큰 차이점은 응답이 압축 된 단일 blob을 보내고 닫는다는 것입니다.
압축 해제 기가 하나의 압축 된 블록이 완료 될 때를 알고 있다는 것은 "맞습니다".하지만 다른 블록이 따라 오지 않을 것이라고 결정하지는 않습니다.
따라서 프레임을 추가해야합니다. 전통적인 방법은 대역 외 길이를 전달하는 것입니다. IO 조작기를 사용하여 코드 중복을 줄이면서 변경 사항을 구현했습니다.
template <typename T> struct LengthPrefixed {
T _wrapped;
friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) ;
friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) ;
};
과
template <typename T> struct ZLIB {
T& data;
ZLIB(T& ref) : data(ref){}
friend std::ostream& operator<<(std::ostream& os, ZLIB z) ;
friend std::istream& operator>>(std::istream& is, ZLIB z) ;
};
ZLIB
속이는 사람
이것은 주로 발신자 / 수신자간에 복제 한 코드를 캡슐화합니다.
template <typename T> struct ZLIB {
T& data;
ZLIB(T& ref) : data(ref){}
friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
{
boost::iostreams::filtering_ostream out;
out.push(boost::iostreams::zlib_compressor());
out.push(os);
out << z.data << std::flush;
}
return os.flush();
}
friend std::istream& operator>>(std::istream& is, ZLIB z) {
boost::iostreams::filtering_istream in;
in.push(boost::iostreams::zlib_decompressor());
in.push(is);
std::ostringstream oss;
copy(in, oss);
z.data = oss.str();
return is;
}
};
나는
T
그것을 저장std::string&
하거나std::string const&
필요에 따라 템플릿을 만들었습니다 .
LengthPrefixed
속이는 사람
이 조작자는 직렬화되는 항목을 신경 쓰지 않지만 단순히 유효 길이의 접두사를 연결합니다.
template <typename T> struct LengthPrefixed {
T _wrapped;
friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
std::ostringstream oss;
oss << lp._wrapped;
auto on_the_wire = std::move(oss).str();
debug << "Writing length " << on_the_wire.length() << std::endl;
return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
}
friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
size_t len;
if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
debug << "Reading length " << len << std::endl;
std::string on_the_wire(len, '\0');
if (is.read(on_the_wire.data(), on_the_wire.size())) {
std::istringstream iss(on_the_wire);
iss >> lp._wrapped;
}
}
return is;
}
};
우리는 미묘함을 추가합니다. 우리가 구성한 것에 따라 참조 또는 값을 저장함으로써 임시 (ZLIB 조작자와 같은)를 수용 할 수도 있습니다.
template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;
나는
ZLIB
매니퓰레이터를 똑같이 일반화 하려고 생각하지 않았습니다. 그래서 저는 독자를위한 엑소시즘으로 남겨 둡니다.
데모 프로그램
이 두 가지를 결합하여 발신자 / 수신자를 다음과 같이 간단하게 작성할 수 있습니다.
void server() {
boost::asio::io_context ctx;
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::acceptor a(ctx, ep);
tcp::iostream stream;
a.accept(stream.socket());
std::cout << "start session" << std::endl;
for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
std::cout << "data: " << std::quoted(data) << std::endl;
stream << LengthPrefixed{ZLIB{data}} << std::flush;
}
}
void client(std::string data) {
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::iostream stream(ep);
stream << LengthPrefixed{ZLIB{data}} << std::flush;
std::cout << "sent: " << std::quoted(data) << std::endl;
stream >> LengthPrefixed{ZLIB{data}};
std::cout << "result: " << std::quoted(data) << std::endl;
}
실제로 다음과 같이 인쇄됩니다.
reader: start session
sender: Writing length 19
reader: Reading length 19
sender: sent: "hello world"
reader: data: "hello world"
reader: Writing length 19
sender: Reading length 19
sender: result: "hello world"
전체 목록
#include <boost/iostreams/filtering_stream.hpp>
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/copy.hpp>
#include <boost/asio.hpp>
#include <iostream>
#include <iomanip>
#include <sstream>
namespace ip = boost::asio::ip;
using ip::tcp;
const unsigned short port = 9999;
const char host[] = "127.0.0.1";
#ifdef DEBUG
std::ostream debug(std::cerr.rdbuf());
#else
std::ostream debug(nullptr);
#endif
template <typename T> struct LengthPrefixed {
T _wrapped;
friend std::ostream& operator<<(std::ostream& os, LengthPrefixed lp) {
std::ostringstream oss;
oss << lp._wrapped;
auto on_the_wire = std::move(oss).str();
debug << "Writing length " << on_the_wire.length() << std::endl;
return os << on_the_wire.length() << "\n" << on_the_wire << std::flush;
}
friend std::istream& operator>>(std::istream& is, LengthPrefixed lp) {
size_t len;
if (is >> std::noskipws >> len && is.ignore(1, '\n')) {
debug << "Reading length " << len << std::endl;
std::string on_the_wire(len, '\0');
if (is.read(on_the_wire.data(), on_the_wire.size())) {
std::istringstream iss(on_the_wire);
iss >> lp._wrapped;
}
}
return is;
}
};
template <typename T> LengthPrefixed(T&&) -> LengthPrefixed<T>;
template <typename T> LengthPrefixed(T&) -> LengthPrefixed<T&>;
template <typename T> struct ZLIB {
T& data;
ZLIB(T& ref) : data(ref){}
friend std::ostream& operator<<(std::ostream& os, ZLIB z) {
{
boost::iostreams::filtering_ostream out;
out.push(boost::iostreams::zlib_compressor());
out.push(os);
out << z.data << std::flush;
}
return os.flush();
}
friend std::istream& operator>>(std::istream& is, ZLIB z) {
boost::iostreams::filtering_istream in;
in.push(boost::iostreams::zlib_decompressor());
in.push(is);
std::ostringstream oss;
copy(in, oss);
z.data = oss.str();
return is;
}
};
void server() {
boost::asio::io_context ctx;
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::acceptor a(ctx, ep);
tcp::iostream stream;
a.accept(stream.socket());
std::cout << "start session" << std::endl;
for (std::string data; stream >> LengthPrefixed{ZLIB{data}};) {
std::cout << "data: " << std::quoted(data) << std::endl;
stream << LengthPrefixed{ZLIB{data}} << std::flush;
}
}
void client(std::string data) {
tcp::endpoint ep(ip::address::from_string(host), port);
tcp::iostream stream(ep);
stream << LengthPrefixed{ZLIB{data}} << std::flush;
std::cout << "sent: " << std::quoted(data) << std::endl;
stream >> LengthPrefixed{ZLIB{data}};
std::cout << "result: " << std::quoted(data) << std::endl;
}
int main(int argc, const char**) {
try {
if (argc > 1)
client("hello world");
else
server();
} catch (const std::exception& e) {
std::cerr << e.what() << '\n';
}
}
boost::serialization
다음 단계를 사용하여 문제를 해결 합니다.
- 우선 다음과 같은 기능으로 압축을 옮겼습니다.
namespace io = boost::iostreams;
namespace my {
std::string compress(const std::string &data)
{
std::stringstream input, output;
input << data;
io::filtering_ostream io_out;
io_out.push(io::zlib_compressor());
io_out.push(output);
io::copy(input, io_out);
return output.str();
}
std::string decompress(const std::string &data)
{
std::stringstream input, output;
input << data;
io::filtering_istream io_in;
io_in.push(io::zlib_decompressor());
io_in.push(input);
io::copy(io_in, output);
return output.str();
}
} // namespace my
- 그런 다음 다음과 같은 문자열 버퍼에 대한 래퍼를 만들었습니다 ( documentation 의 자습서에 따라 ).
class Package
{
public:
Package(const std::string &buffer) : buffer(buffer) {}
private:
std::string buffer;
friend class boost::serialization::access;
template<class Archive>
void serialize(Archive &ar, const unsigned int)
{
ar & buffer;
}
};
- 그리고 마침내 읽은 후 전송하기 전에 직렬화를 추가했습니다.
/**
* receiver
*/
Package request;
{
boost::archive::text_iarchive ia(*stream);
ia >> request;
}
std::string data = my::decompress(request.buffer);
// do something with data
Package response(my::compress(data));
{
boost::archive::text_oarchive oa(*stream);
oa << response;
}
/**
* sender
*/
std::string data = "hello world";
Package package(my::compress(data));
// send request
{
boost::archive::text_oarchive oa(*m_stream);
oa << package;
}
// waiting for a response
{
boost::archive::text_iarchive ia(*m_stream);
ia >> package;
}
// decompress response buffer
result = my::decompress(package.get_buffer());