如何解决如何使用Beast Boost :: asio维护所有http客户端线程的单个会话并执行尽可能多的写入和读取操作?
带有Boost:asio的Http客户端可同时推送数据
我想尽快连接RestAPI服务器并发布数据。我发现Boost :: asio提供了这种功能。通过使用示例代码,我构建了一些东西,但是随着1000多个请求,它开始分解。多个读取命令失败。
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/thread/thread.hpp>
#include <boost/asio/strand.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <chrono>
namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp>
namespace net = boost::asio; // from <boost/asio.hpp>
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
void fail(beast::error_code ec,char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
// Performs an HTTP GET and prints the response
class session : public std::enable_shared_from_this<session>
{
tcp::resolver resolver_;
beast::tcp_stream stream_;
beast::flat_buffer buffer_; // (Must persist between reads)
http::request<http::empty_body> req_;
http::response<http::string_body> res_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
public:
// Objects are constructed with a strand to
// ensure that handlers do not execute concurrently.
explicit
session(std::shared_ptr< boost::asio::io_context >ioc)
: resolver_(net::make_strand (*ioc)),stream_(net::make_strand(*ioc)),strand_(ioc->get_executor())
{
}
// Start the asynchronous operation
void run( char const* host,char const* port,char const* target,int version)
{
// Set up an HTTP GET request message
req_.version(version);
req_.method(http::verb::get);
req_.target(target);
req_.set(http::field::host,host);
req_.set(http::field::user_agent,BOOST_BEAST_VERSION_STRING);
// Look up the domain name
resolver_.async_resolve(
host,port,beast::bind_front_handler(
&session::on_resolve,shared_from_this()));
}
void on_resolve(beast::error_code ec,tcp::resolver::results_type results)
{
if (ec)
return fail(ec,"resolve");
// Set a timeout on the operation
stream_.expires_after(std::chrono::seconds(30));
// Make the connection on the IP address we get from a lookup
stream_.async_connect(
results,beast::bind_front_handler(
&session::on_connect,shared_from_this()));
}
void on_connect(beast::error_code ec,tcp::resolver::results_type::endpoint_type)
{
if (ec)
return fail(ec,"connect");
// Set a timeout on the operation
stream_.expires_after(std::chrono::seconds(60));
// Send the HTTP request to the remote host
http::async_write(stream_,req_,boost::asio::bind_executor(
strand_,beast::bind_front_handler(
&session::on_write,shared_from_this())));
}
void on_write(beast::error_code ec,std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if (ec)
return fail(ec,"write");
// Receive the HTTP response
http::async_read(stream_,buffer_,res_,beast::bind_front_handler(
&session::on_read,shared_from_this())));
}
void on_read(beast::error_code ec,"read");
// Write the message to standard out
//std::cout << res_ << std::endl;
// Gracefully close the socket
stream_.socket().shutdown(tcp::socket::shutdown_both,ec);
// not_connected happens sometimes so don't bother reporting it.
if (ec && ec != beast::errc::not_connected)
return fail(ec,"shutdown");
// If we get here then the connection is closed gracefully
}
};
class CHttpClient
{
private:
boost::thread_group worker_threads;
void Post()
{
auto client = std::make_shared<session>(ioc);
client->run(m_sHost.c_str(),m_sPort.c_str(),m_sTarget.c_str(),11);
}
protected:
std::string m_sHost;
std::string m_sPort;
std::string m_sTarget;
const int VERSION = 11;
std::shared_ptr< boost::asio::io_context > ioc;
std::shared_ptr< boost::asio::io_context::work > work;
std::shared_ptr<session> client;
public:
CHttpClient(std::string sHost,std::string sPort,std::string sTarget) :
m_sHost(sHost),m_sPort(sPort),m_sTarget(sTarget)
{
ioc = std::make_shared<boost::asio::io_context>();
work = std::make_shared<boost::asio::io_context::work>(*ioc);
//client = std::make_shared<session>(ioc);
}
void Initialize(const int nThreadCount = 1)
{
for (int x = 0; x < nThreadCount; ++x)
{
std::function<void()> fun = [this]() { this->ioc->run(); };
worker_threads.create_thread(
//boost::bind(&CHttpClient::WorkerThread,this)
boost::bind(fun)
);
}
// add the main thread into the pool as well
//this->ioc->run();
}
void PostRequest(const std::string& sTarget,const std::string& sVerb)
{
ioc->post(boost::bind(&CHttpClient::Post,this));
}
void Stop()
{
work.reset();
worker_threads.join_all();
ioc->stop();
}
};
int main()
{
auto const host = "localhost";
auto const port = "32773";
auto const target = "/api/DeviceItems"; ;
int version = 11;
CHttpClient objHttpClient(host,target);
objHttpClient.Initialize(8);
auto t1 = std::chrono::high_resolution_clock::now();
for (int ii = 0; ii < 3000; ++ii)
objHttpClient.PostRequest(target,"get");
objHttpClient.Stop();
auto t2 = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count();
std::cout << duration << " milliseconds";
system("PAUSE");
return EXIT_SUCCESS;
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。