如何解决重新启动服务器后,在node-rdkafka
我已使用node-rdkafka进行消息处理,所有工作均按预期进行。每当我重新启动服务器时,我面临的1个问题就是队列中始终有1条消息可用,即使上次成功提交也是如此。
consumer.ts
import * as Kafka from 'node-rdkafka'
import * as async from 'async'
import { CommitManager } from './commitManager'
import { ConsumerConfiguration } from './interface'
export class KafkaConsumer {
commitManager: CommitManager
consumer: Kafka.KafkaConsumer
paused: boolean
config: ConsumerConfiguration
groupId: string
retryBackoffTime: number
rdKafkaMaxRetry: number
maxRetry: number
exponentialFactor: number
parallelHandles: number
maxQueueSize: number
processMsg: Function
constructor(processMsg: Function,config: ConsumerConfiguration) {
this.config = config
this.groupId = 'kafka-test'
this.retryBackoffTime = 500
this.rdKafkaMaxRetry = 3
this.maxRetry = 3
this.exponentialFactor = 2
this.parallelHandles = 3
this.maxQueueSize = 500
this.paused = false
this.processMsg = processMsg
this.commitManager = new CommitManager()
this.msgQueue.drain(() => {
if (this.paused) {
this.consumer.resume(this.consumer.assignments())
this.paused = false
}
})
this.config['rebalance_cb'] = this.onRebalance.bind(this.consumer)
this.consumer = new Kafka.KafkaConsumer(this.config,{})
}
/* istanbul ignore next */
consumerConnect = (topicName: string): void => {
this.consumer.connect()
this.consumer
.on('ready',() => {
this.consumer.subscribe([topicName])
this.consumer.consume()
this.commitManager.start(this.consumer)
})
.on('data',async (data) => {
this.msgQueue.push(data)
console.log('this.msgQueue.length',this.msgQueue.length())
if (this.msgQueue.length() > this.maxQueueSize) {
console.log('Max queue length reached',this.msgQueue.length())
this.consumer.pause(this.consumer.assignments())
this.paused = true
}
})
.on('event.error',(err) => {
return {
message: 'Something wrong occurred!',error: err
}
})
.on('disconnected',() => {
console.log('consumer disconnected')
process.exit(1)
})
}
handleCB = async (data: any,handler: any): Promise<void> => {
const startNotificationData = await this.commitManager.notifyStartProcessing(data)
if (startNotificationData && startNotificationData.length > 0) {
await handler(data)
this.commitManager.notifyFinishedProcessing(data)
}
}
msgQueue = async.queue(async (data,done): Promise<void> => {
await this.handleCB(data,this.processMsg)
done()
},this.parallelHandles)
onRebalance = async (err: { code: number; },assignments: Kafka.Assignment[]): Promise<void> => {
if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
this.consumer.assign(assignments)
} else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
if (this.paused) {
this.consumer.resume(assignments)
this.paused = false
}
this.msgQueue.remove(() => { return true })
this.consumer.unassign()
this.commitManager.onRebalance()
} else {
console.error(`Rebalace error : ${err}`)
}
}
}
commitManager.ts
import { KafkaConsumer } from "node-rdkafka"
export class CommitManager {
partitionsData: any
lastCommited: any
consumer: KafkaConsumer
constructor() {
this.partitionsData = {}
this.lastCommited = []
}
start(consumer) {
this.consumer = consumer
}
async notifyStartProcessing(data) {
const partition = data.partition
const offset = data.offset
const topic = data.topic
this.partitionsData[partition] = this.partitionsData[partition] || []
this.partitionsData[partition].push({
offset: offset,topic: topic,done: false
})
return this.partitionsData[partition]
}
async notifyFinishedProcessing(data) {
const partition = data.partition
const offset = data.offset
this.partitionsData[partition] = this.partitionsData[partition] || []
let record = this.partitionsData[partition].filter(
(record) => { return record.offset === offset }
)[0]
if (record) {
record.done = true
this.commitProcessedOffsets()
}
}
/* istanbul ignore next */
async commitProcessedOffsets() {
try {
const offsetsToCommit = []
for (let key in this.partitionsData) {
const lastProcessedindex = this.partitionsData[key]
.findindex((record) => { return record.done })
const firstProcessedindex = this.partitionsData[key]
.findindex((record) => { return !record.done })
const lastProcessedRecord = firstProcessedindex > 0 ?
this.partitionsData[key][firstProcessedindex - 1] :
(lastProcessedindex > -1 ?
this.partitionsData[key][this.partitionsData[key].length - 1] :
null
)
if (lastProcessedRecord) {
offsetsToCommit.push({
partition: +key - 0,offset: lastProcessedRecord.offset,topic: lastProcessedRecord.topic
})
// remove commited records from array
this.partitionsData[key]
.splice(0,this.partitionsData[key].indexOf(lastProcessedRecord) + 1)
}
}
console.log('Offset to commit',offsetsToCommit)
if (offsetsToCommit.length > 0) {
this.consumer.commit(offsetsToCommit)
}
this.lastCommited = offsetsToCommit.length > 0 ? offsetsToCommit : this.lastCommited
Promise.resolve()
}
catch (e) {
Promise.reject(e)
}
}
onRebalance() {
this.partitionsData = {}
}
getLastCommited() {
return this.lastCommited
}
}
请在出现错误或此处遗漏的任何地方帮助我。任何帮助将不胜感激。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。