Bin
2025-12-16 9e0b2ba2c317b1a86212f24cbae3195ad1f3dbfa
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import type { BatchProgress, TaskPriority } from "./ComputationQueue";
 
type Task = {
  id: string;
  taskFn: () => Promise<any>; // Task function remains async
};
 
// Info about a batch stored within the worker
type WorkerBatchInfo = {
  id: string;
  metadata: any;
  pendingTasks: Task[];
  totalTasks: number; // Initial total number of tasks
  processedTasks: number;
};
 
// Result returned by processChunk
export interface ChunkProcessingResult {
  tasksProcessed: number; // How many tasks were actually processed in this chunk?
  completedBatchIds: string[]; // Array of Batch IDs completed during this chunk
}
 
const DEFAULT_WORKER_CHUNK_SIZE = 5; // Renamed constant
 
// This class is now purely synchronous from the caller's perspective
// It does NOT extend Events or emit anything.
export class ComputationWorkerQueue {
  // Store batches added to this specific worker
  private pendingBatches: WorkerBatchInfo[] = [];
  private name: string;
  private chunkSize: number; // Tasks per chunk for this worker
 
  // Global counters for this worker
  private totalTasksAdded = 0;
  private totalTasksProcessed = 0;
 
  public onBatchComplete?: (batchId: string, metadata: any) => void;
  public onProgress?: (batches: BatchProgress[], priority: TaskPriority) => void;
  private readonly priority: TaskPriority;
 
  constructor(name: string, chunkSize: number = DEFAULT_WORKER_CHUNK_SIZE, priority?: TaskPriority) {
    // Removed: super();
    this.name = name;
    this.chunkSize = Math.max(1, chunkSize);
    this.priority = priority ?? (name as TaskPriority);
  }
 
  addBatch(batchId: string, tasks: Task[], metadata: any) {
    if (tasks.length === 0) return;
 
    const totalTasks = tasks.length;
 
    const batchInfo: WorkerBatchInfo = {
      id: batchId,
      metadata,
      pendingTasks: [...tasks],
      totalTasks: totalTasks,
      processedTasks: 0,
    };
    this.pendingBatches.push(batchInfo);
    this.totalTasksAdded += totalTasks;
  }
 
  /**
   * Processes up to 'chunkSize' tasks from the pending batches sequentially.
   * Can process tasks across batch boundaries within a single chunk.
   * Returns info about the work done.
   */
  async processChunk(): Promise<ChunkProcessingResult> {
    let tasksProcessedInChunk = 0;
    const completedBatchIdsInChunk: string[] = [];
 
    // Keep processing tasks as long as we haven't hit the chunk limit, AND there are batches with tasks remaining.
    while (tasksProcessedInChunk < this.chunkSize && this.hasPendingTasks()) {
      // Get the current front batch
      const currentBatch: WorkerBatchInfo = this.pendingBatches[0];
 
      // Should not happen if hasPendingTasks is true, but safeguard
      if (!currentBatch || currentBatch.pendingTasks.length === 0) {
        console.error(
          `[WorkerQueue ${this.name}] Inconsistency: hasPendingTasks() true, but no tasks found in front batch ${currentBatch?.id}. Removing problematic batch.`,
        );
        if (currentBatch) this.pendingBatches.shift(); // Remove an empty / problematic batch
        continue; // Try the next batch if any
      }
 
      // Get the next task from the current batch
      const taskToStart = currentBatch.pendingTasks.shift(); // Take a task from the front
 
      // Safeguard if shift returns undefined (shouldn't happen if length > 0)
      if (!taskToStart) {
        console.error(
          `[WorkerQueue ${this.name}] Logic Error: Failed to shift task from non-empty pendingTasks of batch ${currentBatch.id}.`,
        );
        continue; // Skip to the next iteration / batch
      }
 
      tasksProcessedInChunk++;
      this.totalTasksProcessed++;
      currentBatch.processedTasks++;
 
      try {
        await taskToStart.taskFn();
      } catch (error) {
        console.error(`[WorkerQueue ${this.name}] Task ${taskToStart.id} (batch ${currentBatch.id}) failed:`, error);
      }
      // Task function is awaited, errors logged, but processing continues.
 
      // Check if the current batch is now complete *after* processing the task
      if (currentBatch.pendingTasks.length === 0) {
        completedBatchIdsInChunk.push(currentBatch.id);
        if (this.onBatchComplete) {
          this.onBatchComplete(currentBatch.id, currentBatch.metadata);
        }
        this.pendingBatches.shift(); // Remove completed batch from the front
      }
      // Continue the while loop to potentially process more tasks up to chunkSize
    }
 
    this.emitProgress();
    return {
      tasksProcessed: tasksProcessedInChunk,
      completedBatchIds: completedBatchIdsInChunk,
    };
  }
 
  /** Checks if there are any tasks remaining across all batches */
  hasPendingTasks(): boolean {
    // Efficiently check if any batch in the queue still has tasks left
    return this.pendingBatches.some((batch) => batch.pendingTasks.length > 0);
  }
 
  /** Removes all pending batches and resets counters */
  clear() {
    this.pendingBatches = [];
    this.totalTasksAdded = 0;
    this.totalTasksProcessed = 0;
    this.emitProgress();
  }
 
  // Removed: pause, _checkIdle, isIdle, activeTaskCount
 
  // --- Getters ---
  get pendingBatchCount(): number {
    // Count batches that actually have tasks left
    return this.pendingBatches.filter((b) => b.pendingTasks.length > 0).length;
  }
 
  get pendingTaskCount(): number {
    // Sum up tasks across all batches
    return this.pendingBatches.reduce((sum, batch) => sum + batch.pendingTasks.length, 0);
  }
 
  get globalTasksAdded(): number {
    return this.totalTasksAdded;
  }
 
  get globalTasksProcessed(): number {
    return this.totalTasksProcessed;
  }
 
  public cancelBatchesByPriority(priorities: TaskPriority[]) {
    this.pendingBatches = this.pendingBatches.filter((batch) => !priorities.includes(batch.metadata?.priority));
  }
 
  public getAllBatchProgresses(priority: TaskPriority): BatchProgress[] {
    return this.pendingBatches.map((batch) => ({
      id: batch.id,
      priority,
      metadata: batch.metadata,
      totalTasks: batch.totalTasks,
      activeTasks: batch.pendingTasks.length,
      completedTasks: batch.processedTasks,
      queuedTasks: batch.pendingTasks.length,
      percentage: batch.totalTasks > 0 ? Math.round((batch.processedTasks / batch.totalTasks) * 100) : 0,
    }));
  }
 
  private emitProgress() {
    if (this.onProgress) {
      this.onProgress(this.getAllBatchProgresses(this.priority), this.priority);
    }
  }
}