Spark-NLP 函数在使用 map

如何解决Spark-NLP 函数在使用 map

我有一个如下结构的 RDD:

my_rdd = [Row(text='Hello World. This is bad.'),Row(text='This is good.'),...]

我可以使用 python 函数执行并行处理:

rdd2=my_rdd.map(lambda f: f.text.split()) 
for x in rdd2.collect():
  print(x)

它给了我预期的输出

但是,当我尝试使用 spark-NLP 断句或情感分析器时,出现错误PicklingError:无法序列化对象:TypeError:无法pickle _thread.RLock 对象

在这一行中:for x in rdd2.collect():

代码如下:

documenter = DocumentAssembler()\
    .setInputCol("text")\
    .setoutputCol("document")
    
sentencerDL = SentenceDetectorDLModel\
  .pretrained("sentence_detector_dl","en") \
  .setInputCols(["document"]) \
  .setoutputCol("sentences")

sd_pipeline = PipelineModel(stages=[documenter,sentencerDL]) 
sd_model = LightPipeline(sd_pipeline)
pipeline = PretrainedPipeline('analyze_sentiment','en')

如果我尝试:

rdd2=my_rdd.map(lambda f: pipeline.annotate(f.text))                    

rdd2=my_rdd.map(lambda f: sd_model.fullAnnotate(f.text)[0]["sentences"].split()[0])

发生错误。当我在没有“映射”的情况下运行它们时,它们会按预期运行。

有谁知道如何并行执行 spark-NLP 断句或情感分析器?我做错了什么?

谢谢大家!

解决方法

当您在数据分布在不同分区的数据帧上应用 Spark-ML 管道时,默认情况下您将获得并行执行。 spark-NLP 管道(它也是 Spark-ML 管道)也是如此。 所以你可以这样做,

pipeline.transform(dataframe)

并以数据分布在不同节点上的方式创建“数据帧”。 一个很好的教程在这里,

https://sparkbyexamples.com/pyspark/pyspark-create-dataframe-from-list/

此外,为了在使用 Spark-NLP 转换后映射数据帧的内容,您可以使用 sparknlp.functions 下的函数,例如 map_annotations_col,它可以让您映射包含 Spark-NLP 的数据帧中特定列的内容注释。 顺便说一句,这个,

rdd2=my_rdd.map(lambda f: pipeline.annotate(f.text))

是您不应该做的事情,您会收到该异常,因为 Spark 正在尝试序列化您的整个管道并将其发送到集群节点。 这不是它应该工作的方式,您将数据传递到管道,让管道选择将什么分发到集群。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?