微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

重新启动服务器后,在node-rdkafka

如何解决重新启动服务器后,在node-rdkafka

我已使用node-rdkafka进行消息处理,所有工作均按预期进行。每当我重新启动服务器时,我面临的1个问题就是队列中始终有1条消息可用,即使上次成功提交也是如此。

consumer.ts

import * as Kafka from 'node-rdkafka'
import * as async from 'async'
import { CommitManager } from './commitManager'
import { ConsumerConfiguration } from './interface'

export class KafkaConsumer {
    commitManager: CommitManager
    consumer: Kafka.KafkaConsumer
    paused: boolean
    config: ConsumerConfiguration
    groupId: string
    retryBackoffTime: number
    rdKafkaMaxRetry: number
    maxRetry: number
    exponentialFactor: number
    parallelHandles: number
    maxQueueSize: number
    processMsg: Function

constructor(processMsg: Function,config: ConsumerConfiguration) {
    this.config = config
    this.groupId = 'kafka-test'
    this.retryBackoffTime = 500
    this.rdKafkaMaxRetry = 3
    this.maxRetry = 3
    this.exponentialFactor = 2
    this.parallelHandles = 3
    this.maxQueueSize = 500
    this.paused = false
    this.processMsg = processMsg
    this.commitManager = new CommitManager()
    this.msgQueue.drain(() => {
        if (this.paused) {
            this.consumer.resume(this.consumer.assignments())
            this.paused = false
        }
    })
    this.config['rebalance_cb'] = this.onRebalance.bind(this.consumer)
    this.consumer = new Kafka.KafkaConsumer(this.config,{})
}
/* istanbul ignore next */
consumerConnect = (topicName: string): void => {
    this.consumer.connect()
    this.consumer
        .on('ready',() => {
            this.consumer.subscribe([topicName])
            this.consumer.consume()
            this.commitManager.start(this.consumer)
        })
        .on('data',async (data) => {
            this.msgQueue.push(data)
            console.log('this.msgQueue.length',this.msgQueue.length())
            if (this.msgQueue.length() > this.maxQueueSize) {
                console.log('Max queue length reached',this.msgQueue.length())
                this.consumer.pause(this.consumer.assignments())
                this.paused = true

            }
        })
        .on('event.error',(err) => {
            return {
                message: 'Something wrong occurred!',error: err
            }
        })
        .on('disconnected',() => {
            console.log('consumer disconnected')
            process.exit(1)
        })
}

handleCB = async (data: any,handler: any): Promise<void> => {
    const startNotificationData = await this.commitManager.notifyStartProcessing(data)
    if (startNotificationData && startNotificationData.length > 0) {
        await handler(data)
        this.commitManager.notifyFinishedProcessing(data)
    }
}

msgQueue = async.queue(async (data,done): Promise<void> => {
    await this.handleCB(data,this.processMsg)
    done()
},this.parallelHandles)

onRebalance = async (err: { code: number; },assignments: Kafka.Assignment[]): Promise<void> => {
    if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {
        this.consumer.assign(assignments)
    } else if (err.code === Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {
        if (this.paused) {
            this.consumer.resume(assignments)
            this.paused = false
        }
        this.msgQueue.remove(() => { return true })
        this.consumer.unassign()
        this.commitManager.onRebalance()
    } else {
        console.error(`Rebalace error : ${err}`)
    }
}
}

commitManager.ts

import { KafkaConsumer } from "node-rdkafka"

export class CommitManager {

    partitionsData: any
    lastCommited: any
    consumer: KafkaConsumer

    constructor() {
        this.partitionsData = {}
         this.lastCommited = []
    }

    start(consumer) {
        this.consumer = consumer
    }

    async notifyStartProcessing(data) {
        const partition = data.partition
        const offset = data.offset
        const topic = data.topic
        this.partitionsData[partition] = this.partitionsData[partition] || []
        this.partitionsData[partition].push({
        offset: offset,topic: topic,done: false
    })
    return this.partitionsData[partition]
}

async notifyFinishedProcessing(data) {
    const partition = data.partition
    const offset = data.offset
    this.partitionsData[partition] = this.partitionsData[partition] || []
    let record = this.partitionsData[partition].filter(
        (record) => { return record.offset === offset }
    )[0]
    if (record) {
        record.done = true
        this.commitProcessedOffsets()
    }
}
/* istanbul ignore next */
async commitProcessedOffsets() {
    try {
        const offsetsToCommit = []
        for (let key in this.partitionsData) {
            const lastProcessedindex = this.partitionsData[key]
                .findindex((record) => { return record.done })
            const firstProcessedindex = this.partitionsData[key]
                .findindex((record) => { return !record.done })
            const lastProcessedRecord = firstProcessedindex > 0 ?
                this.partitionsData[key][firstProcessedindex - 1] :
                (lastProcessedindex > -1 ?
                    this.partitionsData[key][this.partitionsData[key].length - 1] :
                    null
                )
            if (lastProcessedRecord) {
                offsetsToCommit.push({
                    partition: +key - 0,offset: lastProcessedRecord.offset,topic: lastProcessedRecord.topic
                })
                // remove commited records from array
                this.partitionsData[key]
                    .splice(0,this.partitionsData[key].indexOf(lastProcessedRecord) + 1)
            }
        }
        console.log('Offset to commit',offsetsToCommit)
        if (offsetsToCommit.length > 0) {
            this.consumer.commit(offsetsToCommit)
        }
        this.lastCommited = offsetsToCommit.length > 0 ? offsetsToCommit : this.lastCommited
        Promise.resolve()
    }
    catch (e) {
        Promise.reject(e)
    }
}

onRebalance() {
    this.partitionsData = {}
}

getLastCommited() {
    return this.lastCommited
}
}

请在出现错误或此处遗漏的任何地方帮助我。任何帮助将不胜感激。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。