如何解决C++ - Boost streambuf 放气的行为不一致? 代码输出+问题解决方案
问题说明
我正在尝试使用他们的 websocket API 建立到 OKEX 交易所的 websocket 连接。我正在使用 Boost::Beast
网络套接字。
问题是OKEX的服务器没有遵循正确的permessage_deflate
压缩协议,发送的消息是incorrectly deflated。所以我试图自己夸大这些消息。问题是它不起作用......让我发疯的是我得到的行为有点不一致。
实际代码
我的代码主要是从 previously linked to thread 复制粘贴的。为简单起见,我删除了所有预处理器宏,并对套接字值进行了硬编码。
inflate
代码取自 Raj Advani's answer here。
这是 main.cpp
文件:
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;
int inflate(const void *src,int srcLen,void *dst,int dstLen) {
z_stream strm = {0};
strm.total_in = strm.avail_in = srcLen;
strm.total_out = strm.avail_out = dstLen;
strm.next_in = (Bytef *) src;
strm.next_out = (Bytef *) dst;
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
int err = -1;
int ret = -1;
err = inflateInit2(&strm,(15 + 32)); //15 window bits,and the +32 tells zlib to to detect if using gzip or zlib
if (err == Z_OK) {
err = inflate(&strm,Z_FINISH);
if (err == Z_STREAM_END) {
ret = strm.total_out;
}
else {
inflateEnd(&strm);
return err;
}
}
else {
inflateEnd(&strm);
return err;
}
inflateEnd(&strm);
return ret;
}
int main(int argc,char** argv) {
std::string host = "real.okex.com";
auto const port = "8443";
auto const path = "/ws/v3";
net::io_context ioc;
ssl::context ctx{ ssl::context::sslv23 };
tcp::resolver resolver{ ioc };
stream_t s{ ioc,ctx };
ctx.set_verify_mode(ssl::verify_none);
tcp::resolver::results_type results = resolver.resolve(host,port);
net::connect(
beast::get_lowest_layer(s),//s.next_layer().next_layer(),results.begin());
// SSL handshake
s.next_layer().handshake(ssl::stream_base::client);
s.handshake(host + ":" + port,path);
std::cout << "connected." << std::endl;
// send request to the websocket
s.write(net::buffer("{'op':'subscribe','args':['spot/ticker:ETH-USDT']}"));
{
net::streambuf buffer;
s.read(buffer);
// auto data_it = buffer.data().begin();
// std::cout<<"Iterating over data of size:" << buffer.data().size()<<endl; // LINE 85
// int i = 0;
// while (data_it != buffer.data().end()) {
// std::cout << "buffer data["<<i++<<"] size:" << (data_it->size())<<endl;
// data_it++;
// }
net::streambuf out_buffer;
const int error_code_out = inflate(&buffer,buffer.size(),&out_buffer,10000000);
std::cout << "received. size:"<<buffer.size()<<" data: "<< &buffer << std::endl;
std::cout << "deflated. error?"<< error_code_out << " data: " << &out_buffer << std::endl;
}
}
代码输出+问题
膨胀说 buffer
的大小是 117。我认为这是合理的,但由于某种原因,我在解压时得到 Z_DATA_ERROR
,这让我相信还有更多的数据解析....
所以我查看了 net::streambuf
的文档,我发现显然有多个缓冲区可以读取,所以也许我只使用了一个缓冲区?我运行了注释掉的代码(不包括中间的 LINE 85
行)并且它从未通过循环......我认为这很奇怪。我把那条线放进去,然后突然间我有几百个缓冲区? (截断的)输出类似于:
connected.
Iterating over data of size:117
buffer data[0] size:117
buffer data[1] size:72198326954657960
buffer data[2] size:140735485986592
buffer data[3] size:140618848326656
buffer data[4] size:140618848326656
.. many more lines of this...
buffer data[121] size:7089075335985461349
buffer data[122] size:3472329396561475632
buffer data[123] size:8747116609081390898
buffer data[124] size:3472329396561475632
buffer data[125] size:3472387902693336678
buffer data[126] size:
Process finished with exit code 139 (interrupted by signal 11: SIGSEGV)
如您所见,它崩溃了。我不知道发生了什么。我现在不知道如何解码 streambuf
......而 the documentation 似乎假设我没有很多背景知识。我尝试使用 buffer.data()
,将 buffer
转换为 char*
数组,所有这些都导致我完全相同的行为......
不知道该怎么办。欢迎任何帮助
参考:Python 实现
import websockets
import asyncio
import zlib
def inflate(data):
decompress = zlib.decompressobj(-zlib.MAX_WBITS)
inflated = decompress.decompress(data)
inflated += decompress.flush()
return inflated
async def main():
client = await websockets.connect("wss://real.okex.com:8443/ws/v3")
await client.send("{'op':'subscribe','args':['spot/ticker:ETH-USDT']}")
r = await client.recv()
print(len(r),r)
print(inflate(r))
if __name__ == '__main__':
asyncio.run(main())
解决方法
附注:
最后我想起之前看到OKEX返回的特定类型的乱码,确实我之前看过这个服务器。
该问题在概念上与 Boost inflate algorithm decompress 重复,但您的特定代码有(大得多)更大的问题,值得分析:
我很困惑。这是如何编译的?
const int error_code_out = inflate(&buffer,buffer.size(),&out_buffer,10000000);
Buffer 不是字符数组或类似数组。它甚至不是 POD 类型。这是一个 streambuf
。就这样使用它。
你抱怨“有点不一致”?这完全有道理,因为这就是您所要求的:调用 Undefined Behaviour 是获得“不一致”的好方法。或鼻守护进程。
现在,接下来是:为什么编译器不警告?
罪魁祸首是 ::inflate
重载了同名的 ZLIB 函数。让我们把我们的放在一个命名空间中来解开。
接下来,它需要 void*
参数。呵呵。并且他们reinterpet_cast<>
-ed进入Byte*
。只是这么多讨厌这段代码。它鲁莽地使用 C 风格的强制转换,抛弃 const
并且完全无视参数甚至不是 POD。
让我们稍微安全一点:
namespace mylib {
int inflate(uint8_t const *src,int srcLen,uint8_t *dst,int dstLen) {
z_stream strm {};
strm.total_in = strm.avail_in = srcLen;
strm.total_out = strm.avail_out = dstLen;
strm.next_in = const_cast<Bytef*>(static_cast<Bytef const*>(src));
strm.next_out = static_cast<Bytef*>(dst);
我们不要掩饰你甚至明确地、故意地
承诺输出缓冲区大小准确的功能
10000000
。你应该问问自己,当你
写了。
现在代码表达了意图,我们可以期待编译器诊断我们的错误。它当然会这样做,因为这就是 C++ 编译器所做的。
修复调用
让我们避开 net::streambuf
的混淆。您可以将字符串或向量用作动态缓冲区,同样如此。这可能并不总是那么有效,但让我们专注于可理解的代码:
std::vector<uint8_t> in,out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);
out.resize(1024); // make sure it's enough
const int err = mylib::inflate(
in.data(),in.size(),out.data(),out.size());
看,现在你知道你正在传递什么。而且会没事的。
让我们避免打印输入数据(这是二进制乱码......)。
std::cout << "received. size:" << in_buffer.size() << std::endl;
std::cout << "deflated. error?" << err << " data: "
<< std::string(out.begin(),out.end())
<< std::endl;
此时,通胀仍然失败,但这是服务器的问题,请参阅 Boost inflate algorithm decompress - 其中还显示了缓冲区处理的一些替代方案。
固定代码
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;
namespace mylib {
int inflate(uint8_t const *src,int dstLen) {
z_stream strm {};
strm.total_in = strm.avail_in = srcLen;
strm.total_out = strm.avail_out = dstLen;
strm.next_in = const_cast<Bytef*>(static_cast<Bytef const*>(src));
strm.next_out = static_cast<Bytef*>(dst);
strm.zalloc = Z_NULL;
strm.zfree = Z_NULL;
strm.opaque = Z_NULL;
int err = -1;
int ret = -1;
err = inflateInit2(&strm,(15 + 32)); //15 window bits,and the +32 tells zlib to to detect if using gzip or zlib
if (err == Z_OK) {
err = inflate(&strm,Z_FINISH);
if (err == Z_STREAM_END) {
ret = strm.total_out;
}
else {
inflateEnd(&strm);
return err;
}
}
else {
inflateEnd(&strm);
return err;
}
inflateEnd(&strm);
return ret;
}
}
int main() {
std::string host = "real.okex.com";
auto const port = "8443";
auto const path = "/ws/v3";
net::io_context ioc;
tcp::resolver resolver{ ioc };
ssl::context ctx { ssl::context::sslv23 };
ctx.set_verify_mode(ssl::verify_none);
stream_t s{ ioc,ctx };
net::connect(beast::get_lowest_layer(s),resolver.resolve(host,port));
// SSL handshake
s.next_layer().handshake(ssl::stream_base::client);
{
websocket::permessage_deflate opt;
opt.client_enable = true; // for clients
opt.server_enable = true; // for servers
s.set_option(opt);
}
s.handshake(host + ":" + port,path);
std::cerr << "connected." << std::endl;
// send request to the websocket
s.write(net::buffer("{'op':'subscribe','args':['spot/ticker:ETH-USDT']}"));
{
std::vector<uint8_t> in,out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);
// std::cout.write(reinterpret_cast<char const*>(in.data()),in.size());
out.resize(1024); // make sure it's enough
const int err = mylib::inflate(
in.data(),out.size());
std::cerr << "received. size:" << in_buffer.size() << std::endl;
//std::cerr << "received. data:" << std::string(in.begin(),in.end()) << std::endl;
std::cerr << "deflated. error?" << err << " data: "
<< std::string(out.begin(),out.end())
<< std::endl;
}
}
,
我有一个 older answer 来解决代码的问题。从那以后,我找到了解决剩余问题的方法:服务器响应。
解决方案
OKEX 不仅没有遵守 WS 标准来启用按消息通缩,而且还突然结束了数据。然而,事实证明,如果您保留部分膨胀的结果实际上是没问题的。
我使用它的方式是不直接使用 ZLIB,而是使用 beast::zlib::inflate_stream
。这有一个更灵活的接口,可以让我们得到我们需要的结果:
namespace mylib {
auto inflate(std::vector<uint8_t> const& in,std::vector<uint8_t>& out) {
boost::system::error_code ec;
beast::zlib::z_params zp{};
zp.next_in = (Bytef*)in.data();
zp.avail_in = in.size();
zp.next_out = out.data();
zp.avail_out = out.size();
beast::zlib::inflate_stream zs;
zs.write(zp,beast::zlib::Flush::full,ec);
return ec;
}
}
现在我们像这样使用它:
std::vector<uint8_t> in,out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);
out.resize(1024); // make sure it's enough
auto ec = mylib::inflate(in,out);
std::cout << "deflated. " << ec.message() << std::endl;
std::cout << std::string(out.begin(),out.end()) << std::endl;
然后打印
connected.
deflated. unexpected end of deflate stream
{"event":"error","message":"Unrecognized request: {'op':'subscribe','args':['spot/ticker:ETH-USDT']}\u0000","errorCode":30039}
因此,尽管 deflate 流意外结束,但数据是有效且完整的 JSON。
PS.2:参见newer answer,设法克服了最后的障碍
完整列表
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace http = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;
namespace mylib {
auto inflate(std::vector<uint8_t> const& in,ec);
return ec;
}
}
int main() {
std::string host = "real.okex.com";
auto const port = "8443";
auto const path = "/ws/v3";
net::io_context ioc;
tcp::resolver resolver{ ioc };
ssl::context ctx { ssl::context::sslv23 };
ctx.set_verify_mode(ssl::verify_none);
stream_t s{ ioc,path);
std::cout << "connected." << std::endl;
// send request to the websocket
s.write(net::buffer("{'op':'subscribe',out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);
out.resize(1024); // make sure it's enough
auto ec = mylib::inflate(in,out);
std::cout << "deflated. " << ec.message() << std::endl;
std::cout << std::string(out.begin(),out.end()) << std::endl;
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。