微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink流处理-Task之KafkaSourceDataTask

KafkaSourceDataTask

package pers.aishuang.flink.streaming.task;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import pers.aishuang.flink.streaming.entity.ItcastDataObj;
import pers.aishuang.flink.streaming.utils.JsonParseUtil;

/**

  • 主要完成从Kafka集群读取车辆的json数据并将其转换成ItcastDataObj,并将其

  • 通过errorData过滤出来正常的数据或者错误的数据,将正确的数据保存到HDFS上

  • 和HBase上,将错误的数据保存到HDFS上
    */
    public class KafkaSourceDataTask extends BaseTask {
    public static void main(String[] args) throws Exception{
    //1. 获取当前流执行环境-env
    StreamExecutionEnvironment env = getEnv(KafkaSourceDataTask.class.getSimpleName());

     //2. 获取Kafka中的车辆数据json字符串
     DataStreamSource<String> source = getKafkaStream(
             env,
             "__vehicle_consumer_",
             SimpleStringSchema.class
     );
     //-- 打印输出
     source.printToErr();
    
     //3. 将读取出来的json字符串转换为ItcastDataObj
     SingleOutputStreamOperator<ItcastDataObj> vehicleDataStream = source.map(
             new MapFunction<String, ItcastDataObj>() {
                 @Override
                 public ItcastDataObj map(String line) throws Exception {
                     return JsonParseUtil.parseJsonToObject(line);
                 }
             }
     );
     //-- 另种写法
     DataStream<ItcastDataObj> vehicleDataStream02 = source.map(JsonParseUtil::parseJsonToObject);
     //vehicleDataStream.printToErr();
     vehicleDataStream02.printToErr();
    
     //触发执行
     env.execute();
    

    }
    }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐