微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

将实时产生的数据流式写入到hdfs中

Flink将实时产生的数据流式写入到hdfs中
import cn.itcast.day03.source.custom.MyNoParallelSource;
import cn.itcast.day03.source.custom.Order;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.sql.Time;
import java.util.concurrent.TimeUnit;
/**
 * 将实时产生的数据流式写入到hdfs中
 */
public class StreamHDFSDemo {
    public static void main(String[] args) throws Exception {
        /**
         * 实现步骤:
         * 1)创建flink流处理的运行环境
         * 2)构建数据源(自定义数据源)
         * 3)开启checkpoint
         * 4)设置一个并行度写入数据
         * 5)数据实时写入到hdfs(指定分桶策略和滚动策略)
         * 6)递交作业执行
         */
        //TODO 1)创建flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //TODO 2)构建数据源(自定义数据源)
        DataStreamSource<Order> orderDataStreamSource = env.addSource(new MyNoParallelSource());

        //TODO 3)开启checkpoint
        env.enableCheckpointing(5000);//每隔五秒钟进行一次checkpoint(向所有的算子发送栅栏barrier,先想source算子发。。。),周期性执行

        //TODO 4)设置一个并行度写入数据
        env.setParallelism(1);

        //TODO 5)数据实时写入到hdfs(指定分桶策略和滚动策略)
        //指定写入数据的路径
        String outputPath = "hdfs://node1:8020/test/output/streamingfile";
        //定义写入文件的名称和格式
        OutputFileConfig config = OutputFileConfig.builder()
                .withPartPrefix("order")
                .withPartSuffix(".txt")
                .build();

        //指定行编码,一次写入一行数据
        StreamingFileSink streamingFileSink = StreamingFileSink.forRowFormat(new Path(outputPath), new SimpleStringEncoder<>("utf-8"))
                /**
                 * 指定分桶策略:
                 * DateTimeBucketAssigner:默认的分桶策略,默认基于时间的分配器,每小时产生一个桶,指定时间格式:yyyy-MM-dd-HH
                 * BasePathBucketAssigner:把所有的文件放到一个基本路径下的分配器(全局桶)
                 */
                .withBucketAssigner(new DateTimeBucketAssigner<>())
                /**
                 * 指定滚动策略:
                 * DefaultRollingPolicy
                 * CheckpointRollingPolicy
                 * OnCheckpointRollingPolicy
                 */
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.SECONDS.toMillis(5)) //设置滚动时间间隔,每5秒钟产生一个文件
                                .withInactivityInterval(TimeUnit.SECONDS.toMillis(2)) //设置不活动的时间间隔,未写入数据处于不活动状态时滚动文件
                                .withMaxPartSize(1024*1024*1024)//设置文件大小,达到一个1G大小时滚动文件
                                .build()
                ).withOutputFileConfig(config).build();

        orderDataStreamSource.addSink(streamingFileSink);

        //TODO 6)递交作业执行
        env.execute();
    }
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐