Skip to content

Use Context managers and closures for nicer locking flow vs inline branching#24

Closed
pirate wants to merge 203 commits intobrowser-use:mainfrom
ArchiveBox:context-mgr
Closed

Use Context managers and closures for nicer locking flow vs inline branching#24
pirate wants to merge 203 commits intobrowser-use:mainfrom
ArchiveBox:context-mgr

Conversation

@pirate
Copy link
Contributor

@pirate pirate commented Feb 16, 2026

Summary by cubic

Introduces a context-based LockManager and handler lifecycle that simplifies concurrency and adds a full TypeScript runtime with bridges, middleware hooks, first() racing, and a @Retry decorator. Also splits CI into TS/Python, adds npm publish, and updates docs and examples.

  • New Features

    • Context/closure-style locking with unified LockManager; queue-jump preserved.
    • Event handler completion modes: 'all' or 'first'; event.first() cancels losing handlers and their children.
    • @Retry decorator/HOF with timeouts, backoff, error filters, and semaphore scopes ('global' | 'class' | 'instance'); re-entrancy safe.
    • TypeScript runtime (bubus) with BaseEvent/EventBus, middleware hooks, context propagation (AsyncLocalStorage), and bridges (HTTP socket, JSONL, Redis, Postgres, SQLite, NATS).
    • Extensive tests (unit, perf, browser) and new examples; updated README and API docs.
  • Migration

    • Node 22 + pnpm required for TS dev; npm package name: bubus.
    • Optional deps needed for bridges (e.g., redis, nats, pg, better-sqlite3); install only what you use.
    • Prefer event.first() to race handlers; use @Retry on handlers instead of wrapping emit→done.
    • CI split into test-ts and test-py; publish via publish-npm on release.

Written for commit 474964e. Summary will update on new commits.

pirate and others added 30 commits October 15, 2025 12:51
Add support for middlewares to hook into event bus handler lifecycle
implement swappable EventHistory storage backend
Updated the description to clarify the library's functionality and similarities to JS event systems.
Revise README description for bubus library
@pirate pirate closed this Feb 16, 2026
Copy link

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

27 issues found across 199 files

Note: This PR contains a large number of files. cubic only reviews up to 75 files per PR, so some files may not have been reviewed.

Prompt for AI agents (all issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="docs/concurrency/immediate-execution.mdx">

<violation number="1" location="docs/concurrency/immediate-execution.mdx:150">
P3: The order assertion can pass even when `child` is missing because `indexOf` returns -1, so `-1 < parent_endIndex` is true. This makes the example’s validation logic unreliable.</violation>
</file>

<file name="bubus-ts/src/bridges.ts">

<violation number="1" location="bubus-ts/src/bridges.ts:252">
P2: Unbounded request body buffering in the HTTP listener can allow large payloads to exhaust memory and crash the process (DoS).</violation>

<violation number="2" location="bubus-ts/src/bridges.ts:265">
P2: Async handleIncomingPayload is called without a rejection handler in the unix listener, which can trigger unhandled promise rejections if parsing/dispatch fails.</violation>

<violation number="3" location="bubus-ts/src/bridges.ts:306">
P2: Unix socket listener buffers incoming data indefinitely until a newline is seen, allowing a client to send an unbounded stream and exhaust memory (DoS).</violation>

<violation number="4" location="bubus-ts/src/bridges.ts:345">
P2: The constructor treats `null` as an options object, so `new HTTPEventBridge(null, listen_on)` drops the listen_on argument and misconfigures the bridge.</violation>
</file>

<file name="bubus-ts/src/retry.ts">

<violation number="1" location="bubus-ts/src/retry.ts:94">
P2: Re-entrancy tracking is disabled when AsyncLocalStorage is unavailable, so nested calls with the same semaphore can deadlock by re-acquiring an already held AsyncLock.</violation>

<violation number="2" location="bubus-ts/src/retry.ts:271">
P2: The retry decorator always returns an async wrapper (Promise), but it casts the wrapper back to `T` which can be a sync function. This masks a return-type change and can lead to runtime misuse when callers treat the result as a scalar.</violation>
</file>

<file name="docs/concurrency/handlers-parallel.mdx">

<violation number="1" location="docs/concurrency/handlers-parallel.mdx:48">
P2: Python runnable example uses top-level `await`, which is invalid in a standard script and will raise `SyntaxError`. Wrap awaited calls in an async function and run with `asyncio.run(...)`.</violation>
</file>

<file name="docs/api/eventresult.mdx">

<violation number="1" location="docs/api/eventresult.mdx:23">
P3: Documentation claims `EventResult` is awaitable, but the TypeScript implementation is not Promise-like, so `await entry` would not yield the handler result. This is misleading for TS users and should be clarified per language.</violation>
</file>

<file name="bubus-ts/src/bridge_sqlite.ts">

<violation number="1" location="bubus-ts/src/bridge_sqlite.ts:189">
P2: Polling query wraps event_created_at in COALESCE, but the only index is on the raw column, so SQLite cannot use it for the WHERE/ORDER BY expression and will scan/sort every poll as the table grows.</violation>
</file>

<file name="docs/integrations/bridges.mdx">

<violation number="1" location="docs/integrations/bridges.mdx:25">
P2: Bidirectional '*' wiring in the docs creates an infinite feedback loop between EventBus and RedisEventBridge because RedisEventBridge republishes inbound events without loop suppression.</violation>
</file>

<file name="bubus-ts/src/bridge_redis.ts">

<violation number="1" location="bubus-ts/src/bridge_redis.ts:143">
P2: Async dispatchInboundPayload is called without await/catch in the Redis message handler, so any rejection becomes an unhandled promise rejection.</violation>
</file>

<file name="bubus-ts/src/bridge_jsonl.ts">

<violation number="1" location="bubus-ts/src/bridge_jsonl.ts:87">
P2: ensureStarted can trigger multiple concurrent start() calls because running/listener_task aren’t set until after awaits, allowing duplicate listen loops and duplicate event processing.</violation>

<violation number="2" location="bubus-ts/src/bridge_jsonl.ts:156">
P2: TextDecoder is recreated per read and decode is called without streaming support, so a multi‑byte UTF‑8 character split across reads will be replaced with U+FFFD and permanently lost once the offset advances.</violation>

<violation number="3" location="bubus-ts/src/bridge_jsonl.ts:164">
P2: `dirname` only handles '/' separators; Windows paths with '\\' will return '.', so the parent directory isn’t created on Windows and file writes can fail.</violation>
</file>

<file name="bubus-ts/src/event_result.ts">

<violation number="1" location="bubus-ts/src/event_result.ts:454">
P2: Error details will be lost during JSON serialization because Error instances stringify to `{}` when `error: this.error` is included directly.</violation>
</file>

<file name="docs/features/event-history-store.mdx">

<violation number="1" location="docs/features/event-history-store.mdx:37">
P3: The lifecycle section claims events stay in `event_history` while handlers run, but trimming behavior states uncompleted entries can be dropped. This contradiction leaves ambiguous whether in-flight events may disappear from history; clarify the lifecycle guarantee or add the trimming exception.</violation>
</file>

<file name="bubus-ts/src/bridge_nats.ts">

<violation number="1" location="bubus-ts/src/bridge_nats.ts:68">
P2: start() sets `running` only after awaited operations, so concurrent calls can pass the guard and create multiple NATS connections/subscriptions.</violation>
</file>

<file name="bubus-ts/src/lock_manager.ts">

<violation number="1" location="bubus-ts/src/lock_manager.ts:67">
P2: AsyncLock can oversubscribe the concurrency limit: release() decrements in_use before waking a waiter, and acquire() increments after the waiter resumes, allowing a new acquire to slip in and then the waiter increments too.</violation>
</file>

<file name="docs/integrations/bridge-nats.mdx">

<violation number="1" location="docs/integrations/bridge-nats.mdx:85">
P2: Bidirectional example wires bus and bridge in a way that can create an infinite publish/subscribe feedback loop because NATSEventBridge re-emits inbound messages without dedup or provenance checks.</violation>
</file>

<file name="bubus-ts/src/bridge_postgres.ts">

<violation number="1" location="bubus-ts/src/bridge_postgres.ts:110">
P2: dispatch() can proceed with a non-null but unconnected client because ensureStarted starts asynchronously and start() sets this.client before connect/setup; the null check doesn’t await initialization, so queries can run before connect and concurrent dispatches can create multiple clients.</violation>
</file>

<file name="README.md">

<violation number="1" location="README.md:662">
P3: Typo in documented import path (`bubus.middlwares.*`) will cause ImportError if copied; should be `bubus.middlewares.*`.</violation>
</file>

<file name="docs/integrations/bridge-socket.mdx">

<violation number="1" location="docs/integrations/bridge-socket.mdx:45">
P2: The docs suggest bidirectional forwarding without any loop prevention. With SocketEventBridge/EventBridge, inbound socket events are always dispatched and emit always forwards, so two apps using this configuration will ping‑pong events indefinitely. This should be called out or mitigated (e.g., filter origin-tagged events) to avoid infinite loops.</violation>
</file>

<file name="bubus-ts/README.md">

<violation number="1" location="bubus-ts/README.md:101">
P3: The EventBus constructor signature omits the documented `max_history_drop` option, so the API signature is inconsistent with the options table.</violation>

<violation number="2" location="bubus-ts/README.md:237">
P2: `bus.find()` returns a Promise per the documented signature, but the example assigns it without `await`, which will yield a Promise instead of the matching event. This can mislead users copying the snippet.</violation>
</file>

<file name="bubus-ts/src/optional_deps.ts">

<violation number="1" location="bubus-ts/src/optional_deps.ts:23">
P2: Catch-all import error handling masks real module initialization failures by always throwing a missing-dependency error, which can mislead debugging when the package exists but crashes.</violation>
</file>

<file name="docs/integrations/bridge-redis.mdx">

<violation number="1" location="docs/integrations/bridge-redis.mdx:85">
P2: The docs recommend bidirectional wiring (`bus.on('*', bridge.emit)` and `bridge.on('*', bus.emit)`), but RedisEventBridge republishes every inbound Redis message back to the bus without loop prevention. Because the bridge uses separate Redis pub/sub clients, messages it publishes are received by its subscriber, so this wiring can cause an infinite publish/subscribe loop (bus → bridge → Redis → bridge → bus → ...).</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.


let body = ''
req.setEncoding('utf8')
req.on('data', (chunk: string) => {
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Unbounded request body buffering in the HTTP listener can allow large payloads to exhaust memory and crash the process (DoS).

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/bridges.ts, line 252:

<comment>Unbounded request body buffering in the HTTP listener can allow large payloads to exhaust memory and crash the process (DoS).</comment>

<file context>
@@ -0,0 +1,376 @@
+
+      let body = ''
+      req.setEncoding('utf8')
+      req.on('data', (chunk: string) => {
+        body += chunk
+      })
</file context>
Fix with Cubic

return
}

void this.handleIncomingPayload(parsed_payload)
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Async handleIncomingPayload is called without a rejection handler in the unix listener, which can trigger unhandled promise rejections if parsing/dispatch fails.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/bridges.ts, line 265:

<comment>Async handleIncomingPayload is called without a rejection handler in the unix listener, which can trigger unhandled promise rejections if parsing/dispatch fails.</comment>

<file context>
@@ -0,0 +1,376 @@
+          return
+        }
+
+        void this.handleIncomingPayload(parsed_payload)
+          .then(() => {
+            res.statusCode = 202
</file context>
Fix with Cubic

constructor(options?: HTTPEventBridgeOptions)
constructor(send_to_or_options?: string | null | HTTPEventBridgeOptions, listen_on?: string | null, name?: string) {
const options: HTTPEventBridgeOptions =
typeof send_to_or_options === 'object'
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: The constructor treats null as an options object, so new HTTPEventBridge(null, listen_on) drops the listen_on argument and misconfigures the bridge.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/bridges.ts, line 345:

<comment>The constructor treats `null` as an options object, so `new HTTPEventBridge(null, listen_on)` drops the listen_on argument and misconfigures the bridge.</comment>

<file context>
@@ -0,0 +1,376 @@
+  constructor(options?: HTTPEventBridgeOptions)
+  constructor(send_to_or_options?: string | null | HTTPEventBridgeOptions, listen_on?: string | null, name?: string) {
+    const options: HTTPEventBridgeOptions =
+      typeof send_to_or_options === 'object'
+        ? (send_to_or_options ?? {})
+        : { send_to: send_to_or_options ?? undefined, listen_on: listen_on ?? undefined, name }
</file context>
Fix with Cubic

let buffer = ''
socket.setEncoding('utf8')
socket.on('data', (chunk: string) => {
buffer += chunk
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Unix socket listener buffers incoming data indefinitely until a newline is seen, allowing a client to send an unbounded stream and exhaust memory (DoS).

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/bridges.ts, line 306:

<comment>Unix socket listener buffers incoming data indefinitely until a newline is seen, allowing a client to send an unbounded stream and exhaust memory (DoS).</comment>

<file context>
@@ -0,0 +1,376 @@
+      let buffer = ''
+      socket.setEncoding('utf8')
+      socket.on('data', (chunk: string) => {
+        buffer += chunk
+        while (true) {
+          const newline_index = buffer.indexOf('\n')
</file context>
Fix with Cubic

}

Object.defineProperty(retryWrapper, 'name', { value: fn_name, configurable: true })
return retryWrapper as unknown as T
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: The retry decorator always returns an async wrapper (Promise), but it casts the wrapper back to T which can be a sync function. This masks a return-type change and can lead to runtime misuse when callers treat the result as a scalar.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/retry.ts, line 271:

<comment>The retry decorator always returns an async wrapper (Promise), but it casts the wrapper back to `T` which can be a sync function. This masks a return-type change and can lead to runtime misuse when callers treat the result as a scalar.</comment>

<file context>
@@ -0,0 +1,344 @@
+    }
+
+    Object.defineProperty(retryWrapper, 'name', { value: fn_name, configurable: true })
+    return retryWrapper as unknown as T
+  }
+}
</file context>
Fix with Cubic

await bus.emit(ParentEvent({})).done()
await bus.waitUntilIdle()

if (!(order.indexOf('child') < order.indexOf('parent_end'))) throw new Error('child should finish before parent resumes')
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3: The order assertion can pass even when child is missing because indexOf returns -1, so -1 < parent_endIndex is true. This makes the example’s validation logic unreliable.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/concurrency/immediate-execution.mdx, line 150:

<comment>The order assertion can pass even when `child` is missing because `indexOf` returns -1, so `-1 < parent_endIndex` is true. This makes the example’s validation logic unreliable.</comment>

<file context>
@@ -0,0 +1,181 @@
+await bus.emit(ParentEvent({})).done()
+await bus.waitUntilIdle()
+
+if (!(order.indexOf('child') < order.indexOf('parent_end'))) throw new Error('child should finish before parent resumes')
+if (!(order.indexOf('parent_end') < order.indexOf('sibling'))) throw new Error('sibling should run after parent ends')
+```
</file context>
Fix with Cubic


## Await semantics

Awaiting an `EventResult` resolves to handler return value or raises captured failure.
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3: Documentation claims EventResult is awaitable, but the TypeScript implementation is not Promise-like, so await entry would not yield the handler result. This is misleading for TS users and should be clarified per language.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/api/eventresult.mdx, line 23:

<comment>Documentation claims `EventResult` is awaitable, but the TypeScript implementation is not Promise-like, so `await entry` would not yield the handler result. This is misleading for TS users and should be clarified per language.</comment>

<file context>
@@ -0,0 +1,114 @@
+
+## Await semantics
+
+Awaiting an `EventResult` resolves to handler return value or raises captured failure.
+
+<Tabs>
</file context>
Fix with Cubic

- Event is enqueued into `pending_event_queue`.
2. Runloop begins processing:
- Event is removed from `pending_event_queue`.
- Event stays in `event_history` while handlers run.
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3: The lifecycle section claims events stay in event_history while handlers run, but trimming behavior states uncompleted entries can be dropped. This contradiction leaves ambiguous whether in-flight events may disappear from history; clarify the lifecycle guarantee or add the trimming exception.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/features/event-history-store.mdx, line 37:

<comment>The lifecycle section claims events stay in `event_history` while handlers run, but trimming behavior states uncompleted entries can be dropped. This contradiction leaves ambiguous whether in-flight events may disappear from history; clarify the lifecycle guarantee or add the trimming exception.</comment>

<file context>
@@ -0,0 +1,113 @@
+   - Event is enqueued into `pending_event_queue`.
+2. Runloop begins processing:
+   - Event is removed from `pending_event_queue`.
+   - Event stays in `event_history` while handlers run.
+3. Completion:
+   - Event is marked completed.
</file context>
Fix with Cubic

{"event_type": "FirstEventXyz", "event_created_at": "2025-07-10T20:39:56.462000+00:00", "some_key": "some_val", ...}
{"event_type": "SecondEventAbc", ..., "some_key": "banana"}
...
Built-in middlwares you can import from `bubus.middlwares.*`:
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3: Typo in documented import path (bubus.middlwares.*) will cause ImportError if copied; should be bubus.middlewares.*.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At README.md, line 662:

<comment>Typo in documented import path (`bubus.middlwares.*`) will cause ImportError if copied; should be `bubus.middlewares.*`.</comment>

<file context>
@@ -459,36 +623,75 @@ The harsh tradeoff is less deterministic ordering as handler execution order wil
-{"event_type": "FirstEventXyz", "event_created_at": "2025-07-10T20:39:56.462000+00:00", "some_key": "some_val", ...}
-{"event_type": "SecondEventAbc", ..., "some_key": "banana"}
-...
+Built-in middlwares you can import from `bubus.middlwares.*`:
+
+- `AutoErrorEventMiddleware`: on handler error, fire-and-forget emits `OriginalEventTypeErrorEvent` with `{error, error_type}` (skips `*ErrorEvent`/`*ResultEvent` sources). Useful when downstream/remote consumers only see events and need explicit failure notifications.
</file context>
Suggested change
Built-in middlwares you can import from `bubus.middlwares.*`:
Built-in middlewares you can import from `bubus.middlewares.*`:
Fix with Cubic

```ts
new EventBus(name?: string, options?: {
id?: string
max_history_size?: number | null
Copy link

@cubic-dev-ai cubic-dev-ai bot Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P3: The EventBus constructor signature omits the documented max_history_drop option, so the API signature is inconsistent with the options table.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/README.md, line 101:

<comment>The EventBus constructor signature omits the documented `max_history_drop` option, so the API signature is inconsistent with the options table.</comment>

<file context>
@@ -0,0 +1,838 @@
+```ts
+new EventBus(name?: string, options?: {
+  id?: string
+  max_history_size?: number | null
+  event_concurrency?: 'global-serial' | 'bus-serial' | 'parallel' | null
+  event_timeout?: number | null
</file context>
Fix with Cubic

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants