微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

Flink之Source创建的几种方式

java 版本

package source;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;

import java.util.ArrayList;

public class JavaDataSetSourceApp {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //fromCollection(env);

        //readTextFile(env); Recursive


        recursive(env);

    }

    /***
     * 读取 递归文件下的文件
     * @param env
     */
    private static void recursive(ExecutionEnvironment env) throws Exception {
        Configuration parameters = new Configuration();
        parameters.setBoolean("recursive.file.enumeration", true);
        DataSet<String> logs = env.readTextFile("F:\\data\\wordCount1\\Recursive").withParameters(parameters);
        logs.print();
    }

    public static void readCSvFile(ExecutionEnvironment env) throws Exception{
//        env.readCsvFile()

    }

    private static void readTextFile(ExecutionEnvironment env) throws Exception {
        DataSource<String> fileSource = env.readTextFile("F:\\data\\wordCount1", "UTF-8");
        fileSource.print();
    }

    /*从集合中创建*/
    private static void fromCollection(ExecutionEnvironment env) throws Exception {
        ArrayList<Integer> list = new ArrayList<>();
        for (int i = 1; i <=10 ; i++) {
            list.add(i);
        }
        DataSource<Integer> dataSource = env.fromCollection(list);
        dataSource.print();

    }

}

scala 版本

package source

import org.apache.flink.api.scala._

object DataSetDataSourceApp {

  //定义case class 读取
  case class StudentClass( name:String, age:Int,job:String)

  def main(args: Array[String]): Unit = {
    val env: ExecutionEnvironment = ExecutionEnvironment
      .getExecutionEnvironment
    println(env.getParallelism)//8

//    fromCollection(env)

//    textFile(env)
//    readCsv(env)


  }


  /**
    * 读取一个文件 或者一个文件夹
    * @param env
    */
   def textFile(env: ExecutionEnvironment): Unit = {
    val fileSource: DataSet[String] = env.readTextFile("F:\\data\\wordCount1\\", "UTF-8")

    fileSource.print()

  }

  def readCsv(env:ExecutionEnvironment):Unit={
    /**
      * 数据:
      * name,age,job
      * tom,15,Developer
      * Jim,16,Developer
      */

    val filePateh = "F:\\data\\wordCount1\\student.txt";

    //ignoreFirstLine = true忽略标表头 同时也可以读取其他文件 指定参数分隔符即可
    // 必须指定每个字段的数据类型 可以使用元祖指定 也可以使用POJO指定
    val csvSource1 = env.readCsvFile[(String,Integer,String)](filePateh,ignoreFirstLine = true)



    //指定字段 读取
    // 注意  env.readCsvFile[(String,Integer)] 的类型要和读取的Array(0,1)) 的字段类型匹配 否则报错
    env.readCsvFile[(String,Integer)](filePateh,ignoreFirstLine = true,includedFields = Array(0,1)).print()

    //指定读取3 个字段 认读取所有的字段
    val csvSource = env.readCsvFile[(String,Integer,String)](filePateh,ignoreFirstLine = true,includedFields = Array(0,1,2)).print()

    //样例类 不能定义在方法中
    env.readCsvFile[StudentClass](filePateh,ignoreFirstLine = true,includedFields = Array(0,1,2)).print()
    //StudentClass(Jim,16,Developer)
    //StudentClass(tom,15,Developer)
  }

  /**
    * 通过集合创建source 并行度为1
 *
    * @param env
    */
  private def fromCollection(env: ExecutionEnvironment): Unit = {
    val data = 1 to 10;
    val value: DataSet[Int] = env.fromCollection(data)

    println(value.getParallelism)//1
    value.print()
  }
}

总结: 有用的部分就是 readCsvFile 和 readTextFile(可以读取压缩文件

IT_BULL 发布了13 篇原创文章 · 获赞 7 · 访问量 309 私信 关注

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐