forked from openiap/opencore
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathamqp.test.ts
More file actions
123 lines (117 loc) · 5.75 KB
/
Copy pathamqp.test.ts
File metadata and controls
123 lines (117 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
const path = require("path");
const env = path.join(process.cwd(), 'config', '.env');
require("dotenv").config({ path: env }); // , debug: false
import { suite, test, timeout } from '@testdeck/mocha';
import { Config } from "../OpenFlow/src/Config";
import { DatabaseConnection } from '../OpenFlow/src/DatabaseConnection';
import assert = require('assert');
import { Logger } from '../OpenFlow/src/Logger';
import { NoderedUtil, User } from '@openiap/openflow-api';
import { Crypt } from '../OpenFlow/src/Crypt';
import { amqpwrapper } from '../OpenFlow/src/amqpwrapper';
@suite class amqp_test {
private rootToken: string;
private testUser: User;
private amqp: amqpwrapper;
@timeout(10000)
async before() {
Config.workitem_queue_monitoring_enabled = false;
Config.disablelogging();
Logger.configure(true, false);
Config.db = new DatabaseConnection(Config.mongodb_url, Config.mongodb_db, false);
await Config.db.connect(null);
this.rootToken = Crypt.rootToken();
this.testUser = await Logger.DBHelper.FindByUsername("testuser", this.rootToken, null)
this.amqp = new amqpwrapper(Config.amqp_url);
amqpwrapper.SetInstance(this.amqp);
Config.log_amqp = false;
await this.amqp.connect(null);
}
@timeout(5000)
async after() {
this.amqp.shutdown();
await Logger.shutdown();
}
// @test async 'connecterror'() {
// // var amqp = new amqpwrapper('bogus://url');
// // await assert.rejects( await amqp.connect(null));
// }
@timeout(5000)
@test async 'queuetest'() {
const queuename = "demotestqueue";
var q = await this.amqp.AddQueueConsumer(this.testUser, queuename, null, this.rootToken, async (msg, options, ack) => {
if (!NoderedUtil.IsNullEmpty(options.replyTo)) {
if (msg == "hi mom, i miss you") {
msg = "hi";
} else {
msg = "unknown message";
}
await this.amqp.send(options.exchange, options.replyTo, msg, 1500, options.correlationId, options.routingKey, 1);
}
ack();
}, null);
assert.ok(!NoderedUtil.IsNullUndefinded(q));
assert.ok(!NoderedUtil.IsNullEmpty(q.queuename));
reply = await this.amqp.sendWithReply(null, queuename, "hi mom, i miss you", 300, null, null);
assert.strictEqual(reply, "hi");
var reply = await this.amqp.sendWithReply(null, queuename, "hi mom, i miss you", 300, null, null);
assert.strictEqual(reply, "hi");
await this.amqp.RemoveQueueConsumer(this.testUser, q, null);
reply = await this.amqp.sendWithReply("", "bogusName", "hi mom, i miss you", 300, null, null);
assert.strictEqual(reply, "timeout");
// why does this die ? after sending to bogusName
// reply = await this.amqp.sendWithReply(null, queuename, "hi mom, i miss you", 300, null, null);
// assert.strictEqual(reply, "timeout");
}
@timeout(5000)
@test async 'personalqueuetest'() {
var q = await this.amqp.AddQueueConsumer(this.testUser, this.testUser._id, null, this.rootToken, async (msg, options, ack) => {
if (!NoderedUtil.IsNullEmpty(options.replyTo)) {
if (msg == "hi mom, i miss you") {
msg = "hi";
} else {
msg = "unknown message";
}
await this.amqp.send(options.exchange, options.replyTo, msg, 1500, options.correlationId, options.routingKey, 1);
}
ack();
}, null);
assert.ok(!NoderedUtil.IsNullUndefinded(q));
assert.ok(!NoderedUtil.IsNullEmpty(q.queuename));
var reply = await this.amqp.sendWithReply(null, this.testUser._id, "hi mom, i miss you", 300, null, null);
assert.strictEqual(reply, "hi");
await this.amqp.RemoveQueueConsumer(this.testUser, q, null);
reply = await this.amqp.sendWithReply("", "bogusName", "hi mom, i miss you", 300, null, null);
assert.strictEqual(reply, "timeout");
// why does this die ? after sending to bogusName
// reply = await this.amqp.sendWithReply(null, this.testUser._id, "hi mom, i miss you", 300, null, null);
// assert.strictEqual(reply, "timeout");
}
@timeout(5000)
@test async 'exchangetest'() {
const exchangename = "demotestexchange";
var q = await this.amqp.AddExchangeConsumer(this.testUser, exchangename, "direct", "", null, this.rootToken, true, async (msg, options, ack) => {
if (!NoderedUtil.IsNullEmpty(options.replyTo)) {
if (msg == "hi mom, i miss you") {
msg = "hi";
} else {
msg = "unknown message";
}
await this.amqp.send("", options.replyTo, msg, 1500, options.correlationId, "", 1);
}
ack();
}, null);
// Give rabbitmq a little room
await new Promise(resolve => { setTimeout(resolve, 1000) })
var reply = await this.amqp.sendWithReply(exchangename, "", "hi mom, i miss you", 300, null, null);
assert.strictEqual(reply, "hi");
var reply = await this.amqp.sendWithReply(exchangename, "", "hi dad, i miss you", 300, null, null);
assert.strictEqual(reply, "unknown message");
var reply = await this.amqp.sendWithReply(exchangename, "", "hi mom, i miss you", 300, null, null);
assert.strictEqual(reply, "hi");
await this.amqp.RemoveQueueConsumer(this.testUser, q.queue, null);
await assert.rejects(this.amqp.RemoveQueueConsumer(this.testUser, null, null));
}
}
// cls | ./node_modules/.bin/_mocha 'test/**/amqp.test.ts'
// cls | ts-mocha --paths -p test/tsconfig.json 'test/amqp.test.ts'