微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

我的 Pyflink 设置有什么问题,Python UDF 会抛出 py4j 异常?

如何解决我的 Pyflink 设置有什么问题,Python UDF 会抛出 py4j 异常?

我正在使用文档中的 flink python 数据流教程:https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/datastream_tutorial/

环境

我的环境是 Windows 10。java -version 给出:

openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9,mixed mode)

我尝试了几种方法来搭建flink的python环境。结果是一样的。

选项 1(使用 conda)

conda create -n streaming-experiments python=3.8.10
conda activate streaming-experiments
pip install apache-flink==1.13.1

选项 2(使用系统 python + 诗歌)

  • 在系统级别安装了 python 3.8.10
  • pyproject.toml 依赖项:
pandas = ">=1.0"
httpx = "*"
pyarrow = "*"
apache-flink = "^1.13.1"
  • 安装依赖项:
"C:\Users\myuser\AppData\Local\Programs\Python\python38\python.exe" -m pip install poetry
poetry install

问题

最小示例:

from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink


def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1,'aaa'),(2,'bbb')],type_info=Types.ROW([Types.INT(),Types.STRING()])
    ).map(lambda l: l,output_type=Types.ROW([Types.INT(),Types.STRING()]))
    ds.print()
    ds.add_sink(StreamingFileSink
                .for_row_format('./output',Encoder.simple_string_encoder())
                .build())
    env.execute("tutorial_job")


if __name__ == '__main__':
    tutorial()

如果我运行文档中的示例,它就可以工作。一旦我使用 python udf 添加“地图”操作,我就会收到 py4j 错误。这就是我得到的:

C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\python.exe C:/Users/myuser/PycharmProjects/streaming-experiments/example.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Users/myuser/AppData/Local/Continuum/miniconda3/envs/streaming-experiments/Lib/site-packages/pyflink/lib/flink-dist_2.11-1.13.1.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Traceback (most recent call last):
  File "C:/Users/myuser/PycharmProjects/streaming-experiments/example.py",line 22,in <module>
    tutorial()
  File "C:/Users/myuser/PycharmProjects/streaming-experiments/example.py",line 18,in tutorial
    env.execute("tutorial_job")
  File "C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\lib\site-packages\pyflink\datastream\stream_execution_environment.py",line 645,in execute
    return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
  File "C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\lib\site-packages\py4j\java_gateway.py",line 1285,in __call__
    return_value = get_return_value(
  File "C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\lib\site-packages\pyflink\util\exceptions.py",line 146,in deco
    return f(*a,**kw)
  File "C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\lib\site-packages\py4j\protocol.py",line 326,in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o1.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution Failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
    at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
    at java.base/java.util.concurrent.CompletableFuture$uniapply.tryFire(CompletableFuture.java:642)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
    at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
    at akka.dispatch.OnComplete.internal(Future.scala:264)
    at akka.dispatch.OnComplete.internal(Future.scala:261)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
    at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
    at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
    at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
    at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
    at akka.pattern.PipetoSupport$PipeableFuture$$anonfun$pipeto$1.applyOrElse(PipetoSupport.scala:22)
    at akka.pattern.PipetoSupport$PipeableFuture$$anonfun$pipeto$1.applyOrElse(PipetoSupport.scala:21)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
    at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
    at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
    at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
    at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
    at akka.dispatch.TaskInvocation.run(Abstractdispatcher.scala:40)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by norestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcmessage(AkkaRpcActor.java:212)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcmessage(FencedAkkaRpcActor.java:77)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.MailBox.processMailBox(MailBox.scala:258)
    at akka.dispatch.MailBox.run(MailBox.scala:225)
    at akka.dispatch.MailBox.exec(MailBox.scala:235)
    ... 4 more
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness: C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=1-1 --provision_endpoint=localhost:56474

    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:429)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:273)
    at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionoperator.open(AbstractPythonFunctionoperator.java:121)
    at org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionoperator.open(OneInputPythonFunctionoperator.java:108)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenoperators(OperatorChain.java:437)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getorLoad(LocalCache.java:3974)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
    at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:427)
    ... 12 more
Caused by: java.lang.IllegalStateException: Process died with exit code 0
    at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75)
    at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:112)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetorLoad(LocalCache.java:2154)
    at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
    ... 20 more


Process finished with exit code 1

我试图找出 py4j 本身是否有效。但确实如此。这给我打印了两个数字,所以它似乎有效:

from py4j.java_gateway import JavaGateway,launch_gateway
launch_gateway(25333,die_on_exit=True)
gateway = JavaGateway()
random = gateway.jvm.java.util.Random()
number1 = random.nextInt(10)
number2 = random.nextInt(10)
print(number1,number2)

非常感谢任何提示检查或测试什么。

解决方法

好的,经过数小时的故障排除后,我发现问题不在于我的 python 或 java 设置或 pyflink。

问题是我的公司代理。我没有想到网络,但 py4j 在幕后需要网络。应该多注意堆栈跟踪中的这一行:

Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness: C:\Users\myuser\AppData\Local\pypoetry\Cache\virtualenvs\streamexp-yYteoysd-py3.8\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=1-1 --provision_endpoint=localhost:53816

解决办法就是设置一个环境变量:

set no_proxy=localhost

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?