微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Node.Js中实现端口重用原理详解

本文介绍了Node.Js中实现端口重用原理详解,分享给大家,具体如下:

起源,从官方实例中看多进程共用端口

rush:js;"> const cluster = require('cluster'); const http = require('http'); const numcpus = require('os').cpus().length;

if (cluster.isMaster) {
console.log(Master ${process.pid} is running);

for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}

cluster.on('exit',(worker,code,signal) => {
console.log(worker ${worker.process.pid} died);
});
} else {
http.createServer((req,res) => {
res.writeHead(200);
res.end('hello world\n');
}).listen(8000);

console.log(Worker ${process.pid} started);
}

执行结果:

$ node server.js Master 3596 is running Worker 4324 started Worker 4520 started Worker 6056 started Worker 5644 started

了解http.js模块:

我们都只有要创建一个http服务,必须引用http模块,http模块最终会调用net.js实现网络服务

rush:js;"> // lib/net.js 'use strict';

...
Server.prototype.listen = function(...args) {
...
if (options instanceof TCP) {
this._handle = options;
this[async_id_symbol] = this._handle.getAsyncId();
listenInCluster(this,null,-1,backlogFromArgs); // 注意这个方法调用了cluster模式下的处理办法
return this;
}
...
};

function listenInCluster(server,address,port,addresstype,backlog,fd,exclusive) {
// 如果是master 进程或者没有开启cluster模式直接启动listen
if (cluster.isMaster || exclusive) {
//_listen2,细心的人一定会发现为什么是listen2而不直接使用listen
// _listen2 包裹了listen方法,如果是Worker进程,会调用被hack后的listen方法,从而避免出错端口被占用的错误
server._listen2(address,fd);
return;
}
const serverQuery = {
address: address,port: port,addresstype: addresstype,fd: fd,flags: 0
};

// 是fork 出来的进程,获取master上的handel,并且监听,
// 现在是不是很好奇_getServer方法做了什么
cluster._getServer(server,serverQuery,listenOnMasterHandle);
}
...

答案很快就可以通过cluster._getServer 这个函数找到

  1. 代理了server._listen2 这个方法在work进程的执行操作
  2. 向master发送queryServer消息,向master注册一个内部TCP服务器
message.address = address;

// 发送queryServer消息给master进程,master 在收到这个消息后,会创建一个开始一个server,并且listen
send(message,(reply,handle) => {
rr(reply,indexesKey,cb); // Round-robin.
});

obj.once('listening',() => {
cluster.worker.state = 'listening';
const address = obj.address();
message.act = 'listening';
message.port = address && address.port || options.port;
send(message);
});
};
//...
// Round-robin. Master distributes handles across workers.
function rr(message,cb) {
if (message.errno) return cb(message.errno,null);
var key = message.key;
// 这里hack 了listen方法
// 子进程调用的listen方法,就是这个,直接返回0,所以不会报端口被占用的错误
function listen(backlog) {
return 0;
}
// ...
const handle = { close,listen,ref: noop,unref: noop };
handles[key] = handle;
// 这个cb 函数是net.js 中的listenOnMasterHandle 方法
cb(0,handle);
}
// lib/net.js
/
function listenOnMasterHandle(err,handle) {
err = checkBindError(err,handle);
server._handle = handle;
// _listen2 函数中,调用的handle.listen方法,也就是上面被hack的listen
server._listen2(address,fd);
}
/

master进程收到queryServer消息后进行启动服务

  1. 如果地址没被监听过,通过RoundRobinHandle监听开启服务
  2. 如果地址已经被监听,直接绑定handel到已经监听到服务上,去消费请求
const args = [
message.address,message.port,message.addressType,message.fd,message.index
];

const key = args.join(':');
var handle = handles[key];

// 如果地址没被监听过,通过RoundRobinHandle监听开启服务
if (handle === undefined) {
var constructor = RoundRobinHandle;
if (schedulingPolicy !== SCHED_RR ||
message.addressType === 'udp4' ||
message.addressType === 'udp6') {
constructor = SharedHandle;
}

handles[key] = handle = new constructor(key,message.flags);

}

// 如果地址已经被监听,直接绑定handel到已经监听到服务上,去消费请求
// Set custom server data
handle.add(worker,(errno,reply,handle) => {
reply = util._extend({
errno: errno,key: key,ack: message.seq,data: handles[key].data
},reply);

if (errno)
  delete handles[key]; // Gives other workers a chance to retry.

send(worker,handle);

});
}

看到这一步,已经很明显,我们知道了多进行端口共享的实现原理

  1. 其实端口仅由master进程中的内部TCP服务器监听了一次
  2. 因为net.js 模块中会判断当前的进程是master还是Worker进程
  3. 如果是Worker进程调用cluster._getServer 去hack原生的listen 方法
  4. 所以在child调用的listen方法,是一个return 0 的空方法,所以不会报端口占用错误

那现在问题来了,既然Worker进程是如何获取到master进程监听服务接收到的connect呢?

  1. 监听master进程启动的TCP服务器的connection事件
  2. 通过轮询挑选出一个worker
  3. 向其发送newconn内部消息,消息体中包含了客户端句柄
  4. 有了句柄,谁都知道要怎么处理了哈哈
function RoundRobinHandle(key,fd) {

this.server = net.createServer(assert.fail);

if (fd >= 0)
this.server.listen({ fd });
else if (port >= 0)
this.server.listen(port,address);
else
this.server.listen(address); // UNIX socket path.

this.server.once('listening',() => {
this.handle = this.server._handle;
// 监听onconnection方法
this.handle.onconnection = (err,handle) => this.distribute(err,handle);
this.server._handle = null;
this.server = null;
});
}

RoundRobinHandle.prototype.add = function (worker,send) {
// ...
};

RoundRobinHandle.prototype.remove = function (worker) {
// ...
};

RoundRobinHandle.prototype.distribute = function (err,handle) {
// 负载均衡地挑选出一个worker
this.handles.push(handle);
const worker = this.free.shift();
if (worker) this.handoff(worker);
};

RoundRobinHandle.prototype.handoff = function (worker) {
const handle = this.handles.shift();
const message = { act: 'newconn',key: this.key };
// 向work进程其发送newconn内部消息和客户端的句柄handle
sendHelper(worker.process,message,handle,(reply) => {
// ...
this.handoff(worker);
});
};

下面让我们看看Worker进程接收到newconn消息后进行了哪些操作

rush:js;"> // lib/child.js function onmessage(message,handle) { if (message.act === 'newconn') onconnection(message,handle); else if (message.act === 'disconnect') _disconnect.call(worker,true); }

// Round-robin connection.
// 接收连接,并且处理
function onconnection(message,handle) {
const key = message.key;
const server = handles[key];
const accepted = server !== undefined;

send({ ack: message.seq,accepted });

if (accepted) server.onconnection(0,handle);
}

总结

  1. net模块会对进程进行判断,是worker 还是master,是worker的话进行hack net.Server实例的listen方法
  2. worker 调用的listen 方法是hack掉的,直接return 0,不过会向master注册一个connection接手的事件
  3. master 收到客户端connection事件后,会轮询向worker发送connection上来的客户端句柄
  4. worker收到master发送过来客户端的句柄,这时候就可以处理客户端请求了

分享出于共享学习的目的,如有错误,欢迎大家留言指导,不喜勿喷。也希望大家多多支持编程之家。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐