微信公众号搜"智元新知"关注
微信扫一扫可直接关注哦!

使Flink工作流与自定义源并行化

如何解决使Flink工作流与自定义源并行化

我有一个用Flink构建的工作流,它包含一个自定义源,一系列地图/平面图和一个接收器。

我的自定义源的run()方法遍历存储在文件夹中的文件,并通过上下文的collect()方法收集每个文件名称内容我有一个自定义对象来存储该文件两个字段中的信息。

然后,我有一系列的地图/平面图转换这些对象,然后使用自定义接收器将它们打印到文件中。在Flink的Web UI中生成的执行图如下:

Flink execution graph

我有一个集群或2个worker设置,每个都有6个插槽(它们也都有6个核心)。我将并行度设置为12。从执行图中可以看到源的并行度为1,而工作流的其余部分具有并行度12。

运行工作流程时(专用文件夹中有大约15,000个文件),我使用htop监视工作人员的资源。在大多数情况下,所有内核都达到100%的利用率,但是大约每30分钟左右,就有8-10个内核闲置大约2-3分钟。

我的问题如下:

  1. 我了解该源运行的并行度为1,我认为这是从本地存储读取时的正常现象(我的文件位于每个工作进程的同一目录中,因为我不知道将选择哪个工作进程执行源代码)。确实正常吗?你能解释为什么会这样吗?

  2. 我的其余工作流程在并行性12下执行,这看起来是正确的,因为通过检查任务管理器的日志,我可以从所有插槽(例如.... [Flat Map -> Map -> Map -> Sink: Unnamed (**3/12**)] INFO ........ [Flat Map -> Map -> Map -> Sink: Unnamed (**5/12**)] INFO ....等))。我不明白的是,如果一个插槽正在执行源角色,并且集群中有12个插槽,剩下的12个插槽如何执行工作流程的其余部分?一个插槽既可以充当源,也可以充当其余工作流程的一个实例吗?如果是,该特定插槽的资源如何分配?有人可以解释此工作流程中正在进行的步骤吗?例如(这可能是错误的):

  • 插槽1读取文件并将其转发到可用插槽(2到12)
  • 插槽1将一个文件转发给自己,并停止读取,直到完成工作为止
  • 完成后,插槽1读取更多文件并将其转发到可用的插槽中

我相信我上面描述的是错误的,但是我举一个例子来更好地解释我的问题

  1. 为什么我每30分钟(或多或少)每隔30分钟就会有大部分内核处于这种空闲状态?

解决方法

要回答有关并行阅读的特定问题,我将执行以下操作...

  1. 通过扩展RichSourceFunction来实现自定义源。
  2. 在您的open()方法中,调用getRuntimeContext().getNumberOfParallelSubtasks()以获取总体并行性,并调用getRuntimeContext().getIndexOfThisSubtask()以获取正在初始化的子任务的索引。
  3. run()方法中,当您遍历文件时,获得每个文件名的hashCode(),以总并行度为模。如果该值等于子任务的索引,则进行处理。

通过这种方式,您可以将工作分散到12个子任务中,而无需让子任务尝试处理同一文件。

,
  1. 单个使用者设置将您管道的总吞吐量限制为只有一个使用者的性能。此外,它给所有插槽带来了沉重的洗牌-在这种情况下,消费者读取的所有数据也会在此消费者插槽上序列化,这是额外的CPU负载。相反,使消费者并行度等于map / flat map parallelsm将允许链接源-> map操作并避免随机播放。
  2. 默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务也是如此,只要它们来自同一任务即可。结果是一个插槽可以容纳整个作业流水线。因此,在您的情况下,插槽1同时具有使用者和地图/平面地图任务,而其他插槽仅具有地图/平面地图任务。有关更多详细信息,请参见此处:https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#task-slots-and-resources。另外,您实际上可以在Web UI上查看每个子任务的实例。
  3. 您是否启用了检查点?如果是,并且是30分钟,则可能是对状态进行快照的时间间隔。

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。