package com.example.demo; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.sqlContext; import org.apache.spark.sql.SparkSession; public class DemoApplication { public static void main(String[] args) { // /*-----------------------线上调用方式--------------------------*/ // 读入店铺id数据 SparkSession spark = SparkSession.builder().appName("demo_spark").enableHiveSupport().getorCreate(); Dataset<Row> vender_set = spark.sql("select pop_vender_id from app.app_sjzt_payout_apply_with_order where dt = '2019-08-05' and pop_vender_id is not null"); System.out.println( "数据读取 OK" ); JavaSparkContext sc = new JavaSparkContext(spark.sparkContext()); // JavaSparkContext sc = new JavaSparkContext(); sqlContext sqlContext = new sqlContext(sc); // 将数据去重,转换成 List<Row> 格式 vender_set = vender_set.distinct(); vender_set = vender_set.na().fill(0L); JavaRDD<Row> vender= vender_set.toJavaRDD(); List<Row> vender_list = vender.collect(); // 遍历商家id,调用jsf接口,创建list 保存返回数据 List<String> list_temp = new ArrayList<String>(); for(Row row:vender_list) { String id = row.getString(0); String result = service.venderDownAmountList(id); System.out.println( "接口调用返回值 OK" ); // 解析json串 ,按照JSONObject 和 JSONArray 一层一层解析 并过返回滤数据 JSONObject jsonOBJ = JSON.parSEObject(result); JSONArray data = jsonOBJ.getJSONArray("data"); if (data != null) { JSONObject data_all = data.getJSONObject(0); double amount = data_all.getDouble("jfDownAmount"); // 将商家id 和 倒挂金额存下来 list_temp.add("{\"vender_id\":"+id+",\"amount\":"+amount+"}"); } else { continue; } System.out.println( "解析 OK" ); } // list 转为 RDD JavaRDD<String> venderRDD = sc.parallelize(list_temp); // 注册成表 Dataset<Row> vender_table = sqlContext.read().json(venderRDD); vender_table.registerTempTable("vender"); System.out.println( "注册表 OK" ); // 写入数据库 spark.sql("insert overwrite table dev.dev_jypt_vender_dropaway_amount select vender.vender_id,vender.amount from vender"); System.out.println( "写入数据表 OK" ); sc.stop(); System.out.println( "Hello World!" ); } }
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。