@@ -11,7 +11,7 @@ import { Readable, Stream } from "stream";
1111import { GridFSBucket , ObjectID , Db , Cursor , MongoNetworkError } from "mongodb" ;
1212import * as path from "path" ;
1313import { DatabaseConnection } from "../DatabaseConnection" ;
14- import { StripeMessage , EnsureStripeCustomerMessage , NoderedUtil , QueuedMessage , RegisterQueueMessage , QueueMessage , CloseQueueMessage , ListCollectionsMessage , DropCollectionMessage , QueryMessage , AggregateMessage , InsertOneMessage , UpdateOneMessage , Base , UpdateManyMessage , InsertOrUpdateOneMessage , DeleteOneMessage , MapReduceMessage , SigninMessage , TokenUser , User , Rights , EnsureNoderedInstanceMessage , DeleteNoderedInstanceMessage , DeleteNoderedPodMessage , RestartNoderedInstanceMessage , GetNoderedInstanceMessage , GetNoderedInstanceLogMessage , SaveFileMessage , WellknownIds , GetFileMessage , UpdateFileMessage , CreateWorkflowInstanceMessage , RegisterUserMessage , NoderedUser , WatchMessage , GetDocumentVersionMessage , DeleteManyMessage , InsertManyMessage } from "openflow-api" ;
14+ import { StripeMessage , EnsureStripeCustomerMessage , NoderedUtil , QueuedMessage , RegisterQueueMessage , QueueMessage , CloseQueueMessage , ListCollectionsMessage , DropCollectionMessage , QueryMessage , AggregateMessage , InsertOneMessage , UpdateOneMessage , Base , UpdateManyMessage , InsertOrUpdateOneMessage , DeleteOneMessage , MapReduceMessage , SigninMessage , TokenUser , User , Rights , EnsureNoderedInstanceMessage , DeleteNoderedInstanceMessage , DeleteNoderedPodMessage , RestartNoderedInstanceMessage , GetNoderedInstanceMessage , GetNoderedInstanceLogMessage , SaveFileMessage , WellknownIds , GetFileMessage , UpdateFileMessage , CreateWorkflowInstanceMessage , RegisterUserMessage , NoderedUser , WatchMessage , GetDocumentVersionMessage , DeleteManyMessage , InsertManyMessage , GetKubeNodeLabels } from "openflow-api" ;
1515import { Billing , stripe_customer , stripe_base , stripe_list , StripeAddPlanMessage , StripeCancelPlanMessage , stripe_subscription , stripe_subscription_item , stripe_plan , stripe_coupon } from "openflow-api" ;
1616import { V1ResourceRequirements , V1Deployment } from "@kubernetes/client-node" ;
1717import { amqpwrapper } from "../amqpwrapper" ;
@@ -167,6 +167,9 @@ export class Message {
167167 case "restartnoderedinstance" :
168168 this . RestartNoderedInstance ( cli ) ;
169169 break ;
170+ case "getkubenodelabels" :
171+ this . GetKubeNodeLabels ( cli ) ;
172+ break ;
170173 case "getnoderedinstance" :
171174 this . GetNoderedInstance ( cli ) ;
172175 break ;
@@ -951,7 +954,7 @@ export class Message {
951954 let msg : EnsureNoderedInstanceMessage ;
952955 try {
953956 msg = EnsureNoderedInstanceMessage . assign ( this . data ) ;
954- await this . _EnsureNoderedInstance ( cli , msg . _id , false ) ;
957+ await this . _EnsureNoderedInstance ( cli , msg . _id , false , msg . labels ) ;
955958 } catch ( error ) {
956959 this . data = "" ;
957960 cli . _logger . error ( error ) ;
@@ -966,7 +969,7 @@ export class Message {
966969 }
967970 this . Send ( cli ) ;
968971 }
969- private async _EnsureNoderedInstance ( cli : WebSocketServerClient , _id : string , skipcreate : boolean ) : Promise < void > {
972+ private async _EnsureNoderedInstance ( cli : WebSocketServerClient , _id : string , skipcreate : boolean , labels : any ) : Promise < void > {
970973 let user : NoderedUser ;
971974 cli . _logger . debug ( "[" + cli . user . username + "] EnsureNoderedInstance" ) ;
972975 if ( _id === null || _id === undefined || _id === "" ) _id = cli . user . _id ;
@@ -1105,6 +1108,17 @@ export class Message {
11051108 }
11061109 }
11071110 }
1111+ if ( _deployment && labels && Config . nodered_allow_nodeselector ) {
1112+ if ( typeof labels === "string" ) {
1113+ var item = JSON . parse ( labels ) ;
1114+ var spec : any = _deployment . spec . template . spec ;
1115+ const keys = Object . keys ( item ) ;
1116+ if ( spec . nodeSelector == null ) spec . nodeSelector = { } ;
1117+ keys . forEach ( key => {
1118+ spec . nodeSelector [ key ] = item [ key ] ;
1119+ } )
1120+ }
1121+ }
11081122 try {
11091123 await KubeUtil . instance ( ) . AppsV1Api . createNamespacedDeployment ( namespace , ( _deployment as any ) ) ;
11101124 Audit . NoderedAction ( TokenUser . From ( cli . user ) , true , name , "createdeployment" , Config . nodered_image , null ) ;
@@ -1304,7 +1318,7 @@ export class Message {
13041318 }
13051319 }
13061320 } else {
1307- cli . _logger . warn ( "[" + cli . user . username + "] GetNoderedInstance : found NO Namespaced Pods ???" ) ;
1321+ cli . _logger . warn ( "[" + cli . user . username + "] DeleteNoderedPod : found NO Namespaced Pods ???" ) ;
13081322 Audit . NoderedAction ( TokenUser . From ( cli . user ) , false , null , "deletepod" , image , msg . name ) ;
13091323 }
13101324 } catch ( error ) {
@@ -1360,6 +1374,38 @@ export class Message {
13601374 }
13611375 this . Send ( cli ) ;
13621376 }
1377+ private async GetKubeNodeLabels ( cli : WebSocketServerClient ) : Promise < void > {
1378+ this . Reply ( ) ;
1379+ let msg : GetKubeNodeLabels ;
1380+ try {
1381+ cli . _logger . debug ( "[" + cli . user . username + "] GetKubeNodeLabels" ) ;
1382+ msg = GetKubeNodeLabels . assign ( this . data ) ;
1383+ const list = await KubeUtil . instance ( ) . CoreV1Api . listNode ( ) ;
1384+ const result : any = { } ;
1385+ if ( list != null ) {
1386+ list . body . items . forEach ( node => {
1387+ if ( node . metadata && node . metadata . labels ) {
1388+ const keys = Object . keys ( node . metadata . labels ) ;
1389+ keys . forEach ( key => {
1390+ result [ key ] = node . metadata . labels [ key ] ;
1391+ } ) ;
1392+ }
1393+ } ) ;
1394+ }
1395+ msg . result = result ;
1396+ } catch ( error ) {
1397+ this . data = "" ;
1398+ cli . _logger . error ( error ) ;
1399+ if ( msg !== null && msg !== undefined ) msg . error = error . message ? error . message : error
1400+ }
1401+ try {
1402+ this . data = JSON . stringify ( msg ) ;
1403+ } catch ( error ) {
1404+ this . data = "" ;
1405+ cli . _logger . error ( error ) ;
1406+ }
1407+ this . Send ( cli ) ;
1408+ }
13631409 private async GetNoderedInstance ( cli : WebSocketServerClient ) : Promise < void > {
13641410 this . Reply ( ) ;
13651411 let msg : GetNoderedInstanceMessage ;
@@ -1435,7 +1481,7 @@ export class Message {
14351481 this . Reply ( ) ;
14361482 let msg : GetNoderedInstanceLogMessage ;
14371483 try {
1438- cli . _logger . debug ( "[" + cli . user . username + "] GetNoderedInstance " ) ;
1484+ cli . _logger . debug ( "[" + cli . user . username + "] GetNoderedInstanceLog " ) ;
14391485 msg = GetNoderedInstanceLogMessage . assign ( this . data ) ;
14401486 const name = await this . GetInstanceName ( msg . _id , cli . user . _id , cli . user . username , cli . jwt ) ;
14411487 const namespace = Config . namespace ;
@@ -1452,12 +1498,12 @@ export class Message {
14521498
14531499 }
14541500 if ( ! NoderedUtil . IsNullEmpty ( msg . name ) && item . metadata . name == msg . name && cli . user . HasRoleName ( "admins" ) ) {
1455- cli . _logger . debug ( "[" + cli . user . username + "] GetNoderedInstance :" + name + " found one as " + item . metadata . name ) ;
1501+ cli . _logger . debug ( "[" + cli . user . username + "] GetNoderedInstanceLog :" + name + " found one as " + item . metadata . name ) ;
14561502 const obj = await await KubeUtil . instance ( ) . CoreV1Api . readNamespacedPodLog ( item . metadata . name , namespace , "" , false ) ;
14571503 msg . result = obj . body ;
14581504 Audit . NoderedAction ( TokenUser . From ( cli . user ) , true , name , "readpodlog" , image , item . metadata . name ) ;
14591505 } else if ( item . metadata . labels . app === name ) {
1460- cli . _logger . debug ( "[" + cli . user . username + "] GetNoderedInstance :" + name + " found one as " + item . metadata . name ) ;
1506+ cli . _logger . debug ( "[" + cli . user . username + "] GetNoderedInstanceLog :" + name + " found one as " + item . metadata . name ) ;
14611507 const obj = await await KubeUtil . instance ( ) . CoreV1Api . readNamespacedPodLog ( item . metadata . name , namespace , "" , false ) ;
14621508 msg . result = obj . body ;
14631509 Audit . NoderedAction ( TokenUser . From ( cli . user ) , true , name , "readpodlog" , image , item . metadata . name ) ;
@@ -2090,7 +2136,7 @@ export class Message {
20902136 if ( billing . memory != newmemory ) {
20912137 billing . memory = newmemory ;
20922138 billing = await Config . db . _UpdateOne ( null , billing , "users" , 3 , true , rootjwt ) ;
2093- this . _EnsureNoderedInstance ( cli , msg . userid , true ) ;
2139+ this . _EnsureNoderedInstance ( cli , msg . userid , true , null ) ;
20942140 }
20952141 if ( customer != null && ! NoderedUtil . IsNullEmpty ( billing . coupon ) && customer . discount != null ) {
20962142 if ( billing . coupon != customer . discount . coupon . name ) {
0 commit comments