forked from rocicorp/mono
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue.ts
More file actions
139 lines (126 loc) · 4.04 KB
/
queue.ts
File metadata and controls
139 lines (126 loc) · 4.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import {resolver, type Resolver} from '@rocicorp/resolver';
import {assert} from './asserts.js';
/**
* A Queue allows the consumers to await (possibly future) values,
* and producers to await the consumption of their values.
*/
export class Queue<T> {
// Consumers waiting for entries to be produced.
readonly #consumers: Consumer<T>[] = [];
// Produced entries waiting to be consumed.
readonly #produced: Produced<T>[] = [];
/**
* @returns A Promise that resolves when the value is consumed.
*/
enqueue(value: T): Promise<void> {
const consumer = this.#consumers.shift();
if (consumer) {
consumer.resolver.resolve(value);
clearTimeout(consumer.timeoutID);
return Promise.resolve();
}
return this.#enqueueProduced(Promise.resolve(value), value);
}
/** @returns A Promise that resolves when the rejection is consumed. */
enqueueRejection(reason?: unknown): Promise<void> {
const consumer = this.#consumers.shift();
if (consumer) {
consumer.resolver.reject(reason);
clearTimeout(consumer.timeoutID);
return Promise.resolve();
}
return this.#enqueueProduced(Promise.reject(reason));
}
#enqueueProduced(produced: Promise<T>, value?: T): Promise<void> {
const {promise, resolve: consumed} = resolver<void>();
this.#produced.push({produced, value, consumed});
return promise;
}
/**
* Deletes all unconsumed entries matching the specified `value` based on identity equality.
* The consumed callback(s) are resolved as if the values were dequeued.
*
* Note: deletion of `undefined` values is not supported. This method will assert
* if `value` is undefined.
*
* @returns The number of entries deleted.
*/
delete(value: T): number {
assert(value !== undefined);
let count = 0;
for (let i = this.#produced.length - 1; i >= 0; i--) {
const p = this.#produced[i];
if (p.value === value) {
this.#produced.splice(i, 1);
p.consumed();
count++;
}
}
return count;
}
/**
* @param timeoutValue An optional value to resolve if `timeoutMs` is reached.
* @param timeoutMs The milliseconds after which the `timeoutValue` is resolved
* if nothing is produced for the consumer.
* @returns A Promise that resolves to the next enqueued value.
*/
dequeue(timeoutValue?: T, timeoutMs: number = 0): Promise<T> {
const produced = this.#produced.shift();
if (produced) {
produced.consumed();
return produced.produced;
}
const r = resolver<T>();
const timeoutID =
timeoutValue === undefined
? undefined
: setTimeout(() => {
const i = this.#consumers.findIndex(c => c.resolver === r);
if (i >= 0) {
const [consumer] = this.#consumers.splice(i, 1);
consumer.resolver.resolve(timeoutValue);
}
}, timeoutMs);
this.#consumers.push({resolver: r, timeoutID});
return r.promise;
}
/**
* @returns The instantaneous number of outstanding values waiting to be
* dequeued. Note that if a value was enqueued while a consumer
* was waiting (with `await dequeue()`), the value is immediately
* handed to the consumer and the Queue's size remains 0.
*/
size(): number {
return this.#produced.length;
}
asAsyncIterable(cleanup = NOOP): AsyncIterable<T> {
return {[Symbol.asyncIterator]: () => this.asAsyncIterator(cleanup)};
}
asAsyncIterator(cleanup = NOOP): AsyncIterator<T> {
return {
next: async () => {
try {
const value = await this.dequeue();
return {value};
} catch (e) {
cleanup();
throw e;
}
},
return: value => {
cleanup();
return Promise.resolve({value, done: true});
},
};
}
}
const NOOP = () => {};
type Consumer<T> = {
resolver: Resolver<T>;
timeoutID: ReturnType<typeof setTimeout> | undefined;
};
type Produced<T> = {
produced: Promise<T>;
value: T | undefined;
consumed: () => void;
};