微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink实现单词计数并写入MySQL

Flink实现单词计数并写入MysqL

依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.14.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.12</artifactId>
            <version>1.10.0</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

导包:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FlatMapIterator;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.types.Row;
import java.util.Arrays;
import java.util.Iterator;

java代码

public class toMysqL {
    public static void main(String[] args) throws Exception {
        JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername("com.MysqL.cj.jdbc.Driver")
                .setDBUrl("jdbc:MysqL://localhost:3306/test?user=root&password=123456")
                .setQuery("insert into  words (word,count) values (?,?) ")
                //设置为每2条数据就提交一次
                .setBatchInterval(2)
                .finish();

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> data = env.readTextFile("datas/1.txt");

        //将读取的字符串按照空格分割成单个单词
        FlatMapOperator<String, String> data1 = data.flatMap(new FlatMapIterator<String, String>() {
            @Override
            public Iterator<String> flatMap(String s) throws Exception {
                //先把标点符号去除
                String s1 = s.replace("?", "");
                String s2 = s1.replace(".", "");
                String s3 = s2.replace(",", "");
                return Arrays.asList(s3.split(" ")).iterator();
            }
        });

        MapOperator<String, Tuple2<String, Integer>> data2 = data1.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                return new Tuple2<>(s, 1);
            }
        });

        //将Tuple对象,按第一个元素进行分区,再将第二个元素进行累加
        AggregateOperator<Tuple2<String, Integer>> data3 = data2.groupBy(0).sum(1);

        MapOperator<Tuple2<String, Integer>, Row> data4 = data3.map(new MapFunction<Tuple2<String, Integer>, Row>() {
            @Override
            public Row map(Tuple2<String, Integer> ss) throws Exception {
                Row row = new Row(2);
                row.setField(0, ss.f0);
                row.setField(1, ss.f1);
                return row;
            }
        });

        data4.print();
        data4.output(jdbcOutput);
        env.execute();
    }
}

结果:

在这里插入图片描述

在这里插入图片描述

ps注意:
由于数据库字段名不区分大小写,因此不要把word设置为主键。因为存在大小写不同的单词,会发生主键冲突。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐