Skip to content

Commit 468417c

Browse files
committed
Improve housekeeping
1 parent 11f9a51 commit 468417c

3 files changed

Lines changed: 71 additions & 15 deletions

File tree

OpenFlow/src/Messages/Message.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4331,7 +4331,28 @@ export class Message {
43314331
m.data = JSON.stringify(l);
43324332
cli.Send(m);
43334333
}
4334+
public static lastHouseKeeping: Date = null;
4335+
public static ReadyForHousekeeping(): boolean {
4336+
const date = new Date();
4337+
const a: number = (date as any) - (Message.lastHouseKeeping as any);
4338+
const diffminutes = a / (1000 * 60);
4339+
// const diffhours = a / (1000 * 60 * 60);
4340+
if (diffminutes < 60) return false;
4341+
return true;
4342+
}
43344343
public async Housekeeping(skipNodered: boolean, skipCalculateSize: boolean, skipUpdateUserSize: boolean, parent: Span): Promise<void> {
4344+
if (Message.lastHouseKeeping == null) {
4345+
Message.lastHouseKeeping = new Date();
4346+
Message.lastHouseKeeping.setDate(Message.lastHouseKeeping.getDate() - 1);
4347+
}
4348+
if (!Message.ReadyForHousekeeping()) {
4349+
const date = new Date();
4350+
const a: number = (date as any) - (Message.lastHouseKeeping as any);
4351+
const diffminutes = a / (1000 * 60);
4352+
Logger.instanse.debug("[housekeeping] Skipping housekeeping, to early for next run (ran " + diffminutes + " minutes ago)");
4353+
return;
4354+
}
4355+
Message.lastHouseKeeping = new Date();
43354356
const span: Span = Logger.otel.startSubSpan("message.QueueMessage", parent);
43364357
try {
43374358
if (!skipNodered) await this.GetNoderedInstance(span)

OpenFlow/src/amqpwrapper.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import { Span } from "@opentelemetry/api";
77
import { Logger } from "./Logger";
88
import events = require("events");
99
import { Auth } from "./Auth";
10+
import { Message } from "./Messages/Message";
1011
type QueueOnMessage = (msg: string, options: QueueMessageOptions, ack: any, done: any) => void;
1112
interface IHashTable<T> {
1213
[key: string]: T;
@@ -559,6 +560,10 @@ export class amqpwrapper extends events.EventEmitter {
559560
done();
560561
}, undefined);
561562
}
563+
IsMyconsumerTag(consumerTag: string) {
564+
var q = this.queues.filter(q => q.consumerTag == consumerTag);
565+
return q.length != 0;
566+
}
562567
async AddOFExchange() {
563568
if (!Config.enable_openflow_amqp) return;
564569
await this.AddExchangeConsumer("openflow", "fanout", "", null, null, async (msg: any, options: QueueMessageOptions, ack: any, done: any) => {
@@ -574,6 +579,22 @@ export class amqpwrapper extends events.EventEmitter {
574579
case "clearcache":
575580
Auth.clearCache();
576581
break;
582+
case "housekeeping":
583+
if (this.IsMyconsumerTag(options.consumerTag)) {
584+
ack();
585+
done();
586+
try {
587+
var dt = new Date(new Date().toISOString());
588+
var msg2 = new Message(); msg2.jwt = Crypt.rootToken();
589+
var skipUpdateUsage: boolean = !(dt.getHours() == 1 || dt.getHours() == 13);
590+
await msg2.Housekeeping(false, skipUpdateUsage, skipUpdateUsage, null);
591+
} catch (error) {
592+
}
593+
return;
594+
}
595+
console.log("Reset house keeping, someone else is doing it now");
596+
Message.lastHouseKeeping = new Date();
597+
break;
577598
case "shutdown":
578599
try {
579600
await Config.db.shutdown();

OpenFlow/src/index.ts

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -189,24 +189,38 @@ async function initDatabase(): Promise<boolean> {
189189
Logger.otel.endSpan(span);
190190

191191
if (Config.auto_hourly_housekeeping) {
192+
// Every 15 minutes, give and take a few minutes, send out a message to do house keeping, if ready
193+
const randomNum = (Math.floor(Math.random() * 100) + 1);
194+
console.log("Housekeeping every 15 minutes plus " + randomNum + " seconds");
192195
housekeeping = setInterval(async () => {
193-
try {
194-
var dt = new Date(new Date().toISOString());
195-
var msg = new Message(); msg.jwt = Crypt.rootToken();
196-
var skipUpdateUsage: boolean = !(dt.getHours() == 1 || dt.getHours() == 13);
197-
await msg.Housekeeping(false, skipUpdateUsage, skipUpdateUsage, null);
198-
} catch (error) {
196+
if (Config.enable_openflow_amqp) {
197+
if (Config.enable_openflow_amqp) {
198+
if (!Message.ReadyForHousekeeping()) {
199+
return;
200+
}
201+
amqpwrapper.Instance().send("openflow", "", { "command": "housekeeping" }, 20000, null, "", 1);
202+
}
203+
} else {
204+
try {
205+
var dt = new Date(new Date().toISOString());
206+
var msg = new Message(); msg.jwt = Crypt.rootToken();
207+
var skipUpdateUsage: boolean = !(dt.getHours() == 1 || dt.getHours() == 13);
208+
await msg.Housekeeping(false, skipUpdateUsage, skipUpdateUsage, null);
209+
} catch (error) {
210+
}
199211
}
200-
}, 3600000);
212+
}, (15 * 60 * 1000) + (randomNum * 1000));
213+
// If I'm first and noone else has run it, lets trigger it now
214+
const randomNum2 = (Math.floor(Math.random() * 10) + 1);
215+
console.log("Trigger first Housekeeping in " + randomNum2 + " seconds");
201216
setTimeout(async () => {
202-
var dt = new Date(new Date().toISOString());
203-
var msg = new Message(); msg.jwt = Crypt.rootToken();
204-
var skipUpdateUsage: boolean = !(dt.getHours() == 1 || dt.getHours() == 13);
205-
var skipUpdateUserSize: boolean = !(dt.getHours() == 1 || dt.getHours() == 13);
206-
if (Config.housekeeping_update_usage_hourly) skipUpdateUsage = false;
207-
if (Config.housekeeping_update_usersize_hourly) skipUpdateUserSize = false;
208-
await msg.Housekeeping(false, skipUpdateUsage, skipUpdateUserSize, null);
209-
}, 5000);
217+
if (Config.enable_openflow_amqp) {
218+
if (!Message.ReadyForHousekeeping()) {
219+
return;
220+
}
221+
amqpwrapper.Instance().send("openflow", "", { "command": "housekeeping" }, 20000, null, "", 1);
222+
}
223+
}, randomNum2 * 1000);
210224
}
211225
return true;
212226
} catch (error) {

0 commit comments

Comments
 (0)