Skip to content

Commit 6d71a32

Browse files
author
Dmytro Kulieshov
authored
Heap on master filled with JSON-RPC message Strings (eclipse-che#6676)
1 parent 1bea871 commit 6d71a32

2 files changed

Lines changed: 50 additions & 26 deletions

File tree

core/che-core-api-core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@
8686
<groupId>org.eclipse.che.core</groupId>
8787
<artifactId>che-core-commons-lang</artifactId>
8888
</dependency>
89+
<dependency>
90+
<groupId>org.eclipse.che.core</groupId>
91+
<artifactId>che-core-commons-schedule</artifactId>
92+
</dependency>
8993
<dependency>
9094
<groupId>org.everrest</groupId>
9195
<artifactId>everrest-core</artifactId>

core/che-core-api-core/src/main/java/org/eclipse/che/api/core/websocket/impl/MessagesReSender.java

Lines changed: 46 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,15 @@
1010
*/
1111
package org.eclipse.che.api.core.websocket.impl;
1212

13-
import java.util.ArrayList;
14-
import java.util.HashMap;
15-
import java.util.LinkedList;
16-
import java.util.List;
13+
import com.google.common.collect.EvictingQueue;
1714
import java.util.Map;
1815
import java.util.Optional;
16+
import java.util.Queue;
17+
import java.util.concurrent.ConcurrentHashMap;
1918
import javax.inject.Inject;
2019
import javax.inject.Singleton;
2120
import javax.websocket.Session;
21+
import org.eclipse.che.commons.schedule.ScheduleDelay;
2222

2323
/**
2424
* Instance is responsible for re-sending messages that were not sent during the period when WEB
@@ -29,57 +29,77 @@
2929
*/
3030
@Singleton
3131
public class MessagesReSender {
32+
3233
private static final int MAX_MESSAGES = 100;
3334

3435
private final WebSocketSessionRegistry registry;
3536

36-
private final Map<String, List<String>> messagesMap = new HashMap<>();
37+
private final Map<String, Queue<DelayedMessage>> delayedMessageRegistry =
38+
new ConcurrentHashMap<>();
3739

3840
@Inject
3941
public MessagesReSender(WebSocketSessionRegistry registry) {
4042
this.registry = registry;
4143
}
4244

43-
public void add(String endpointId, String message) {
44-
List<String> messages = messagesMap.get(endpointId);
45+
@ScheduleDelay(initialDelay = 60, delay = 60)
46+
void cleanStaleMessages() {
47+
long currentTimeMillis = System.currentTimeMillis();
4548

46-
if (messages == null) {
47-
messages = new LinkedList<>();
48-
messagesMap.put(endpointId, messages);
49-
}
49+
delayedMessageRegistry
50+
.values()
51+
.forEach(it -> it.removeIf(m -> currentTimeMillis - m.timeMillis > 60_000));
5052

51-
if (messages.size() <= MAX_MESSAGES) {
52-
messages.add(message);
53-
}
53+
delayedMessageRegistry.values().removeIf(Queue::isEmpty);
54+
}
55+
56+
public void add(String endpointId, String message) {
57+
58+
delayedMessageRegistry
59+
.computeIfAbsent(endpointId, k -> EvictingQueue.create(MAX_MESSAGES))
60+
.offer(new DelayedMessage(message));
5461
}
5562

5663
public void resend(String endpointId) {
57-
final List<String> messages = messagesMap.remove(endpointId);
64+
Queue<DelayedMessage> delayedMessages = delayedMessageRegistry.remove(endpointId);
5865

59-
if (messages == null || messages.isEmpty()) {
66+
if (delayedMessages == null || delayedMessages.isEmpty()) {
6067
return;
6168
}
6269

63-
final Optional<Session> sessionOptional = registry.get(endpointId);
70+
Optional<Session> sessionOptional = registry.get(endpointId);
6471

6572
if (!sessionOptional.isPresent()) {
6673
return;
6774
}
6875

69-
final Session session = sessionOptional.get();
70-
71-
final List<String> backing = new ArrayList<>(messages);
72-
messages.clear();
73-
74-
for (String message : backing) {
76+
Queue<DelayedMessage> backingQueue = EvictingQueue.create(delayedMessages.size());
77+
while (!delayedMessages.isEmpty()) {
78+
backingQueue.offer(delayedMessages.poll());
79+
}
7580

81+
Session session = sessionOptional.get();
82+
for (DelayedMessage delayedMessage : backingQueue) {
7683
if (session.isOpen()) {
77-
session.getAsyncRemote().sendText(message);
84+
session.getAsyncRemote().sendText(delayedMessage.message);
7885
} else {
79-
messages.add(message);
86+
delayedMessages.add(delayedMessage);
8087
}
8188
}
8289

83-
messagesMap.put(endpointId, messages);
90+
if (!delayedMessages.isEmpty()) {
91+
delayedMessageRegistry.put(endpointId, delayedMessages);
92+
}
93+
}
94+
95+
private static class DelayedMessage {
96+
97+
private final long timeMillis;
98+
private final String message;
99+
100+
private DelayedMessage(String message) {
101+
this.message = message;
102+
this.timeMillis = System.currentTimeMillis();
103+
}
84104
}
85105
}

0 commit comments

Comments
 (0)