如何解决我的 Pyflink 设置有什么问题,Python UDF 会抛出 py4j 异常?
我正在使用文档中的 flink python 数据流教程:https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/python/datastream_tutorial/
环境
我的环境是 Windows 10。java -version
给出:
openjdk version "11.0.2" 2019-01-15
OpenJDK Runtime Environment 18.9 (build 11.0.2+9)
OpenJDK 64-Bit Server VM 18.9 (build 11.0.2+9,mixed mode)
我尝试了几种方法来搭建flink的python环境。结果是一样的。
选项 1(使用 conda):
conda create -n streaming-experiments python=3.8.10
conda activate streaming-experiments
pip install apache-flink==1.13.1
选项 2(使用系统 python + 诗歌):
- 在系统级别安装了 python 3.8.10
- pyproject.toml 依赖项:
pandas = ">=1.0"
httpx = "*"
pyarrow = "*"
apache-flink = "^1.13.1"
- 安装依赖项:
"C:\Users\myuser\AppData\Local\Programs\Python\python38\python.exe" -m pip install poetry
poetry install
问题
最小示例:
from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
def tutorial():
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
ds = env.from_collection(
collection=[(1,'aaa'),(2,'bbb')],type_info=Types.ROW([Types.INT(),Types.STRING()])
).map(lambda l: l,output_type=Types.ROW([Types.INT(),Types.STRING()]))
ds.print()
ds.add_sink(StreamingFileSink
.for_row_format('./output',Encoder.simple_string_encoder())
.build())
env.execute("tutorial_job")
if __name__ == '__main__':
tutorial()
如果我运行文档中的示例,它就可以工作。一旦我使用 python udf 添加“地图”操作,我就会收到 py4j 错误。这就是我得到的:
C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\python.exe C:/Users/myuser/PycharmProjects/streaming-experiments/example.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Users/myuser/AppData/Local/Continuum/miniconda3/envs/streaming-experiments/Lib/site-packages/pyflink/lib/flink-dist_2.11-1.13.1.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Traceback (most recent call last):
File "C:/Users/myuser/PycharmProjects/streaming-experiments/example.py",line 22,in <module>
tutorial()
File "C:/Users/myuser/PycharmProjects/streaming-experiments/example.py",line 18,in tutorial
env.execute("tutorial_job")
File "C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\lib\site-packages\pyflink\datastream\stream_execution_environment.py",line 645,in execute
return JobExecutionResult(self._j_stream_execution_environment.execute(j_stream_graph))
File "C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\lib\site-packages\py4j\java_gateway.py",line 1285,in __call__
return_value = get_return_value(
File "C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\lib\site-packages\pyflink\util\exceptions.py",line 146,in deco
return f(*a,**kw)
File "C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\lib\site-packages\py4j\protocol.py",line 326,in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o1.execute.
: org.apache.flink.runtime.client.JobExecutionException: Job execution Failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
at java.base/java.util.concurrent.CompletableFuture$uniapply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipetoSupport$PipeableFuture$$anonfun$pipeto$1.applyOrElse(PipetoSupport.scala:22)
at akka.pattern.PipetoSupport$PipeableFuture$$anonfun$pipeto$1.applyOrElse(PipetoSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(Abstractdispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by norestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcmessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcmessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.MailBox.processMailBox(MailBox.scala:258)
at akka.dispatch.MailBox.run(MailBox.scala:225)
at akka.dispatch.MailBox.exec(MailBox.scala:235)
... 4 more
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness: C:\Users\myuser\AppData\Local\Continuum\miniconda3\envs\streaming-experiments\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=1-1 --provision_endpoint=localhost:56474
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:429)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.open(BeamPythonFunctionRunner.java:273)
at org.apache.flink.streaming.api.operators.python.AbstractPythonFunctionoperator.open(AbstractPythonFunctionoperator.java:121)
at org.apache.flink.streaming.api.operators.python.OneInputPythonFunctionoperator.open(OneInputPythonFunctionoperator.java:108)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenoperators(OperatorChain.java:437)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:582)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:537)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: Process died with exit code 0
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2050)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.get(LocalCache.java:3952)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache.getorLoad(LocalCache.java:3974)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4958)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4964)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:451)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:436)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:303)
at org.apache.flink.streaming.api.runners.python.beam.BeamPythonFunctionRunner.createStageBundleFactory(BeamPythonFunctionRunner.java:427)
... 12 more
Caused by: java.lang.IllegalStateException: Process died with exit code 0
at org.apache.beam.runners.fnexecution.environment.ProcessManager$RunningProcess.isAliveOrThrow(ProcessManager.java:75)
at org.apache.beam.runners.fnexecution.environment.ProcessEnvironmentFactory.createEnvironment(ProcessEnvironmentFactory.java:112)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:252)
at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:231)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetorLoad(LocalCache.java:2154)
at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
... 20 more
Process finished with exit code 1
我试图找出 py4j 本身是否有效。但确实如此。这给我打印了两个数字,所以它似乎有效:
from py4j.java_gateway import JavaGateway,launch_gateway
launch_gateway(25333,die_on_exit=True)
gateway = JavaGateway()
random = gateway.jvm.java.util.Random()
number1 = random.nextInt(10)
number2 = random.nextInt(10)
print(number1,number2)
非常感谢任何提示检查或测试什么。
解决方法
好的,经过数小时的故障排除后,我发现问题不在于我的 python 或 java 设置或 pyflink。
问题是我的公司代理。我没有想到网络,但 py4j 在幕后需要网络。应该多注意堆栈跟踪中的这一行:
Caused by: java.lang.RuntimeException: Failed to create stage bundle factory! INFO:root:Initializing python harness: C:\Users\myuser\AppData\Local\pypoetry\Cache\virtualenvs\streamexp-yYteoysd-py3.8\lib\site-packages\pyflink\fn_execution\beam\beam_boot.py --id=1-1 --provision_endpoint=localhost:53816
解决办法就是设置一个环境变量:
set no_proxy=localhost
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。