java 版本
package source;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.configuration.Configuration;
import java.util.ArrayList;
public class JavaDataSetSourceApp {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//fromCollection(env);
//readTextFile(env); Recursive
recursive(env);
}
/***
* 读取 递归文件下的文件
* @param env
*/
private static void recursive(ExecutionEnvironment env) throws Exception {
Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
DataSet<String> logs = env.readTextFile("F:\\data\\wordCount1\\Recursive").withParameters(parameters);
logs.print();
}
public static void readCSvFile(ExecutionEnvironment env) throws Exception{
// env.readCsvFile()
}
private static void readTextFile(ExecutionEnvironment env) throws Exception {
DataSource<String> fileSource = env.readTextFile("F:\\data\\wordCount1", "UTF-8");
fileSource.print();
}
/*从集合中创建*/
private static void fromCollection(ExecutionEnvironment env) throws Exception {
ArrayList<Integer> list = new ArrayList<>();
for (int i = 1; i <=10 ; i++) {
list.add(i);
}
DataSource<Integer> dataSource = env.fromCollection(list);
dataSource.print();
}
}
scala 版本
package source
import org.apache.flink.api.scala._
object DataSetDataSourceApp {
//定义case class 读取
case class StudentClass( name:String, age:Int,job:String)
def main(args: Array[String]): Unit = {
val env: ExecutionEnvironment = ExecutionEnvironment
.getExecutionEnvironment
println(env.getParallelism)//8
// fromCollection(env)
// textFile(env)
// readCsv(env)
}
/**
* 读取一个文件 或者一个文件夹
* @param env
*/
def textFile(env: ExecutionEnvironment): Unit = {
val fileSource: DataSet[String] = env.readTextFile("F:\\data\\wordCount1\\", "UTF-8")
fileSource.print()
}
def readCsv(env:ExecutionEnvironment):Unit={
/**
* 数据:
* name,age,job
* tom,15,Developer
* Jim,16,Developer
*/
val filePateh = "F:\\data\\wordCount1\\student.txt";
//ignoreFirstLine = true忽略标表头 同时也可以读取其他文件 指定参数分隔符即可
// 必须指定每个字段的数据类型 可以使用元祖指定 也可以使用POJO指定
val csvSource1 = env.readCsvFile[(String,Integer,String)](filePateh,ignoreFirstLine = true)
//指定字段 读取
// 注意 env.readCsvFile[(String,Integer)] 的类型要和读取的Array(0,1)) 的字段类型匹配 否则报错
env.readCsvFile[(String,Integer)](filePateh,ignoreFirstLine = true,includedFields = Array(0,1)).print()
//指定读取3 个字段 默认读取所有的字段
val csvSource = env.readCsvFile[(String,Integer,String)](filePateh,ignoreFirstLine = true,includedFields = Array(0,1,2)).print()
//样例类 不能定义在方法中
env.readCsvFile[StudentClass](filePateh,ignoreFirstLine = true,includedFields = Array(0,1,2)).print()
//StudentClass(Jim,16,Developer)
//StudentClass(tom,15,Developer)
}
/**
* 通过集合创建source 并行度为1
*
* @param env
*/
private def fromCollection(env: ExecutionEnvironment): Unit = {
val data = 1 to 10;
val value: DataSet[Int] = env.fromCollection(data)
println(value.getParallelism)//1
value.print()
}
}
总结: 有用的部分就是 readCsvFile 和 readTextFile(可以读取压缩文件)
IT_BULL 发布了13 篇原创文章 · 获赞 7 · 访问量 309 私信 关注版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。