微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

FlatMap 从列值到多行缺失模式

如何解决FlatMap 从列值到多行缺失模式

我正在尝试根据列值将以下数据框转换为多行。我相信 Row 缺少架构(第 128 行)并引发异常。

原始数据框

+---+------------------------+
|Id |Set                     |
+---+------------------------+
|1  |AA001-AA003,BB002-BB003|
|2  |AA045-AA046,CC099-CC100|
+---+------------------------+

用于澄清目的的中间数据框步骤

+---+-----------+
| Id|        Set|
+---+-----------+
|  1|AA001-AA003|
|  1|BB002-BB003|
|  2|AA045-AA046|
|  2|CC099-CC100|
+---+-----------+

最终数据框

+---+-------+------+------+
| Id|Combine|Letter|Number|
+---+-------+------+------+
|  1|  AA001|    AA|     1|
|  1|  AA002|    AA|     2|
|  1|  AA003|    AA|     3|
|  1|  BB002|    BB|     2|
|  1|  BB003|    BB|     3|
|  2|  AA045|    AA|    45|
|  2|  AA046|    AA|    46|
|  2|  CC099|    CC|    99|
|  2|  CC100|    CC|   100|
+---+-------+------+------+

这是我得到异常的地方:

enter image description here

示例应用

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.encoders.RowEncoder;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import scala.Tuple2;

public class SampleApp implements Serializable {
    private static final long serialVersionUID = -1L;
    
    private static String ID = "Id";
    private static String SET = "Set";  
    private static String COMBINE = "Combine";
    private static String LETTER = "Letter";
    private static String NUMBER = "Number";

    public static void main(String[] args) {
        SampleApp app = new SampleApp();
        app.start();
    }

    private void start() {

        Logger.getLogger("org.apache").setLevel(Level.WARN);

        SparkSession spark = SparkSession
                .builder()
                .appName("Spark App")
                .master("local[*]")
                .getorCreate();

        StructType commaStructType = new StructType();
        commaStructType = commaStructType.add(ID,DataTypes.IntegerType,false);
        commaStructType = commaStructType.add(SET,DataTypes.StringType,true);
        
        StructType resultStructType = new StructType();
        resultStructType = resultStructType.add(ID,false);
        resultStructType = resultStructType.add(COMBINE,false);
        resultStructType = resultStructType.add(LETTER,false);
        resultStructType = resultStructType.add(NUMBER,false);
        
        List<Row> list = new ArrayList<Row>();
        list.add(RowFactory.create(1,"AA001-AA003,BB002-BB003"));
        list.add(RowFactory.create(2,"AA045-AA046,CC099-CC100"));
        
                
        Dataset<Row> df = spark.createDataFrame(list,commaStructType);
        df.show(10,false);
        df.printSchema();
        
        Dataset<Row> commaSeparatedDf = df.flatMap(new separateByCommaFlatMap(),RowEncoder.apply(commaStructType));
        commaSeparatedDf.show(10,true);
        commaSeparatedDf.printSchema();
        
        Dataset<Row> resultDf = commaSeparatedDf.flatMap(new separateByDashFlatMap(),RowEncoder.apply(resultStructType));
        resultDf.show(10,true);
        resultDf.printSchema();
        
        /* This manually created DataFrame for the final step works */
        /*List<Row> list2 = new ArrayList<Row>();
        list2.add(RowFactory.create(1,"AA001-AA003"));
        list2.add(RowFactory.create(1,"BB002-BB003"));
        list2.add(RowFactory.create(2,"AA045-AA046"));
        list2.add(RowFactory.create(2,"CC099-CC100"));
        
        Dataset<Row> df2 = spark.createDataFrame(list2,commaStructType);
        df2.show(10,true);
        df2.printSchema();
        
        Dataset<Row> resultDf2 = df2.flatMap(new separateByDashFlatMap(),RowEncoder.apply(resultStructType));
        resultDf2.show(10,true);
        resultDf2.printSchema();*/
    }
    
    /*
     * Split "AA001-AA003,BB002-BB003" into individual row
     *   AA001-AA003
     *   BB002-BB003
     */
    private final class separateByCommaFlatMap implements FlatMapFunction<Row,Row> {
        private static final long serialVersionUID = 63784L;
    
        @Override
        public Iterator<Row> call(Row r) throws Exception {
                int id = Integer.parseInt(r.getAs(ID).toString());
                String[] s = r.getAs(SET).toString().split(",");
                
                List<Row> list = new ArrayList<Row>();
                for (int i = 0; i < s.length; i++) {
                    List<Object> data = new ArrayList<>();
                    data.add(id);
                    data.add(s[i]);
                    list.add(RowFactory.create(data.toArray()));
                }
                
                return list.iterator();
            }
        }   

    /*
     * Split "AA001-AA003" into individual row
     *   AA001 | AA | 1
     *   AA002 | AA | 2
     *   AA003 | AA | 3
     */
    private final class separateByDashFlatMap implements FlatMapFunction<Row,Row> {
        private static final long serialVersionUID = 63784L;
    
        @Override
        public Iterator<Row> call(Row r) throws Exception {
                int id = r.getAs(ID);   
                String[] s = r.getAs(SET).toString().split("-");                
                String letter = s[0].substring(0,2);
                
                int start = Integer.parseInt(s[0].substring(2,s[0].length()));
                int end = Integer.parseInt(s[1].substring(2,s[1].length()));
                
                List<Row> list = new ArrayList<Row>();
                for(int i = start; i <= end; i++) {
                    List<Object> data = new ArrayList<>();
                    data.add(id);
                    data.add(String.format("%s%03d",letter,i));
                    data.add(letter);
                    data.add(i);
                    list.add(RowFactory.create(data.toArray()));                                        
                }
                
                return list.iterator();
            }
        }
}

解决方法

这是一个基于数据框 API 的解决方案:

import org.apache.spark.sql.functions._

Dataset<Row> result = df.withColumn(
    "Set",explode(split(col("Set"),","))    // split by comma and explode into rows
).withColumn(
    "Letter",substring(col("Set"),1,2)    // get letter from first two chars
).withColumn(
    "Number",// get and explode a list of numbers using Spark SQL sequence function
    expr("""
        explode(sequence(
            int(substring(split(Set,'-')[0],3)),int(substring(split(Set,'-')[1],3))
        ))
    """)
).withColumn(
    "Combine",// get formatted string for combine column
     format_string("%s%03d",col("Letter"),col("Number"))
).select(
    "ID","Combine","Letter","Number"
)

result.show()
+---+-------+------+------+
| ID|Combine|Letter|Number|
+---+-------+------+------+
|  1|  AA001|    AA|     1|
|  1|  AA002|    AA|     2|
|  1|  AA003|    AA|     3|
|  1|  BB002|    BB|     2|
|  1|  BB003|    BB|     3|
|  2|  AA045|    AA|    45|
|  2|  AA046|    AA|    46|
|  2|  CC099|    CC|    99|
|  2|  CC100|    CC|   100|
+---+-------+------+------+
,

如果有人想要 JAVA 代码,我还添加了我的答案。特别感谢 mck!

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.split;
import static org.apache.spark.sql.functions.substring;
import static org.apache.spark.sql.functions.sequence;
import static org.apache.spark.sql.functions.format_string;
import static org.apache.spark.sql.functions.col;

public class ExplodeApp implements Serializable {
    private static final long serialVersionUID = -1L;
    
    private static String ID = "Id";
    private static String SET = "Set";  
    private static String COMBINE = "Combine";
    private static String LETTER = "Letter";
    private static String NUMBER = "Number";
    private static String RANGE = "Range";

    public static void main(String[] args) {
        ExplodeApp app = new ExplodeApp();
        app.start();
    }

    private void start() {

        Logger.getLogger("org.apache").setLevel(Level.WARN);

        SparkSession spark = SparkSession
                .builder()
                .appName("Spark App")
                .master("local[*]")
                .getOrCreate();

        StructType commaStructType = new StructType();
        commaStructType = commaStructType.add(ID,DataTypes.IntegerType,false);
        commaStructType = commaStructType.add(SET,DataTypes.StringType,true);
        
        List<Row> list = new ArrayList<Row>();
        list.add(RowFactory.create(1,"AA001-AA003,BB002-BB003"));
        list.add(RowFactory.create(2,"AA045-AA046,CC099-CC100"));     
                
        Dataset<Row> df = spark.createDataFrame(list,commaStructType);
        df.show(10,false);     

        Column[] columnNames =  new Column[] { col(ID),col(COMBINE),col(LETTER),col(NUMBER) };
        
        Dataset<Row> resultDf = df
                .withColumn(RANGE,explode(split(df.col(SET),")))               
                .withColumn(LETTER,substring(col(RANGE),2))
                .withColumn(NUMBER,explode(
                        sequence(substring(split(col(RANGE),"-").getItem(0),3,3).cast(DataTypes.IntegerType),substring(split(col(RANGE),"-").getItem(1),3).cast(DataTypes.IntegerType))))
                .withColumn(COMBINE,format_string("%s%03d",col(NUMBER)))
                .select(columnNames);
        
        resultDf.show(10,false);       
    }   
}

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?