微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Nifi 自定义处理器如何在内容或属性中写入结果

如何解决Nifi 自定义处理器如何在内容或属性中写入结果

我编写了一个简单的自定义处理器,将两个数字相加然后显示结果。 但我不知道如何在 Flowfile 内容属性显示结果。 在输入值 1 和输入值 2 属性添加值后,我运行处理器,Flowfile 为空。

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="",description="")})
@WritesAttributes({@WritesAttribute(attribute="",description="")})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor NUMBER_1 = new PropertyDescriptor
            .Builder().name("Input Value 1")
            .displayName("Input Value 1")
            .description("Enter the input value 1 to perform addition operation")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();
    public static final PropertyDescriptor NUMBER_2 = new PropertyDescriptor
            .Builder().name("Input Value 2")
            .displayName("Input Value 2")
            .description("Enter the input value 2 to perform addition operation")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("Success")
            .description("All created FlowFiles are routed to this relationship")
            .build();


    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(NUMBER_1);
        descriptors.add(NUMBER_2);
        this.descriptors = Collections.unmodifiableList(descriptors);

        final Set<Relationship> relationships = new HashSet<Relationship>();
        relationships.add(REL_SUCCESS);
        this.relationships = Collections.unmodifiableSet(relationships);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context,final ProcessSession session) throws ProcessException {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) {
            return;
        }
        int num1 = Integer.parseInt(context.getProperty(NUMBER_1).getValue());
        int num2 = Integer.parseInt(context.getProperty(NUMBER_2).getValue());

        final String output = String.valueOf(num1 + num2);

        flowFile =session.write(flowFile,new StreamCallback() {
            @Override
            public void process(InputStream in,OutputStream out) throws IOException {
                out.write(Integer.parseInt(output));
//                IoUtils.write(output,out); // writes the result to the flowfile.
            }
        });

        session.transfer(flowFile,REL_SUCCESS);
        // Transfer the output flowfile to success relationship.
    }

}

如何在流文件内容或流文件属性显示结果?

解决方法

要创建属性,请尝试这样的操作,

        final AtomicInteger line_count_from_file = new AtomicInteger(0);
        final AtomicInteger field_count_in_header = new AtomicInteger(0);
        final AtomicInteger valid_line_count = new AtomicInteger(0);
        final AtomicInteger lineNo = new AtomicInteger(0);            
        final AtomicReference<String> corrupt_line_nos = new AtomicReference<String>(initialReference);

final AtomicReference<String> flowfileContent= new AtomicReference<>();

        corrupt_line_nos.set("Attribute Value");
       String result = "Sample flowfile content"
                flowfileContent.set(result);


       Map<String,String> metricAttributes = new HashMap<>();
    
                metricAttributes.put("line.count.from.file",String.valueOf(line_count_from_file.get()));
                metricAttributes.put("field.count.in.header",String.valueOf(field_count_in_header.get()));
                metricAttributes.put("valid.line.count",String.valueOf(valid_line_count.get()));
                metricAttributes.put("corrupt.line.nos",String.valueOf(corrupt_line_nos.get()));
    
    flowfile= session.putAllAttributes(flowFile,metricAttributes);


    flowfile= session.write(flowfile,new OutputStreamCallback() {

        @Override
        public void process(OutputStream out) throws IOException {
            out.write(flowfileContent.get().getBytes());
        }
    });
                
                session.transfer(flowfile,REL_SUCCESS);

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。