如何解决Apache Flume - 将失败的消息从 HTTP 接收器发送到不同的 Kafka 主题
我正在使用 Apache Flume 处理从 Apache kafka 通道到 HTTP 接收器的数据。我正在尝试为失败的 HTTP 调用自定义 HTTP 接收器行为。当对 HTTP 端点的调用失败并显示 4XX 响应代码时,我需要将这些事件发送到不同的 kafka 主题(说坏事件 kafka 主题)。来自这个坏事件主题的事件不会被水槽管道进一步处理。 (不同的应用程序会处理它)。
问题:
- 是否应该在我的flume.conf 文件中将此不良事件kafka 声明为flume source、flume channel 或flume sink?
- 在我的自定义 HTTP 接收器 process() 方法中,如何获得对这个不良事件 kafka 主题的引用?
这里贴出HTTP sink的处理方法:
public class CustomHttpSink extends AbstractSink implements Configurable {
@Override
public final Status process() throws EventDeliveryException {
Status status = null;
OutputStream outputStream = null;
Channel ch = getChannel();
Transaction txn = ch.getTransaction();
txn.begin();
try {
Event event = ch.take();
byte[] eventBody = null;
if (event != null) {
eventBody = event.getBody();
}
if (eventBody != null && eventBody.length > 0) {
sinkCounter.incrementEventDrainAttemptCount();
LOG.debug("Sending request : " + new String(event.getBody()));
HttpURLConnection connection = connectionBuilder.getConnection();
outputStream = connection.getoutputStream();
outputStream.write(eventBody);
outputStream.flush();
outputStream.close();
int httpStatusCode = connection.getResponseCode();
LOG.debug("Got status code : " + httpStatusCode);
if (httpStatusCode == HttpURLConnection.HTTP_BAD_REQUEST) {
// -------> WRITE this MESSAGE TO BAD events KAFKA TOPIC
}
}
} catch (Throwable t) {
txn.rollback();
status = Status.BACKOFF;
LOG.error("Error sending HTTP request,retrying",t);
sinkCounter.incrementEventWriteOrChannelFail(t);
// re-throw all Errors
if (t instanceof Error) {
throw (Error) t;
}
} finally {
txn.close();
if (outputStream != null) {
try {
outputStream.close();
} catch (IOException e) {
// ignore errors
}
}
}
return status;
}
}
# Below agent defines kafka as a channel and http as a sink.
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source. Adding netcat as sample source for testing.
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = com.flume.poc.sink.CustomHttpSink
a1.sinks.k1.endpoint = http://localhost:8080/process
a1.sinks.k1.contentTypeHeader = application/json
# Use a channel which buffers events in kafka
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = loalhost:9092
a1.channels.c1.kafka.topic = flume-channel-topic
a1.channels.c1.kafka.consumer.group.id = flume-consumer1
a1.channels.c1.parseAsFlumeEvent=false
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。