如何解决java.util.ConcurrentModificationException 在 Stormcrawler 中向元数据添加一些键时发生
我在元数据中添加了一个字段,用于在状态索引中传输和持久化。该字段是一个字符串列表,其名称为 input_keywords。在 Strom 集群中运行拓扑后,拓扑停止并显示以下日志:
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
at org.apache.storm.utils.disruptorQueue.consumeBatchToCursor(disruptorQueue.java:522) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.utils.disruptorQueue.consumeBatchWhenAvailable(disruptorQueue.java:487) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.disruptor$consume_loop_STAR_$fn__4132.invoke(disruptor.clj:84) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndobject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
at org.apache.storm.serialization.KryovaluesSerializer.serializeInto(KryovaluesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.utils.disruptorQueue.consumeBatchToCursor(disruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
... 6 more
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) ~[?:1.8.0_112]
at java.util.HashMap$EntryIterator.next(HashMap.java:1471) ~[?:1.8.0_112]
at java.util.HashMap$EntryIterator.next(HashMap.java:1469) ~[?:1.8.0_112]
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:99) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndobject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
at org.apache.storm.serialization.KryovaluesSerializer.serializeInto(KryovaluesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.utils.disruptorQueue.consumeBatchToCursor(disruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
... 6 more
2021-02-27 08:03:34.276 o.a.s.d.executor Thread-20-disruptor-executor[45 45]-send-queue [ERROR]
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
at org.apache.storm.utils.disruptorQueue.consumeBatchToCursor(disruptorQueue.java:522) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.utils.disruptorQueue.consumeBatchWhenAvailable(disruptorQueue.java:487) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:74) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.disruptor$consume_loop_STAR_$fn__4132.invoke(disruptor.clj:84) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:484) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndobject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
at org.apache.storm.serialization.KryovaluesSerializer.serializeInto(KryovaluesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.utils.disruptorQueue.consumeBatchToCursor(disruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
... 6 more
Caused by: java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) ~[?:1.8.0_112]
at java.util.HashMap$EntryIterator.next(HashMap.java:1471) ~[?:1.8.0_112]
at java.util.HashMap$EntryIterator.next(HashMap.java:1469) ~[?:1.8.0_112]
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:99) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.Kryo.writeClassAndobject(Kryo.java:628) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-3.0.3.jar:?]
at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534) ~[kryo-3.0.3.jar:?]
at org.apache.storm.serialization.KryovaluesSerializer.serializeInto(KryovaluesSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:44) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.daemon.worker$mk_transfer_fn$transfer_fn__10378.invoke(worker.clj:203) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.daemon.executor$start_batch_transfer_GT_worker_handler_BANG$fn__10056.invoke(executor.clj:314) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.disruptor$clojure_handler$reify__4115.onEvent(disruptor.clj:41) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.utils.disruptorQueue.consumeBatchToCursor(disruptorQueue.java:509) ~[storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
... 6 more
2021-02-27 08:03:34.327 o.a.s.util Thread-20-disruptor-executor[45 45]-send-queue [ERROR] Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at org.apache.storm.util$exit_process_BANG_.doInvoke(util.clj:341) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.7.0.jar:?]
at org.apache.storm.daemon.worker$fn_10827$fn_10828.invoke(worker.clj:781) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.daemon.executor$mk_executor_data$fn_10034$fn_10035.invoke(executor.clj:281) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at org.apache.storm.util$async_loop$fn__1221.invoke(util.clj:494) [storm-core-1.2.1.3.1.4.0-315.jar:1.2.1.3.1.4.0-315]
at clojure.lang.AFn.run(AFn.java:22) [clojure-1.7.0.jar:?]
at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]
我们对拓扑的每个组件都有不同的并行提示。将 input_keywords 添加到 元数据 后,我们得到了错误。错误的主要原因是什么?
解决方法
您正在修改正在序列化的元数据实例。你不能那样做,见Storm troubleshooting page。
如 release notes of 1.16 中所述,您可以锁定元数据。这不会解决问题,但会告诉您在代码中写入元数据的位置。
In our topology,we emitted the same metadata to multiple bolts at the same time.
神秘解释。不要那样做。
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。