微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

java spark list 转为 RDD 转为 dataset 写入表中

package com.example.demo;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.sqlContext;
import org.apache.spark.sql.SparkSession;

public class DemoApplication {

	public static void main(String[] args) {
		
		
//		/*-----------------------线上调用方式--------------------------*/
		// 读入店铺id数据
		SparkSession spark = SparkSession.builder().appName("demo_spark").enableHiveSupport().getorCreate();
		Dataset<Row> vender_set = spark.sql("select pop_vender_id from app.app_sjzt_payout_apply_with_order where dt = '2019-08-05' and pop_vender_id is not null");
		System.out.println( "数据读取 OK" );
		
		
		JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
//		JavaSparkContext sc = new JavaSparkContext();
		sqlContext sqlContext = new sqlContext(sc);

		// 将数据去重,转换成 List<Row> 格式
		vender_set =  vender_set.distinct();
		vender_set = vender_set.na().fill(0L);
		JavaRDD<Row> vender= vender_set.toJavaRDD();
		List<Row> vender_list = vender.collect();
		

		// 遍历商家id,调用jsf接口,创建list 保存返回数据
		List<String> list_temp = new ArrayList<String>();
		for(Row row:vender_list) {
			String id = row.getString(0);
			String result = service.venderDownAmountList(id);
			
			System.out.println( "接口调用返回值 OK" );
			
			// 解析json串 ,按照JSONObject 和 JSONArray 一层一层解析 并过返回滤数据
			JSONObject jsonOBJ = JSON.parSEObject(result);
			JSONArray data = jsonOBJ.getJSONArray("data");
			if (data != null) {
				JSONObject data_all = data.getJSONObject(0);
				double amount = data_all.getDouble("jfDownAmount");
				// 将商家id 和 倒挂金额存下来
				list_temp.add("{\"vender_id\":"+id+",\"amount\":"+amount+"}");
			}
			else {
				continue;
			}
			
			System.out.println( "解析 OK" );
			
		}
		// list 转为 RDD 
		JavaRDD<String> venderRDD = sc.parallelize(list_temp);
		
		// 注册成表
		Dataset<Row> vender_table = sqlContext.read().json(venderRDD);
		vender_table.registerTempTable("vender");
		System.out.println( "注册表 OK" );
		
		// 写入数据库
		spark.sql("insert overwrite table dev.dev_jypt_vender_dropaway_amount select vender.vender_id,vender.amount from vender");
		System.out.println( "写入数据表 OK" );

		sc.stop();		
		System.out.println( "Hello World!" );
		
	}
}

  

 

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐