如何解决Flink的使用方法-FasterXML / jackson-dataformats-text-将CSV转换为POJO
我在课堂上收到CSV,我需要获取值才能创建POJO。我不必在目录中打开“ file.csv”,逗号分隔的元素通过Flink传递到EventDeserializationSchema,而该元素用于“事件类”以处理每个事件。
这里有个例子:
IN:“'Adam','Smith',66,....'12:01:00.000'”-> OUT:pojo
为此,我正在使用: https://github.com/FasterXML/jackson-dataformats-text/tree/master/csv
这是我的事件类,应该去做把戏,实际上此刻什么也没做。
import java.io.Serializable;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectReader;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.dataformat.csv.CsvSchema;
public class Event implements Serializable {
CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age",CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
CsvSchema schema = CsvSchema.emptySchema().withHeader();
CsvSchema bootstrapSchema = CsvSchema.emptySchema().withHeader();
ObjectMapper mapper = new CsvMapper();
mapper.readerFor(Pojo.class).with(bootstrapSchema).readValue(??);
return Pojo
}
这是我的Pojo课程:
public class Pojo {
public String firstName;
public String lastName;
private int age;
public String time;
public Pojo(String firstName,String lastName,int age,String time) {
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
this.time =time;
}
}
任何帮助让该班级返回Pojo的人都将不胜感激。
解决方法
要使其工作,您需要具有默认的构造函数和字段的getter / setter。我不知道您在Event
中将要做什么以及为什么还有一个Pojo
,但是假设您想将输入的字符串反序列化为Event
,则应该这样工作:
-
Event
Pojo课:
public class Event implements Serializable {
public String firstName;
public String lastName;
private int age;
public String time;
public Event() {
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getTime() {
return time;
}
public void setTime(String time) {
this.time = time;
}
}
- EventDeserializationSchema from this question已实现
deserialize()
public class EventDeserializationSchema implements DeserializationSchema<Event> {
private static final long serialVersionUID = 1L;
private static final CsvSchema schema = CsvSchema.builder()
.addColumn("firstName")
.addColumn("lastName")
.addColumn("age",CsvSchema.ColumnType.NUMBER)
.addColumn("time")
.build();
private static final ObjectMapper mapper = new CsvMapper();
@Override
public Event deserialize(byte[] message) throws IOException {
return mapper.readerFor(Event.class).with(schema).readValue(message);
}
@Override
public boolean isEndOfStream(Event nextElement) {
return false;
}
@Override
public TypeInformation<Event> getProducedType() {
return TypeInformation.of(Event.class);
}
}
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。