Skip to content

news-crawl 2.x Broken when using multiple workers (across multiple hosts) #63

Description

@jnioche

Discussed in #62

Originally posted by alextechnology December 8, 2023
I spent today merging our local news-crawl codebase to the 2.x branch. We run only 1 nimbus and 2 supervisors. While the topology seems to work fine with a single worker, it does not work at all with two workers despite all the configuration being the same and working fine on news-crawl 1.2.4.

The first issue seems to be related to local worker not being able to connect to the remote supervisor/worker. Once the topology is submitted to storm, the worker log will scroll for over a minute with 140+ attempts like the following:

2023-12-08 21:14:17.456 o.a.s.m.n.Client client-worker-1 [ERROR] connection attempt 147 to Netty-Client-xxxxxxxxxx failed: org.apache.storm.shade.io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: xxxxxxxxxxxxxxxx

It will eventually start just trying to do the crawl but there will be lots of the followings kinds of logs indicating failures:

2023-12-08 21:14:20.720 o.a.s.m.n.Client client-worker-1 [ERROR] failed to send 1 messages to Netty-Client-xxxxxxxxx java.nio.channels.ClosedChannelException
AND

2023-12-08 21:14:20.735 o.a.s.m.n.Client Worker-Transfer [INFO] Dropping 1 messages
2023-12-08 21:14:20.735 o.a.s.m.n.Client Worker-Transfer [INFO] Dropping 2 messages
2023-12-08 21:14:20.735 o.a.s.m.n.Client Worker-Transfer [INFO] Dropping 2 messages
2023-12-08 21:14:20.735 o.a.s.m.n.Client Worker-Transfer [INFO] Dropping 2 messages

The second issue (which I assume is related to the first) is this will inevitably lead to an exception with the java.util.ConcurrentModificationException error and crash the worker:

2023-12-08 21:14:23.316 o.a.s.m.n.Client Worker-Transfer [INFO] Dropping 3 messages
2023-12-08 21:14:23.316 o.a.s.u.Utils Thread-14-feed-executor[70, 71] [ERROR] Async loop died!
java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
	at org.apache.storm.executor.Executor.accept(Executor.java:301) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.Utils$1.run(Utils.java:398) [storm-client-2.5.0.jar:2.5.0]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
	at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:118) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:555) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:68) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:77) ~[storm-client-2.5.0.jar:2.5.0]
	at com.digitalpebble.stormcrawler.bolt.FeedParserBolt.execute(FeedParserBolt.java:104) ~[stormjar.jar:?]
	at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
	... 6 more
Caused by: java.util.ConcurrentModificationException
	at java.util.HashMap$HashIterator.nextNode(HashMap.java:1511) ~[?:?]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1544) ~[?:?]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1542) ~[?:?]
	at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:99) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
	at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:118) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:555) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:68) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:77) ~[storm-client-2.5.0.jar:2.5.0]
	at com.digitalpebble.stormcrawler.bolt.FeedParserBolt.execute(FeedParserBolt.java:104) ~[stormjar.jar:?]
	at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
	... 6 more
2023-12-08 21:14:23.320 o.a.s.e.e.ReportError Thread-14-feed-executor[70, 71] [ERROR] Error
java.lang.RuntimeException: java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
	at org.apache.storm.utils.Utils$1.run(Utils.java:413) ~[storm-client-2.5.0.jar:2.5.0]
	at java.lang.Thread.run(Thread.java:829) [?:?]
Caused by: java.lang.RuntimeException: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
	at org.apache.storm.executor.Executor.accept(Executor.java:301) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.Utils$1.run(Utils.java:398) ~[storm-client-2.5.0.jar:2.5.0]
	... 1 more
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
md (com.digitalpebble.stormcrawler.Metadata)
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
	at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:118) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:555) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:68) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:77) ~[storm-client-2.5.0.jar:2.5.0]
	at com.digitalpebble.stormcrawler.bolt.FeedParserBolt.execute(FeedParserBolt.java:104) ~[stormjar.jar:?]
	at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.Utils$1.run(Utils.java:398) ~[storm-client-2.5.0.jar:2.5.0]
	... 1 more
Caused by: java.util.ConcurrentModificationException
	at java.util.HashMap$HashIterator.nextNode(HashMap.java:1511) ~[?:?]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1544) ~[?:?]
	at java.util.HashMap$EntryIterator.next(HashMap.java:1542) ~[?:?]
	at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:99) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.MapSerializer.write(MapSerializer.java:39) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:575) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:79) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:508) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40) ~[kryo-4.0.2.jar:?]
	at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:557) ~[kryo-4.0.2.jar:?]
	at org.apache.storm.serialization.KryoValuesSerializer.serializeInto(KryoValuesSerializer.java:38) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.serialization.KryoTupleSerializer.serialize(KryoTupleSerializer.java:40) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerTransfer.tryTransferRemote(WorkerTransfer.java:118) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.daemon.worker.WorkerState.tryTransferRemote(WorkerState.java:555) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.ExecutorTransfer.tryTransfer(ExecutorTransfer.java:68) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.boltEmit(BoltOutputCollectorImpl.java:112) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltOutputCollectorImpl.emit(BoltOutputCollectorImpl.java:65) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:93) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:42) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.task.OutputCollector.emit(OutputCollector.java:77) ~[storm-client-2.5.0.jar:2.5.0]
	at com.digitalpebble.stormcrawler.bolt.FeedParserBolt.execute(FeedParserBolt.java:104) ~[stormjar.jar:?]
	at org.apache.storm.executor.bolt.BoltExecutor.tupleActionFn(BoltExecutor.java:212) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.Executor.accept(Executor.java:294) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consumeImpl(JCQueue.java:113) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.JCQueue.consume(JCQueue.java:89) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:154) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.executor.bolt.BoltExecutor$1.call(BoltExecutor.java:140) ~[storm-client-2.5.0.jar:2.5.0]
	at org.apache.storm.utils.Utils$1.run(Utils.java:398) ~[storm-client-2.5.0.jar:2.5.0]
	... 1 more

Thankfully, a single topology worker appears to run much faster in 2.x than 2 workers did in 1.2.4, so we're ok with that for the moment but would be nice to find out why this cannot run with 2 or more topology workers.

EDIT:

I forgot to mention that the remote worker/supervisor experiences similar errors, though it does receive the topology and tries to crawl the pages, but fails with dropped messages and during Worker-Transfer

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions