@@ -7,6 +7,7 @@ import { Crypt } from "./Crypt";
77import { Message } from "./Messages/Message" ;
88import { Config } from "./Config" ;
99import { SigninMessage , NoderedUtil , TokenUser } from "@openiap/openflow-api" ;
10+ import { Span } from "@opentelemetry/api" ;
1011import { otel } from "./otel" ;
1112import { ValueRecorder , Counter , BaseObserver } from "@opentelemetry/api-metrics"
1213
@@ -80,113 +81,129 @@ export class WebSocketServer {
8081 setInterval ( this . pingClients , 10000 ) ;
8182 }
8283 private static async pingClients ( ) : Promise < void > {
83- let count : number = WebSocketServer . _clients . length ;
84- for ( let i = WebSocketServer . _clients . length - 1 ; i >= 0 ; i -- ) {
85- const cli : WebSocketServerClient = WebSocketServer . _clients [ i ] ;
86- try {
87- if ( ! NoderedUtil . IsNullEmpty ( cli . jwt ) ) {
88- const payload = Crypt . decryptToken ( cli . jwt ) ;
89- const clockTimestamp = Math . floor ( Date . now ( ) / 1000 ) ;
90- if ( ( payload . exp - clockTimestamp ) < 60 ) {
91- WebSocketServer . _logger . debug ( "Token for " + cli . id + "/" + cli . user . name + "/" + cli . clientagent + " expires in less than 1 minute, send new jwt to client" ) ;
92- const tuser : TokenUser = await Message . DoSignin ( cli , null ) ;
93- if ( tuser != null ) {
94- const l : SigninMessage = new SigninMessage ( ) ;
95- cli . jwt = Crypt . createToken ( tuser , Config . shorttoken_expires_in ) ;
96- l . jwt = cli . jwt ;
97- l . user = tuser ;
98- const m : Message = new Message ( ) ; m . command = "refreshtoken" ;
99- m . data = JSON . stringify ( l ) ;
100- cli . Send ( m ) ;
101- } else {
102- cli . Close ( ) ;
84+ const span : Span = otel . startSpan ( "WebSocketServer.pingClients" ) ;
85+ try {
86+ let count : number = WebSocketServer . _clients . length ;
87+ for ( let i = WebSocketServer . _clients . length - 1 ; i >= 0 ; i -- ) {
88+ const cli : WebSocketServerClient = WebSocketServer . _clients [ i ] ;
89+ try {
90+ if ( ! NoderedUtil . IsNullEmpty ( cli . jwt ) ) {
91+ const payload = Crypt . decryptToken ( cli . jwt ) ;
92+ const clockTimestamp = Math . floor ( Date . now ( ) / 1000 ) ;
93+ if ( ( payload . exp - clockTimestamp ) < 60 ) {
94+ WebSocketServer . _logger . debug ( "Token for " + cli . id + "/" + cli . user . name + "/" + cli . clientagent + " expires in less than 1 minute, send new jwt to client" ) ;
95+ const tuser : TokenUser = await Message . DoSignin ( cli , null ) ;
96+ if ( tuser != null ) {
97+ span . addEvent ( "Token for " + cli . id + "/" + cli . user . name + "/" + cli . clientagent + " expires in less than 1 minute, send new jwt to client" ) ;
98+ const l : SigninMessage = new SigninMessage ( ) ;
99+ cli . jwt = Crypt . createToken ( tuser , Config . shorttoken_expires_in ) ;
100+ l . jwt = cli . jwt ;
101+ l . user = tuser ;
102+ const m : Message = new Message ( ) ; m . command = "refreshtoken" ;
103+ m . data = JSON . stringify ( l ) ;
104+ cli . Send ( m ) ;
105+ } else {
106+ cli . Close ( ) ;
107+ }
103108 }
104109 }
110+ } catch ( error ) {
111+ span . recordException ( error ) ;
112+ console . error ( error ) ;
113+ cli . Close ( ) ;
105114 }
106- } catch ( error ) {
107- console . error ( error ) ;
108- cli . Close ( ) ;
109- }
110- const now = new Date ( ) ;
111- const seconds = ( now . getTime ( ) - cli . lastheartbeat . getTime ( ) ) / 1000 ;
112- if ( seconds >= Config . client_heartbeat_timeout ) {
113- if ( cli . user != null ) {
114- WebSocketServer . _logger . info ( "client " + cli . id + " /" + cli . user . name + "/" + cli . clientagent + " timeout, close down" ) ;
115- } else {
116- WebSocketServer . _logger . info ( "client not signed/" + cli . id + "/" + cli . clientagent + " timeout, close down" ) ;
115+ const now = new Date ( ) ;
116+ const seconds = ( now . getTime ( ) - cli . lastheartbeat . getTime ( ) ) / 1000 ;
117+ if ( seconds >= Config . client_heartbeat_timeout ) {
118+ if ( cli . user != null ) {
119+ span . addEvent ( "client " + cli . id + "/" + cli . user . name + "/" + cli . clientagent + " timeout, close down" ) ;
120+ WebSocketServer . _logger . info ( "client " + cli . id + "/" + cli . user . name + "/" + cli . clientagent + " timeout, close down" ) ;
121+ } else {
122+ span . addEvent ( "client not signed/" + cli . id + "/" + cli . clientagent + " timeout, close down" ) ;
123+ WebSocketServer . _logger . info ( "client not signed /" + cli . id + "/" + cli . clientagent + " timeout, close down" ) ;
124+ }
125+ cli . Close ( ) ;
117126 }
118- cli . Close ( ) ;
119- }
120- cli . ping ( ) ;
121- if ( ! cli . connected ( ) && cli . queuecount ( ) == 0 && cli . streamcount ( ) == 0 ) {
122- if ( cli . user != null ) {
123- WebSocketServer . _logger . info ( "removing disconnected client " + cli . id + "/" + cli . user . name + "/" + cli . clientagent ) ;
124- } else {
125- WebSocketServer . _logger . info ( "removing disconnected client " + cli . id + "/" + cli . clientagent + " timeout, close down" ) ;
127+ cli . ping ( span ) ;
128+ if ( ! cli . connected ( ) && cli . queuecount ( ) == 0 && cli . streamcount ( ) == 0 ) {
129+ if ( cli . user != null ) {
130+ WebSocketServer . _logger . info ( "removing disconnected client " + cli . id + "/" + cli . user . name + "/" + cli . clientagent ) ;
131+ span . addEvent ( "removing disconnected client " + cli . id + "/" + cli . user . name + "/" + cli . clientagent ) ;
132+ } else {
133+ WebSocketServer . _logger . info ( "removing disconnected client " + cli . id + "/" + cli . clientagent + " timeout, close down" ) ;
134+ span . addEvent ( "removing disconnected client " + cli . id + "/" + cli . clientagent + " timeout, close down" ) ;
135+ }
136+ WebSocketServer . _clients . splice ( i , 1 ) ;
126137 }
127- WebSocketServer . _clients . splice ( i , 1 ) ;
128138 }
129- }
130- if ( count !== WebSocketServer . _clients . length ) {
131- WebSocketServer . _logger . info ( "new client count: " + WebSocketServer . _clients . length ) ;
132- }
133- // let openrpa: number = 0;
134- // this.p_online_clients.labels("openrpa").set(count);
135- const p_all = { } ;
136- for ( let i = 0 ; i < WebSocketServer . _clients . length ; i ++ ) {
137- try {
138- const cli = WebSocketServer . _clients [ i ] ;
139- if ( cli . user != null ) {
140- if ( ! NoderedUtil . IsNullEmpty ( cli . clientagent ) ) {
141- if ( ! NoderedUtil . IsNullUndefinded ( WebSocketServer . p_all ) ) {
142- if ( NoderedUtil . IsNullUndefinded ( p_all [ cli . clientagent ] ) ) p_all [ cli . clientagent ] = 0 ;
143- p_all [ cli . clientagent ] += 1 ;
139+ if ( count !== WebSocketServer . _clients . length ) {
140+ WebSocketServer . _logger . info ( "new client count: " + WebSocketServer . _clients . length ) ;
141+ span . setAttribute ( "clientcount" , WebSocketServer . _clients . length )
142+ }
143+ // let openrpa: number = 0;
144+ // this.p_online_clients.labels("openrpa").set(count);
145+ const p_all = { } ;
146+ for ( let i = 0 ; i < WebSocketServer . _clients . length ; i ++ ) {
147+ try {
148+ const cli = WebSocketServer . _clients [ i ] ;
149+ if ( cli . user != null ) {
150+ if ( ! NoderedUtil . IsNullEmpty ( cli . clientagent ) ) {
151+ if ( ! NoderedUtil . IsNullUndefinded ( WebSocketServer . p_all ) ) {
152+ if ( NoderedUtil . IsNullUndefinded ( p_all [ cli . clientagent ] ) ) p_all [ cli . clientagent ] = 0 ;
153+ p_all [ cli . clientagent ] += 1 ;
154+ }
155+ }
156+ // Lets assume only robots register queues ( not true )
157+ if ( cli . clientagent == "openrpa" ) {
158+ Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
159+ { $set : { _rpaheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
160+ Config . prometheus_measure_onlineuser , null ) ;
161+ }
162+ if ( cli . clientagent == "nodered" ) {
163+ Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
164+ { $set : { _noderedheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
165+ Config . prometheus_measure_onlineuser , null ) ;
166+ }
167+ if ( cli . clientagent == "webapp" || cli . clientagent == "aiotwebapp" ) {
168+ Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
169+ { $set : { _webheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
170+ Config . prometheus_measure_onlineuser , null ) ;
171+ }
172+ if ( cli . clientagent == "powershell" ) {
173+ Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
174+ { $set : { _powershellheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
175+ Config . prometheus_measure_onlineuser , null ) ;
176+ }
177+ if ( cli . clientagent == "mobileapp" || cli . clientagent == "aiotmobileapp" ) {
178+ Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
179+ { $set : { _webheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _mobilheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
180+ Config . prometheus_measure_onlineuser , null ) ;
181+ }
182+ else {
183+ // Should proberly turn this a little down, so we dont update all online users every 10th second
184+ Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
185+ { $set : { _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
186+ Config . prometheus_measure_onlineuser , null ) ;
144187 }
145188 }
146- // Lets assume only robots register queues ( not true )
147- if ( cli . clientagent == "openrpa" ) {
148- Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
149- { $set : { _rpaheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
150- Config . prometheus_measure_onlineuser , null ) ;
151- }
152- if ( cli . clientagent == "nodered" ) {
153- Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
154- { $set : { _noderedheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
155- Config . prometheus_measure_onlineuser , null ) ;
156- }
157- if ( cli . clientagent == "webapp" || cli . clientagent == "aiotwebapp" ) {
158- Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
159- { $set : { _webheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
160- Config . prometheus_measure_onlineuser , null ) ;
161- }
162- if ( cli . clientagent == "powershell" ) {
163- Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
164- { $set : { _powershellheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
165- Config . prometheus_measure_onlineuser , null ) ;
166- }
167- if ( cli . clientagent == "mobileapp" || cli . clientagent == "aiotmobileapp" ) {
168- Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
169- { $set : { _webheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _mobilheartbeat : new Date ( new Date ( ) . toISOString ( ) ) , _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
170- Config . prometheus_measure_onlineuser , null ) ;
171- }
172- else {
173- // Should proberly turn this a little down, so we dont update all online users every 10th second
174- Config . db . synRawUpdateOne ( "users" , { _id : cli . user . _id } ,
175- { $set : { _heartbeat : new Date ( new Date ( ) . toISOString ( ) ) } } ,
176- Config . prometheus_measure_onlineuser , null ) ;
177- }
189+ } catch ( error ) {
190+ span . recordException ( error ) ;
191+ console . error ( error ) ;
178192 }
179- } catch ( error ) {
180- console . error ( error ) ;
181193 }
182- }
183194
184- if ( ! NoderedUtil . IsNullUndefinded ( WebSocketServer . p_all ) ) {
185- WebSocketServer . p_all . clear ( ) ;
186- const keys = Object . keys ( p_all ) ;
187- keys . forEach ( key => {
188- WebSocketServer . p_all . bind ( { ...otel . defaultlabels , agent : key } ) . update ( p_all [ key ] ) ;
189- } ) ;
195+ if ( ! NoderedUtil . IsNullUndefinded ( WebSocketServer . p_all ) ) {
196+ WebSocketServer . p_all . clear ( ) ;
197+ const keys = Object . keys ( p_all ) ;
198+ keys . forEach ( key => {
199+ WebSocketServer . p_all . bind ( { ...otel . defaultlabels , agent : key } ) . update ( p_all [ key ] ) ;
200+ } ) ;
201+ }
202+ } catch ( error ) {
203+ span . recordException ( error ) ;
204+ throw error ;
205+ } finally {
206+ otel . endSpan ( span ) ;
190207 }
191208 }
192209}
0 commit comments