forked from openiap/opencore
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWebSocketServer.ts
More file actions
118 lines (116 loc) · 6.13 KB
/
Copy pathWebSocketServer.ts
File metadata and controls
118 lines (116 loc) · 6.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import * as winston from "winston";
import * as http from "http";
import * as WebSocket from "ws";
import { WebSocketServerClient } from "./WebSocketServerClient";
import { DatabaseConnection } from "./DatabaseConnection";
import { Crypt } from "./Crypt";
import { Message } from "./Messages/Message";
import { Config } from "./Config";
import { SigninMessage, NoderedUtil } from "openflow-api";
export class WebSocketServer {
private static _logger: winston.Logger;
private static _socketserver: WebSocket.Server;
private static _server: http.Server;
public static _clients: WebSocketServerClient[];
private static _db: DatabaseConnection;
static configure(logger: winston.Logger, server: http.Server): void {
this._clients = [];
this._logger = logger;
this._server = server;
this._socketserver = new WebSocket.Server({ server: server });
this._socketserver.on("connection", (socketObject: WebSocket): void => {
this._clients.push(new WebSocketServerClient(logger, socketObject));
});
this._socketserver.on("error", (error: Error): void => {
this._logger.error(error);
});
// this._socketserver.on("listening", (cb: () => void):void => {
// this._logger.debug("WebSocketServer is listening");
// });
// this._socketserver.on("headers", (headers: string[], request: http.IncomingMessage):void => {
// this._logger.debug("headers" + headers.join(","));
// });
setInterval(this.pingClients, 10000);
}
private static async pingClients(): Promise<void> {
let count: number = WebSocketServer._clients.length;
WebSocketServer._clients = WebSocketServer._clients.filter(function (cli: WebSocketServerClient): boolean {
try {
if (!NoderedUtil.IsNullEmpty(cli.jwt)) {
var tuser = Crypt.verityToken(cli.jwt);
var payload = Crypt.decryptToken(cli.jwt);
var clockTimestamp = Math.floor(Date.now() / 1000);
// WebSocketServer._logger.silly((payload.exp - clockTimestamp))
if ((payload.exp - clockTimestamp) < 60) {
WebSocketServer._logger.debug("Token for " + tuser.username + " expires in less than 1 minute, send new jwt to client");
var l: SigninMessage = new SigninMessage();
cli.jwt = Crypt.createToken(tuser, Config.shorttoken_expires_in);
l.jwt = cli.jwt;
l.user = tuser;
var m: Message = new Message(); m.command = "refreshtoken";
m.data = JSON.stringify(l);
cli.Send(m);
}
}
} catch (error) {
console.error(error);
cli.Close();
}
var now = new Date();
var seconds = (now.getTime() - cli.lastheartbeat.getTime()) / 1000;
if (seconds >= Config.client_heartbeat_timeout) {
if (cli.user != null) {
WebSocketServer._logger.info("client " + cli.user.name + "/" + cli.clientagent + " timeout, close down");
} else {
WebSocketServer._logger.info("client not signed/" + cli.clientagent + " timeout, close down");
}
cli.Close();
return false;
}
return cli.ping();
});
if (count !== WebSocketServer._clients.length) {
WebSocketServer._logger.info("new client count: " + WebSocketServer._clients.length);
}
for (var i = 0; i < WebSocketServer._clients.length; i++) {
try {
var cli = WebSocketServer._clients[i];
if (cli.user != null) {
// Lets assume only robots register queues ( not true )
if (cli.clientagent == "openrpa") {
Config.db.db.collection("users").updateOne({ _id: cli.user._id },
{ $set: { _rpaheartbeat: new Date(new Date().toISOString()), _heartbeat: new Date(new Date().toISOString()) } }).catch((err) => {
console.error(err);
});
}
if (cli.clientagent == "nodered") {
Config.db.db.collection("users").updateOne({ _id: cli.user._id },
{ $set: { _noderedheartbeat: new Date(new Date().toISOString()), _heartbeat: new Date(new Date().toISOString()) } }).catch((err) => {
console.error(err);
});
}
if (cli.clientagent == "webapp" || cli.clientagent == "aiotwebapp") {
Config.db.db.collection("users").updateOne({ _id: cli.user._id },
{ $set: { _webheartbeat: new Date(new Date().toISOString()), _heartbeat: new Date(new Date().toISOString()) } }).catch((err) => {
console.error(err);
});
}
if (cli.clientagent == "mobileapp" || cli.clientagent == "aiotmobileapp") {
Config.db.db.collection("users").updateOne({ _id: cli.user._id },
{ $set: { _webheartbeat: new Date(new Date().toISOString()), _mobilheartbeat: new Date(new Date().toISOString()), _heartbeat: new Date(new Date().toISOString()) } }).catch((err) => {
console.error(err);
});
}
else {
// Should proberly turn this a little down, so we dont update all online users every 10th second
Config.db.db.collection("users").updateOne({ _id: cli.user._id }, { $set: { _heartbeat: new Date(new Date().toISOString()) } }).catch((err) => {
console.error(err);
});
}
}
} catch (error) {
console.error(error);
}
}
}
}