如何解决Apache Kafka Spout SSL
我们正在创建一个带有 kafka spout 的 STORM 拓扑。 Kafka 上的数据使用生产者的 SSL 加密。我们使用直接的 Storm Bolt 集成来访问来自 Kafka spout 的数据,如下所示
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.List;
import java.util.Properties;
import org.apache.log4j.Logger;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.brokerHosts;
import org.apache.storm.kafka.Kafkaspout;
import org.apache.storm.kafka.spoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.utils.Utils;
public class Topology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();
// getting properties from topology.properties
Properties prop = new Properties();
InputStream input = null;
input = new FileInputStream(args[0]);
prop.load(input);
spoutId = prop.getProperty("spoutId");
topologyName = prop.getProperty("topologyName");
zkhost = prop.getProperty("zkhost");
// setting workers for each bolt from properties
numspout = Integer.parseInt(prop.getProperty("numspout"));
numConverter = Integer.parseInt(prop.getProperty("numConverter"));
numTaskspout = Integer.parseInt(prop.getProperty("numTaskspout"));
numTaskConverter = Integer.parseInt(prop.getProperty("numTaskConverter"));
spoutConfig spoutConfig = new spoutConfig(hosts,inputTopic,"/" + Kafkabroker,consumerGroup);
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
spoutConfig.maxOffsetBehind = Long.MAX_VALUE;
spoutConfig.useStartOffsetTimeIfOffsetoutOfRange = true;
spoutConfig.ignoreZkOffsets = true;
spoutConfig.zkPort = 2181;
spoutConfig.socketTimeoutMs = socketTimeoutMs;
spoutConfig.fetchSizeBytes = fetchSizeBytes;
Kafkaspout kafkaspout = new Kafkaspout(spoutConfig);
builder.setspout(spoutId,kafkaspout,numspout).setNumTasks(numTaskspout)
.addConfiguration("offset.commit.period.ms",offsetCommitPeriodMs)
.addConfiguration("max.uncommitted.offsets",maxUncommittedOffsets)
.addConfiguration("poll.timeout.ms",pollTimeoutMs);
builder.setBolt("Converter",new Converter(),numConverter).shuffleGrouping(spoutId)
.setNumTasks(numTaskConverter);
我们如何配置Kafka spout来访问加密数据??
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。