Skip to content

Commit 42b101c

Browse files
committed
Add rate limits to socket messages
1 parent 5b52504 commit 42b101c

11 files changed

Lines changed: 47 additions & 22 deletions

OpenFlow/src/Config.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ export class Config {
4343
Config.api_credential_cache_seconds = parseInt(Config.getEnv("api_credential_cache_seconds", "60000"));
4444
Config.api_rate_limit_points = parseInt(Config.getEnv("api_rate_limit_points", "40"));
4545
Config.api_rate_limit_duration = parseInt(Config.getEnv("api_rate_limit_duration", "5"));
46+
Config.socket_rate_limit_points = parseInt(Config.getEnv("socket_rate_limit_points", "10"));
47+
Config.socket_rate_limit_duration = parseInt(Config.getEnv("socket_rate_limit_duration", "1"));
4648

4749
Config.client_heartbeat_timeout = parseInt(Config.getEnv("client_heartbeat_timeout", "60"));
4850

@@ -119,6 +121,8 @@ export class Config {
119121
public static api_credential_cache_seconds: number = parseInt(Config.getEnv("api_credential_cache_seconds", "60000"));
120122
public static api_rate_limit_points: number = parseInt(Config.getEnv("api_rate_limit_points", "40"));
121123
public static api_rate_limit_duration: number = parseInt(Config.getEnv("api_rate_limit_duration", "5"));
124+
public static socket_rate_limit_points: number = parseInt(Config.getEnv("socket_rate_limit_points", "10"));
125+
public static socket_rate_limit_duration: number = parseInt(Config.getEnv("socket_rate_limit_duration", "1"));
122126

123127
public static client_heartbeat_timeout: number = parseInt(Config.getEnv("client_heartbeat_timeout", "60"));
124128

OpenFlow/src/LoginProvider.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { Audit } from "./Audit";
2020
import * as saml from "saml20";
2121
const multer = require('multer');
2222
const GridFsStorage = require('multer-gridfs-storage');
23-
const { RateLimiterMemory, RateLimiterRes } = require('rate-limiter-flexible')
23+
const { RateLimiterMemory } = require('rate-limiter-flexible')
2424
import { GridFSBucket, ObjectID, Db, Cursor, Binary } from "mongodb";
2525
import { Base, User, NoderedUtil, TokenUser, WellknownIds, Rights, Role } from "openflow-api";
2626
import { DBHelper } from "./DBHelper";
@@ -35,11 +35,11 @@ const rateLimiter = (req: express.Request, res: express.Response, next: express.
3535
BaseRateLimiter
3636
.consume(req.ip)
3737
.then((e) => {
38-
// console.log("NO_RATE_LIMIT consumedPoints: " + e.consumedPoints + " remainingPoints: " + e.remainingPoints);
38+
// console.log("API_O_RATE_LIMIT consumedPoints: " + e.consumedPoints + " remainingPoints: " + e.remainingPoints);
3939
next();
4040
})
4141
.catch((e) => {
42-
console.log("RATE_LIMIT consumedPoints: " + e.consumedPoints + " remainingPoints: " + e.remainingPoints + " msBeforeNext: " + e.msBeforeNext);
42+
console.log("API_RATE_LIMIT consumedPoints: " + e.consumedPoints + " remainingPoints: " + e.remainingPoints + " msBeforeNext: " + e.msBeforeNext);
4343
res.status(429).json({ response: 'RATE_LIMIT' });
4444
});
4545
};

OpenFlow/src/WebSocketServerClient.ts

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ import { Config } from "./Config";
66
import { amqpwrapper, QueueMessageOptions, amqpqueue } from "./amqpwrapper";
77
import { NoderedUtil, Base, InsertOneMessage, QueueMessage, MapReduceMessage, QueryMessage, UpdateOneMessage, UpdateManyMessage, DeleteOneMessage, User, mapFunc, reduceFunc, finalizeFunc, QueuedMessage, QueuedMessageCallback, WatchEventMessage } from "openflow-api";
88
import { ChangeStream } from "mongodb";
9+
const { RateLimiterMemory } = require('rate-limiter-flexible')
10+
11+
const BaseRateLimiter = new RateLimiterMemory({
12+
points: Config.socket_rate_limit_points,
13+
duration: Config.socket_rate_limit_duration,
14+
});
915

1016
interface IHashTable<T> {
1117
[key: string]: T;
@@ -70,7 +76,7 @@ export class WebSocketServerClient {
7076
}
7177
logger.info("new client " + this.id + " from " + this.remoteip);
7278
socketObject.on("open", (e: Event): void => this.open(e));
73-
socketObject.on("message", (e: string): void => this.message(e)); // e: MessageEvent
79+
socketObject.on("message", (e: string): void => (this.message(e) as any)); // e: MessageEvent
7480
socketObject.on("error", (e: Event): void => this.error(e));
7581
socketObject.on("close", (e: CloseEvent): void => this.close(e));
7682
}
@@ -129,15 +135,30 @@ export class WebSocketServerClient {
129135
}
130136
}
131137
}
132-
private message(message: string): void { // e: MessageEvent
138+
private _message(message: string): void {
139+
//this._logger.silly("WebSocket message received " + message);
140+
let msg: SocketMessage = SocketMessage.fromjson(message);
141+
this._logger.silly("WebSocket message received id: " + msg.id + " index: " + msg.index + " count: " + msg.count);
142+
this._receiveQueue.push(msg);
143+
if ((msg.index + 1) >= msg.count) this.ProcessQueue();
144+
}
145+
private async message(message: string): Promise<void> {
146+
let username: string = "Unknown";
133147
try {
134-
//this._logger.silly("WebSocket message received " + message);
135-
let msg: SocketMessage = SocketMessage.fromjson(message);
136-
this._logger.silly("WebSocket message received id: " + msg.id + " index: " + msg.index + " count: " + msg.count);
137-
this._receiveQueue.push(msg);
138-
this.ProcessQueue();
148+
try {
149+
150+
if (!NoderedUtil.IsNullUndefinded(this.user)) { username = this.user.username; }
151+
152+
// await this.consume("me");
153+
var res = await BaseRateLimiter.consume("me");
154+
// console.log("SOCKET_NO_RATE_LIMIT consumedPoints: " + res.consumedPoints + " remainingPoints: " + res.remainingPoints);
155+
this._message(message);
156+
} catch (error) {
157+
this._logger.debug("[" + username + "/" + this.clientagent + "/" + this.id + "] SOCKET_RATE_LIMIT consumedPoints: " + error.consumedPoints + " remainingPoints: " + error.remainingPoints + " msBeforeNext: " + error.msBeforeNext);
158+
setTimeout(() => { this.message(message); }, Math.floor(Math.random() * 10) * 100);
159+
}
139160
} catch (error) {
140-
this._logger.error("WebSocket error encountered " + (error.message ? error.message : error));
161+
this._logger.error("[" + username + "/" + this.clientagent + "/" + this.id + "] WebSocket error encountered " + (error.message ? error.message : error));
141162
const errormessage: Message = new Message(); errormessage.command = "error"; errormessage.data = (error.message ? error.message : error);
142163
this._socketObject.send(JSON.stringify(errormessage));
143164
}
@@ -163,7 +184,7 @@ export class WebSocketServerClient {
163184
if (this._socketObject != null) {
164185
try {
165186
this._socketObject.removeListener("open", (e: Event): void => this.open(e));
166-
this._socketObject.removeListener("message", (e: string): void => this.message(e)); // e: MessageEvent
187+
this._socketObject.removeListener("message", (e: string): void => (this.message(e) as any)); // e: MessageEvent
167188
this._socketObject.removeListener("error", (e: Event): void => this.error(e));
168189
this._socketObject.removeListener("close", (e: CloseEvent): void => this.close(e));
169190
} catch (error) {

OpenFlowNodeRED/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "openflow-nodered",
3-
"version": "1.1.60",
3+
"version": "1.1.62",
44
"description": "Simple wrapper around NodeRed, RabbitMQ and MongoDB to support a more scaleable NodeRed implementation.\r Also the \"backend\" for [OpenRPA](https://github.com/skadefro/OpenRPA)",
55
"main": "index.js",
66
"scripts": {

OpenFlowNodeRED/src/nodered/nodes/api_nodes.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ export class api_update {
351351
}
352352
Promises.push(NoderedUtil.UpdateOne(this.config.collection, null, element, this.config.writeconcern, this.config.journal, msg.jwt));
353353
}
354-
this.node.status({ fill: "blue", shape: "dot", text: "processing " + y + " to " + (y + 49) });
354+
this.node.status({ fill: "blue", shape: "dot", text: y + " to " + (y + 49) + " of " + data.length });
355355
const tempresults = await Promise.all(Promises.map(p => p.catch(e => e)));
356356
results = results.concat(tempresults);
357357
Promises = [];
@@ -430,7 +430,7 @@ export class api_addorupdate {
430430
}
431431
Promises.push(NoderedUtil.InsertOrUpdateOne(this.config.collection, element, this.config.uniqeness, this.config.writeconcern, this.config.journal, msg.jwt));
432432
}
433-
this.node.status({ fill: "blue", shape: "dot", text: "processing " + y + " to " + (y + 49) });
433+
this.node.status({ fill: "blue", shape: "dot", text: y + " to " + (y + 49) + " of " + data.length });
434434
const tempresults = await Promise.all(Promises.map(p => p.catch(e => e)));
435435
results = results.concat(tempresults);
436436
Promises = [];
@@ -496,7 +496,7 @@ export class api_delete {
496496
if (NoderedUtil.isObject(element)) { id = element._id; }
497497
Promises.push(NoderedUtil.DeleteOne(this.config.collection, id, msg.jwt));
498498
}
499-
this.node.status({ fill: "blue", shape: "dot", text: "processing " + y + " to " + (y + 49) });
499+
this.node.status({ fill: "blue", shape: "dot", text: y + " to " + (y + 49) + " of " + data.length });
500500
const tempresults = await Promise.all(Promises.map(p => p.catch(e => e)));
501501
results = results.concat(tempresults);
502502
Promises = [];

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.1.60
1+
1.1.62

docker-compose-toolbox.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ services:
8282
- "traefik.http.routers.nodered.rule=Host(`nodered1.toolbox.openrpa.dk`)"
8383
- "traefik.http.routers.nodered.entrypoints=web"
8484
- "traefik.http.services.nodered.loadbalancer.server.port=1880"
85-
image: "cloudhack/openflownodered:1.1.60"
85+
image: "cloudhack/openflownodered:1.1.62"
8686
container_name: "nodered"
8787
environment:
8888
# - nodered_id=1

docker-compose-traefik-letsencrypt.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ services:
107107
- "traefik.http.routers.nodered.entrypoints=web,websecure"
108108
- "traefik.http.services.nodered.loadbalancer.server.port=1880"
109109
- "traefik.http.routers.nodered.tls.certresolver=myresolver"
110-
image: "cloudhack/openflownodered:1.1.60"
110+
image: "cloudhack/openflownodered:1.1.62"
111111
container_name: "nodered"
112112
environment:
113113
# - nodered_id=1

docker-compose-traefik.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ services:
8282
- "traefik.http.routers.nodered.rule=Host(`nodered1.localhost.openrpa.dk`)"
8383
- "traefik.http.routers.nodered.entrypoints=web"
8484
- "traefik.http.services.nodered.loadbalancer.server.port=1880"
85-
image: "cloudhack/openflownodered:1.1.60"
85+
image: "cloudhack/openflownodered:1.1.62"
8686
container_name: "nodered"
8787
environment:
8888
# - nodered_id=1

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ services:
5252
- "80:80"
5353
- "5858:5858"
5454
nodered:
55-
image: "cloudhack/openflownodered:1.1.60"
55+
image: "cloudhack/openflownodered:1.1.62"
5656
environment:
5757
# - nodered_id=1
5858
- nodered_sa=nodered1

0 commit comments

Comments
 (0)