微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

第 15 节 DataStream之sourcescala语言

上篇:第 14 节  DataStream之sink(java)


1、自定义sink

  1. 实现自定义的sink
    实现SinkFunction接口
    或者继承RichSinkFunction
  2. 参考org.apache.flink.streaming.connectors.redis.RedisSink

2、简单scala入门测试:

把定义每个数都累加1

具体代码实现:

package xuwei.streaming

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object StreamingFromCollectionScala {
  def main(args: Array[String]): Unit = {
    //获取flink的运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //添加隐式转换
    import org.apache.flink.api.scala._

    val data=List(10,15,20)

    val text = env.fromCollection(data)

    //针对map接收到的数据执行加1的操作
    val num = text.map(_+1)

   num.print().setParallelism(1)

    env.execute("StreamingFromCollectionScala")

  }
}

控制台打印信息:

在这里插入图片描述


3、创建自定义并行度为1的source

实现从1开始产生递增数字

有3种方式:

方式一:
继承SourceFunction泛型为long类型

具体代码实现:

MynoparallelSource .scala

package xuwei.streaming

import org.apache.flink.streaming.api.functions.source.sourceFunction
import org.apache.flink.streaming.api.functions.source.sourceFunction.sourceContext

/**
 * 创建自定义并行度为1的source
 *
 * 实现从1开始产生递增数字
 */
class MynoparallelSource extends SourceFunction[Long]{
  var count=1L
  var isRunning=true

  override def run(ctx: SourceContext[Long]) = {
      while(isRunning){
        ctx.collect(count)
        count+=1
        Thread.sleep(1000)
      }
  }

  override def cancel()= {
    isRunning = false
  }
}

StreamingDemoWithMynoparallelSource.scala

package xuwei.streaming
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object StreamingDemoWithMynoparallelSource {
  def main(args: Array[String]): Unit = {
    //获取flink的运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //隐式转换
    import org.apache.flink.api.scala._

    val text = env.addSource(new MynoparallelSource)

    val mapData = text.map(line => {
      println("接收到的数据:" + line)
      line
    })

    val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)


    sum.print().setParallelism(1)

    env.execute("StreamingFromCollectionScala")

  }
}

控制台打印信息(不断打印下去):

在这里插入图片描述

方式二:
继承ParallelSourceFunction泛型为long类型
(有重复的并行)

具体代码实现:

MyParallelSourceScala.scala

package xuwei.streaming

import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.sourceFunction.sourceContext

/**
 * 创建自定义并行度为1的source
 *
 * 实现从1开始产生递增数字
 */
class MyParallelSourceScala extends ParallelSourceFunction[Long]{
  var count=1L
  var isRunning=true

  override def run(ctx: SourceContext[Long]) = {
      while(isRunning){
        ctx.collect(count)
        count+=1
        Thread.sleep(1000)
      }
  }

  override def cancel()= {
    isRunning = false
  }
}

StreamingDemoWithMyParallelSourceScala.scala

package xuwei.streaming

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object StreamingDemoWithMyParallelSourceScala {
  def main(args: Array[String]): Unit = {
    //获取flink的运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //隐式转换
    import org.apache.flink.api.scala._

    val text = env.addSource(new MyParallelSourceScala).setParallelism(2)

    val mapData = text.map(line => {
      println("接收到的数据:" + line)
      line
    })

    val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)


    sum.print().setParallelism(1)

    env.execute("StreamingFromCollectionScala")

  }
}

控制台打印信息(不断打印下去):
发现2个并行度,组合在一起

在这里插入图片描述

方式三:

  1. 继承RichParallelSourceFunction泛型为long类型
  2. 重写open方法

具体代码实现:

MyRichParallelSourceScala.scala\

package xuwei.streaming

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.sourceFunction.sourceContext

/**
 * 创建自定义并行度为1的source
 *
 * 实现从1开始产生递增数字
 */
class MyRichParallelSourceScala extends RichParallelSourceFunction[Long]{
  var count=1L
  var isRunning=true

  override def run(ctx: SourceContext[Long]) = {
      while(isRunning){
        ctx.collect(count)
        count+=1
        Thread.sleep(1000)
      }
  }

  override def cancel()= {
    isRunning = false
  }

  //open方法
  override def open(parameters: Configuration): Unit = super.open(parameters)

  //close方法
  override def close(): Unit = super.close()
}

StreamingDemoWithMyRichParallelSourceScala.scala

package xuwei.streaming

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.time.Time

object StreamingDemoWithMyRichParallelSourceScala {
  def main(args: Array[String]): Unit = {
    //获取flink的运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //隐式转换
    import org.apache.flink.api.scala._

    val text = env.addSource(new MyRichParallelSourceScala).setParallelism(2)

    val mapData = text.map(line => {
      println("接收到的数据:" + line)
      line
    })

    val sum = mapData.timeWindowAll(Time.seconds(2)).sum(0)


    sum.print().setParallelism(1)

    env.execute("StreamingFromCollectionScala")

  }
}

控制台打印信息(不断打印下去):
发现2个并行度,也是组合在一起

在这里插入图片描述

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 [email protected] 举报,一经查实,本站将立刻删除。

相关推荐