如何解决Puppeteer-Cluster不使用nodeJS worker
我在nodeJS的多工作程序模式下使用puppeteer-cluster,由于某种原因,只有一个工作程序正在打开我定义的并发浏览器的数量。另一个被忽略。我在做什么错了?
基本上,我为每个工作程序启动一个具有2个浏览器并发性的集群,因此我希望它打开工作程序数(如我定义的cpus数)*每个工作程序2个浏览器,但实际上仅打开了两个浏览器。
例如: 假设我有8个内核,所以我要启动8个工作程序,每个工作程序将启动两个puppeteer浏览器,总共16个。相反,在无头调试时:true模式,我只能打开两个浏览器。
单位: “ puppeteer”:“ ^ 5.2.1”, “ puppeteer-cluster”:“ ^ 0.22.0”
import {Cluster} from 'puppeteer-cluster';
import {ReportTimeouts} from "../../config/reports.consts";
import {isDebug} from "../../utils/env.utils";
import {IPuppeteerClusterService} from "../../interfaces/services.interfaces";
export default class PuppeteerClusterService implements IPuppeteerClusterService {
private cluster;
public getCluster() {
if (!this.cluster) {
throw new Error(`PuppeteerClusterService.getCluster: init didn't run`);
}
return this.cluster;
}
public async init() {
const args = [
'--no-first-run','--no-zygote','--no-sandBox','--disable-extensions','--disable-setuid-sandBox','--disable-dev-shm-usage','--ignore-certificate-errors',"--proxy-server='direct://'",'--proxy-bypass-list=*','--lang=en-US,en'];
const debugMode = isDebug;
const headless = !debugMode;
const concurrency = Cluster.CONCURRENCY_broWSER;
const maxConcurrency = 2;
const cluster = await Cluster.launch({
concurrency,maxConcurrency,puppeteerOptions: {
headless,dumpio: debugMode,handleSIGTERM: true,handleSIGINT: true,args
},monitor: false,// turn this on to get cpu / memory usages
timeout: ReportTimeouts.PuppeteerClusterTimeout,});
console.log(`PuppeteerClusterService.init: initialized puppeteer cluster with concurrency type ${concurrency} and max concurrency of ${maxConcurrency}`);
console.log('PuppeteerClusterService.init: running headless?: ',headless);
this.cluster = cluster;
}
}
import './src/services/monitoring/tracer';
import {config} from 'dotenv';
import * as process from 'process';
import * as http from 'http';
import * as cluster from 'cluster';
import * as os from 'os';
import App from './app';
import initORM from './src/config/sequelize_config.handler';
import routes from './src/routes';
import {DEFAULT_PORTS,REQUEST_TIMEOUT_MINUTES} from './src/config/networking.consts';
import {IServices} from "./src/interfaces/services.interfaces";
import SystemSetting from "./src/models/system_setting.model";
import Services from "./src/services";
import {ISystemSetting} from "./src/interfaces/models/system_setting.interface";
import ActiveReportSendingScheduler from "./src/logic/scheduled_tasks/active_report_sending.scheduler";
import appConfig from './src/config';
import LoggerService from './src/services/logger.service';
import DayTaggingReportSendingScheduler from './src/logic/scheduled_tasks/day_tagging_report_sending.scheduler';
import { MessageConsumingManager } from './src/logic/messaging/message_consuming.manager';
import { isDebug } from './src/utils/env.utils';
config()
const env = process.env.NODE_ENV?.toLocaleLowerCase() || 'dev';
monitorServer(env,process);
const ports = {
http: process.env.HTTP_PORT ? Number(process.env.HTTP_PORT) : DEFAULT_PORTS.http,};
const workers = [];
function setupOrm() {
const logger = LoggerService;
console.info(`setupOrm: app initiating on env ${env}`);
initORM({logger});
console.info(`setupOrm: sequelize ORM initiated`);
console.info('setupOrm: loading static tables into memory');
console.info(`setupOrm: env params are ${JSON.stringify(process.env)}`);
}
const setupWorkerProcesses = (services:IServices) => {
const numCores = os.cpus().length;
services.logger.info('setupWorkerProcesses: master cluster setting up ' + numCores + ' workers');
// iterate on number of cores need to be utilized by an application
// current example will utilize all of them
for(let i = 0; i < numCores; i++) {
// creating workers and pushing reference in an array
// these references can be used to receive messages from workers
workers.push(cluster.fork());
// to receive messages from worker process
workers[i].on('message',function(message) {
services.logger.info(message);
});
}
// process is clustered on a core and process id is assigned
cluster.on('online',function(worker) {
services.logger.info('setupWorkerProcesses: worker ' + worker.process.pid + ' is listening');
});
// if any of the worker process dies then start a new one by simply forking another one
cluster.on('exit',function(worker,code,signal) {
services.logger.info('setupWorkerProcesses: worker ' + worker.process.pid + ' died with code: ' + code + ',and signal: ' + signal);
services.logger.info('setupWorkerProcesses: starting a new worker');
cluster.fork();
workers.push(cluster.fork());
// to receive messages from worker process
workers[workers.length-1].on('message',function(message) {
services.logger.info(message);
});
});
};
async function setApp(services:IServices,systemSettings: ISystemSetting[]) {
const app = await App.init({routes,services,systemSettings,env});
services.logger.info(`setApp: app initiated on env ${env}`);
services.logger.info(`setApp: app initiated with config ${JSON.stringify(appConfig)}`);
services.logger.info(`setApp: routes: ${Object.keys(routes).join(' | ')}`);
services.logger.info(`setApp: ports: ${JSON.stringify(ports)}`);
services.logger.info(`setApp: database connection gained`);
const server = http.createServer(app)
.listen(
ports.http,async () => {
services.logger.info(`setApp: HTTP Server successfully started at port ${ports.http}`);
}
);
server.keepAliveTimeout = REQUEST_TIMEOUT_MINUTES * 60 * 1000; // Time (in ms) server will wait and keep the connection open after last response.
server.headersTimeout = (REQUEST_TIMEOUT_MINUTES * 60 * 1000) + 1000; // https://github.com/nodejs/node/issues/27363#issuecomment-603489130
}
function setSchedulers(services:IServices,systemSettings: ISystemSetting[]) {
const schedulers = [
new ActiveReportSendingScheduler(services,systemSettings),new DayTaggingReportSendingScheduler(services,];
schedulers.forEach(s=>s.init());
services.logger.info(`setSchedulers: schedulers initiated on env ${env}`);
}
function setMessageConsumers(services:IServices,systemSettings: ISystemSetting[]){
new MessageConsumingManager(services,systemSettings).initialize();
services.logger.info(`setMessageConsumers: message consumers initiated on env ${env}`);
}
/**
* Setup server either with clustering or without it
* @param isClusterrequired
*/
const setupServer = async () => {
console.info(`setupServer: initating app in multiprocess mode`);
setupOrm();
const systemSettings = await SystemSetting.findAll().then((settings) => settings.filter((s:SystemSetting) => !s.env || s.env.includes(env) || env.includes(s.env)));
const services: IServices = new Services(systemSettings,process.env);
await services.init();
services.logger.info(`setupServer: initiated app in multi process mode`);
if (cluster.isMaster) {
setupWorkerProcesses(services);
setSchedulers(services,systemSettings);
setMessageConsumers(services,systemSettings);
} else {
await setApp(services,systemSettings);
}
};
function monitorServer(env:string,proc: NodeJS.Process){
if (env !== 'production'){
console.warn(`monitorServer - not production so no monitoring.`);
return;
}
if (!process.env.NEW_RELIC_KEY){
console.warn(`monitorServer - NEW_RELIC_KEY not provided,not loading.`);
return;
}
if (!process.env.APP_NAME){
console.warn(`monitorServer - APP_NAME not provided,not loading.`);
return;
}
const newRelic = require('newrelic');
console.info(`monitorServer - newrelic loaded: ${typeof newRelic === 'object'}`);
}
setupServer();
import {
IAnalyticsService,ICacheService,ICycleTaggingService,IEmailSendingService,IFileUploader,ILogger,IMonitoringService,IPowerBIAuthService,IPowerBIService,IPuppeteerClusterService,IReportMonitoringServiceFactory,IServices,ISiteService,IUserService
} from "../interfaces/services.interfaces";
import {ISystemSetting} from "../interfaces/models/system_setting.interface";
import CacheService from "./cache.service";
import LoggerService from './logger.service';
import {S3FileUploader} from "./persistence/s3_file.uploader";
import {EmailSendingService} from "./sendouts/email_sending.service";
import {AdminUserService} from "./external_models/user.service";
import {ModerationService} from "./sendouts/moderation.service";
import {PowerBIAuthService} from "./power_bi/power_bi_auth.service";
import {PowerBIService} from "./power_bi/power_bi.service";
import {AdminSiteService} from "./external_models/site.service";
import {CycleTaggingService} from "./cycle_tagging.service";
import MonitoringService from "./monitoring/monitoring.service";
import ReportMonitoringServiceFactory from "./monitoring/report_monitoring.service.factory";
import {AutoSendoutCalculatorFactory} from "../logic/sendout/auto_sendout_calculator.factory";
import PuppeteerClusterService from "./screenshots/puppeteer_cluster.service";
import {RedisFactory} from "./redis.factory";
import {MutexFactory} from "./mutex.factory";
import {IMutexFactory} from "../interfaces/general.interfaces";
import {AnalyticsService} from "./analytics/analytics.service";
import {DummyAnalyticsService} from "./analytics/dummy.service";
import {IAutoSendoutCalculatorFactory} from "../interfaces/sendouts.interface";
export default class Services implements IServices {
puppeteerClusterService: IPuppeteerClusterService;
constructor(systemSettings: ISystemSetting[],processEnv: Record<string,string | undefined>){
this.puppeteerClusterService = new PuppeteerClusterService();
}
public async init(): Promise<void> {
await this.puppeteerClusterService.init();
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。