Skip to content

Commit f5d0f5f

Browse files
committed
Add support for messagequeues in web
1 parent 36c8bb8 commit f5d0f5f

16 files changed

Lines changed: 342 additions & 207 deletions

OpenFlow/readme.md

Lines changed: 0 additions & 3 deletions
This file was deleted.

OpenFlow/src/Messages/Message.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,11 @@ export class Message {
5353
if(this.replyto!==null && this.replyto!==undefined) {
5454
var qmsg:QueuedMessage = cli.messageQueue[this.replyto];
5555
if(qmsg!==undefined && qmsg !== null) {
56-
qmsg.message = Object.assign(qmsg.message, JSON.parse(this.data));
56+
try {
57+
qmsg.message = Object.assign(qmsg.message, JSON.parse(this.data));
58+
} catch (error) {
59+
// TODO: should we set message to data ?
60+
}
5761
if(qmsg.cb!==undefined && qmsg.cb !== null) { qmsg.cb(qmsg.message); }
5862
delete cli.messageQueue[this.id];
5963
}

OpenFlow/src/WebSocketClient.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,12 @@ export class WebSocketClient {
8383
this.consumers = [];
8484
}
8585
public async CreateConsumer(queuename:string):Promise<void> {
86+
var autoDelete:boolean = false;
87+
if(queuename===null || queuename === undefined || queuename === "") { queuename = "web." + Math.random().toString(36).substr(2, 9); autoDelete = true; }
8688
var consumer = new amqp_consumer(this._logger, Config.amqp_url, queuename);
8789
consumer.OnMessage = this.OnMessage.bind(this);
8890
this.consumers.push(consumer);
89-
await consumer.connect(false);
91+
await consumer.connect(false, true);
9092
}
9193
public async CloseConsumer(queuename:string):Promise<void> {
9294
var index = -1;
@@ -188,7 +190,6 @@ export class WebSocketClient {
188190
});
189191
}
190192
private _Send(message: Message, cb: QueuedMessageCallback):void {
191-
// console.log("SEND:::" + message.command);
192193
var messages: string[] = this.chunkString(message.data, 500);
193194
if(messages===null || messages===undefined || messages.length === 0) {
194195
var singlemessage: SocketMessage = SocketMessage.frommessage(message, "", 1, 0);
@@ -225,7 +226,7 @@ export class WebSocketClient {
225226
q.data = data; q.replyto = replyTo;
226227
q.correlationId = correlationId; q.queuename = queuename;
227228
let m: Message = Message.fromcommand("queuemessage");
228-
q.correlationId = m.id;
229+
if(q.correlationId === undefined || q.correlationId === null || q.correlationId === "") { q.correlationId = m.id; }
229230
m.data = JSON.stringify(q);
230231
q = await this.Send<QueueMessage>(m);
231232
return q.data;

OpenFlow/src/amqp_consumer.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ export class amqp_consumer {
1515
this.queue = queue;
1616
this.connectionstring = connectionstring;
1717
}
18-
async connect(autoack:boolean): Promise<void> {
18+
async connect(autoack:boolean, autoDelete:boolean): Promise<void> {
1919
var me:amqp_consumer = this;
2020
this.conn = await amqplib.connect(this.connectionstring);
2121
this.conn.on("error", () => null);
2222
this.channel = await this.conn.createChannel();
23-
this._ok = await this.channel.assertQueue(this.queue, { durable: false });
23+
this._ok = await this.channel.assertQueue(this.queue, { durable: false, autoDelete: autoDelete });
2424
await this.channel.consume(this.queue, (msg)=> { this.OnMessage(me, msg); }, { noAck: autoack });
2525
this._logger.info("Connected to " + this.connectionstring);
2626
}

OpenFlow/src/public/CommonControllers.ts

Lines changed: 73 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,69 @@
11
module openflow {
22
"use strict";
3+
4+
5+
class messagequeue {
6+
constructor(
7+
public msg: QueueMessage,
8+
public callback: any) {}
9+
}
10+
interface IHashTable<T> {
11+
[key: string]: T;
12+
}
13+
export class api {
14+
static $inject = ["$rootScope", "$location", "WebSocketClient"];
15+
public messageQueue: IHashTable<messagequeue> = {};
16+
constructor(public $rootScope:ng.IRootScopeService, public $location, public WebSocketClient:WebSocketClient) {
17+
18+
var cleanup = $rootScope.$on('queuemessage', (event, data:QueueMessage) => {
19+
if (event && data) { }
20+
if(this.messageQueue[data.correlationId] !== undefined) {
21+
this.messageQueue[data.correlationId].callback(data);
22+
delete this.messageQueue[data.correlationId];
23+
}
24+
});
25+
}
26+
async Insert(collection:string, model: any): Promise<any> {
27+
var q: InsertOneMessage = new InsertOneMessage();
28+
q.collectionname = collection; q.item = model;
29+
var msg: Message = new Message(); msg.command = "insertone"; msg.data = JSON.stringify(q);
30+
q = await this.WebSocketClient.Send<InsertOneMessage>(msg);
31+
return q.result;
32+
}
33+
async Update(collection:string, model: any): Promise<any> {
34+
var q: UpdateOneMessage = new UpdateOneMessage();
35+
q.collectionname = collection; q.item = model;
36+
var msg: Message = new Message(); msg.command = "updateone"; msg.data = JSON.stringify(q);
37+
q = await this.WebSocketClient.Send<UpdateOneMessage>(msg);
38+
return q.result;
39+
}
40+
async Delete(collection:string, model: any): Promise<void> {
41+
var q: DeleteOneMessage = new DeleteOneMessage();
42+
q.collectionname = collection; q._id = model._id;
43+
var msg: Message = new Message(); msg.command = "deleteone"; msg.data = JSON.stringify(q);
44+
q = await this.WebSocketClient.Send<DeleteOneMessage>(msg);
45+
}
46+
47+
async _QueueMessage(queuename: string, data: any):Promise<QueueMessage> {
48+
return new Promise<QueueMessage>(async (resolve, reject) => {
49+
var q: QueueMessage = new QueueMessage();
50+
q.correlationId = Math.random().toString(36).substr(2, 9);
51+
q.queuename = queuename; q.data = JSON.stringify(data);
52+
var msg:Message = new Message(); msg.command = "queuemessage"; msg.data = JSON.stringify(q);
53+
this.messageQueue[q.correlationId] = new messagequeue(q, (msgresult:QueueMessage)=> {
54+
resolve(msgresult);
55+
});
56+
await this.WebSocketClient.Send(msg);
57+
});
58+
}
59+
async QueueMessage(queuename: string, data: any):Promise<any> {
60+
var result:any = await this._QueueMessage(queuename, data);
61+
var msg = JSON.parse(result.data);
62+
return msg;
63+
}
64+
65+
}
66+
367
function _timeSince(timeStamp) {
468
var now: Date = new Date(),
569
secondsPast: number = (now.getTime() - timeStamp.getTime()) / 1000;
@@ -146,13 +210,15 @@ module openflow {
146210
"$scope",
147211
"$location",
148212
"$routeParams",
149-
"WebSocketClient"
213+
"WebSocketClient",
214+
"api"
150215
];
151216
constructor(
152217
public $scope: ng.IScope,
153218
public $location: ng.ILocationService,
154219
public $routeParams: ng.route.IRouteParamsService,
155-
public WebSocketClient: WebSocketClient
220+
public WebSocketClient: WebSocketClient,
221+
public api: api
156222
) {
157223
}
158224
async loadData(): Promise<void> {
@@ -164,30 +230,7 @@ module openflow {
164230
this.models = q.result;
165231
if (!this.$scope.$$phase) { this.$scope.$apply(); }
166232
}
167-
async Insert(model: any): Promise<any> {
168-
var q: InsertOneMessage = new InsertOneMessage();
169-
// model.name = "Find me " + Math.random().toString(36).substr(2, 9);
170-
q.collectionname = this.collection; q.item = model;
171-
var msg: Message = new Message(); msg.command = "insertone"; msg.data = JSON.stringify(q);
172-
q = await this.WebSocketClient.Send<InsertOneMessage>(msg);
173-
return q.result;
174-
}
175-
async Update(model: any): Promise<any> {
176-
var q: UpdateOneMessage = new UpdateOneMessage();
177-
// model.name = "Find me " + Math.random().toString(36).substr(2, 9);
178-
q.collectionname = this.collection; q.item = model;
179-
var msg: Message = new Message(); msg.command = "updateone"; msg.data = JSON.stringify(q);
180-
q = await this.WebSocketClient.Send<UpdateOneMessage>(msg);
181-
return q.result;
182-
}
183-
async Delete(model: any): Promise<void> {
184-
var q: DeleteOneMessage = new DeleteOneMessage();
185-
q.collectionname = this.collection; q._id = model._id;
186-
var msg: Message = new Message(); msg.command = "deleteone"; msg.data = JSON.stringify(q);
187-
q = await this.WebSocketClient.Send<DeleteOneMessage>(msg);
188-
// this.models = this.models.filter(function (m: any):boolean { return m._id!==model._id;});
189-
// if (!this.$scope.$$phase) { this.$scope.$apply(); }
190-
}
233+
191234

192235
ToggleOrder(field: string) {
193236
if (this.orderby[field] == undefined) {
@@ -218,13 +261,15 @@ module openflow {
218261
"$scope",
219262
"$location",
220263
"$routeParams",
221-
"WebSocketClient"
264+
"WebSocketClient",
265+
"api"
222266
];
223267
constructor(
224268
public $scope: ng.IScope,
225269
public $location: ng.ILocationService,
226270
public $routeParams: ng.route.IRouteParamsService,
227-
public WebSocketClient: WebSocketClient
271+
public WebSocketClient: WebSocketClient,
272+
public api: api
228273
) {
229274
this.id = $routeParams.id;
230275
this.basequery = { _id: this.id };
@@ -239,27 +284,5 @@ module openflow {
239284
this.keys = Object.keys(this.model);
240285
if (!this.$scope.$$phase) { this.$scope.$apply(); }
241286
}
242-
async Insert(model: any): Promise<any> {
243-
var q: InsertOneMessage = new InsertOneMessage();
244-
// model.name = "Find me " + Math.random().toString(36).substr(2, 9);
245-
q.collectionname = this.collection; q.item = model;
246-
var msg: Message = new Message(); msg.command = "insertone"; msg.data = JSON.stringify(q);
247-
q = await this.WebSocketClient.Send<InsertOneMessage>(msg);
248-
return q.result;
249-
}
250-
async Update(model: any): Promise<any> {
251-
var q: UpdateOneMessage = new UpdateOneMessage();
252-
// model.name = "Find me " + Math.random().toString(36).substr(2, 9);
253-
q.collectionname = this.collection; q.item = model;
254-
var msg: Message = new Message(); msg.command = "updateone"; msg.data = JSON.stringify(q);
255-
q = await this.WebSocketClient.Send<UpdateOneMessage>(msg);
256-
return q.result;
257-
}
258-
async Delete(model: any): Promise<void> {
259-
var q: DeleteOneMessage = new DeleteOneMessage();
260-
q.collectionname = this.collection; q._id = model._id;
261-
var msg: Message = new Message(); msg.command = "deleteone"; msg.data = JSON.stringify(q);
262-
q = await this.WebSocketClient.Send<DeleteOneMessage>(msg);
263-
}
264287
}
265288
}

0 commit comments

Comments
 (0)