微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

flink source

package com.shujia.flink.source

import org.apache.flink.streaming.api.scala._

object Demo1ListSource {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    /**
      * 基于本地集合的source   ---> 有界流
      *
      */

    val ds: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6, 7, 8, 9))

//    ds.print()

    /**
      * 基于文件构建source  --->  有界流
      *
      */


    val lineDS: DataStream[String] = env.readTextFile("data/words.txt")

    val countDS: DataStream[(String, Int)] = lineDS.flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)

//    countDS.print()

    /**
      * 基于socket 构建ds   ---> 无界流
      *
      */

    val socketDS: DataStream[String] = env.socketTextStream("master", 8888)

    socketDS.print()


    env.execute()

  }
}
package com.shujia.flink.source

import org.apache.flink.streaming.api.functions.source.sourceFunction
import org.apache.flink.streaming.api.scala._

object Demo2SourceFunction {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    /**
      * 自定义source
      * 实现SourceFunction接口
      * 实现里面的run方法----> 在run方法获取数据发送到下游
      *
      *
      */
    val myDS: DataStream [Int] = env.addSource(new MySourceFunction)

    myDS.print()

    env.execute()

  }
}
/**
  *
  * ParallelSourceFunction   : 并行的source, 可以多个线程一个读取数据,需要解决数据重复问题
  * RichParallelSourceFunction: 并行的source,多了open和close 方法
  * RichSourceFunction :单一的source,多了open和close 方法
  * SourceFunction : 单一的source
  */

class MySourceFunction extends SourceFunction[Int] {
  /**
    * 在run 方法获取数据,将数据发送到下游
    *
    * 可以在run  方法链接外部的数据源获取数据
    *
    * @param ctx flink source的上下文对象,用于将数据发送到下游
    */
  override def run(ctx: SourceFunction.sourceContext[Int]): Unit = {

    var i = 0
    while (true) {
      //将数据发送到下游
      ctx.collect(i)

      i += 1
      Thread.sleep(1000)//停顿1秒再发送

    }
  }

  /**
    * 任务在取消的时候调用,用于回收资源(关闭jdbc 链接)
    *
    */
  override def cancel(): Unit = {

  }
}
package com.shujia.flink.source

import java.sql.{Connection, DriverManager, PreparedStatement, ResultSet}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._

object Demo3MyqlSource {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment


    //使用自定义source
    val MysqLDS: DataStream[AdminCode] = env.addSource(new MysqLSource)

    val countDS: DataStream[(String, Int)] = MysqLDS
      .map(ac => (ac.cituName, 1))
      .keyBy(_._1)
      .sum(1)

    countDS.print()

    env.execute()
  }

}

case class AdminCode(provName: String, cituName: String, countyName: String)


class MysqLSource extends RichSourceFunction[AdminCode] {


  var con: Connection = _

  /**
    * open 方法; 在run 方法之前执行,只执行一次
    *
    */
  override def open(parameters: Configuration): Unit = {
    //1、加载驱动
    Class.forName("com.MysqL.jdbc.Driver")
    //2、建立链接
    con = DriverManager.getConnection("jdbc:MysqL://master:3306/tour", "root", "123456")

  }

  /**
    * open 方法; 在run 方法后执行,只执行一次
    */
  override def close(): Unit = {

    //关闭链接
    con.close()
  }

  /**
    * run 方法只会运行一次
    *
    */
  override def run(ctx: SourceFunction.sourceContext[AdminCode]): Unit = {

    //查询数据
    val stat: PreparedStatement = con.prepareStatement("select prov_name,city_name,county_name from admin_code")

    //执行查询
    val resultSet: ResultSet = stat.executeQuery()

    //解析数据
    while (resultSet.next()) {
      val prov_name: String = resultSet.getString("prov_name")
      val city_name: String = resultSet.getString("city_name")
      val county_name: String = resultSet.getString("county_name")

      //发送到下游
      ctx.collect(AdminCode(prov_name, city_name, county_name))
    }

  }

  override def cancel(): Unit = {

  }
}
package com.shujia.flink.source

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

object Demo4kafkaSource {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "master:9092")
    properties.setProperty("group.id", "test")

    //创建消费者
    val flinkKafkaConsumer = new FlinkKafkaConsumer[String](
      "test_topic",
      new SimpleStringSchema(),
      properties)

    flinkKafkaConsumer.setStartFromEarliest() // 尽可能从最早的记录开始
    //flinkKafkaConsumer.setStartFromLatest()        // 从最新的记录开始
    //flinkKafkaConsumer.setStartFromTimestamp(...)  // 从指定的时间开始(毫秒)
    //flinkKafkaConsumer.setStartFromGroupOffsets()  // 认的方法

    //使用kafka source
    val kafkaDS: DataStream[String] = env.addSource(flinkKafkaConsumer)

    kafkaDS.print()

    env.execute()


  }

}

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐