@@ -136,6 +136,9 @@ export class Message {
136136 case "deletenoderedpod" :
137137 await this . DeleteNoderedPod ( span ) ;
138138 break ;
139+ case "getnoderedinstance" :
140+ await this . GetNoderedInstance ( span ) ;
141+ break ;
139142 default :
140143 span . recordException ( "Unknown command " + this . command ) ;
141144 this . UnknownCommand ( ) ;
@@ -431,7 +434,13 @@ export class Message {
431434 await this . GetKubeNodeLabels ( cli ) ;
432435 break ;
433436 case "getnoderedinstance" :
434- await this . GetNoderedInstance ( cli , span ) ;
437+ this . EnsureJWT ( cli ) ;
438+ if ( Config . enable_openflow_amqp ) {
439+ cli . Send ( await QueueClient . SendForProcessing ( this , this . priority ) ) ;
440+ } else {
441+ await this . RestartNoderedInstance ( span ) ;
442+ cli . Send ( this ) ;
443+ }
435444 break ;
436445 case "getnoderedinstancelog" :
437446 await this . GetNoderedInstanceLog ( cli , span ) ;
@@ -485,6 +494,9 @@ export class Message {
485494 break ;
486495 case "pushmetrics" :
487496 break ;
497+ case "housekeeping" :
498+ await this . Housekeeping ( span ) ;
499+ break ;
488500 default :
489501 span . recordException ( "Unknown command " + command ) ;
490502 this . UnknownCommand ( ) ;
@@ -2364,22 +2376,24 @@ export class Message {
23642376 }
23652377 private static detectdocker : boolean = true ;
23662378 private static usedocker : boolean = false ;
2367- private async GetNoderedInstance ( cli : WebSocketServerClient , parent : Span ) : Promise < void > {
2379+ private async GetNoderedInstance ( parent : Span ) : Promise < void > {
23682380 await this . DetectDocker ( ) ;
23692381 if ( Message . usedocker ) {
2370- this . dockerGetNoderedInstance ( cli , parent ) ;
2382+ this . dockerGetNoderedInstance ( parent ) ;
23712383 } else {
2372- this . KubeGetNoderedInstance ( cli , parent ) ;
2384+ this . KubeGetNoderedInstance ( parent ) ;
23732385 }
23742386 }
2375- private async dockerGetNoderedInstance ( cli : WebSocketServerClient , parent : Span ) : Promise < void > {
2387+ private async dockerGetNoderedInstance ( parent : Span ) : Promise < void > {
23762388 this . Reply ( ) ;
23772389 let msg : GetNoderedInstanceMessage ;
23782390 const span : Span = Logger . otel . startSubSpan ( "message.GetNoderedInstance" , parent ) ;
23792391 try {
2380- Logger . instanse . debug ( "[" + cli . user . username + "] GetNoderedInstance" ) ;
2392+ const _tuser = Crypt . verityToken ( this . jwt ) ;
2393+
2394+ Logger . instanse . debug ( "[" + _tuser . username + "] GetNoderedInstance" ) ;
23812395 msg = GetNoderedInstanceMessage . assign ( this . data ) ;
2382- const name = await this . GetInstanceName ( msg . _id , cli . user . _id , cli . user . username , cli . jwt , span ) ;
2396+ const name = await this . GetInstanceName ( msg . _id , _tuser . _id , _tuser . username , this . jwt , span ) ;
23832397
23842398 span . addEvent ( "init Docker()" ) ;
23852399 const docker = new Docker ( ) ;
@@ -2415,27 +2429,27 @@ export class Message {
24152429 } catch ( error ) {
24162430 span . recordException ( error ) ;
24172431 this . data = "" ;
2418- await handleError ( cli , error ) ;
2432+ await handleError ( null , error ) ;
24192433 if ( msg !== null && msg !== undefined ) msg . error = error . message ? error . message : error
24202434 }
24212435 try {
24222436 this . data = JSON . stringify ( msg ) ;
24232437 } catch ( error ) {
24242438 span . recordException ( error ) ;
24252439 this . data = "" ;
2426- await handleError ( cli , error ) ;
2440+ await handleError ( null , error ) ;
24272441 }
24282442 Logger . otel . endSpan ( span ) ;
2429- this . Send ( cli ) ;
24302443 }
2431- private async KubeGetNoderedInstance ( cli : WebSocketServerClient , parent : Span ) : Promise < void > {
2444+ private async KubeGetNoderedInstance ( parent : Span ) : Promise < void > {
24322445 this . Reply ( ) ;
24332446 let msg : GetNoderedInstanceMessage ;
24342447 const span : Span = Logger . otel . startSubSpan ( "message.GetNoderedInstance" , parent ) ;
24352448 try {
2436- Logger . instanse . debug ( "[" + cli . user . username + "] GetNoderedInstance" ) ;
2449+ const _tuser = Crypt . verityToken ( this . jwt ) ;
2450+ Logger . instanse . debug ( "[" + _tuser . username + "] GetNoderedInstance" ) ;
24372451 msg = GetNoderedInstanceMessage . assign ( this . data ) ;
2438- const name = await this . GetInstanceName ( msg . _id , cli . user . _id , cli . user . username , cli . jwt , span ) ;
2452+ const name = await this . GetInstanceName ( msg . _id , _tuser . _id , _tuser . username , this . jwt , span ) ;
24392453 const namespace = Config . namespace ;
24402454 const list = await KubeUtil . instance ( ) . CoreV1Api . listNamespacedPod ( namespace ) ;
24412455 msg . result = null ;
@@ -2459,7 +2473,7 @@ export class Message {
24592473 if ( ( image . indexOf ( "openflownodered" ) > - 1 || image . indexOf ( "openiap/nodered" ) > - 1 ) && ! NoderedUtil . IsNullEmpty ( userid ) ) {
24602474 try {
24612475 if ( billed != "true" && diffhours > 24 ) {
2462- Logger . instanse . debug ( "[" + cli . user . username + "] Remove un billed nodered instance " + itemname + " that has been running for " + diffhours + " hours" ) ;
2476+ Logger . instanse . debug ( "[" + _tuser . username + "] Remove un billed nodered instance " + itemname + " that has been running for " + diffhours + " hours" ) ;
24632477 await this . _DeleteNoderedInstance ( userid , rootjwt , span ) ;
24642478 }
24652479 } catch ( error ) {
@@ -2472,7 +2486,7 @@ export class Message {
24722486 }
24732487 }
24742488 }
2475- if ( ! NoderedUtil . IsNullEmpty ( msg . name ) && item . metadata . name == msg . name && cli . user . HasRoleName ( "admins" ) ) {
2489+ if ( ! NoderedUtil . IsNullEmpty ( msg . name ) && item . metadata . name == msg . name && _tuser . HasRoleName ( "admins" ) ) {
24762490 found = item ;
24772491 var metrics : any = null ;
24782492 try {
@@ -2485,7 +2499,7 @@ export class Message {
24852499 found = item ;
24862500 if ( item . status . phase != "Failed" ) {
24872501 msg . result = item ;
2488- Logger . instanse . debug ( "[" + cli . user . username + "] GetNoderedInstance: " + name + " found one" ) ;
2502+ Logger . instanse . debug ( "[" + _tuser . username + "] GetNoderedInstance: " + name + " found one" ) ;
24892503 }
24902504 var metrics : any = null ;
24912505 try {
@@ -2498,23 +2512,22 @@ export class Message {
24982512 }
24992513 if ( msg . result == null ) msg . result = found ;
25002514 } else {
2501- Logger . instanse . warn ( "[" + cli . user . username + "] GetNoderedInstance: found NO Namespaced Pods ???" ) ;
2515+ Logger . instanse . warn ( "[" + _tuser . username + "] GetNoderedInstance: found NO Namespaced Pods ???" ) ;
25022516 }
25032517 } catch ( error ) {
25042518 span . recordException ( error ) ;
25052519 this . data = "" ;
2506- await handleError ( cli , error ) ;
2520+ await handleError ( null , error ) ;
25072521 if ( msg !== null && msg !== undefined ) msg . error = error . message ? error . message : error
25082522 }
25092523 try {
25102524 this . data = JSON . stringify ( msg ) ;
25112525 } catch ( error ) {
25122526 span . recordException ( error ) ;
25132527 this . data = "" ;
2514- await handleError ( cli , error ) ;
2528+ await handleError ( null , error ) ;
25152529 }
25162530 Logger . otel . endSpan ( span ) ;
2517- this . Send ( cli ) ;
25182531 }
25192532 private async GetNoderedInstanceLog ( cli : WebSocketServerClient , parent : Span ) : Promise < void > {
25202533 await this . DetectDocker ( ) ;
@@ -3484,6 +3497,98 @@ export class Message {
34843497 }
34853498 this . Send ( cli ) ;
34863499 }
3500+ private async Housekeeping ( parent : Span ) : Promise < void > {
3501+ const span : Span = Logger . otel . startSubSpan ( "message.QueueMessage" , parent ) ;
3502+ try {
3503+ await this . GetNoderedInstance ( span )
3504+ } catch ( error ) {
3505+ }
3506+ try {
3507+ await Config . db . ensureindexes ( span ) ;
3508+ } catch ( error ) {
3509+ }
3510+ try {
3511+ const jwt : string = Crypt . rootToken ( ) ;
3512+ const timestamp = new Date ( new Date ( ) . toISOString ( ) ) ;
3513+ timestamp . setUTCHours ( 0 , 0 , 0 , 0 ) ;
3514+ const collections = await Config . db . ListCollections ( jwt ) ;
3515+ for ( let col of collections ) {
3516+ Config . db . db . collection ( "dbusage" ) . deleteMany ( { timestamp : timestamp , collection : col . name } ) ;
3517+ let aggregates : any = [
3518+ {
3519+ "$project" : {
3520+ "_modifiedbyid" : 1 ,
3521+ "_modifiedby" : 1 ,
3522+ "object_size" : { "$bsonSize" : "$$ROOT" }
3523+ }
3524+ } ,
3525+ {
3526+ "$group" : {
3527+ "_id" : "$_modifiedbyid" ,
3528+ "size" : { "$sum" : "$object_size" } ,
3529+ "name" : { "$max" : "$_modifiedby" }
3530+ }
3531+ } ,
3532+ { $addFields : { "userid" : "$_id" } } ,
3533+ { $unset : "_id" } ,
3534+ { $addFields : { "collection" : col . name } } ,
3535+ { $addFields : { timestamp : timestamp . toISOString ( ) } } ,
3536+ ] ;
3537+ if ( col . name == "fs.files" ) {
3538+ aggregates = [
3539+ {
3540+ "$project" : {
3541+ "_modifiedbyid" : "$metadata._modifiedbyid" ,
3542+ "_modifiedby" : "$metadata._modifiedby" ,
3543+ "object_size" : "$length"
3544+ }
3545+ } ,
3546+ {
3547+ "$group" : {
3548+ "_id" : "$_modifiedbyid" ,
3549+ "size" : { "$sum" : "$object_size" } ,
3550+ "name" : { "$max" : "$_modifiedby" }
3551+ }
3552+ } ,
3553+ { $addFields : { "userid" : "$_id" } } ,
3554+ { $unset : "_id" } ,
3555+ { $addFields : { "collection" : col . name } } ,
3556+ { $addFields : { timestamp : timestamp . toISOString ( ) } } ,
3557+ ]
3558+ }
3559+ if ( col . name == "fs.files" ) {
3560+ aggregates = [
3561+ {
3562+ "$project" : {
3563+ "userid" : 1 ,
3564+ "name" : 1 ,
3565+ "object_size" : { "$bsonSize" : "$$ROOT" }
3566+ }
3567+ } ,
3568+ {
3569+ "$group" : {
3570+ "_id" : "$userid" ,
3571+ "size" : { "$sum" : "$object_size" } ,
3572+ "name" : { "$max" : "$name" }
3573+ }
3574+ } ,
3575+ { $addFields : { "userid" : "$_id" } } ,
3576+ { $unset : "_id" } ,
3577+ { $addFields : { "collection" : col . name } } ,
3578+ { $addFields : { timestamp : timestamp . toISOString ( ) } } ,
3579+ ]
3580+ }
3581+
3582+ const items : any [ ] = await Config . db . db . collection ( col . name ) . aggregate ( aggregates ) . toArray ( ) ;
3583+ let bulkInsert = Config . db . db . collection ( "dbusage" ) . initializeUnorderedBulkOp ( ) ;
3584+ items . forEach ( item => bulkInsert . insert ( item ) ) ;
3585+ bulkInsert . execute ( ) ;
3586+ }
3587+ } catch ( error ) {
3588+
3589+ }
3590+ Logger . otel . endSpan ( span ) ;
3591+ }
34873592}
34883593
34893594export class JSONfn {
0 commit comments