微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使用 boost-asio

如何解决使用 boost-asio

我有一个由 boost.asio 编写的服务器。该服务器从客户端获取文件并将其写入磁盘。我只是有一个问题。当服务器获取文件时,它在完全接收文件时将其写入磁盘。我希望服务器以实时方式将缓冲区写入磁盘。例如,服务器每 100kb 大小的从客户端获取文件就会写入磁盘。我已经编写了以下代码,但我不知道如何编辑才能达到这个目标。

void Session::DoReadFileContent(size_t arg_bytes_transferred)
{
    if (arg_bytes_transferred > 0)
    {
        m_outputFile.write(m_buffer.data(),static_cast<std::streamsize>(arg_bytes_transferred));

        if (m_outputFile.tellp() >= static_cast<std::streamsize>(m_fileSize))
        {
            std::cout << "Received file: " << m_fileName << std::endl;
            return;
        }
    }

    auto self = shared_from_this();

    m_socket.async_read_some(boost::asio::buffer(m_buffer.data(),m_buffer.size()),[this,self](boost::system::error_code arg_error_code,size_t arg_bytes)
        {
            DoReadFileContent(arg_bytes);
        });
}

解决方法

首先,在这种情况下,最好读取明确的数据大小,而不是读取任何可用数据的 read_some

在这种模式下,跟踪“剩余可接收字节数”比m_fileSize更容易。

这里有一些小的改组,使您的代码成为一个独立的示例。它期望服务器发送一行文本,给出有效负载大小和输出文件名,然后是该文件的内容。可以使用 netcat 运行示例服务器,例如:

(stat -c '%soutput.dat' main.cpp; cat main.cpp) | netcat -l -p 6969

Live On Coliru

#include <boost/asio.hpp>
#include <fstream>
#include <iostream>

using boost::system::error_code;
using boost::asio::ip::tcp;

struct Session : std::enable_shared_from_this<Session> {

    Session(boost::asio::io_context& io,uint16_t port)
     : m_socket(io) 
    {
        m_socket.connect({{},port});
    }

    void Start();
    void DoReadFileContent(size_t transferred = 0);

  private:
    std::array<char,1024> m_buffer;
    std::streamsize m_remainingSize = 0;
    std::string     m_fileName      = "noname.dat";
    std::ofstream   m_outputFile;

    tcp::socket m_socket;
};

void Session::Start() {
    // Reading a size (in text for simplicity) and subsequently receive as many bytes
    //
    // I'm keeping this sync for simplicity,because you probably already have
    // this coded somehwere
    boost::asio::streambuf buf;
    error_code ec;
    auto n = read_until(m_socket,buf,"\n",ec);

    std::istream is(&buf);
    if (is >> m_remainingSize && getline(is,m_fileName)) {
        std::cerr << "Protocol trace: n:" << n << ",fileName:" << m_fileName << " payload_size:" << m_remainingSize << "\n";

        m_outputFile.exceptions(std::ios::failbit | std::ios::badbit);
        m_outputFile.open(m_fileName,std::ios::binary);

        // write excess buffer contents as part of payload
        if (buf.size()) {
            std::cerr << "Writing " << buf.size() << " bytes\n";
            m_remainingSize -= buf.size();
            m_outputFile << &buf;
        }

        DoReadFileContent();
    } else {
        std::cerr << "Protocol error,payload_size expected\n";
    }
}
void Session::DoReadFileContent(size_t transferred) {
    if (transferred > 0) {
        std::cerr << "Writing " << transferred << " bytes\n";
        m_remainingSize -= transferred;
        m_outputFile.write(m_buffer.data(),transferred);
    }
    if (m_remainingSize <= 0) {
        std::cout << "Completed file: " << m_fileName << std::endl;
        return;
    }

    auto self = shared_from_this();
    auto expect = std::min(size_t(m_remainingSize),m_buffer.size());
    std::cout << "Trying to receive next " << expect << " bytes" << std::endl;
    async_read(m_socket,boost::asio::buffer(m_buffer.data(),expect),[this,self](error_code ec,size_t arg_bytes) {
            std::cerr << "async_read: " << ec.message() << " - " << arg_bytes << " bytes\n";
            if (!ec) {
                DoReadFileContent(arg_bytes);
            }
        });
}

int main() {
    boost::asio::io_context io;

    std::make_shared<Session>(io,6868) // download from port 6868
        ->Start();

    io.run(); // complete
}

测试

(stat -c '%soutput.dat' main.cpp; cat main.cpp) | netcat -l -p 6868&
./a.out
md5sum main.cpp output.dat

打印,例如:

Protocol trace: n:15,fileName:output.dat payload_size:2654
Trying to receive next 1024 bytes
async_read: Success - 1024 bytes
Writing 1024 bytes
Trying to receive next 1024 bytes
async_read: Success - 1024 bytes
Writing 1024 bytes
Trying to receive next 606 bytes
async_read: Success - 606 bytes
Writing 606 bytes
Completed file: output.dat

最后两行

b4eec7203f6a1dcbfbf3d298c7ec0832  main.cpp
b4eec7203f6a1dcbfbf3d298c7ec0832  output.dat

表示接收到的文件与原始文件相同。

注意事项:

  • 数据包在我的系统上以未指定的大小传送,例如接收到的文件相同:

     Protocol trace: n:15,fileName:output.dat payload_size:2654
     Writing 497 bytes
     Trying to receive next 1024 bytes
     async_read: Success - 1024 bytes
     Writing 1024 bytes
     Trying to receive next 1024 bytes
     async_read: Success - 1024 bytes
     Writing 1024 bytes
     Trying to receive next 109 bytes
     async_read: Success - 109 bytes
     Writing 109 bytes
     Completed file: output.dat
     b4eec7203f6a1dcbfbf3d298c7ec0832  main.cpp
     b4eec7203f6a1dcbfbf3d298c7ec0832  output.dat
    

    请注意,它以来自 read_until 的输入缓冲区中已经的 497 个字节开始。

  • 协议不安全:
    • 应该验证文件名。想象一下,如果文件是“/home/sehe/myimportant_file.txt”或更糟,比如说 /dev/sde1 并且我们有权限进行原始块设备访问会发生什么......
    • 您可能希望为 streambuf 指定一个最大大小,这样如果您得到一个永远不会发送 '\n' 的模糊器,您就不会只是吞噬所有 RAM
  • 文件 IO 的错误处理非常粗糙。我使用了 io 异常,但您可能想在不同的地方检查 m_outputFile.good()

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?