如何解决Nifi ExecuteScript /尾部文件
我想拖尾日志文件。 (不幸的是,在Nifi中,处理器TailFile并没有执行我想要的操作,因为如果删除了尾部文件,状态似乎会丢失)。
因此,我想使用ExecuteScript命令(Jython / python)使用类似的处理器。 我的主要问题是要在Nifi Jython软件包中的Python中有效地尾巴文件。
> from org.apache.nifi.components.state import Scope from
> org.apache.commons.io import IoUtils from java.nio.charset import
> StandardCharsets from java.io import BufferedReader,InputStreamReader
> from org.apache.nifi.processor.io import StreamCallback
>
> # Define a subclass of StreamCallback for use in session.write() class PyStreamCallback(StreamCallback): def __init__(self,old_position,> line_count):
> self.__line_count = int(line_count)
> self.__old_position = int(old_position) def process(self,inputStream,outputStream):
> reader = BufferedReader(InputStreamReader(inputStream))
> try:
> # Loop indefinitely
> i = 0
> tailled =[]
> while True:
> # Get the next line
> line = reader.readLine()
> # If there is no more content,break out of the loop
> if line is None:
> break
> else:
> i += 1
> if i > self.__old_position:
> tailled.append(line)
> outputs = '\n'.join(tailled)
> outputStream.write(bytearray(outputs.encode('utf-8')))
>
> finally:
> if reader is not None:
> reader.close()
>
>
> # end class
>
>
>
> stateManager = context.stateManager
>
> stateMap = stateManager.getState(Scope.LOCAL) oldMap
> stateManager.getState(Scope.LOCAL).toMap() flowFile = session.get()
>
> if flowFile != None:
>
> unique_name = flowFile.getAttribute("absolute.path") + flowFile.getAttribute("filename")
> line_count = flowFile.getAttribute("text.line.count")
> new_map = {}
> new_map.update(oldMap)
> new_map.update({unique_name: line_count})
> if stateMap.version == -1:
> stateManager.setState(new_map,Scope.LOCAL)
> old_position = stateMap.get(unique_name)
> flowFile = session.putAttribute(flowFile,"old_position",old_position)
> session.transfer(flowFile,REL_SUCCESS)
> else:
> if stateMap.get(unique_name):
> old_position = stateMap.get(unique_name)
> flowFile = session.putAttribute(flowFile,old_position)
> flowFile = session.write(flowFile,PyStreamCallback(old_position,line_count))
> session.transfer(flowFile,REL_SUCCESS)
> stateManager.replace(stateMap,new_map,Scope.LOCAL)
它在注册状态(根据路径名:因此是上次读取的old_position)并在BufferedReader中读取FlowFiles(流)的行时起作用。我的FlowFiles可能非常庞大且(XXX MB),因此此方法效率很低。 “输出”变量迅速变大,我的Java Heap(1.5GB)耗尽了一个小文件(180MB)
# Loop indefinitely i = 0 tailled =[] while True: # Get the next line line = reader.readLine() # If there is no more content,break out of the loop if line is None: break else: i += 1 if i > self.__old_position: tailled.append(line) outputs = '\n'.join(tailled)
我知道行数以及在文件中需要从哪里开始(在我的情况下是Stream),您是否有一个最好的方式来获取最后几行? 我愿意从python切换。
我当时打算添加另一个处理器来执行读取(ExecuteStreamCommand)。
谢谢您的帮助。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。