微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

如何在 PySpark 中对嵌套的 for 循环使用列表理解

如何解决如何在 PySpark 中对嵌套的 for 循环使用列表理解

我打算在下面的 PySpark 数据帧上使用 difflib.SequenceMatcher()。

tech.show()

+-----------------------------+----------------------+
|        concat_tech          |vendor_product        |
+-----------------------------+----------------------+
|AWS Cloud Administration     |AWS Cloud Map         |
|Grad Portal                  |CA Identity Portal    |
|Html/PHP                     |HTML                  |
|UX Xpect                     |HP-UX                 |
|Debian-based                 |Debian                |
|Microsoft Office excel       |Microsoft Office      |
|Oracle EBusiness Suite 12.2.4|Oracle Primavera Suite|
|Solaris 10.XX                |Solaris               |
|CA7 Job Scheduler            |CA Scheduler          |
|Windows NT/XP/Vista          |Windows XP            |
+-----------------------------+----------------------+

techno.show()
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
|vendor                        |product                        |category                        |sub_category                                       |
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
|Notion Labs,Inc.             |Notion                         |Project Management              |Project Management                                 |
|Apptricity Corporation        |Apptricity                     |Enterprise Applications         |Enterprise Resource Planning (ERP)                 |
|Resolution Software,Ltd.     |Xcase                          |IT Governance                   |Application Development & Management               |
|The Apache Software Foundation|Apache Mynewt                  |IT Governance                   |Application Development & Management               |
|NetApp,Inc.                  |NetApp iSCSI SAN Storage System|Data Center Solutions           |Data Management & Storage (Hardware)               |
|HP,Inc.                      |HP Z820                        |Hardware (Basic)                |Consumer Electronics,Personal Computers & Software|
|Dell Technologies,Inc.       |Dell EMC FormWare              |Customer Relationship Management|Help Desk Management                               |
|ServiceMax,Inc.              |ServiceMax                     |Customer Relationship Management|Service & Field Support Management                 |
|maxmind,Inc.                 |maxmind GeoIP                  |Software (Basic)                |Server Technologies (Software)                     |
|Campus Management Corporation |Campus Management              |Vertical Markets                |Academic & Education Management Software           |
+------------------------------+-------------------------------+--------------------------------+---------------------------------------------------+
import pandas as pd
from difflib import SequenceMatcher
def similar(a,b):
    if pd.isnull(a) or pd.isnull(b):
        return 0
    else:
        return SequenceMatcher(None,a,b).ratio()

函数 SequenceMatcher(a,b) 将数据帧 tech 中的 concat_tech 作为 'a' 和数据帧 中的 product techno 为 'b' 并返回 a 和 b 之间的比率。目标是在所有 product 中找到 concat_tech 的最佳匹配并返回具有最佳匹配的数据框,即一个 product 将从使用 SequenceMatcher() 为 concat_tech 的值生成最佳(最大)比率的 product 列返回。

它应该是一个一对多的操作,可以在 Pandas 中使用列表理解来完成,但如何在 PySpark 中实现相同的操作?我在两个数据框中都有数百万行,这里我给出了 10 个样本。

解决方法

您正在尝试将数据框 tech 中的每个元素与数据框 techno 中的每个元素进行比较。这种操作的结果是 crossJoin。除非此连接的任一侧相当小,或者有一种方法可以减少可能的组合数量(从而避免交叉连接),否则这将是一项成本非常高的操作。

实际代码很简单:进行连接,在 udf 的帮助下计算每对的比率,然后从 tech 中找到每个元素的最大值:

import pandas as pd
from difflib import SequenceMatcher
from pyspark.sql import functions as F

@F.udf("double")
def similar(a,b):
    if pd.isnull(a) or pd.isnull(b):
        return 0
    else:
        return SequenceMatcher(None,a,b).ratio()

df = tech.select("concat_tech").crossJoin(techno.select("product")) \
    .withColumn("ratio",similar("concat_tech","product")) \
    .groupBy("concat_tech").agg(F.expr("max_by(product,ratio)"),F.max("ratio"))
df.show(truncate=False)

示例数据的输出:

+--------------------------+----------------------+-------------------+         
|concat_tech               |max_by(product,ratio)|max(ratio)         |
+--------------------------+----------------------+-------------------+
|UXXpect                   |Apptricity            |0.35294117647058826|
|GradPortal                |Notion                |0.25               |
|OracleEBusinessSuite12.2.4|ApacheMynewt          |0.3157894736842105 |
|MicrosoftOfficeexcel      |ServiceMax            |0.3333333333333333 |
|AWSCloudAdministration    |Notion                |0.35714285714285715|
|CA7JobScheduler           |ApacheMynewt          |0.37037037037037035|
|Html/php                  |HPZ820                |0.14285714285714285|
|WindowsNT/XP/Vista        |MaxMindGeoIP          |0.3333333333333333 |
|Debian-based              |Xcase                 |0.35294117647058826|
|Solaris10.XX              |Xcase                 |0.23529411764705882|
+--------------------------+----------------------+-------------------+

使用 Vectorized UDF 可能会稍微提高性能,但对于大型数据帧,交叉连接仍然是个问题。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?