package com.shujia.flink.source import org.apache.flink.streaming.api.scala._ object Demo1ListSource { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment /** * 基于本地集合的source ---> 有界流 * */ val ds: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9)) // ds.print() /** * 基于文件构建source ---> 有界流 * */ val lineDS: DataStream[String] = env.readTextFile("data/words.txt") val countDS: DataStream[(String, Int)] = lineDS.flatMap(_.split(",")) .map((_, 1)) .keyBy(_._1) .sum(1) // countDS.print() /** * 基于socket 构建ds ---> 无界流 * */ val socketDS: DataStream[String] = env.socketTextStream("master", 8888) socketDS.print() env.execute() } }
package com.shujia.flink.source import org.apache.flink.streaming.api.functions.source.sourceFunction import org.apache.flink.streaming.api.scala._ object Demo2SourceFunction { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment /** * 自定义source * 实现SourceFunction接口 * 实现里面的run方法----> 在run方法中获取数据发送到下游 * * */ val myDS: DataStream [Int] = env.addSource(new MySourceFunction) myDS.print() env.execute() } } /** * * ParallelSourceFunction : 并行的source, 可以多个线程一个读取数据,需要解决数据重复问题 * RichParallelSourceFunction: 并行的source,多了open和close 方法 * RichSourceFunction :单一的source,多了open和close 方法 * SourceFunction : 单一的source */ class MySourceFunction extends SourceFunction[Int] { /** * 在run 方法中获取数据,将数据发送到下游 * * 可以在run 方法中链接外部的数据源获取数据 * * @param ctx flink source的上下文对象,用于将数据发送到下游 */ override def run(ctx: SourceFunction.sourceContext[Int]): Unit = { var i = 0 while (true) { //将数据发送到下游 ctx.collect(i) i += 1 Thread.sleep(1000)//停顿1秒再发送 } } /** * 任务在取消的时候调用,用于回收资源(关闭jdbc 链接) * */ override def cancel(): Unit = { } }
package com.shujia.flink.source import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet} import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala._ object Demo3MyqlSource { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //使用自定义source val MysqLDS: DataStream[AdminCode] = env.addSource(new MysqLSource) val countDS: DataStream[(String, Int)] = MysqLDS .map(ac => (ac.cituName, 1)) .keyBy(_._1) .sum(1) countDS.print() env.execute() } } case class AdminCode(provName: String, cituName: String, countyName: String) class MysqLSource extends RichSourceFunction[AdminCode] { var con: Connection = _ /** * open 方法; 在run 方法之前执行,只执行一次 * */ override def open(parameters: Configuration): Unit = { //1、加载驱动 Class.forName("com.MysqL.jdbc.Driver") //2、建立链接 con = DriverManager.getConnection("jdbc:MysqL://master:3306/tour", "root", "123456") } /** * open 方法; 在run 方法之后执行,只执行一次 */ override def close(): Unit = { //关闭链接 con.close() } /** * run 方法只会运行一次 * */ override def run(ctx: SourceFunction.sourceContext[AdminCode]): Unit = { //查询数据 val stat: PreparedStatement = con.prepareStatement("select prov_name,city_name,county_name from admin_code") //执行查询 val resultSet: ResultSet = stat.executeQuery() //解析数据 while (resultSet.next()) { val prov_name: String = resultSet.getString("prov_name") val city_name: String = resultSet.getString("city_name") val county_name: String = resultSet.getString("county_name") //发送到下游 ctx.collect(AdminCode(prov_name, city_name, county_name)) } } override def cancel(): Unit = { } }
package com.shujia.flink.source import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer object Demo4kafkaSource { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() properties.setProperty("bootstrap.servers", "master:9092") properties.setProperty("group.id", "test") //创建消费者 val flinkKafkaConsumer = new FlinkKafkaConsumer[String]( "test_topic", new SimpleStringSchema(), properties) flinkKafkaConsumer.setStartFromEarliest() // 尽可能从最早的记录开始 //flinkKafkaConsumer.setStartFromLatest() // 从最新的记录开始 //flinkKafkaConsumer.setStartFromTimestamp(...) // 从指定的时间开始(毫秒) //flinkKafkaConsumer.setStartFromGroupOffsets() // 默认的方法 //使用kafka source val kafkaDS: DataStream[String] = env.addSource(flinkKafkaConsumer) kafkaDS.print() env.execute() } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。