微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

C++ - Boost streambuf 放气的行为不一致? 代码输出+问题解决方案

如何解决C++ - Boost streambuf 放气的行为不一致? 代码输出+问题解决方案

问题说明

我正在尝试使用他们的 websocket API 建立到 OKEX 交易所的 websocket 连接。我正在使用 Boost::Beast 网络套接字。

问题是OKEX的服务器没有遵循正确的permessage_deflate压缩协议,发送的消息是incorrectly deflated。所以我试图自己夸大这些消息。问题是它不起作用......让我发疯的是我得到的行为有点不一致。

实际代码

我的代码主要是从 previously linked to thread 复制粘贴的。为简单起见,我删除了所有预处理器宏,并对套接字值进行了硬编码。

inflate 代码取自 Raj Advani's answer here

这是 main.cpp 文件

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>

namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace beast     = boost::beast;
namespace http      = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;

int inflate(const void *src,int srcLen,void *dst,int dstLen) {
    z_stream strm  = {0};
    strm.total_in  = strm.avail_in  = srcLen;
    strm.total_out = strm.avail_out = dstLen;
    strm.next_in   = (Bytef *) src;
    strm.next_out  = (Bytef *) dst;

    strm.zalloc = Z_NULL;
    strm.zfree  = Z_NULL;
    strm.opaque = Z_NULL;

    int err = -1;
    int ret = -1;

    err = inflateInit2(&strm,(15 + 32)); //15 window bits,and the +32 tells zlib to to detect if using gzip or zlib
    if (err == Z_OK) {
        err = inflate(&strm,Z_FINISH);
        if (err == Z_STREAM_END) {
            ret = strm.total_out;
        }
        else {
            inflateEnd(&strm);
            return err;
        }
    }
    else {
        inflateEnd(&strm);
        return err;
    }

    inflateEnd(&strm);
    return ret;
}


int main(int argc,char** argv) {

    std::string host = "real.okex.com";
    auto const port  = "8443";
    auto const path  = "/ws/v3";

    net::io_context ioc;
    ssl::context ctx{ ssl::context::sslv23 };
    tcp::resolver resolver{ ioc };
    stream_t s{ ioc,ctx };
    ctx.set_verify_mode(ssl::verify_none);
    tcp::resolver::results_type results = resolver.resolve(host,port);
    net::connect(
            beast::get_lowest_layer(s),//s.next_layer().next_layer(),results.begin());

    // SSL handshake
    s.next_layer().handshake(ssl::stream_base::client);
    s.handshake(host + ":" + port,path);

    std::cout << "connected." << std::endl;

    // send request to the websocket
    s.write(net::buffer("{'op':'subscribe','args':['spot/ticker:ETH-USDT']}"));

    {
        net::streambuf buffer;
        s.read(buffer);

//        auto data_it = buffer.data().begin();
//        std::cout<<"Iterating over data of size:" << buffer.data().size()<<endl; // LINE 85
//        int i = 0;
//        while (data_it != buffer.data().end()) {
//            std::cout << "buffer data["<<i++<<"] size:" << (data_it->size())<<endl;
//            data_it++;
//        }

        net::streambuf out_buffer;
        const int error_code_out = inflate(&buffer,buffer.size(),&out_buffer,10000000);

        std::cout << "received. size:"<<buffer.size()<<" data: "<< &buffer << std::endl;
        std::cout << "deflated. error?"<< error_code_out << " data: " << &out_buffer << std::endl;
    }
}

代码输出+问题

膨胀说 buffer 的大小是 117。我认为这是合理的,但由于某种原因,我在解压时得到 Z_DATA_ERROR,这让我相信还有更多的数据解析....

所以我查看了 net::streambuf 的文档,我发现显然有多个缓冲区可以读取,所以也许我只使用了一个缓冲区?我运行了注释掉的代码(不包括间的 LINE 85 行)并且它从未通过循环......我认为这很奇怪。我把那条线放进去,然后突然间我有几百个缓冲区? (截断的)输出类似于:

connected.
Iterating over data of size:117
buffer data[0] size:117
buffer data[1] size:72198326954657960
buffer data[2] size:140735485986592
buffer data[3] size:140618848326656
buffer data[4] size:140618848326656
.. many more lines of this...
buffer data[121] size:7089075335985461349
buffer data[122] size:3472329396561475632
buffer data[123] size:8747116609081390898
buffer data[124] size:3472329396561475632
buffer data[125] size:3472387902693336678
buffer data[126] size:
Process finished with exit code 139 (interrupted by signal 11: SIGSEGV)

如您所见,它崩溃了。我不知道发生了什么。我现在不知道如何解码 streambuf ......而 the documentation 似乎假设我没有很多背景知识。我尝试使用 buffer.data(),将 buffer 转换为 char* 数组,所有这些都导致我完全相同的行为......

不知道该怎么办。欢迎任何帮助


参考:Python 实现

import websockets
import asyncio
import zlib

def inflate(data):
    decompress = zlib.decompressobj(-zlib.MAX_WBITS)
    inflated = decompress.decompress(data)
    inflated += decompress.flush()
    return inflated

async def main():
    client = await websockets.connect("wss://real.okex.com:8443/ws/v3")
    await client.send("{'op':'subscribe','args':['spot/ticker:ETH-USDT']}")
    r = await client.recv()
    print(len(r),r)
    print(inflate(r))


if __name__ == '__main__':
    asyncio.run(main())

解决方法

附注:

最后我想起之前看到OKEX返回的特定类型的乱码,确实我之前看过这个服务器。

该问题在概念上与 Boost inflate algorithm decompress 重复,但您的特定代码有(大得多)更大的问题,值得分析:

我很困惑。这是如何编译的?

const int error_code_out = inflate(&buffer,buffer.size(),&out_buffer,10000000);

Buffer 不是字符数组或类似数组。它甚至不是 POD 类型。这是一个 streambuf。就这样使用它。

你抱怨“有点不一致”?这完全有道理,因为这就是您所要求的:调用 Undefined Behaviour 是获得“不一致”的好方法。或鼻守护进程。

现在,接下来是:为什么编译器不警告?

罪魁祸首是 ::inflate 重载了同名的 ZLIB 函数。让我们把我们的放在一个命名空间中来解开。

接下来,它需要 void* 参数。呵呵。并且他们reinterpet_cast<>-ed进入Byte*。只是这么多讨厌这段代码。它鲁莽地使用 C 风格的强制转换,抛弃 const 并且完全无视参数甚至不是 POD。

让我们稍微安全一点:

namespace mylib {
    int inflate(uint8_t const *src,int srcLen,uint8_t *dst,int dstLen) {
        z_stream strm  {};
        strm.total_in  = strm.avail_in  = srcLen;
        strm.total_out = strm.avail_out = dstLen;
        strm.next_in = const_cast<Bytef*>(static_cast<Bytef const*>(src));
        strm.next_out = static_cast<Bytef*>(dst);

我们不要掩饰你甚至明确地、故意地 承诺输出缓冲区大小准确的功能 10000000。你应该问问自己,当你 写了。

现在代码表达了意图,我们可以期待编译器诊断我们的错误。它当然会这样做,因为这就是 C++ 编译器所做的。

修复调用

让我们避开 net::streambuf 的混淆。您可以将字符串或向量用作动态缓冲区,同样如此。这可能并不总是那么有效,但让我们专注于可理解的代码:

std::vector<uint8_t> in,out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);

out.resize(1024); // make sure it's enough
const int err = mylib::inflate(
        in.data(),in.size(),out.data(),out.size());

看,现在你知道你正在传递什么。而且会没事的。

让我们避免打印输入数据(这是二进制乱码......)。

std::cout << "received. size:" << in_buffer.size() << std::endl;
std::cout << "deflated. error?" << err << " data: "
          << std::string(out.begin(),out.end())
          << std::endl;

此时,通胀仍然失败,但这是服务器的问题,请参阅 Boost inflate algorithm decompress - 其中还显示了缓冲区处理的一些替代方案。

固定代码

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>

namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace beast     = boost::beast;
namespace http      = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;

namespace mylib {
    int inflate(uint8_t const *src,int dstLen) {
        z_stream strm  {};
        strm.total_in  = strm.avail_in  = srcLen;
        strm.total_out = strm.avail_out = dstLen;
        strm.next_in = const_cast<Bytef*>(static_cast<Bytef const*>(src));
        strm.next_out = static_cast<Bytef*>(dst);

        strm.zalloc = Z_NULL;
        strm.zfree  = Z_NULL;
        strm.opaque = Z_NULL;

        int err = -1;
        int ret = -1;

        err = inflateInit2(&strm,(15 + 32)); //15 window bits,and the +32 tells zlib to to detect if using gzip or zlib
        if (err == Z_OK) {
            err = inflate(&strm,Z_FINISH);
            if (err == Z_STREAM_END) {
                ret = strm.total_out;
            }
            else {
                inflateEnd(&strm);
                return err;
            }
        }
        else {
            inflateEnd(&strm);
            return err;
        }

        inflateEnd(&strm);
        return ret;
    }
}

int main() {
    std::string host = "real.okex.com";
    auto const port  = "8443";
    auto const path  = "/ws/v3";

    net::io_context ioc;
    tcp::resolver resolver{ ioc };

    ssl::context ctx { ssl::context::sslv23 };
    ctx.set_verify_mode(ssl::verify_none);

    stream_t s{ ioc,ctx };
    net::connect(beast::get_lowest_layer(s),resolver.resolve(host,port));

    // SSL handshake
    s.next_layer().handshake(ssl::stream_base::client);

    {
        websocket::permessage_deflate opt;
        opt.client_enable = true; // for clients
        opt.server_enable = true; // for servers
        s.set_option(opt);
    }

    s.handshake(host + ":" + port,path);

    std::cerr << "connected." << std::endl;

    // send request to the websocket
    s.write(net::buffer("{'op':'subscribe','args':['spot/ticker:ETH-USDT']}"));

    {
        std::vector<uint8_t> in,out;
        auto in_buffer = net::dynamic_buffer(in);
        s.read(in_buffer);

        // std::cout.write(reinterpret_cast<char const*>(in.data()),in.size());

        out.resize(1024); // make sure it's enough
        const int err = mylib::inflate(
                in.data(),out.size());

        std::cerr << "received. size:" << in_buffer.size() << std::endl;
        //std::cerr << "received. data:" << std::string(in.begin(),in.end()) << std::endl;
        std::cerr << "deflated. error?" << err << " data: "
                  << std::string(out.begin(),out.end())
                  << std::endl;
    }
}
,

我有一个 older answer 来解决代码的问题。从那以后,我找到了解决剩余问题的方法:服务器响应。

解决方案

OKEX 不仅没有遵守 WS 标准来启用按消息通缩,而且还突然结束了数据。然而,事实证明,如果您保留部分膨胀的结果实际上是没问题的。

我使用它的方式是直接使用 ZLIB,而是使用 beast::zlib::inflate_stream。这有一个更灵活的接口,可以让我们得到我们需要的结果:

namespace mylib {
    auto inflate(std::vector<uint8_t> const& in,std::vector<uint8_t>& out) {
        boost::system::error_code ec;
        beast::zlib::z_params zp{};
        zp.next_in   = (Bytef*)in.data();
        zp.avail_in  = in.size();
        zp.next_out  = out.data();
        zp.avail_out = out.size();

        beast::zlib::inflate_stream zs;
        zs.write(zp,beast::zlib::Flush::full,ec);

        return ec;
    }
}

现在我们像这样使用它:

std::vector<uint8_t> in,out;
auto in_buffer = net::dynamic_buffer(in);
s.read(in_buffer);

out.resize(1024); // make sure it's enough
auto ec = mylib::inflate(in,out);

std::cout << "deflated. " << ec.message() << std::endl;
std::cout << std::string(out.begin(),out.end()) << std::endl;

然后打印

connected.
deflated. unexpected end of deflate stream
{"event":"error","message":"Unrecognized request: {'op':'subscribe','args':['spot/ticker:ETH-USDT']}\u0000","errorCode":30039}

因此,尽管 deflate 流意外结束,但数据是有效且完整的 JSON。

PS.2:参见newer answer,设法克服了最后的障碍

完整列表

Live On Compiler Exporer

#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <istream>
#include "zlib.h"
#include <iostream>

namespace net       = boost::asio;
namespace ssl       = net::ssl;
namespace beast     = boost::beast;
namespace http      = beast::http;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
using stream_t = websocket::stream<ssl::stream<tcp::socket>>;

namespace mylib {
    auto inflate(std::vector<uint8_t> const& in,ec);

        return ec;
    }
}

int main() {
    std::string host = "real.okex.com";
    auto const port  = "8443";
    auto const path  = "/ws/v3";

    net::io_context ioc;
    tcp::resolver resolver{ ioc };

    ssl::context ctx { ssl::context::sslv23 };
    ctx.set_verify_mode(ssl::verify_none);

    stream_t s{ ioc,path);

    std::cout << "connected." << std::endl;

    // send request to the websocket
    s.write(net::buffer("{'op':'subscribe',out;
        auto in_buffer = net::dynamic_buffer(in);
        s.read(in_buffer);

        out.resize(1024); // make sure it's enough
        auto ec = mylib::inflate(in,out);

        std::cout << "deflated. " << ec.message() << std::endl;
        std::cout << std::string(out.begin(),out.end()) << std::endl;
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?