如何解决如何使用ExecuteScript和Python从nifi中的一个传入流文件创建多个流文件
在本地运行,这完全符合我的要求(在位置7-10有一个传入流文件,其中有许多不同的代码,每个唯一代码输出1个文件)例如,如果记录1-5在位置7-具有1234 10,记录6在7-10的位置具有2345,记录7在7-10的位置具有1234,那么将有一个名为1234_file.txt的文件具有1-5和7行,而第二个文件2345_file.txt则具有输入文件的第6行:
f = open("test_comp.txt","r")
for x in f:
comp = x[6:10]
print(comp)
n = open(comp+"_file.txt","a")
n.write(x)
n.close()
f.close()
在nifi中,我正在尝试:
from org.apache.commons.io import IoUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback
class PyStreamCallback(StreamCallback):
def __init__(self):
pass
def process(self,inputStream,outputStream):
f = open(inputStream,'r')
for x in f:
comp = x[6:10]
print("comp: ",comp)
newFile = open(comp+"_file.txt","a")
newFile.write(x)
flowFile = session.get()
if (flowFile != None):
flowFile = session.write(flowFile,PyStreamCallback())
session.transfer(flowFile,REL_SUCCESS)
session.commit()
似乎正在获得输入并按预期将comp正确地存储在位置7-10处,但是我没有得到多个流文件(对于x [6:10]中的每个唯一字符串。 out是1个零字节文件。
对我想念的东西有什么想法吗?
解决方法
您直接在文件系统中写入文件,而不是在NiFi生态系统中的对象 flowfiles 中写入。我建议阅读Apache NiFi Developer's Guide中有关这些模式的上下文,并查看一些Python ExecuteScript examples以查看相关的Python代码。
您需要创建多个flowfile对象,将数据映射到它们,然后将它们的全部转移到各自的关系中,而不是写出单个流文件。
是否有必要使用自定义Python代码而不是SplitRecord
和/或PartitionRecord
处理器来执行此操作?我认为PartitionRecord
可以很轻松地解决您所描述的问题。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。