微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Nifi ExecuteScript /尾部文件

如何解决Nifi ExecuteScript /尾部文件

我想拖尾日志文件。 (不幸的是,在Nifi中,处理器TailFile并没有执行我想要的操作,因为如果删除了尾部文件,状态似乎会丢失)。

因此,我想使用ExecuteScript命令(Jython / python)使用类似的处理器。 我的主要问题是要在Nifi Jython软件包中的Python中有效地尾巴文件

> from org.apache.nifi.components.state import Scope from
> org.apache.commons.io import IoUtils from java.nio.charset import
> StandardCharsets from java.io import BufferedReader,InputStreamReader
> from org.apache.nifi.processor.io import StreamCallback
> 
> # Define a subclass of StreamCallback for use in session.write() class PyStreamCallback(StreamCallback):   def __init__(self,old_position,> line_count):
>         self.__line_count = int(line_count)
>         self.__old_position = int(old_position)   def process(self,inputStream,outputStream):
>     reader = BufferedReader(InputStreamReader(inputStream))
>     try:
>         # Loop indefinitely
>         i = 0
>         tailled =[]
>         while True:
>             # Get the next line
>             line = reader.readLine()
>             # If there is no more content,break out of the loop
>             if line is None:
>                 break
>             else:
>                 i += 1
>             if i > self.__old_position:
>                 tailled.append(line)
>         outputs = '\n'.join(tailled)
>         outputStream.write(bytearray(outputs.encode('utf-8')))
> 
>     finally:
>         if reader is not None:
>             reader.close()
> 
> 
> # end class
> 
> 
> 
> stateManager = context.stateManager
> 
> stateMap = stateManager.getState(Scope.LOCAL) oldMap 
> stateManager.getState(Scope.LOCAL).toMap() flowFile = session.get()
> 
> if flowFile != None:
> 
>     unique_name = flowFile.getAttribute("absolute.path") + flowFile.getAttribute("filename")
>     line_count = flowFile.getAttribute("text.line.count")
>     new_map = {}
>     new_map.update(oldMap)
>     new_map.update({unique_name: line_count})
>     if stateMap.version == -1:
>         stateManager.setState(new_map,Scope.LOCAL)
>         old_position = stateMap.get(unique_name)
>         flowFile = session.putAttribute(flowFile,"old_position",old_position)
>         session.transfer(flowFile,REL_SUCCESS)
>     else:
>         if stateMap.get(unique_name):
>             old_position = stateMap.get(unique_name)
>             flowFile = session.putAttribute(flowFile,old_position)
>             flowFile = session.write(flowFile,PyStreamCallback(old_position,line_count))
>             session.transfer(flowFile,REL_SUCCESS)
>             stateManager.replace(stateMap,new_map,Scope.LOCAL)

它在注册状态(根据路径名:因此是上次读取的old_position)并在BufferedReader中读取FlowFiles(流)的行时起作用。我的FlowFiles可能非常庞大且(XXX MB),因此此方法效率很低。 “输出”变量迅速变大,我的Java Heap(1.5GB)耗尽了一个文件(180MB)

    # Loop indefinitely
    i = 0
    tailled =[]
    while True:
        # Get the next line
        line = reader.readLine()
        # If there is no more content,break out of the loop
        if line is None:
            break
        else:
            i += 1
        if i > self.__old_position:
            tailled.append(line)
    outputs = '\n'.join(tailled)

我知道行数以及在文件中需要从哪里开始(在我的情况下是Stream),您是否有一个最好的方式来获取最后几行? 我愿意从python切换。

我当时打算添加一个处理器来执行读取(ExecuteStreamCommand)。

谢谢您的帮助。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。