Skip to content

Commit 2bc196c

Browse files
authored
Merge pull request openiap#5 from skadefro/master
fixes
2 parents bac82ea + 9dd1ca3 commit 2bc196c

12 files changed

Lines changed: 83 additions & 69 deletions

File tree

.vscode/launch.json

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,6 @@
11
{
22
"version": "0.2.0",
33
"configurations": [
4-
{
5-
"type": "node",
6-
"request": "attach",
7-
"name": "Attach to aiotdevred.openrpa.dk",
8-
"address": "aiotdevred.openrpa.dk",
9-
"port": 9229,
10-
"localRoot": "${workspaceFolder}",
11-
"remoteRoot": "/data"
12-
},
13-
{
14-
"type": "node",
15-
"request": "attach",
16-
"name": "Attach to Remote",
17-
"address": "aiotdevred.openrpa.dk",
18-
"port": 5858,
19-
"localRoot": "${workspaceFolder}/OpenFlowNodeRED/dist",
20-
"remoteRoot": "/data"
21-
},
22-
{
23-
"name": "Launch index.html",
24-
"type": "chrome",
25-
"request": "launch",
26-
"url": "https://localhost.openrpa.dk:3000/",
27-
"webRoot": "${workspaceFolder}/dist/Public/index.html"
28-
},
294
{
305
"args": [],
316
"cwd": "${workspaceRoot}",
@@ -47,6 +22,13 @@
4722
"stopOnEntry": false,
4823
"type": "node"
4924
},
25+
{
26+
"name": "Launch index.html",
27+
"type": "chrome",
28+
"request": "launch",
29+
"url": "https://localhost.openrpa.dk:3000/",
30+
"webRoot": "${workspaceFolder}/dist/Public/index.html"
31+
},
5032
{
5133
"args": [],
5234
"cwd": "${workspaceRoot}",

OpenFlow/src/LoginProvider.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,19 @@ export class LoginProvider {
131131
res.end(error);
132132
}
133133
});
134+
app.get("/config", (req: any, res: any, next: any): void => {
135+
var _url:string = "";
136+
if(url.parse(baseurl).protocol == "http:") {
137+
_url = "ws://" + url.parse(baseurl).host;
138+
} else {
139+
_url = "wss://" + url.parse(baseurl).host;
140+
}
141+
_url += "/";
142+
var res2 = {
143+
wshost: _url
144+
}
145+
res.end(JSON.stringify(res2));
146+
});
134147
app.get("/loginproviders", async (req: any, res: any, next: any): Promise<void> => {
135148
LoginProvider.login_providers = await Config.db.query<Provider>({_type: "provider"}, null, 10, 0, null, "config", TokenUser.rootToken());
136149
var result:any[] = [];

OpenFlow/src/WebSocketClient.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ export class WebSocketClient {
115115
}
116116
public async sendToQueue(msg:QueueMessage) {
117117
if(this.consumers.length === 0) { throw new Error("No consumers for client available to send message through") }
118-
this.consumers[0].sendToQueue(msg.queuename, msg.correlationId, msg.data);
118+
var result = this.consumers[0].sendToQueue(msg.queuename, msg.correlationId,{payload: msg.data, jwt: this.jwt});
119119
}
120120
async OnMessage(sender: amqp_consumer, msg: amqplib.ConsumeMessage ) {
121121
try {
@@ -222,8 +222,9 @@ export class WebSocketClient {
222222

223223

224224
async Queue(data:string, replyTo: string, correlationId:string, queuename:string):Promise<any[]> {
225+
var d:any = JSON.parse(data);
225226
var q: QueueMessage = new QueueMessage();
226-
q.data = data; q.replyto = replyTo;
227+
q.data = d.payload; q.replyto = replyTo;
227228
q.correlationId = correlationId; q.queuename = queuename;
228229
let m: Message = Message.fromcommand("queuemessage");
229230
if(q.correlationId === undefined || q.correlationId === null || q.correlationId === "") { q.correlationId = m.id; }

OpenFlow/src/amqp_consumer.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ export class amqp_consumer {
3232
sender._logger.info("OnMessage " + msg.content.toString());
3333
}
3434
sendToQueue(replyto: string, correlationId:string, data: any) {
35+
if (typeof data !== 'string' && !(data instanceof String)) {
36+
data = JSON.stringify(data);
37+
}
3538
this._logger.info("SendMessage " + data);
3639
//this.channel.publish( this.exchange, "", Buffer.from(msg));
3740
this.channel.sendToQueue(replyto, Buffer.from(data), { correlationId: correlationId, replyTo: this._ok.queue });

OpenFlow/src/public/CommonControllers.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,12 @@ module openflow {
5151
var msg: Message = new Message(); msg.command = "deleteone"; msg.data = JSON.stringify(q);
5252
q = await this.WebSocketClient.Send<DeleteOneMessage>(msg);
5353
}
54-
54+
async RegisterQueue(queuename:string = undefined): Promise<void> {
55+
var q: RegisterQueueMessage = new RegisterQueueMessage();
56+
q.queuename = queuename;
57+
var msg:Message = new Message(); msg.command = "registerqueue"; msg.data = JSON.stringify(q);
58+
await this.WebSocketClient.Send(msg);
59+
}
5560
async _QueueMessage(queuename: string, data: any):Promise<QueueMessage> {
5661
return new Promise<QueueMessage>(async (resolve, reject) => {
5762
var q: QueueMessage = new QueueMessage();

OpenFlow/src/public/Controllers.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -406,16 +406,18 @@ module openflow {
406406
public api:api
407407
) {
408408
WebSocketClient.onSignedin(async (user: TokenUser) => {
409-
var q: RegisterQueueMessage = new RegisterQueueMessage();
410-
var msg:Message = new Message(); msg.command = "registerqueue"; msg.data = JSON.stringify(q);
411-
await this.WebSocketClient.Send(msg);
409+
await api.RegisterQueue();
412410
});
413411
}
414412

415413
async SendOne():Promise<void> {
416-
var result:any = await this.api.QueueMessage("webtest", {payload: "hi mom"});
417-
var msg = JSON.parse(result.data);
418-
this.messages += msg.payload + "\n";
414+
var result:any = await this.api.QueueMessage("webtest", {"payload": "hi mom"});
415+
console.log(result);
416+
try {
417+
result = JSON.parse(result);
418+
} catch (error) {
419+
}
420+
this.messages += result.payload + "\n";
419421
if (!this.$scope.$$phase) { this.$scope.$apply(); }
420422
}
421423
}

OpenFlow/src/public/WebSocketClient.ts

Lines changed: 14 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -36,25 +36,18 @@ module openflow {
3636
private _sendQueue: SocketMessage[] = [];
3737
public user:TokenUser = null;
3838
public jwt:string = null;
39-
static $inject = ["$rootScope", "$location"];
39+
static $inject = ["$rootScope", "$location", "$window"];
4040
public messageQueue: IHashTable<QueuedMessage> = {};
41-
constructor(public $rootScope:ng.IRootScopeService, public $location) {
42-
var url:string = window.location.href;
43-
var arr:string[] = url.split("/");
44-
var result:string = arr[0] + "//" + arr[2];
45-
if(arr[0] === "http:") {
46-
result = "ws://" + arr[2];
47-
} else {
48-
result = "wss://" + arr[2];
49-
}
50-
this._url = result;
51-
console.log("WebSocketClient::onopen: connecting to " + result);
52-
this._socketObject = new ReconnectingWebSocket(result);
53-
this._socketObject.onopen = (this.onopen).bind(this);
54-
this._socketObject.onmessage = (this.onmessage).bind(this);
55-
this._socketObject.onclose = (this.onclose).bind(this);
56-
this._socketObject.onerror = (this.onerror).bind(this);
57-
WebSocketClient.instance = this;
41+
constructor(public $rootScope:ng.IRootScopeService, public $location, public $window:any) {
42+
this.getJSON("/config", async (error:any, data:any) => {
43+
console.debug("WebSocketClient::onopen: connecting to " + data.wshost);
44+
this._socketObject = new ReconnectingWebSocket(data.wshost);
45+
this._socketObject.onopen = (this.onopen).bind(this);
46+
this._socketObject.onmessage = (this.onmessage).bind(this);
47+
this._socketObject.onclose = (this.onclose).bind(this);
48+
this._socketObject.onerror = (this.onerror).bind(this);
49+
WebSocketClient.instance = this;
50+
});
5851
}
5952
public connect():void {
6053
}
@@ -86,20 +79,13 @@ module openflow {
8679
private async onopen(evt: Event):Promise<void> {
8780
console.log("WebSocketClient::onopen: connected");
8881
var me:WebSocketClient = WebSocketClient.instance;
89-
// me.communication = "Conenction opened, signing in" + "<br/>" + me.communication;
90-
// var message:any = { command: "signin", data: {username: "az", password: "az"}};
91-
// me._socketObject.send(JSON.stringify(message));
9282
var q:SigninMessage = new SigninMessage();
9383
this.getJSON("/jwt", async (error:any, data:any) => {
9484
try {
9585
if(data===null || data ===undefined || data.jwt === "") {
9686
if(this.$location.path() !=="/Login") {
9787
console.log("path: " + this.$location.path());
9888
console.log("WebSocketClient::onopen: Not signed in, redirect /Login");
99-
// var url:string = window.location.href;
100-
// var arr:string[] = url.split("/");
101-
// var result:string = arr[0] + "//" + arr[2];
102-
// top.location.href = result + "/#Login";
10389
this.$location.path("/Login");
10490
this.$rootScope.$apply();
10591
}
@@ -111,6 +97,7 @@ module openflow {
11197
} catch (error) {
11298
}
11399
q.jwt = data.jwt;
100+
q.rawAssertion = data.rawAssertion;
114101
q.realm = "browser";
115102
console.log("WebSocketClient::onopen: Validate jwt");
116103
if(_android!=null) {
@@ -136,16 +123,14 @@ module openflow {
136123
console.error(error);
137124
this.$location.path("/Login");
138125
this.$rootScope.$apply();
139-
}
126+
}
140127
});
141128
}
142129
private onclose(evt: CloseEvent):void {
143130
var me:WebSocketClient = WebSocketClient.instance;
144-
// me.communication = "Conenction Closed " + evt.code + " " + evt.type + "<br/>" + me.communication;
145131
}
146132
private onerror(evt: ErrorEvent):void {
147133
var me:WebSocketClient = WebSocketClient.instance;
148-
// me.communication = "Error Occured " + evt.message + "<br/>" + me.communication;
149134
}
150135
private onmessage(evt: MessageEvent):void {
151136
var me:WebSocketClient = WebSocketClient.instance;
@@ -156,7 +141,7 @@ module openflow {
156141
public async Send<T>(message: Message):Promise<T> {
157142
return new Promise<T>(async (resolve, reject) => {
158143
this._Send(message, ((msg)=> {
159-
if(msg.error!==null && msg.error !== undefined) { return reject(msg.error); }
144+
if(msg.error!==null && msg.error !== undefined) { console.log(message); return reject(msg.error); }
160145
resolve(msg);
161146
}).bind(this));
162147
});
@@ -179,9 +164,6 @@ module openflow {
179164
if(message.replyto === null || message.replyto === undefined) {
180165
this.messageQueue[message.id] = new QueuedMessage(message, cb);
181166
}
182-
// setTimeout(() => {
183-
// this.ProcessQueue();
184-
// }, 500);
185167
this.ProcessQueue();
186168
}
187169
public chunkString(str: string, length: number): string[] {

OpenFlowNodeRED/src/amqp_consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ export class amqp_consumer {
3333
}
3434
private _OnMessage(sender: amqp_consumer, msg: amqplib.ConsumeMessage): void {
3535
try {
36-
sender._logger.info("OnMessage " + msg.content.toString());
36+
// sender._logger.info("OnMessage " + msg.content.toString());
3737
if(this.OnMessage!==null && this.OnMessage!==undefined) {
3838
if(!this.noAck || (msg.properties.replyTo!== null && msg.properties.replyTo!== undefined)) {
3939
this.OnMessage(msg, (result) => {

OpenFlowNodeRED/src/nodered/nodes/amqp_nodes.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,10 @@ export class amqp_consumer_node {
8181
result.amqpacknowledgment = ack;
8282

8383
var data = JSON.parse(msg.content.toString());
84+
try {
85+
data.payload = JSON.parse(data.payload);
86+
} catch (error) {
87+
}
8488
result.payload = data.payload;
8589
result.jwt = data.jwt;
8690
this.node.send(result);

VERSION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.0.71
1+
0.0.73

0 commit comments

Comments
 (0)