微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink流处理-Task之TripDriveTask

Task之TripDriveTask

package pers.aishuang.flink.streaming.task;

import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Typeinformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.OutputTag;
import pers.aishuang.flink.streaming.entity.ItcastDataObj;
import pers.aishuang.flink.streaming.entity.TripModel;
import pers.aishuang.flink.streaming.function.window.DriveSampleWindowFunction;
import pers.aishuang.flink.streaming.function.window.DriveTripwindowFunction;
import pers.aishuang.flink.streaming.sink.hbase.TripDrivetoHBaseSink;
import pers.aishuang.flink.streaming.sink.hbase.TripSampletoHBaseSink;
import pers.aishuang.flink.streaming.utils.JsonParseUtil;

/**

  • 驾驶行程采样分析 驾驶行程分析

  • 开发步骤:

  • 1、创建流执行环境

  • 2、获取Kafka中的数据

  • 3、将json字符串解析成车辆数据对象

  • 4、过滤出正确的数据并且是行程数据 chargeStatus=2 或者 chargeStatus=3

  • 0x01:停车充电 0x02:行驶充电 0x03:未充电状态 0x04:充电完成 0xFE:异常 0xFF:无效

  • 5、分配水印机制,设置最大延迟时间 30s

  • 6、超出3分钟的数据,保存到侧输出流,分析一下数据为什么会延迟

  • 7、对车辆数据进行分组,创建会话窗口

  • 8、数据的采样分析

  • -- 应用窗口,数据的采样分析

  • -- 将分析的采样数据封装成数组,并将其保存到HBase中

  • 9、数据的行程分析

  • -- 应用窗口数据,分析低速、中速、高速车辆的soc、行驶里程、油耗、速度、速度切换的次数等数据封装成对象

  • -- 将这个对象保存到HBase中

  • 10、执行流环境任务
    */
    public class TripDriveTask extends BaseTask{
    public static void main(String[] args) {
    //1、创建流执行环境(已设置好checkpoint、重启策略)
    StreamExecutionEnvironment env = getEnv(TripDriveTask.class.getSimpleName());
    //2、获取Kafka中的数据
    DataStreamSource kafkaStream = getKafkaStream(env, "_tripDrive_consumer", SimpleStringSchema.class);
    //3、将json字符串解析成车辆数据对象
    DataStream tripDriveStream = kafkaStream.map(JsonParseUtil::parseJsonToObject)
    //4、 过滤出正确的数据并且是行程数据 chargeStatus=2或者chargeStatus=3
    .filter(obj -> StringUtils.isEmpty(obj.getErrorData()))
    .filter(obj -> (obj.getChargeStatus()2) || (obj.getChargeStatus()3));
    //5、分配水印机制,设置最大延迟时间30s
    SingleOutputStreamOperator itcastDataObjWatermark = tripDriveStream
    //分配水印机制,并指定事件时间字段
    .assignTimestampsAndWatermarks(
    new BoundedOutOfOrdernessTimestampExtractor(Time.seconds(30)) {
    @Override
    public long extractTimestamp(ItcastDataObj element) {
    return element.getTerminalTimeStamp();
    }
    }
    );
    //6、超过3分钟的数据,保存到侧输出流,分析一下数据为什么会延迟
    OutputTag maxLatestData = new OutputTag<>("maxLatestData", Typeinformation.of(ItcastDataObj.class));
    //7、对车辆数据进行分组,创建会话窗口。
    WindowedStream<ItcastDataObj, String, TimeWindow> itcastDataObjWindowStream = itcastDataObjWatermark
    //指定分组字段
    .keyBy(obj -> obj.getVin())
    //指定窗口类型为会话窗口,时间间隔是15min
    .window(EventTimeSessionWindows.withGap(Time.minutes(15L)))
    //允许延迟时间
    .allowedLateness(Time.minutes(3L))
    //侧边流输出延迟数据
    .sideOutputLateData(maxLatestData);
    //8、数据的采样分析
    //-- 应用窗口,数据的采样分析
    SingleOutputStreamOperator<String[]> sampleTripDriveStream = itcastDataObjWindowStream
    .apply(new DriveSampleWindowFunction());
    //-- 将分析的采样数据封装成数组,并将其保存到HBase中
    sampleTripDriveStream.addSink(new TripSampletoHBaseSink("TRipdb:trip_sample"));
    //9、数据的行程分析
    //-- 应用窗口数据,分析低速、中速、高速车辆的soc、行驶里程、油耗、速度、速度切换的次数等数据封装成对象
    SingleOutputStreamOperator tripModelStream = itcastDataObjWindowStream
    .apply(new DriveTripwindowFunction());
    //-- 将这个对象保存到Hbase中
    tripModelStream.addSink(new TripDrivetoHBaseSink("TRipdb:trip_division"));
    //10、执行流环境任务
    try {
    env.execute();
    } catch (Exception e) {
    e.printstacktrace();
    }

    }
    }

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐