如何解决如何在多线程中使用 node-addon-api 保存回调以备后用
我尝试在多线程模式下调用回调,但应用程序崩溃了。我想知道如何在多线程环境中正确调用回调。其中node_event_deliver接口是在另一个线程中调用的。
我的系统是macos,发现在macos下不能使用ThreadSafeFunction,怎么办。
系统完整性保护:已启用
Crashed Thread: 0 CrbrowserMain dispatch queue: com.apple.main-thread
Exception Type: EXC_BAD_ACCESS (SIGSEGV)
Exception Codes: KERN_INVALID_ADDRESS at 0x0000000008257065
Exception Note: EXC_CORPSE_NOTIFY
Termination Signal: Segmentation fault: 11
Termination Reason: Namespace SIGNAL,Code 0xb
Terminating Process: exc handler [98763]
VM Regions Near 0x8257065:
-->
__TEXT 102f57000-102f7f000 [ 160K] r-x/r-x SM=COW /Users/*/移动办公.app/Contents/MacOS/移动办公
Thread 0 Crashed:: CrbrowserMain dispatch queue: com.apple.main-thread
0 com.github.Electron.framework 0x0000000109bbecb6 napi_is_detached_arraybuffer + 502
1 com.github.Electron.framework 0x0000000109bb6deb napi_create_function + 1035
2 com.github.Electron.framework 0x00000001078863ad v8::internal::ClassScope::ResolvePrivateNamesPartially() + 14509
3 com.github.Electron.framework 0x00000001078860a2 v8::internal::ClassScope::ResolvePrivateNamesPartially() + 13730
4 com.github.Electron.framework 0x00000001078856a1 v8::internal::ClassScope::ResolvePrivateNamesPartially() + 11169
5 com.github.Electron.framework 0x0000000107e65f78 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 484488
6 com.github.Electron.framework 0x0000000107df7ff8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 34056
7 com.github.Electron.framework 0x0000000107df7ff8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 34056
8 com.github.Electron.framework 0x0000000107df0458 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 2408
9 com.github.Electron.framework 0x0000000107df7ff8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 34056
10 com.github.Electron.framework 0x0000000107df0458 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 2408
11 com.github.Electron.framework 0x0000000107df7ff8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 34056
12 com.github.Electron.framework 0x0000000107df7ff8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 34056
13 com.github.Electron.framework 0x0000000107df0458 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 2408
14 com.github.Electron.framework 0x0000000107df5b1b v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 24619
15 com.github.Electron.framework 0x0000000107df58f8 v8::internal::SetupIsolateDelegate::SetupHeap(v8::internal::HeaP*) + 24072
16 com.github.Electron.framework 0x00000001078d74a1 v8::internal::Execution::Call(v8::internal::Isolate*,v8::internal::Handle<v8::internal::Object>,int,v8::internal::Handle<v8::internal::Object>*) + 897
17 com.github.Electron.framework 0x00000001078d71ba v8::internal::Execution::Call(v8::internal::Isolate*,v8::internal::Handle<v8::internal::Object>*) + 154
18 com.github.Electron.framework 0x000000010786338c v8::Function::Call(v8::Local<v8::Context>,v8::Local<v8::Value>,v8::Local<v8::Value>*) + 332
19 com.github.Electron.framework 0x0000000109b6c1f6 node::InternalCallbackScope::Close() + 1734
20 com.github.Electron.framework 0x0000000109b6c465 node::MakeCallback(v8::Isolate*,v8::Local<v8::Object>,v8::Local<v8::Function>,v8::Local<v8::Value>*,node::async_context) + 181
21 com.github.Electron.framework 0x00000001073b908c ElectronInitializeIcuandStartNode + 1290636
//
// node_v6_api.h
// rcsnodejs
//
// Created by liuchao on 16/11/14.
// copyright © 2016年 rcsnodejs. All rights reserved.
//
#ifndef node_v6_api_h
#define node_v6_api_h
#include "v6_api.h"
#include <map>
#include "uv.h"
#include <queue>
#include <mutex>
#include "ClipboardHelper.h"
#include "triesearch_cpp.hpp"
#include "utils/md5/md5.h"
#include <napi.h>
using namespace trie;
// CALL_NODE_USE_ASYNC_QUEUE 用户定义SDK线程和NodeJS线程交互的两种工作模式开关
// 1. 异步:使用Queue执行
// 2. 同步:使用condition_variable通知执行 (关闭宏定义,默认)
//
// 前提: NodeJS 的V8线程使用libuv的main_loop执行,也只能由main_loop执行,不允许其他线程访问V8实例
// 同时,libuv 唯一线程安全的API为uv_async_send,是一个类似于我们lua stack interrupter的唤醒机制
//
// 那么线程交互自然也就有两种: 同步和异步两种
//
// 问题: LuaSDK 部分业务如消息的listener callback 后会更新本地的sync_version,使用异步可能会造成消息还在队列
// LuaSDK 更新sync_version后进程退出,造成客户端丢失消息
//
// 所以: `默认`必须采用同步模式
//
// 同步模式优势:
// - Mac下测试:消息发送->回调->消息发送->回调 cpu 102% 速度:1.7W/s; 性能够用
// - 回调可以返回Javascript的返回值
// - 同步执行,保证强时序
//
// 异步模式优势:
// - SDK执行不受主线程执行的干扰
// - 更强的吞吐量
#define CALL_NODE_USE_ASYNC_QUEUE
typedef uint64_t (*p_v6_getTickCount)();
typedef int(*pInitSdk)(int a,v6_logger_func b,v6_node_callback c,const char* d,const char* e,const char* f,const char* g,int h,const char* i);
typedef int(*p_v6_sdk_exec)(int cid,char * bizMethod,char *jsonParamStr,void* response_callback);
typedef char* (*p_v6_queryRegPath)(const char* szPath,const char* szKey,bool bA2u);
typedef void (*p_createUuid2)(uuidString buf);
struct EventItem {
const char *event;
char *data;
int cid;
};
void node_event_deliver(const char* event,const char* json,int cid);
class RcsSdk : public Napi::ObjectWrap<RcsSdk> {
public:
static Napi::Object Init(Napi::Env env,Napi::Object exports);
RcsSdk(const Napi::CallbackInfo& info);
void addCallback(int cid,Napi::Function callback);
int getCid()
{
return cid++;
}
int getinitV6sdkRst()
{
return m_initV6sdkRst;
}
void notifyMainLoop();
void destoryObject();
#ifdef CALL_NODE_USE_ASYNC_QUEUE
std::queue<EventItem*> asyncEventQueue;
#else
void waitForEventProcess(EventItem* item);
void notifyEventComplete();
EventItem* syncEventItem;
char* syncEventResult;
#endif
Napi::FunctionReference listener_callback;
std::mutex eventSyncMutex;
p_v6_getTickCount v6_getTickCount;
p_v6_sdk_exec v6_exec;
p_v6_queryRegPath v6_queryRegKey;
p_createUuid2 v6_createUuid2;
std::string m_proxyInfo;
std::map<int,Napi::FunctionReference> callbacks;
private:
void sdk_call_method(const Napi::CallbackInfo& info,const char* bizMethod);
Napi::Value bind(const Napi::CallbackInfo& info);
Napi::Value isValid(const Napi::CallbackInfo& info);
Napi::Value getsmscode(const Napi::CallbackInfo& info);
Napi::Value doLogin(const Napi::CallbackInfo& info);
Napi::Value setProxyInfo(const Napi::CallbackInfo& info);
Napi::Value createQrCode(const Napi::CallbackInfo& info);
Napi::Value checkExclusive(const Napi::CallbackInfo& info);
std::condition_variable eventCondition;
uv_async_t uv_async;
int cid;
int m_initV6sdkRst;
std::string m_cachePath;
// searchtrees_manager m_treeMgr;
// cclipboardHelper m_clipBoardHelper;
#ifndef _WIN32
// imageHelper_mac m_imageHelper_mac;
#endif
// CipcstubMgr m_ipcstubMgr;
};
#endif /* node_v6_api_h */
//
// node_v6_api.c
// rcsnodejs
//
// Created by liuchao on 16/11/14.
// copyright © 2016年 rcsnodejs. All rights reserved.
//
#include "node_v6_api.h"
#include <stdio.h>
#include "uv.h"
#include <mutex>
// 所有的 Callbacks & Listeners,先放到RcsSDK实例字段上等唤醒的主线程来获取执行
// 线程安全:sdkMap、asyncEventQueue/syncEventItem 字段会被SDK线程和主线程同时访问,使用mutex锁
//typedef void(*v6_logger_func)(int level,const char *msg);
//typedef void(*v6_node_callback)(const char *action,const char *jsonStr);//call back to NodeJS proxy.
RcsSdk* g_rcsSdkV6 = NULL;
static std::mutex globalMutex;
int cid11 = 2;
void node_event_deliver(const char* event,int cid){
//LuaSdk 后台线程
LOGDEBUG("[<<V8 ENGINE>>]:node_event_deliver begin,event:%s json:%s,cid:%d\n",event,json,cid);
if(NULL == g_rcsSdkV6)
{
LOGDEBUG("[<<V8 ENGINE>>]:node_event_deliver sdk NULL\n");
}
#ifdef CALL_NODE_USE_ASYNC_QUEUE
// 异步执行
EventItem *item = (EventItem*)malloc(sizeof(EventItem));
assert(item);
item->data = (char*)malloc(strlen(json)+1);
assert(item->data);
item->event = event;
item->cid = cid;
strcpy(item->data,json);
g_rcsSdkV6->eventSyncMutex.lock();
g_rcsSdkV6->asyncEventQueue.push(item);
g_rcsSdkV6->eventSyncMutex.unlock();
g_rcsSdkV6->notifyMainLoop();
#else
// 同步执行
char* result = NULL;
EventItem item;
item.event = event;
item.data = (char *)json;
g_rcsSdkV6->waitForEventProcess(&item);
g_rcsSdkV6->eventSyncMutex.lock();
result = g_rcsSdkV6->syncEventResult;
g_rcsSdkV6->eventSyncMutex.unlock();
#endif
//LOGDEBUG("node_event_deliver end,event:%s json:%s\n",json);
}
static void node_do_event_process(RcsSdk* sdk,EventItem *item){
LOGDEBUG("enter here.");
if(!std::strcmp(item->event,"callback"))
{
int cid = item->cid;
if(NULL == g_rcsSdkV6->callbacks[cid])
{
LOGDEBUG("[<<V8 ENGINE>>]:callback is null,cid is %d",cid);
}
else
{
LOGDEBUG("[<<V8 ENGINE>>]:callback is null,cid);
g_rcsSdkV6->callbacks[cid].Call({
Napi::String::New(g_rcsSdkV6->callbacks[cid].Env(),item->data)});
}
}
else
{
if ( NULL == g_rcsSdkV6->listener_callback)
{
LOGDEBUG("[<<V8 ENGINE>>]:listener_callback is null");
}
else
{
g_rcsSdkV6->listener_callback.Call({ Napi::String::New(g_rcsSdkV6->listener_callback.Env(),item->event),Napi::String::New(g_rcsSdkV6->listener_callback.Env(),item->data)});
}
}
//LOGDEBUG("[<<V8 ENGINE>>]:node_do_event_process:%s %s\n",item->event,item->data);
return;
}
// SDK线程调用 uv_async_send 后,主线程被唤醒后执行
// uv_async_send 是libuv唯一线程安全的API,但不保证每次调用都通知到,只是能确保至少一次的唤醒主线程
// 参见: http://docs.libuv.org/en/v1.x/async.html
// int uv_async_send(uv_async_t* async)
// Wakeup the event loop and call the async handle’s callback.
//
// Note It’s safe to call this function from any thread. The callback will be
// called on the loop thread.
// Warning libuv will coalesce calls to uv_async_send(),that is,not every call to
// it will yield an execution of the callback. For example: if uv_async_send() is
// called 5 times in a row before the callback is called,the callback will only be
// called once. If uv_async_send() is called again after the callback was called,it
// will be called again.
static void node_event_process(uv_async_t *handle){
//nodejs 主线程
LOGDEBUG("[<<V8 ENGINE>>]:node_event_process begin");
// Nan::HandleScope scope;
RcsSdk *sdk = (RcsSdk*)handle->data;
#ifdef CALL_NODE_USE_ASYNC_QUEUE
while(true){
EventItem *item = NULL;
sdk->eventSyncMutex.lock();
if(!sdk->asyncEventQueue.empty()){
item = sdk->asyncEventQueue.front();
sdk->asyncEventQueue.pop();
}
sdk->eventSyncMutex.unlock();
if(NULL == item){
break;
}
node_do_event_process(sdk,item);
free(item->data);
free(item);
}
#else
sdk->eventSyncMutex.lock();
EventItem *item = sdk->syncEventItem;
sdk->syncEventItem = NULL;
sdk->eventSyncMutex.unlock();
if(NULL == item)
{
LOGERROR("[<<V8 ENGINE>>]:node_event_process sync syncEventItem=NULL");
}
else
{
v8::Local<v8::Value> ret = node_do_event_process(sdk,item);
char *result = NULL;
// 暂只支持返回String类型(必须在主线程解析v8对象)
if(!ret.IsEmpty() && ret->Isstring()){
result = *Nan::Utf8String(v8::Local<v8::String>::Cast(ret));
}
// 此处分配内存,由下次调用前释放,最多保留上一次的结果占用的内存
if(result != NULL){
char* cpy = (char*)malloc(strlen(result)+1);
assert(cpy);
strcpy(cpy,result);
result = cpy;
}
sdk->eventSyncMutex.lock();
sdk->syncEventResult = result;
sdk->eventSyncMutex.unlock();
}
sdk->notifyEventComplete();
#endif
// LOGDEBUG("[<<V8 ENGINE>>]:node_event_process end");
}
Napi::Object RcsSdk::Init(Napi::Env env,Napi::Object exports) {
Napi::Function func =
DefineClass(env,"RcsSdk",{ InstanceMethod("bind",&RcsSdk::bind),InstanceMethod("isValid",&RcsSdk::isValid),InstanceMethod("getsmscode",&RcsSdk::getsmscode),InstanceMethod("doLogin",&RcsSdk::doLogin),InstanceMethod("createQrCode",&RcsSdk::createQrCode),InstanceMethod("checkExclusive",&RcsSdk::checkExclusive),InstanceMethod("setProxyInfo",&RcsSdk::setProxyInfo)});
Napi::String name = Napi::String::New(env,"RcsSdk");
exports.Set(name,func);
return exports;
}
RcsSdk::RcsSdk(const Napi::CallbackInfo& info)
: Napi::ObjectWrap<RcsSdk>(info)
#ifndef CALL_NODE_USE_ASYNC_QUEUE
:syncEventItem(NULL),syncEventResult(NULL),m_initV6sdkRst(-1)
#endif
{
cid = 1;
// LOGDEBUG("[<<V8 ENGINE>>]:begin initSdk: level:%d scriptPath:%s productversion:%s mode:%d storage:%s portraitpath:%s",log_level,scriptPath,productversion,mode,storagePath,portraitPath);
Napi::Number arg1 = info[0].As<Napi::Number>();
Napi::String arg2 = info[1].As<Napi::String>();
Napi::String arg3 = info[2].As<Napi::String>();
Napi::Number arg4 = info[3].As<Napi::Number>();
Napi::String arg5 = info[4].As<Napi::String>();
Napi::String arg6 = info[5].As<Napi::String>();
Napi::String arg7 = info[6].As<Napi::String>();
int level = arg1.Int32Value();
auto path = arg2.As<Napi::String>().Utf8Value();
auto productversion = arg3.Utf8Value();
int mode = arg4.Int32Value();
auto sdkLogPath = arg5.Utf8Value();
auto storagePath = arg6.Utf8Value();
auto proxyInfo = arg7.Utf8Value();
v6_getTickCount = v6_getTickCount2;
v6_exec = v6_execute_from_Node;
v6_createUuid2 = v6_createUuid;
m_initV6sdkRst = v6_init_sdk2(level,NULL,node_event_deliver,path.c_str(),storagePath.c_str(),sdkLogPath.c_str(),productversion.c_str(),proxyInfo.c_str());
// 每个State上有且只有一个用于中断主线程的 uv_async_t
uv_async_init(uv_default_loop(),&uv_async,node_event_process);
uv_async.data = this;
globalMutex.lock();
g_rcsSdkV6 = this;
globalMutex.unlock();
}
void RcsSdk::addCallback(int cid,Napi::Function callback){
callbacks[cid] = Napi::Reference<Napi::Function>::New(callback,1);;
callbacks[cid].SuppressDestruct();
}
void RcsSdk::notifyMainLoop(){
uv_async_send(&uv_async);
}
void RcsSdk::destoryObject()
{
g_rcsSdkV6 = NULL;
}
#ifndef CALL_NODE_USE_ASYNC_QUEUE
void RcsSdk::waitForEventProcess(EventItem* item){
std::unique_lock<std::mutex> lock(eventSyncMutex);
syncEventItem = item;
if(syncEventResult != NULL){
free(syncEventResult);
syncEventResult = NULL;
}
notifyMainLoop();
eventCondition.wait(lock);
}
void RcsSdk::notifyEventComplete(){
std::unique_lock<std::mutex> lock(eventSyncMutex);
eventCondition.notify_one();
}
#endif
Napi::Value RcsSdk::bind(const Napi::CallbackInfo& info) {
Napi::Function callback = info[0].As<Napi::Function>();
g_rcsSdkV6->listener_callback = Napi::Reference<Napi::Function>::New(callback,1);
g_rcsSdkV6->listener_callback.SuppressDestruct();
return Napi::Number::New(info.Env(),0);
}
Napi::Value RcsSdk::isValid(const Napi::CallbackInfo& info) {
return Napi::Number::New(info.Env(),0);
}
#define ADD_CALLBACK(__index) \
if(info[__index].IsFunction() && cid > 0){ \
Napi::Function callback = info[__index].As<Napi::Function>();\
addCallback(cid,callback); \
}
void RcsSdk::sdk_call_method(const Napi::CallbackInfo& info,const char* bizMethod) {
if (info.Length() == 1) {
if (!info[0].IsFunction()) {
LOGERROR("sdk_call_method: %s Wrong arguments",bizMethod);
Napi::TypeError::New(info.Env(),"Wrong arguments").ThrowAsJavaScriptException();
return;
}
int cid = getCid();
cid = v6_exec(cid,(char *)bizMethod,NULL);
ADD_CALLBACK(0);
return;
} else if (info.Length() == 2) {
if (!info[0].Isstring() || !info[1].IsFunction()) {
LOGERROR("sdk_call_method: %s Wrong arguments",bizMethod);
Napi::TypeError::New(info.Env(),"Wrong arguments").ThrowAsJavaScriptException();
return;
}
Napi::String json = info[0].As<Napi::String>();
int cid = getCid();
cid = v6_exec(cid,(char *)json.Utf8Value().c_str(),NULL);
ADD_CALLBACK(1);
return;
}
else {
Napi::TypeError::New(info.Env(),"Wrong arguments").ThrowAsJavaScriptException();
return;
}
}
Napi::Value RcsSdk::getsmscode(const Napi::CallbackInfo& info){
sdk_call_method(info,"getSmsCode");
}
Napi::Value RcsSdk::doLogin(const Napi::CallbackInfo& info){
sdk_call_method(info,"doLogin");
}
Napi::Value RcsSdk::setProxyInfo(const Napi::CallbackInfo& info){
sdk_call_method(info,"setProxyInfo");
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。