Use Context managers and closures for nicer locking flow vs inline branching#24
Use Context managers and closures for nicer locking flow vs inline branching#24pirate wants to merge 203 commits intobrowser-use:mainfrom
Conversation
Add support for middlewares to hook into event bus handler lifecycle
implement swappable EventHistory storage backend
Updated the description to clarify the library's functionality and similarities to JS event systems.
Revise README description for bubus library
…ead of process-until-event
There was a problem hiding this comment.
27 issues found across 199 files
Note: This PR contains a large number of files. cubic only reviews up to 75 files per PR, so some files may not have been reviewed.
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="docs/concurrency/immediate-execution.mdx">
<violation number="1" location="docs/concurrency/immediate-execution.mdx:150">
P3: The order assertion can pass even when `child` is missing because `indexOf` returns -1, so `-1 < parent_endIndex` is true. This makes the example’s validation logic unreliable.</violation>
</file>
<file name="bubus-ts/src/bridges.ts">
<violation number="1" location="bubus-ts/src/bridges.ts:252">
P2: Unbounded request body buffering in the HTTP listener can allow large payloads to exhaust memory and crash the process (DoS).</violation>
<violation number="2" location="bubus-ts/src/bridges.ts:265">
P2: Async handleIncomingPayload is called without a rejection handler in the unix listener, which can trigger unhandled promise rejections if parsing/dispatch fails.</violation>
<violation number="3" location="bubus-ts/src/bridges.ts:306">
P2: Unix socket listener buffers incoming data indefinitely until a newline is seen, allowing a client to send an unbounded stream and exhaust memory (DoS).</violation>
<violation number="4" location="bubus-ts/src/bridges.ts:345">
P2: The constructor treats `null` as an options object, so `new HTTPEventBridge(null, listen_on)` drops the listen_on argument and misconfigures the bridge.</violation>
</file>
<file name="bubus-ts/src/retry.ts">
<violation number="1" location="bubus-ts/src/retry.ts:94">
P2: Re-entrancy tracking is disabled when AsyncLocalStorage is unavailable, so nested calls with the same semaphore can deadlock by re-acquiring an already held AsyncLock.</violation>
<violation number="2" location="bubus-ts/src/retry.ts:271">
P2: The retry decorator always returns an async wrapper (Promise), but it casts the wrapper back to `T` which can be a sync function. This masks a return-type change and can lead to runtime misuse when callers treat the result as a scalar.</violation>
</file>
<file name="docs/concurrency/handlers-parallel.mdx">
<violation number="1" location="docs/concurrency/handlers-parallel.mdx:48">
P2: Python runnable example uses top-level `await`, which is invalid in a standard script and will raise `SyntaxError`. Wrap awaited calls in an async function and run with `asyncio.run(...)`.</violation>
</file>
<file name="docs/api/eventresult.mdx">
<violation number="1" location="docs/api/eventresult.mdx:23">
P3: Documentation claims `EventResult` is awaitable, but the TypeScript implementation is not Promise-like, so `await entry` would not yield the handler result. This is misleading for TS users and should be clarified per language.</violation>
</file>
<file name="bubus-ts/src/bridge_sqlite.ts">
<violation number="1" location="bubus-ts/src/bridge_sqlite.ts:189">
P2: Polling query wraps event_created_at in COALESCE, but the only index is on the raw column, so SQLite cannot use it for the WHERE/ORDER BY expression and will scan/sort every poll as the table grows.</violation>
</file>
<file name="docs/integrations/bridges.mdx">
<violation number="1" location="docs/integrations/bridges.mdx:25">
P2: Bidirectional '*' wiring in the docs creates an infinite feedback loop between EventBus and RedisEventBridge because RedisEventBridge republishes inbound events without loop suppression.</violation>
</file>
<file name="bubus-ts/src/bridge_redis.ts">
<violation number="1" location="bubus-ts/src/bridge_redis.ts:143">
P2: Async dispatchInboundPayload is called without await/catch in the Redis message handler, so any rejection becomes an unhandled promise rejection.</violation>
</file>
<file name="bubus-ts/src/bridge_jsonl.ts">
<violation number="1" location="bubus-ts/src/bridge_jsonl.ts:87">
P2: ensureStarted can trigger multiple concurrent start() calls because running/listener_task aren’t set until after awaits, allowing duplicate listen loops and duplicate event processing.</violation>
<violation number="2" location="bubus-ts/src/bridge_jsonl.ts:156">
P2: TextDecoder is recreated per read and decode is called without streaming support, so a multi‑byte UTF‑8 character split across reads will be replaced with U+FFFD and permanently lost once the offset advances.</violation>
<violation number="3" location="bubus-ts/src/bridge_jsonl.ts:164">
P2: `dirname` only handles '/' separators; Windows paths with '\\' will return '.', so the parent directory isn’t created on Windows and file writes can fail.</violation>
</file>
<file name="bubus-ts/src/event_result.ts">
<violation number="1" location="bubus-ts/src/event_result.ts:454">
P2: Error details will be lost during JSON serialization because Error instances stringify to `{}` when `error: this.error` is included directly.</violation>
</file>
<file name="docs/features/event-history-store.mdx">
<violation number="1" location="docs/features/event-history-store.mdx:37">
P3: The lifecycle section claims events stay in `event_history` while handlers run, but trimming behavior states uncompleted entries can be dropped. This contradiction leaves ambiguous whether in-flight events may disappear from history; clarify the lifecycle guarantee or add the trimming exception.</violation>
</file>
<file name="bubus-ts/src/bridge_nats.ts">
<violation number="1" location="bubus-ts/src/bridge_nats.ts:68">
P2: start() sets `running` only after awaited operations, so concurrent calls can pass the guard and create multiple NATS connections/subscriptions.</violation>
</file>
<file name="bubus-ts/src/lock_manager.ts">
<violation number="1" location="bubus-ts/src/lock_manager.ts:67">
P2: AsyncLock can oversubscribe the concurrency limit: release() decrements in_use before waking a waiter, and acquire() increments after the waiter resumes, allowing a new acquire to slip in and then the waiter increments too.</violation>
</file>
<file name="docs/integrations/bridge-nats.mdx">
<violation number="1" location="docs/integrations/bridge-nats.mdx:85">
P2: Bidirectional example wires bus and bridge in a way that can create an infinite publish/subscribe feedback loop because NATSEventBridge re-emits inbound messages without dedup or provenance checks.</violation>
</file>
<file name="bubus-ts/src/bridge_postgres.ts">
<violation number="1" location="bubus-ts/src/bridge_postgres.ts:110">
P2: dispatch() can proceed with a non-null but unconnected client because ensureStarted starts asynchronously and start() sets this.client before connect/setup; the null check doesn’t await initialization, so queries can run before connect and concurrent dispatches can create multiple clients.</violation>
</file>
<file name="README.md">
<violation number="1" location="README.md:662">
P3: Typo in documented import path (`bubus.middlwares.*`) will cause ImportError if copied; should be `bubus.middlewares.*`.</violation>
</file>
<file name="docs/integrations/bridge-socket.mdx">
<violation number="1" location="docs/integrations/bridge-socket.mdx:45">
P2: The docs suggest bidirectional forwarding without any loop prevention. With SocketEventBridge/EventBridge, inbound socket events are always dispatched and emit always forwards, so two apps using this configuration will ping‑pong events indefinitely. This should be called out or mitigated (e.g., filter origin-tagged events) to avoid infinite loops.</violation>
</file>
<file name="bubus-ts/README.md">
<violation number="1" location="bubus-ts/README.md:101">
P3: The EventBus constructor signature omits the documented `max_history_drop` option, so the API signature is inconsistent with the options table.</violation>
<violation number="2" location="bubus-ts/README.md:237">
P2: `bus.find()` returns a Promise per the documented signature, but the example assigns it without `await`, which will yield a Promise instead of the matching event. This can mislead users copying the snippet.</violation>
</file>
<file name="bubus-ts/src/optional_deps.ts">
<violation number="1" location="bubus-ts/src/optional_deps.ts:23">
P2: Catch-all import error handling masks real module initialization failures by always throwing a missing-dependency error, which can mislead debugging when the package exists but crashes.</violation>
</file>
<file name="docs/integrations/bridge-redis.mdx">
<violation number="1" location="docs/integrations/bridge-redis.mdx:85">
P2: The docs recommend bidirectional wiring (`bus.on('*', bridge.emit)` and `bridge.on('*', bus.emit)`), but RedisEventBridge republishes every inbound Redis message back to the bus without loop prevention. Because the bridge uses separate Redis pub/sub clients, messages it publishes are received by its subscriber, so this wiring can cause an infinite publish/subscribe loop (bus → bridge → Redis → bridge → bus → ...).</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
|
|
||
| let body = '' | ||
| req.setEncoding('utf8') | ||
| req.on('data', (chunk: string) => { |
There was a problem hiding this comment.
P2: Unbounded request body buffering in the HTTP listener can allow large payloads to exhaust memory and crash the process (DoS).
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/bridges.ts, line 252:
<comment>Unbounded request body buffering in the HTTP listener can allow large payloads to exhaust memory and crash the process (DoS).</comment>
<file context>
@@ -0,0 +1,376 @@
+
+ let body = ''
+ req.setEncoding('utf8')
+ req.on('data', (chunk: string) => {
+ body += chunk
+ })
</file context>
| return | ||
| } | ||
|
|
||
| void this.handleIncomingPayload(parsed_payload) |
There was a problem hiding this comment.
P2: Async handleIncomingPayload is called without a rejection handler in the unix listener, which can trigger unhandled promise rejections if parsing/dispatch fails.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/bridges.ts, line 265:
<comment>Async handleIncomingPayload is called without a rejection handler in the unix listener, which can trigger unhandled promise rejections if parsing/dispatch fails.</comment>
<file context>
@@ -0,0 +1,376 @@
+ return
+ }
+
+ void this.handleIncomingPayload(parsed_payload)
+ .then(() => {
+ res.statusCode = 202
</file context>
| constructor(options?: HTTPEventBridgeOptions) | ||
| constructor(send_to_or_options?: string | null | HTTPEventBridgeOptions, listen_on?: string | null, name?: string) { | ||
| const options: HTTPEventBridgeOptions = | ||
| typeof send_to_or_options === 'object' |
There was a problem hiding this comment.
P2: The constructor treats null as an options object, so new HTTPEventBridge(null, listen_on) drops the listen_on argument and misconfigures the bridge.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/bridges.ts, line 345:
<comment>The constructor treats `null` as an options object, so `new HTTPEventBridge(null, listen_on)` drops the listen_on argument and misconfigures the bridge.</comment>
<file context>
@@ -0,0 +1,376 @@
+ constructor(options?: HTTPEventBridgeOptions)
+ constructor(send_to_or_options?: string | null | HTTPEventBridgeOptions, listen_on?: string | null, name?: string) {
+ const options: HTTPEventBridgeOptions =
+ typeof send_to_or_options === 'object'
+ ? (send_to_or_options ?? {})
+ : { send_to: send_to_or_options ?? undefined, listen_on: listen_on ?? undefined, name }
</file context>
| let buffer = '' | ||
| socket.setEncoding('utf8') | ||
| socket.on('data', (chunk: string) => { | ||
| buffer += chunk |
There was a problem hiding this comment.
P2: Unix socket listener buffers incoming data indefinitely until a newline is seen, allowing a client to send an unbounded stream and exhaust memory (DoS).
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/bridges.ts, line 306:
<comment>Unix socket listener buffers incoming data indefinitely until a newline is seen, allowing a client to send an unbounded stream and exhaust memory (DoS).</comment>
<file context>
@@ -0,0 +1,376 @@
+ let buffer = ''
+ socket.setEncoding('utf8')
+ socket.on('data', (chunk: string) => {
+ buffer += chunk
+ while (true) {
+ const newline_index = buffer.indexOf('\n')
</file context>
| } | ||
|
|
||
| Object.defineProperty(retryWrapper, 'name', { value: fn_name, configurable: true }) | ||
| return retryWrapper as unknown as T |
There was a problem hiding this comment.
P2: The retry decorator always returns an async wrapper (Promise), but it casts the wrapper back to T which can be a sync function. This masks a return-type change and can lead to runtime misuse when callers treat the result as a scalar.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/src/retry.ts, line 271:
<comment>The retry decorator always returns an async wrapper (Promise), but it casts the wrapper back to `T` which can be a sync function. This masks a return-type change and can lead to runtime misuse when callers treat the result as a scalar.</comment>
<file context>
@@ -0,0 +1,344 @@
+ }
+
+ Object.defineProperty(retryWrapper, 'name', { value: fn_name, configurable: true })
+ return retryWrapper as unknown as T
+ }
+}
</file context>
| await bus.emit(ParentEvent({})).done() | ||
| await bus.waitUntilIdle() | ||
|
|
||
| if (!(order.indexOf('child') < order.indexOf('parent_end'))) throw new Error('child should finish before parent resumes') |
There was a problem hiding this comment.
P3: The order assertion can pass even when child is missing because indexOf returns -1, so -1 < parent_endIndex is true. This makes the example’s validation logic unreliable.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/concurrency/immediate-execution.mdx, line 150:
<comment>The order assertion can pass even when `child` is missing because `indexOf` returns -1, so `-1 < parent_endIndex` is true. This makes the example’s validation logic unreliable.</comment>
<file context>
@@ -0,0 +1,181 @@
+await bus.emit(ParentEvent({})).done()
+await bus.waitUntilIdle()
+
+if (!(order.indexOf('child') < order.indexOf('parent_end'))) throw new Error('child should finish before parent resumes')
+if (!(order.indexOf('parent_end') < order.indexOf('sibling'))) throw new Error('sibling should run after parent ends')
+```
</file context>
|
|
||
| ## Await semantics | ||
|
|
||
| Awaiting an `EventResult` resolves to handler return value or raises captured failure. |
There was a problem hiding this comment.
P3: Documentation claims EventResult is awaitable, but the TypeScript implementation is not Promise-like, so await entry would not yield the handler result. This is misleading for TS users and should be clarified per language.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/api/eventresult.mdx, line 23:
<comment>Documentation claims `EventResult` is awaitable, but the TypeScript implementation is not Promise-like, so `await entry` would not yield the handler result. This is misleading for TS users and should be clarified per language.</comment>
<file context>
@@ -0,0 +1,114 @@
+
+## Await semantics
+
+Awaiting an `EventResult` resolves to handler return value or raises captured failure.
+
+<Tabs>
</file context>
| - Event is enqueued into `pending_event_queue`. | ||
| 2. Runloop begins processing: | ||
| - Event is removed from `pending_event_queue`. | ||
| - Event stays in `event_history` while handlers run. |
There was a problem hiding this comment.
P3: The lifecycle section claims events stay in event_history while handlers run, but trimming behavior states uncompleted entries can be dropped. This contradiction leaves ambiguous whether in-flight events may disappear from history; clarify the lifecycle guarantee or add the trimming exception.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At docs/features/event-history-store.mdx, line 37:
<comment>The lifecycle section claims events stay in `event_history` while handlers run, but trimming behavior states uncompleted entries can be dropped. This contradiction leaves ambiguous whether in-flight events may disappear from history; clarify the lifecycle guarantee or add the trimming exception.</comment>
<file context>
@@ -0,0 +1,113 @@
+ - Event is enqueued into `pending_event_queue`.
+2. Runloop begins processing:
+ - Event is removed from `pending_event_queue`.
+ - Event stays in `event_history` while handlers run.
+3. Completion:
+ - Event is marked completed.
</file context>
| {"event_type": "FirstEventXyz", "event_created_at": "2025-07-10T20:39:56.462000+00:00", "some_key": "some_val", ...} | ||
| {"event_type": "SecondEventAbc", ..., "some_key": "banana"} | ||
| ... | ||
| Built-in middlwares you can import from `bubus.middlwares.*`: |
There was a problem hiding this comment.
P3: Typo in documented import path (bubus.middlwares.*) will cause ImportError if copied; should be bubus.middlewares.*.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At README.md, line 662:
<comment>Typo in documented import path (`bubus.middlwares.*`) will cause ImportError if copied; should be `bubus.middlewares.*`.</comment>
<file context>
@@ -459,36 +623,75 @@ The harsh tradeoff is less deterministic ordering as handler execution order wil
-{"event_type": "FirstEventXyz", "event_created_at": "2025-07-10T20:39:56.462000+00:00", "some_key": "some_val", ...}
-{"event_type": "SecondEventAbc", ..., "some_key": "banana"}
-...
+Built-in middlwares you can import from `bubus.middlwares.*`:
+
+- `AutoErrorEventMiddleware`: on handler error, fire-and-forget emits `OriginalEventTypeErrorEvent` with `{error, error_type}` (skips `*ErrorEvent`/`*ResultEvent` sources). Useful when downstream/remote consumers only see events and need explicit failure notifications.
</file context>
| Built-in middlwares you can import from `bubus.middlwares.*`: | |
| Built-in middlewares you can import from `bubus.middlewares.*`: |
| ```ts | ||
| new EventBus(name?: string, options?: { | ||
| id?: string | ||
| max_history_size?: number | null |
There was a problem hiding this comment.
P3: The EventBus constructor signature omits the documented max_history_drop option, so the API signature is inconsistent with the options table.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At bubus-ts/README.md, line 101:
<comment>The EventBus constructor signature omits the documented `max_history_drop` option, so the API signature is inconsistent with the options table.</comment>
<file context>
@@ -0,0 +1,838 @@
+```ts
+new EventBus(name?: string, options?: {
+ id?: string
+ max_history_size?: number | null
+ event_concurrency?: 'global-serial' | 'bus-serial' | 'parallel' | null
+ event_timeout?: number | null
</file context>
Summary by cubic
Introduces a context-based LockManager and handler lifecycle that simplifies concurrency and adds a full TypeScript runtime with bridges, middleware hooks, first() racing, and a @Retry decorator. Also splits CI into TS/Python, adds npm publish, and updates docs and examples.
New Features
Migration
Written for commit 474964e. Summary will update on new commits.