如何解决无法使用 nifi-flink 连接器将 apache flink 连接到 NIFI 源
我想使用 nifi 将文件从本地系统传输到 flink。我已经在 nifi 中配置了带有 GetFile 处理器的管道和名称为“Data For Flink”的输出端口。在 Flink 端,我使用带有 flink 的 flink-connector-nifi_2.11。下面是我在 Flink 中的 nifi 客户端代码。
SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder()
// .url("https://localhost:9443/nifi")
.url("http://localhost:8080/nifi")
.portName("Data For Flink")
.transportProtocol(SiteToSiteTransportProtocol.HTTP)
.requestBatchCount(5)
.buildConfig();
SourceFunction<NiFiDataPacket> nifiSource = new NiFiSource(clientConfig);
DataStream<NiFiDataPacket> streamSource = streamExecEnv.addSource(nifiSource);
DataStream<String> data = streamSource.map(new MapFunction<NiFiDataPacket,String>() {
@Override
public String map(NiFiDataPacket line) throws Exception {
String row = new String(line.getContent(),Charset.defaultCharset());
return row;
}
});
data.print();
streamExecEnv.execute();
当我运行上面的代码时,我没有得到任何输出以及任何错误。我的 Nifi 管道是否正确?我在这里遗漏了什么吗?
我还需要配置安全的 nifi 实例或任何站点到站点的属性来将数据从 Nifi 传输到 Flink 吗?
我也试过了,但是我遇到了 ca 证书的问题。我已经使用 tls-toolkit 生成了 PKCS12 证书。但是当我尝试将其上传到 java truststore 时,我收到错误消息 - 需要 x509 证书。
编辑 1:我在 IntelliJ IDEA 上运行上述代码,并在类路径上添加了 Flink 依赖项。
我是 NiFi 和 Flink 的新手。感谢您的帮助!
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。