一、DataStream Wordcount
基于scala实现
maven依赖如下:
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- flink的hadoop兼容 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop2</artifactId>
<version>1.7.2</version>
</dependency>
<!-- flink的hadoop兼容 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink的scala的api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink streaming的scala的api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink的java的api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink streaming的java的api -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink 的kafkaconnector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 使用rocksdb保存flink的state -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink操作hbase -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hbase_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink运行时的webUI -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink table -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- MysqL连接驱动 -->
<dependency>
<groupId>MysqL</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.35</version>
</dependency>
</dependencies>
具体代码如下:
import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.util.Collector object SocketWordCount { def main(args: Array[String]): Unit = { val logPath: String = "/tmp/logs/flink_log" // 生成配置对象 var conf: Configuration = new Configuration() // 开启flink web UI conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) // 配置web UI的日志文件,否则打印日志到控制台 conf.setString("web.log.path", logPath) // 配置taskManager的日志文件,否则打印到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath) // 获取local运行环境 val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) // 定义socket 源 val socket: DataStream[String] = env.socketTextStream("localhost", 6666) //scala开发需要加一行隐式转换,否则在调用operator的时候会报错 import org.apache.flink.api.scala._ // 定义 operators 解析数据,求Wordcount val wordCount: DataStream[(String, Int)] = socket.flatMap(_.split(" ")).map((_, 1)).keyBy(_._1).sum(1) //使用FlatMapFunction自定义函数来完成flatMap和map的组合功能 val wordCount2: DataStream[(String, Int)] = socket.flatMap(new FlatMapFunction[String, (String, Int)] { override def flatMap(int: String, out: Collector[(String, Int)]): Unit = { val strings: Array[String] = int.split(" ") for (str <- strings) { out.collect((str, 1)) } } }).setParallelism(2) .keyBy(_._1).sum(1).setParallelism(2) // 打印结果 wordCount.print() // 定义任务的名称并运行,operator是惰性的,只有遇到execute才运行 env.execute("SocketWordCount") } }
二、flink table & sql Wordcount
import org.apache.flink.api.scala.{ExecutionEnvironment, _} import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.table.api.Table import org.apache.flink.table.api.scala.BatchTableEnvironment import scala.collection.mutable.ArrayBuffer /** * @author xiandongxie */ object WordCountsql extends App { val logPath: String = "/tmp/logs/flink_log" // 生成配置对象 var conf: Configuration = new Configuration() // 开启flink web UI conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true) // 配置web UI的日志文件,否则打印日志到控制台 conf.setString("web.log.path", logPath) // 配置taskManager的日志文件,否则打印到控制台 conf.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logPath) // 获取local运行环境 val env: ExecutionEnvironment = ExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) //创建一个tableEnvironment val tableEnv: BatchTableEnvironment = BatchTableEnvironment.create(env) val words: String = "hello flink hello xxd" val strings: Array[String] = words.split("\\W+") val arrayBuffer = new ArrayBuffer[WordCount]() for (f <- strings) { arrayBuffer.append(new WordCount(f, 1)) } val dataSet: DataSet[WordCount] = env.fromCollection(arrayBuffer) //DataSet 转sql val table: Table = tableEnv.fromDataSet(dataSet) table.printSchema() // 注册为一个表 tableEnv.createTemporaryView("WordCount", table) // 查询 val selectTable: Table = tableEnv.sqlQuery("select word as word, sum(frequency) as frequency from WordCount GROUP BY word") // 查询结果转为dataset,输出 val value: DataSet[WordCount] = tableEnv.toDataSet[WordCount](selectTable) value.print() } /** * 样例类 * @param word * @param frequency */ case class WordCount(word: String, frequency: Long) { override def toString: String = { word + "\t" + frequency } }
结果:
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。