微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Apache Flume - 将失败的消息从 HTTP 接收器发送到不同的 Kafka 主题

如何解决Apache Flume - 将失败的消息从 HTTP 接收器发送到不同的 Kafka 主题

我正在使用 Apache Flume 处理从 Apache kafka 通道到 HTTP 接收器的数据。我正在尝试为失败的 HTTP 调用自定义 HTTP 接收器行为。当对 HTTP 端点的调用失败并显示 4XX 响应代码时,我需要将这些事件发送到不同的 kafka 主题(说坏事件 kafka 主题)。来自这个坏事件主题的事件不会被水槽管道进一步处理。 (不同的应用程序会处理它)。

问题:

  1. 是否应该在我的flume.conf 文件中将此不良事件kafka 声明为flume source、flume channel 或flume sink?
  2. 在我的自定义 HTTP 接收器 process() 方法中,如何获得对这个不良事件 kafka 主题的引用?

这里贴出HTTP sink的处理方法

public class CustomHttpSink extends AbstractSink implements Configurable {

    @Override
    public final Status process() throws EventDeliveryException {
        Status status = null;
        OutputStream outputStream = null;

        Channel ch = getChannel();
        Transaction txn = ch.getTransaction();
        txn.begin();

        try {
            Event event = ch.take();

            byte[] eventBody = null;
            if (event != null) {
                eventBody = event.getBody();
            }

            if (eventBody != null && eventBody.length > 0) {
                sinkCounter.incrementEventDrainAttemptCount();
                LOG.debug("Sending request : " + new String(event.getBody()));

                
                    HttpURLConnection connection = connectionBuilder.getConnection();

                    outputStream = connection.getoutputStream();
                    outputStream.write(eventBody);
                    outputStream.flush();
                    outputStream.close();

                    int httpStatusCode = connection.getResponseCode();
                    LOG.debug("Got status code : " + httpStatusCode);

                    if (httpStatusCode == HttpURLConnection.HTTP_BAD_REQUEST) {
                        // -------> WRITE this MESSAGE TO BAD events KAFKA TOPIC
                    }
                
            } 

        } catch (Throwable t) {
            txn.rollback();
            status = Status.BACKOFF;

            LOG.error("Error sending HTTP request,retrying",t);
            sinkCounter.incrementEventWriteOrChannelFail(t);

            // re-throw all Errors
            if (t instanceof Error) {
                throw (Error) t;
            }

        } finally {
            txn.close();

            if (outputStream != null) {
                try {
                    outputStream.close();
                } catch (IOException e) {
                    // ignore errors
                }
            }
        }

        return status;
    }
}

我的 Flume.conf 文件内容

# Below agent defines kafka as a channel and http as a sink.

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source. Adding netcat as sample source for testing.
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = com.flume.poc.sink.CustomHttpSink
a1.sinks.k1.endpoint = http://localhost:8080/process
a1.sinks.k1.contentTypeHeader = application/json


# Use a channel which buffers events in kafka
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = loalhost:9092
a1.channels.c1.kafka.topic = flume-channel-topic
a1.channels.c1.kafka.consumer.group.id = flume-consumer1
a1.channels.c1.parseAsFlumeEvent=false


# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。