微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink的使用方法-FasterXML / jackson-dataformats-text-将CSV转换为POJO

如何解决Flink的使用方法-FasterXML / jackson-dataformats-text-将CSV转换为POJO

我在课堂上收到CSV,我需要获取值才能创建POJO。我不必在目录中打开“ file.csv”,逗号分隔的元素通过Flink传递到EventDeserializationSchema,而该元素用于“事件类”以处理每个事件。

这里有个例子:

IN:“'Adam','Smith',66,....'12:01:00.000'”-> OUT:pojo

为此,我正在使用: https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv

这是我的事件类,应该去做把戏,实际上此刻什么也没做。

import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;

public class Event implements Serializable {

    CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age",CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();
    
    CsvSchema schema = CsvSchema.emptySchema().withHeader();

    CsvSchema bootstrapSchema = CsvSchema.emptySchema().withHeader();
    ObjectMapper mapper = new CsvMapper();
    mapper.readerFor(Pojo.class).with(bootstrapSchema).readValue(??);
    
    return Pojo
}

这是我的Pojo课程:

public class Pojo {
    
        public String firstName;
        public String lastName;
        private int age;
        public String time;

        public Pojo(String firstName,String lastName,int age,String time) {
            this.firstName = firstName;
            this.lastName = lastName;
            this.age = age;
            this.time =time;
            
        }

}

任何帮助让该班级返回Pojo的人都将不胜感激。

这是JSON的示例: https://github.com/apache/flink-playgrounds/blob/master/docker/ops-playground-image/java/flink-playground-clickcountjob/src/main/java/org/apache/flink/playgrounds/ops/clickcount/records/ClickEventDeserializationSchema.java

ClickEvenClass https://github.com/apache/flink/blob/9dd04a25bd300a725486ff08560920f548f3b1d9/flink-end-to-end-tests/flink-streaming-kafka-test-base/src/main/java/org/apache/flink/streaming/kafka/test/base/KafkaEvent.java#L27

解决方法

要使其工作,您需要具有默认的构造函数和字段的getter / setter。我不知道您在Event中将要做什么以及为什么还有一个Pojo,但是假设您想将输入的字符串反序列化为Event,则应该这样工作:

  1. Event Pojo课:
public class Event implements Serializable {
    public String firstName;
    public String lastName;
    private int age;
    public String time;

    public Event() {
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public String getTime() {
        return time;
    }

    public void setTime(String time) {
        this.time = time;
    }
}
  1. EventDeserializationSchema from this question已实现deserialize()
public class EventDeserializationSchema implements DeserializationSchema<Event> {

    private static final long serialVersionUID = 1L;

    private static final CsvSchema schema = CsvSchema.builder()
            .addColumn("firstName")
            .addColumn("lastName")
            .addColumn("age",CsvSchema.ColumnType.NUMBER)
            .addColumn("time")
            .build();

    private static final ObjectMapper mapper = new CsvMapper();

    @Override
    public Event deserialize(byte[] message) throws IOException {
        return mapper.readerFor(Event.class).with(schema).readValue(message);
    }

    @Override
    public boolean isEndOfStream(Event nextElement) {
        return false;
    }

    @Override
    public TypeInformation<Event> getProducedType() {
        return TypeInformation.of(Event.class);
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。