Now only a sleep function is provided
Usage:
function test2_threadpool() local tp = Dll.MyTHdPool() local n =1 local function f() n = n+1 print(‘f ‘ .. n) if(n==50) then return end tp:sleep1(0,f) end f() tp:join() end
C codes:
#include "stdafx.h" #include <luabind.hpp> #include <vector> #include <queue> #include <boost/thread.hpp> using namespace luabind; #include "stdafx.h" #include <boost/asio.hpp> #include <boost/bind.hpp> #include <boost/thread/thread.hpp> #include <deque> class ThreadPool { boost::asio::io_service ioService; boost::thread_group threadpool; boost::asio::io_service::work work; public: ThreadPool() :work(ioService) { /* * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. */ //boost::asio::io_service::work work(ioService); /* * This will add 2 threads to the thread pool. (You Could just put it in a for loop) */ threadpool.create_thread( boost::bind(&boost::asio::io_service::run,&ioService) ); threadpool.create_thread( boost::bind(&boost::asio::io_service::run,&ioService) ); } ~ThreadPool() { } void post(boost::function<void()> f) { ioService.post(f); } void join() { threadpool.join_all(); } private: }; namespace bamthread { typedef std::unique_ptr<boost::asio::io_service::work> asio_worker; struct ThreadPool { ThreadPool(size_t threads) :service(),working(new asio_worker::element_type(service)) { while (threads--) { auto worker = boost::bind(&boost::asio::io_service::run,&(this->service)); g.add_thread(new boost::thread(worker)); } } template<class F> void post(F f){ service.post(f); } ~ThreadPool() { working.reset(); //allow run() to exit g.join_all(); service.stop(); } private: boost::asio::io_service service; //< the io_service we are wrapping asio_worker working; boost::thread_group g; //< need to keep track of threads so we can join them }; } void my_task() { Sleep(1000); printf("mytask"); } void test1() { bamthread::ThreadPool tp(3); tp.post(boost::bind(my_task)); //tp.join(); } void test() { /* * Create an asio::io_service and a thread_group (through pool in essence) */ boost::asio::io_service ioService; boost::thread_group threadpool; /* * This will start the ioService processing loop. All tasks * assigned with ioService.post() will start executing. */ boost::asio::io_service::work work(ioService); /* * This will add 2 threads to the thread pool. (You Could just put it in a for loop) */ threadpool.create_thread( boost::bind(&boost::asio::io_service::run,&ioService) ); threadpool.create_thread( boost::bind(&boost::asio::io_service::run,&ioService) ); /* * This will assign tasks to the thread pool. * More about boost::bind: "http://www.boost.org/doc/libs/1_54_0/libs/bind/bind.html#with_functions" */ ioService.post(boost::bind(my_task)); /* * This will stop the ioService processing loop. Any tasks * you add behind this point will not execute. */ ioService.stop(); /* * Will wait till all the threads in the thread pool are finished with * their assigned tasks and ‘join‘ them. Just assume the threads inside * the threadpool will be destroyed by this method. */ threadpool.join_all(); } template <typename T> class queue { private: boost::mutex d_mutex; boost::condition_variable d_condition; std::deque<T> d_queue; public: void push(T const& value) { { boost::unique_lock<boost::mutex> lock(this->d_mutex); d_queue.push_front(value); } this->d_condition.notify_one(); } T pop() { boost::unique_lock<boost::mutex> lock(this->d_mutex); this->d_condition.wait(lock,[=]{ return !this->d_queue.empty(); }); T rc(std::move(this->d_queue.back())); this->d_queue.pop_back(); return rc; } }; class MyTHdPool { bamthread::ThreadPool tp; boost::mutex m; std::map<int,boost::function<void()> > f2s; // key: taskid,value: post processing //boost::thread t_; queue<int> q_; int taskid_; public: MyTHdPool() :tp(3),taskid_(0){} ~MyTHdPool(){ join(); } void Call(boost::function<void()> f1,boost::function<void()> f2) { int taskid = taskid_++; printf("begin call task %d\n",taskid); boost::function<void()> f = [=]() mutable { f1(); q_.push(taskid); printf("done task %d\n",taskid); }; { boost::lock_guard<boost::mutex> lock(m); f2s[taskid] = (f2); } tp.post(f); printf("end post task %d\n",taskid); } void join() { while (true) { boost::function<void()> f2; int taskid = 0; { { boost::lock_guard<boost::mutex> lock(m); if (f2s.empty()) return; } printf("start pop a task from queue\n"); int taskid = q_.pop(); printf("got a task %d from queue\n",taskid); { boost::lock_guard<boost::mutex> lock(m); auto it = f2s.find(taskid); assert(it != f2s.end()); f2 = it->second; f2s.erase(it); } } printf("exec task post ftn %d\n",taskid); f2(); } } void sleep1(double n,object f2) { Call([n](){Sleep(n * 1000); },[f2,this]() mutable { f2(); }); } void sleep2(double n) { Call([n](){Sleep(n * 1000); },[](){}); } private: }; void callback(object o) { printf("before callback\n"); o(); printf("after callback\n"); } int luaopen_Dll(lua_State* L) { luaL_openlibs(L); open(L); // define a module in _G["t"] module(L,"Dll")[ class_<MyTHdPool>("MyTHdPool") .def(constructor<>()) .def("sleep1",&MyTHdPool::sleep1) .def("sleep2",&MyTHdPool::sleep2) .def("join",&MyTHdPool::join),def("test1",&test1),def("callback",&callback) ]; // push _G["t"] to stack lua_getglobal(L,"Dll"); // set _G["t"]=nil lua_pushnil(L); lua_setglobal(L,"Dll"); return 1; }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。