@@ -2,10 +2,9 @@ import { generateKeyBetween } from "fractional-indexing"
22import { DifferenceStreamWriter , UnaryOperator } from "../graph.js"
33import { StreamBuilder } from "../d2.js"
44import { MultiSet } from "../multiset.js"
5- import { Index } from "../indexes.js"
65import { binarySearch , globalObjectIdGenerator } from "../utils.js"
76import type { DifferenceStreamReader } from "../graph.js"
8- import type { IStreamBuilder , KeyValue , PipedOperator } from "../types.js"
7+ import type { IStreamBuilder , PipedOperator } from "../types.js"
98
109export interface TopKWithFractionalIndexOptions {
1110 limit ?: number
@@ -158,31 +157,34 @@ class TopKArray<V> implements TopK<V> {
158157 * This operator maintains fractional indices for sorted elements
159158 * and only updates indices when elements move position
160159 */
161- export class TopKWithFractionalIndexOperator < K , V1 > extends UnaryOperator <
162- [ K , V1 ] ,
163- [ K , IndexedValue < V1 > ]
160+ export class TopKWithFractionalIndexOperator < K , T > extends UnaryOperator <
161+ [ K , T ] ,
162+ [ K , IndexedValue < T > ]
164163> {
165- #index = new Index < K , V1 > ( )
164+ #index: Map < K , number > = new Map ( ) // maps keys to their multiplicity
166165
167166 /**
168167 * topK data structure that supports insertions and deletions
169168 * and returns changes to the topK.
170169 */
171- #topK: TopK < TaggedValue < V1 > >
170+ #topK: TopK < TaggedValue < K , T > >
172171
173172 constructor (
174173 id : number ,
175- inputA : DifferenceStreamReader < [ K , V1 ] > ,
176- output : DifferenceStreamWriter < [ K , [ V1 , string ] ] > ,
177- comparator : ( a : V1 , b : V1 ) => number ,
174+ inputA : DifferenceStreamReader < [ K , T ] > ,
175+ output : DifferenceStreamWriter < [ K , IndexedValue < T > ] > ,
176+ comparator : ( a : T , b : T ) => number ,
178177 options : TopKWithFractionalIndexOptions
179178 ) {
180179 super ( id , inputA , output )
181180 const limit = options . limit ?? Infinity
182181 const offset = options . offset ?? 0
183- const compareTaggedValues = ( a : TaggedValue < V1 > , b : TaggedValue < V1 > ) => {
182+ const compareTaggedValues = (
183+ a : TaggedValue < K , T > ,
184+ b : TaggedValue < K , T >
185+ ) => {
184186 // First compare on the value
185- const valueComparison = comparator ( untagValue ( a ) , untagValue ( b ) )
187+ const valueComparison = comparator ( getVal ( a ) , getVal ( b ) )
186188 if ( valueComparison !== 0 ) {
187189 return valueComparison
188190 }
@@ -197,13 +199,13 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
197199 protected createTopK (
198200 offset : number ,
199201 limit : number ,
200- comparator : ( a : TaggedValue < V1 > , b : TaggedValue < V1 > ) => number
201- ) : TopK < TaggedValue < V1 > > {
202+ comparator : ( a : TaggedValue < K , T > , b : TaggedValue < K , T > ) => number
203+ ) : TopK < TaggedValue < K , T > > {
202204 return new TopKArray ( offset , limit , comparator )
203205 }
204206
205207 run ( ) : void {
206- const result : Array < [ [ K , [ V1 , string ] ] , number ] > = [ ]
208+ const result : Array < [ [ K , IndexedValue < T > ] , number ] > = [ ]
207209 for ( const message of this . inputMessages ( ) ) {
208210 for ( const [ item , multiplicity ] of message . getInner ( ) ) {
209211 const [ key , value ] = item
@@ -218,27 +220,25 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
218220
219221 processElement (
220222 key : K ,
221- value : V1 ,
223+ value : T ,
222224 multiplicity : number ,
223- result : Array < [ [ K , [ V1 , string ] ] , number ] >
225+ result : Array < [ [ K , IndexedValue < T > ] , number ] >
224226 ) : void {
225- const oldMultiplicity = this . #index. getMultiplicity ( key , value )
226- this . #index. addValue ( key , [ value , multiplicity ] )
227- const newMultiplicity = this . #index. getMultiplicity ( key , value )
227+ const { oldMultiplicity, newMultiplicity } = this . addKey ( key , multiplicity )
228228
229- let res : TopKChanges < TaggedValue < V1 > > = {
229+ let res : TopKChanges < TaggedValue < K , T > > = {
230230 moveIn : null ,
231231 moveOut : null ,
232232 }
233233 if ( oldMultiplicity <= 0 && newMultiplicity > 0 ) {
234234 // The value was invisible but should now be visible
235235 // Need to insert it into the array of sorted values
236- const taggedValue = tagValue ( value )
236+ const taggedValue = tagValue ( key , value )
237237 res = this . #topK. insert ( taggedValue )
238238 } else if ( oldMultiplicity > 0 && newMultiplicity <= 0 ) {
239239 // The value was visible but should now be invisible
240240 // Need to remove it from the array of sorted values
241- const taggedValue = tagValue ( value )
241+ const taggedValue = tagValue ( key , value )
242242 res = this . #topK. delete ( taggedValue )
243243 } else {
244244 // The value was invisible and it remains invisible
@@ -247,26 +247,45 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
247247 }
248248
249249 if ( res . moveIn ) {
250- const valueWithoutTieBreaker = mapValue ( res . moveIn , untagValue )
251- result . push ( [ [ key , valueWithoutTieBreaker ] , 1 ] )
250+ const index = getIndex ( res . moveIn )
251+ const taggedValue = getValue ( res . moveIn )
252+ const k = getKey ( taggedValue )
253+ const val = getVal ( taggedValue )
254+ result . push ( [ [ k , [ val , index ] ] , 1 ] )
252255 }
253256
254257 if ( res . moveOut ) {
255- const valueWithoutTieBreaker = mapValue ( res . moveOut , untagValue )
256- result . push ( [ [ key , valueWithoutTieBreaker ] , - 1 ] )
258+ const index = getIndex ( res . moveOut )
259+ const taggedValue = getValue ( res . moveOut )
260+ const k = getKey ( taggedValue )
261+ const val = getVal ( taggedValue )
262+ result . push ( [ [ k , [ val , index ] ] , - 1 ] )
257263 }
258264
259265 return
260266 }
267+
268+ private getMultiplicity ( key : K ) : number {
269+ return this . #index. get ( key ) ?? 0
270+ }
271+
272+ private addKey (
273+ key : K ,
274+ multiplicity : number
275+ ) : { oldMultiplicity : number ; newMultiplicity : number } {
276+ const oldMultiplicity = this . getMultiplicity ( key )
277+ const newMultiplicity = oldMultiplicity + multiplicity
278+ if ( newMultiplicity === 0 ) {
279+ this . #index. delete ( key )
280+ } else {
281+ this . #index. set ( key , newMultiplicity )
282+ }
283+ return { oldMultiplicity, newMultiplicity }
284+ }
261285}
262286
263287/**
264288 * Limits the number of results based on a comparator, with optional offset.
265- * This works on a keyed stream, where the key is the first element of the tuple.
266- * The ordering is within a key group, i.e. elements are sorted within a key group
267- * and the limit + offset is applied to that sorted group.
268- * To order the entire stream, key by the same value for all elements such as null.
269- *
270289 * Uses fractional indexing to minimize the number of changes when elements move positions.
271290 * Each element is assigned a fractional index that is lexicographically sortable.
272291 * When elements move, only the indices of the moved elements are updated, not all elements.
@@ -275,26 +294,22 @@ export class TopKWithFractionalIndexOperator<K, V1> extends UnaryOperator<
275294 * @param options - An optional object containing limit and offset properties
276295 * @returns A piped operator that orders the elements and limits the number of results
277296 */
278- export function topKWithFractionalIndex <
279- KType extends T extends KeyValue < infer K , infer _V > ? K : never ,
280- V1Type extends T extends KeyValue < KType , infer V > ? V : never ,
281- T ,
282- > (
283- comparator : ( a : V1Type , b : V1Type ) => number ,
297+ export function topKWithFractionalIndex < KType , T > (
298+ comparator : ( a : T , b : T ) => number ,
284299 options ?: TopKWithFractionalIndexOptions
285- ) : PipedOperator < T , KeyValue < KType , [ V1Type , string ] > > {
300+ ) : PipedOperator < [ KType , T ] , [ KType , IndexedValue < T > ] > {
286301 const opts = options || { }
287302
288303 return (
289- stream : IStreamBuilder < T >
290- ) : IStreamBuilder < KeyValue < KType , [ V1Type , string ] > > => {
291- const output = new StreamBuilder < KeyValue < KType , [ V1Type , string ] > > (
304+ stream : IStreamBuilder < [ KType , T ] >
305+ ) : IStreamBuilder < [ KType , IndexedValue < T > ] > => {
306+ const output = new StreamBuilder < [ KType , IndexedValue < T > ] > (
292307 stream . graph ,
293- new DifferenceStreamWriter < KeyValue < KType , [ V1Type , string ] > > ( )
308+ new DifferenceStreamWriter < [ KType , IndexedValue < T > ] > ( )
294309 )
295- const operator = new TopKWithFractionalIndexOperator < KType , V1Type > (
310+ const operator = new TopKWithFractionalIndexOperator < KType , T > (
296311 stream . graph . getNextOperatorId ( ) ,
297- stream . connectReader ( ) as DifferenceStreamReader < KeyValue < KType , V1Type > > ,
312+ stream . connectReader ( ) ,
298313 output . writer ,
299314 comparator ,
300315 opts
@@ -324,24 +339,21 @@ export function getIndex<V>(indexedVal: IndexedValue<V>): FractionalIndex {
324339 return indexedVal [ 1 ]
325340}
326341
327- function mapValue < V , W > (
328- indexedVal : IndexedValue < V > ,
329- f : ( value : V ) => W
330- ) : IndexedValue < W > {
331- return [ f ( getValue ( indexedVal ) ) , getIndex ( indexedVal ) ]
332- }
333-
334342export type Tag = number
335- export type TaggedValue < V > = [ V , Tag ]
343+ export type TaggedValue < K , V > = [ K , V , Tag ]
336344
337- function tagValue < V > ( value : V ) : TaggedValue < V > {
338- return [ value , globalObjectIdGenerator . getId ( value ) ]
345+ function tagValue < K , V > ( key : K , value : V ) : TaggedValue < K , V > {
346+ return [ key , value , globalObjectIdGenerator . getId ( key ) ]
339347}
340348
341- function untagValue < V > ( tieBreakerTaggedValue : TaggedValue < V > ) : V {
349+ function getKey < K , V > ( tieBreakerTaggedValue : TaggedValue < K , V > ) : K {
342350 return tieBreakerTaggedValue [ 0 ]
343351}
344352
345- function getTag < V > ( tieBreakerTaggedValue : TaggedValue < V > ) : Tag {
353+ function getVal < K , V > ( tieBreakerTaggedValue : TaggedValue < K , V > ) : V {
346354 return tieBreakerTaggedValue [ 1 ]
347355}
356+
357+ function getTag < K , V > ( tieBreakerTaggedValue : TaggedValue < K , V > ) : Tag {
358+ return tieBreakerTaggedValue [ 2 ]
359+ }
0 commit comments