Skip to content

feat: add SSE for live block updates#19

Open
pthmas wants to merge 19 commits intomainfrom
pthmas/websocket-live-updates
Open

feat: add SSE for live block updates#19
pthmas wants to merge 19 commits intomainfrom
pthmas/websocket-live-updates

Conversation

@pthmas
Copy link
Collaborator

@pthmas pthmas commented Feb 26, 2026

Summary

  • Adds SSE endpoint (GET /api/events) that streams new block metadata in real-time
  • Frontend drains events at the chain's natural block rate (derived from on-chain timestamps) for a smooth visual cadence
  • Blocks page prepends new blocks instantly via SSE; polling remains as fallback
  • Block time display uses a 30s rolling window of block timestamps for stability

Closes #11

Summary by CodeRabbit

  • New Features

    • Real-time SSE stream for live block events; frontend hook exposes latest event, connection state, height and bps with automatic reconnect and buffering; polling fallback via a new fast status endpoint.
  • UI

    • Live status indicator (green when SSE connected, red when polling); block height uses exact SSE values when live and smoothed prediction when polling; blocks list prepends live blocks with deduplication.
  • Chores

    • Database data moved to a named persistent volume.
  • Style

    • Simplified, non-animated block counter for consistent rendering.

Add Server-Sent Events endpoint (GET /api/events) that streams new blocks
to the frontend in real-time, replacing polling for block height and blocks
list updates.

Backend:
- New SSE handler polls DB every 200ms, emits one event per block in order
- SSE route excluded from 10s TimeoutLayer so connections stay alive
- Nginx configured to proxy SSE with buffering disabled

Frontend:
- useBlockSSE hook buffers events in a ref-based queue and drains at the
  chain's natural block rate (computed from on-chain timestamps)
- Navbar counter updates instantly via SSE, block time derived from 30s
  rolling window of block timestamps for stability
- BlocksPage prepends new blocks from SSE on page 1 with auto-refresh
- Polling falls back automatically when SSE disconnects

Closes #11
@coderabbitai
Copy link

coderabbitai bot commented Feb 26, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds server-sent events for new blocks: backend exposes /api/events with a Postgres NOTIFY fanout; nginx proxies the stream; frontend adds useBlockSSE, integrates SSE into height polling, UI, and Blocks page (backfill/dedup); indexer emits NOTIFY; streaming deps added to manifests.

Changes

Cohort / File(s) Summary
Workspace Manifests
backend/Cargo.toml, backend/crates/atlas-api/Cargo.toml
Added streaming dependencies: tokio-stream, futures, async-stream as workspace deps and crate workspace refs.
Backend SSE module
backend/crates/atlas-api/src/handlers/mod.rs, backend/crates/atlas-api/src/handlers/sse.rs
New sse module exposing block_events SSE handler, fanout task run_block_event_fanout, DB backfill, batch fetching, broadcast logic, event serialization, keep-alives, and unit tests.
Backend router / state
backend/crates/atlas-api/src/main.rs
Added block_events_tx: broadcast::Sender<()> to AppState, spawned fanout task, registered /api/events route and excluded that route from TimeoutLayer; adjusted router state wiring.
Indexer notify
backend/crates/atlas-indexer/src/indexer.rs
Introduced BLOCK_EVENT_CHANNEL and emit PostgreSQL NOTIFY (atlas_new_blocks) after updating last_indexed_block.
Docker / build
docker-compose.yml, backend/.dockerignore
Named pgdata volume atlas_pgdata; added backend/.dockerignore to ignore target/, .git/, .context/.
Nginx proxy
frontend/nginx.conf
Added /api/events proxy to atlas-api:3000/api/events with HTTP/1.1 and disabled buffering for streaming.
Frontend SSE hook
frontend/src/hooks/useBlockSSE.ts
New hook connecting to /api/events, buffering events, adaptive drain loop, reconnection/backoff, computes bps, and exposes latestBlock, height, connected, error, bps, plus NewBlockEvent type.
Frontend integration
frontend/src/components/Layout.tsx, frontend/src/hooks/useLatestBlockHeight.ts, frontend/src/context/BlockStatsContext.tsx
Integrated SSE into layout/context; useLatestBlockHeight signature updated to accept SSE state; context now includes latestBlockEvent and sseConnected.
Pages & UI
frontend/src/pages/BlocksPage.tsx, frontend/src/components/SmoothCounter.tsx
BlocksPage: prepend/deduplicate SSE blocks on page 1 with buffering and RAF batching, adjust polling; SmoothCounter: simplified DOM/CSS (removed animated wrapper).
API / client
frontend/src/api/client.ts
Made API_BASE_URL exported.
Docs / notes
CLAUDE.md
Documented SSE /api/events and /api/status as polling fallback.

Sequence Diagram(s)

sequenceDiagram
    participant Browser as Browser (Client)
    participant Nginx as Nginx
    participant API as Atlas API
    participant Indexer as Indexer/DB
    participant Hook as useBlockSSE
    participant UI as UI

    Browser->>Nginx: GET /api/events (SSE)
    Nginx->>API: Proxy /api/events (HTTP/1.1, no buffering)
    API->>Indexer: Query latest block (seed)
    Indexer-->>API: latest block
    API->>Browser: SSE new_block (initial)

    Note over Indexer,API: Indexer emits NOTIFY "atlas_new_blocks" on new blocks
    Indexer->>API: NOTIFY payload
    API->>API: broadcast via tokio::broadcast
    API->>Browser: SSE new_block (streamed JSON)
    API-->>Browser: SSE keep-alive (~15s)

    Browser->>Hook: receive SSE events
    Hook->>Hook: enqueue, compute bps, adapt drain interval
    loop Drain (adaptive)
      Hook->>UI: latestBlock, height, connected, bps
      UI->>UI: render counter and indicators
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Suggested reviewers

  • tac0turtle

Poem

🐰 I nibble logs by moonlit streams,
New blocks hop in tiny beams,
Queues I tend with careful paws,
Keep-alives hum and fanouts cause,
I twitch my nose and cheer the streams.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 52.63% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: add SSE for live block updates' accurately and concisely describes the main change—implementing Server-Sent Events for real-time block updates.
Linked Issues check ✅ Passed The pull request implements the core requirement from issue #11: an SSE endpoint for immediate frontend notification on new block production, with full streaming integration.
Out of Scope Changes check ✅ Passed All changes directly support SSE implementation and live block updates. Changes include SSE endpoint, frontend integration, docker/nginx config, and supporting infrastructure—all scoped to the objective.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings (stacked PR)
  • 📝 Generate docstrings (commit on current branch)
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch pthmas/websocket-live-updates
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/crates/atlas-api/src/main.rs (1)

219-225: ⚠️ Potential issue | 🟠 Major

The SSE routes are not wrapped by the TimeoutLayer and violate the coding guidelines.

In axum 0.8, .layer() only affects routes that exist at the time it is called. Since TimeoutLayer is applied on lines 219–222 and .merge(sse_routes) occurs on line 225, the SSE routes do not receive the timeout middleware. Per coding guidelines, all routes in main.rs must be wrapped with TimeoutLayer::with_status_code(StatusCode::REQUEST_TIMEOUT, Duration::from_secs(10)).

Move the .layer(TimeoutLayer...) call after .merge(sse_routes), or apply TimeoutLayer to the sse_routes router before merging if SSE connections require a different timeout policy.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/main.rs` around lines 219 - 225, The
TimeoutLayer is applied before merging sse_routes so SSE routes miss the
timeout; move the call to
TimeoutLayer::with_status_code(axum::http::StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(10)) so it wraps the final router that includes sse_routes
(i.e., call .layer(TimeoutLayer::with_status_code(...)) after
.merge(sse_routes)), or alternatively apply the same TimeoutLayer to the
sse_routes Router itself before calling .merge so all routes in the combined
router receive the timeout middleware while preserving long-lived SSE behavior
if needed.
🧹 Nitpick comments (3)
docker-compose.yml (1)

69-70: Document first-run volume bootstrap for local/dev environments.

Marking pgdata as external is fine, but new environments will fail to start unless atlas_pgdata already exists. Consider documenting docker volume create atlas_pgdata in setup docs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docker-compose.yml` around lines 69 - 70, The docker-compose file marks the
Postgres volume as external ("external: true" with name "atlas_pgdata"), which
will cause new local/dev environments to fail unless that Docker volume already
exists; add a short setup note instructing developers to run the Docker volume
creation command (e.g., docker volume create atlas_pgdata) before first
docker-compose up, and include this instruction in the project's README or local
setup docs alongside the postgres/pgdata volume mention so new contributors know
to bootstrap the volume.
backend/crates/atlas-api/src/handlers/sse.rs (1)

26-33: Per-client 200ms DB polling will not scale well with concurrent SSE clients.

Consider a shared broadcaster (single poller + fanout) so DB polling is O(1) per server instance instead of O(clients).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 26 - 33, The
current per-client async_stream (symbols: stream, last_block_number, tick,
ping_counter) polls the DB every 200ms for each SSE connection; change this to a
shared broadcaster pattern: create a single background task (spawned once at
server start or lazy-init) that owns last_block_number and tick, polls the DB
and sends updates on a tokio::sync::broadcast::channel (or similar) and include
periodic ping events; in the SSE handler, replace the per-client loop with
subscribing to that broadcast channel (channel.subscribe()), receiving updates
(with a timeout or handling lagged subscribers) and forwarding them to the
client, falling back to the channel’s ping/events instead of each client running
its own interval loop.
frontend/src/components/Layout.tsx (1)

59-63: Inconsistent state update pattern in SSE branch.

Line 62 calls setDisplayedHeight directly, while all other branches (initialization, polling) wrap setState in requestAnimationFrame. For consistency with the rest of the effect, consider applying the same RAF-wrapping pattern here.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/components/Layout.tsx` around lines 59 - 63, The SSE-connected
branch updates displayedRef.current and calls setDisplayedHeight directly;
change it to match the initialization/polling branches by wrapping the
displayedRef.current assignment and setDisplayedHeight call inside
requestAnimationFrame (use the same raf id tracking/cancellation pattern your
effect uses), e.g., when sse.connected is true schedule the update via
requestAnimationFrame and store/clear the raf id to ensure consistent rendering
and proper cleanup; target the displayedRef, setDisplayedHeight, and
sse.connected code paths for this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 37-43: Replace the silent .ok().flatten() and unwrap_or_default()
patterns in the SSE loop with explicit error handling: use
sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(number) FROM
blocks").fetch_optional(&state.pool).await and similarly switch the other block
fetch (currently using query_as with a tuple) to an appropriate query_scalar or
query_as for the concrete row type, then match on the Result; on Err(log the
error via the existing logger/tracing) and perform a short backoff (e.g.,
tokio::time::sleep) then continue the loop instead of treating errors as missing
data, and on Ok(None/Some) handle the null/Some case as before—this ensures
database errors are logged and the loop recovers rather than silently stalling.

In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 126-131: The TS2448 self-reference error comes from calling
connect() inside its own arrow-function initializer; change the anonymous arrow
callback used to define connect into a named function expression (e.g., function
connectInner(...) { ... } or const connect = function connect(...) { ... }) so
internal recursive/timeout references use the function's internal name rather
than the outer const binding; update any references from
reconnectTimeoutRef.current = window.setTimeout(() => { ... connect(); ... },
2000) to call the internal name (or keep calling connect() since the named
function expression allows self-reference) and ensure the connect symbol remains
exported/used the same elsewhere.

In `@frontend/src/hooks/useLatestBlockHeight.ts`:
- Around line 74-82: The polling fallback in useLatestBlockHeight's fetchHeight
currently calls getStatus() which is too heavy; change fetchHeight to call the
lightweight height endpoint (e.g., getHeight or an equivalent function that
performs GET /api/height) and use the returned { block_height, indexed_at }
instead of status?.block_height, then pass the numeric block_height into
processHeight(latestHeight, false); ensure fetchingRef logic remains unchanged
and update any imports/usages of getStatus inside this module to the new
getHeight function.

In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 37-40: The useEffect that unconditionally calls setSseBlocks([])
when fetchedBlocks changes drops streamed blocks prematurely; instead, in the
effect inside the BlocksPage component compare current sseBlocks with
fetchedBlocks (e.g., by unique id or timestamp) and only remove/merge those SSE
entries that already appear in fetchedBlocks, leaving remaining sseBlocks
intact; update the effect to iterate over sseBlocks and filter out items present
in fetchedBlocks, then call setSseBlocks(filteredSseBlocks) so streamed blocks
are preserved until they are present in fetched data.

---

Outside diff comments:
In `@backend/crates/atlas-api/src/main.rs`:
- Around line 219-225: The TimeoutLayer is applied before merging sse_routes so
SSE routes miss the timeout; move the call to
TimeoutLayer::with_status_code(axum::http::StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(10)) so it wraps the final router that includes sse_routes
(i.e., call .layer(TimeoutLayer::with_status_code(...)) after
.merge(sse_routes)), or alternatively apply the same TimeoutLayer to the
sse_routes Router itself before calling .merge so all routes in the combined
router receive the timeout middleware while preserving long-lived SSE behavior
if needed.

---

Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 26-33: The current per-client async_stream (symbols: stream,
last_block_number, tick, ping_counter) polls the DB every 200ms for each SSE
connection; change this to a shared broadcaster pattern: create a single
background task (spawned once at server start or lazy-init) that owns
last_block_number and tick, polls the DB and sends updates on a
tokio::sync::broadcast::channel (or similar) and include periodic ping events;
in the SSE handler, replace the per-client loop with subscribing to that
broadcast channel (channel.subscribe()), receiving updates (with a timeout or
handling lagged subscribers) and forwarding them to the client, falling back to
the channel’s ping/events instead of each client running its own interval loop.

In `@docker-compose.yml`:
- Around line 69-70: The docker-compose file marks the Postgres volume as
external ("external: true" with name "atlas_pgdata"), which will cause new
local/dev environments to fail unless that Docker volume already exists; add a
short setup note instructing developers to run the Docker volume creation
command (e.g., docker volume create atlas_pgdata) before first docker-compose
up, and include this instruction in the project's README or local setup docs
alongside the postgres/pgdata volume mention so new contributors know to
bootstrap the volume.

In `@frontend/src/components/Layout.tsx`:
- Around line 59-63: The SSE-connected branch updates displayedRef.current and
calls setDisplayedHeight directly; change it to match the initialization/polling
branches by wrapping the displayedRef.current assignment and setDisplayedHeight
call inside requestAnimationFrame (use the same raf id tracking/cancellation
pattern your effect uses), e.g., when sse.connected is true schedule the update
via requestAnimationFrame and store/clear the raf id to ensure consistent
rendering and proper cleanup; target the displayedRef, setDisplayedHeight, and
sse.connected code paths for this change.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a488e55 and f2d30cd.

📒 Files selected for processing (13)
  • backend/Cargo.toml
  • backend/crates/atlas-api/Cargo.toml
  • backend/crates/atlas-api/src/handlers/mod.rs
  • backend/crates/atlas-api/src/handlers/sse.rs
  • backend/crates/atlas-api/src/main.rs
  • docker-compose.yml
  • frontend/nginx.conf
  • frontend/src/components/Layout.tsx
  • frontend/src/components/SmoothCounter.tsx
  • frontend/src/context/BlockStatsContext.tsx
  • frontend/src/hooks/useBlockSSE.ts
  • frontend/src/hooks/useLatestBlockHeight.ts
  • frontend/src/pages/BlocksPage.tsx

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
backend/crates/atlas-api/src/handlers/sse.rs (1)

93-105: Redundant keep-alive mechanisms.

The manual ping_counter logic (lines 93-97) sends a keep-alive comment every ~15 seconds, while axum's KeepAlive configuration (lines 101-105) independently sends a keep-alive text at the same interval. Consider removing one to reduce redundancy.

♻️ Proposed simplification — remove manual keep-alive
-            // Send keep-alive ping every ~15s (75 ticks * 200ms)
-            if ping_counter >= 75 {
-                ping_counter = 0;
-                yield Ok(Event::default().comment("keep-alive"));
-            }
         }
     };

     Sse::new(stream).keep_alive(
         axum::response::sse::KeepAlive::new()
             .interval(Duration::from_secs(15))
             .text("keep-alive"),
     )

You can also remove ping_counter and its updates at lines 29, 33, 62, and 80 if adopting this change.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 93 - 105, Remove
the redundant manual keep-alive: delete the ping_counter-based send (the block
that yields Ok(Event::default().comment("keep-alive")) guarded by ping_counter
>= 75) and any updates to ping_counter elsewhere (updates at the locations that
increment/reset ping_counter), and rely solely on Sse::new(stream).keep_alive
with axum::response::sse::KeepAlive::new().interval(...).text("keep-alive") to
emit pings; this eliminates duplicate keep-alive messages while keeping the
existing Sse::keep_alive configuration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 43-64: The code sets last_block_number = Some(max_num) before
ensuring the initial block was fetched and emitted, which can permanently skip
the block if the fetch fails; modify the logic in the block handling using
latest/max_num so that last_block_number is only updated after a successful
fetch-and-emit: perform the sqlx::query_as(...) fetch_optional, check and
serialize the fetched Block into NewBlockEvent (using serde_json::to_string) and
yield the Event first, and only then assign last_block_number = Some(max_num)
(and reset ping_counter) so the cursor does not advance on failed fetches;
update the code paths around latest, block, NewBlockEvent and the yield
Ok(Event::default().event("new_block").data(...)) accordingly.

---

Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 93-105: Remove the redundant manual keep-alive: delete the
ping_counter-based send (the block that yields
Ok(Event::default().comment("keep-alive")) guarded by ping_counter >= 75) and
any updates to ping_counter elsewhere (updates at the locations that
increment/reset ping_counter), and rely solely on Sse::new(stream).keep_alive
with axum::response::sse::KeepAlive::new().interval(...).text("keep-alive") to
emit pings; this eliminates duplicate keep-alive messages while keeping the
existing Sse::keep_alive configuration.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f2d30cd and cc1a48e.

📒 Files selected for processing (1)
  • backend/crates/atlas-api/src/handlers/sse.rs

- sse.rs: replace silent error swallowing with tracing::warn! and continue
- useBlockSSE: use named function expression to fix TS2448 self-reference
- BlocksPage: filter SSE buffer instead of unconditional clear on fetch
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
backend/crates/atlas-api/src/handlers/sse.rs (2)

46-68: ⚠️ Potential issue | 🟠 Major

Advance the SSE cursor only after a successful initial event emit.

last_block_number is set before the initial block is fetched/emitted. If that fetch/emit path fails, the stream cursor advances and that block can be skipped for this client.

Suggested fix
                 if let Some(max_num) = latest {
-                    last_block_number = Some(max_num);
                     // Emit the current latest block as the initial event
                     let block: Option<Block> = match sqlx::query_as(
                         "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
                          FROM blocks WHERE number = $1"
                     )
@@
                     if let Some(block) = block {
                         let event = NewBlockEvent { block };
                         if let Ok(json) = serde_json::to_string(&event) {
                             yield Ok(Event::default().event("new_block").data(json));
+                            last_block_number = Some(max_num);
+                            ping_counter = 0;
                         }
                     }
-                    ping_counter = 0;
                 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 46 - 68, The SSE
cursor advancement currently sets last_block_number = Some(max_num) before
fetching/emitting the initial block (in the branch handling latest), which can
skip a block if fetch or emit fails; move the assignment of last_block_number so
it only happens after a successful fetch and successful serde_json::to_string +
yield of the NewBlockEvent (i.e., only after the block is Some, NewBlockEvent is
constructed, serialization succeeds, and
Event::default().event("new_block").data(json) is yielded); update the code
around latest, sqlx::query_as(...).bind(max_num).fetch_optional(...), the block
handling, serde_json::to_string(&event), and the yield to ensure
last_block_number is set after emission succeeds and remains unchanged on
errors.

43-44: ⚠️ Potential issue | 🟠 Major

Add a short backoff on repeated DB errors.

Current error branches immediately continue, so outages can produce a tight warn/retry loop every 200ms per SSE client.

Suggested fix
-use tokio::time::interval;
+use tokio::time::{interval, sleep};
@@
-                    Err(e) => { warn!(error = ?e, "sse: failed to query latest block number"); continue; }
+                    Err(e) => {
+                        warn!(error = ?e, "sse: failed to query latest block number");
+                        sleep(Duration::from_millis(500)).await;
+                        continue;
+                    }
@@
-                        Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; }
+                        Err(e) => {
+                            warn!(error = ?e, "sse: failed to fetch initial block");
+                            sleep(Duration::from_millis(500)).await;
+                            continue;
+                        }
@@
-                Err(e) => { warn!(error = ?e, cursor, "sse: failed to fetch new blocks"); continue; }
+                Err(e) => {
+                    warn!(error = ?e, cursor, "sse: failed to fetch new blocks");
+                    sleep(Duration::from_millis(500)).await;
+                    continue;
+                }
             };

Also applies to: 58-59, 84-85

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 43 - 44, The error
branches currently `continue` immediately (e.g. the branches that warn!("sse:
failed to query latest block number") and the other similar warn! messages),
causing a tight retry loop on DB outages; add a short async backoff (e.g.,
tokio::time::sleep for a few hundred milliseconds) before each `continue` so the
SSE handler loop yields and avoids tight retries. Update the three error
branches referenced (the Err(e) warn! handling around the latest block query and
the two other warn! sites at the noted locations) to await a small delay and
then continue; keep the warn! logs as-is and only add the sleep+await just prior
to the continue. Ensure you import tokio::time::Duration and tokio::time::sleep
in the sse.rs scope if not already present.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 46-51: The merge logic in the useMemo for blocks (using sseBlocks
and fetchedBlocks) runs on every page, causing newer page views to show items
belonging to page 1; change the logic to only prepend/merge sseBlocks when the
current page is 1: add the page (or currentPage) value to the dependency array
and, inside the useMemo, if page !== 1 return fetchedBlocks directly; otherwise
perform the existing Set/filter merge of sseBlocks and fetchedBlocks and
slice(0,20). Ensure you reference and update the blocks useMemo dependencies to
include the page/currentPage variable so updates happen correctly.

---

Duplicate comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 46-68: The SSE cursor advancement currently sets last_block_number
= Some(max_num) before fetching/emitting the initial block (in the branch
handling latest), which can skip a block if fetch or emit fails; move the
assignment of last_block_number so it only happens after a successful fetch and
successful serde_json::to_string + yield of the NewBlockEvent (i.e., only after
the block is Some, NewBlockEvent is constructed, serialization succeeds, and
Event::default().event("new_block").data(json) is yielded); update the code
around latest, sqlx::query_as(...).bind(max_num).fetch_optional(...), the block
handling, serde_json::to_string(&event), and the yield to ensure
last_block_number is set after emission succeeds and remains unchanged on
errors.
- Around line 43-44: The error branches currently `continue` immediately (e.g.
the branches that warn!("sse: failed to query latest block number") and the
other similar warn! messages), causing a tight retry loop on DB outages; add a
short async backoff (e.g., tokio::time::sleep for a few hundred milliseconds)
before each `continue` so the SSE handler loop yields and avoids tight retries.
Update the three error branches referenced (the Err(e) warn! handling around the
latest block query and the two other warn! sites at the noted locations) to
await a small delay and then continue; keep the warn! logs as-is and only add
the sleep+await just prior to the continue. Ensure you import
tokio::time::Duration and tokio::time::sleep in the sse.rs scope if not already
present.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cc1a48e and bc193c3.

📒 Files selected for processing (3)
  • backend/crates/atlas-api/src/handlers/sse.rs
  • frontend/src/hooks/useBlockSSE.ts
  • frontend/src/pages/BlocksPage.tsx
🚧 Files skipped from review as they are similar to previous changes (1)
  • frontend/src/hooks/useBlockSSE.ts

…-in-effect

Wrap setSseBlocks calls in BlocksPage and setDisplayedHeight in Layout
in requestAnimationFrame to avoid calling setState synchronously inside
useEffect bodies, which is flagged by eslint-plugin-react-hooks v7.
Stop polling every 10s as a safety net when SSE is active — periodic
refetches disrupt the smooth live flow. Instead, refetch immediately
when SSE disconnects to catch any missed blocks during the gap.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
frontend/src/pages/BlocksPage.tsx (1)

55-60: ⚠️ Potential issue | 🟠 Major

Gate SSE merge to page 1 (and live mode) to avoid cross-page contamination.

blocks still prepends sseBlocks on every page. After navigating away from page 1, buffered newest blocks can leak into page 2+ and break pagination correctness.

Suggested fix
   const blocks = useMemo(() => {
-    if (!sseBlocks.length) return fetchedBlocks;
+    if (page !== 1 || !autoRefresh || !sseBlocks.length) return fetchedBlocks;
     const seen = new Set(fetchedBlocks.map((b) => b.number));
     const unique = sseBlocks.filter((b) => !seen.has(b.number));
     return [...unique, ...fetchedBlocks].slice(0, 20);
-  }, [fetchedBlocks, sseBlocks]);
+  }, [fetchedBlocks, sseBlocks, page, autoRefresh]);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/pages/BlocksPage.tsx` around lines 55 - 60, The merge logic in
the useMemo for blocks currently prepends sseBlocks on every page and must be
restricted to live mode on page 1; modify the useMemo so it first checks the
page and live-mode flags (e.g., currentPage === 1 and isLive / liveMode) and if
not both true simply return fetchedBlocks, otherwise run the existing
dedup/merge (seen Set, unique, return [...unique,
...fetchedBlocks].slice(0,20)). Update the dependency array to include the page
and live-mode identifiers so the memo recalculates when those change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Duplicate comments:
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 55-60: The merge logic in the useMemo for blocks currently
prepends sseBlocks on every page and must be restricted to live mode on page 1;
modify the useMemo so it first checks the page and live-mode flags (e.g.,
currentPage === 1 and isLive / liveMode) and if not both true simply return
fetchedBlocks, otherwise run the existing dedup/merge (seen Set, unique, return
[...unique, ...fetchedBlocks].slice(0,20)). Update the dependency array to
include the page and live-mode identifiers so the memo recalculates when those
change.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2a0cea0 and 5a61a1f.

📒 Files selected for processing (1)
  • frontend/src/pages/BlocksPage.tsx

- BlocksPage: skip API polling entirely while SSE is connected to avoid
  disrupting the live block flow; trigger an immediate refetch when SSE
  drops to recover any missed blocks
- BlocksPage: guard SSE block merge to page 1 + autoRefresh only, so
  navigating to page 2+ never shows page-1 SSE blocks
- sse.rs: only advance last_block_number after the initial block is
  successfully fetched and emitted, preventing the cursor from skipping
  the block on a transient fetch failure
@pthmas
Copy link
Collaborator Author

pthmas commented Feb 27, 2026

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Feb 27, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@CLAUDE.md`:
- Around line 82-83: The docs in CLAUDE.md describe /api/status as a lightweight
fast-poll endpoint but the frontend (frontend/src/pages/*.tsx) expects
/api/status to return full chain info on initial page load; update the contract
to avoid drift by either (A) restoring /api/status to always return full chain
info (block_height, indexed_at plus the full chain payload the frontend needs)
and mark a new endpoint (e.g., /api/status/quick or /api/poll) as the
lightweight single-key fast-poll, or (B) explicitly document in CLAUDE.md that
/api/status returns only the full chain snapshot (used on load) and add a
separate documented fast-poll endpoint name to be implemented by the backend and
used by the navbar SSE fallback; ensure references in frontend/src/pages/*.tsx
match the chosen endpoint name.

In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 26-40: The current useEffect cancels any existing ssePrependRafRef
scheduled RAF whenever a new latestBlockEvent arrives, which drops intermediate
blocks during bursts; change the logic to never cancel a pending RAF but instead
accumulate incoming blocks into a pending buffer (e.g., pendingSseBlocksRef) on
each arrival, dedupe by block.number and preserve arrival order, and ensure the
single scheduled RAF (tracked by ssePrependRafRef) flushes the whole buffer once
by calling setSseBlocks(prev => { merge buffer + prev, dedupe, slice(0,20) })
and then clears the buffer and RAF ref; update uses of lastSseBlockRef,
latestBlockEvent, and setSseBlocks accordingly so lastSseBlockRef is advanced
when blocks are actually prepended.

ℹ️ Review info

Configuration used: defaults

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5a61a1f and 67bfb14.

📒 Files selected for processing (3)
  • CLAUDE.md
  • backend/crates/atlas-api/src/handlers/sse.rs
  • frontend/src/pages/BlocksPage.tsx
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/crates/atlas-api/src/handlers/sse.rs

Replace cancel-and-reschedule RAF pattern with a pending buffer so that
burst arrivals (e.g. blocks 100, 101, 102 before a frame fires) are all
flushed in a single requestAnimationFrame rather than only the last block
surviving. Also clarify CLAUDE.md /api/status contract.
- Add LIMIT 100 to the SSE block polling query to prevent unbounded
  memory usage and avoid infinite retry loops from statement_timeout
- Add unit tests for NewBlockEvent serialization
- Remove external: true from pgdata volume so docker compose up works
  on fresh systems
@pthmas
Copy link
Collaborator Author

pthmas commented Mar 12, 2026

@coderabbitai resume

@coderabbitai
Copy link

coderabbitai bot commented Mar 12, 2026

✅ Actions performed

Reviews resumed.

@pthmas
Copy link
Collaborator Author

pthmas commented Mar 12, 2026

@coderabbitai review

@coderabbitai
Copy link

coderabbitai bot commented Mar 12, 2026

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
backend/crates/atlas-api/src/main.rs (1)

18-24: ⚠️ Potential issue | 🟠 Major

AppState is missing required chain_id and chain_name fields.

This main entrypoint should fetch eth_chainId once at startup and store it in AppState.chain_id (u64), and populate chain_name from CHAIN_NAME (default "Unknown").

As per coding guidelines, **/atlas-api/**/main.rs: fetch chain ID from RPC at startup into AppState.chain_id and use CHAIN_NAME (default "Unknown") for chain identification.

Also applies to: 60-66

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/main.rs` around lines 18 - 24, AppState
currently lacks chain identification; add fields chain_id: u64 and chain_name:
String to the AppState struct and initialize them at startup: call the RPC
eth_chainId once (e.g., in main or your app initialization function that
constructs AppState) and parse the returned hex/number into a u64 to set
AppState.chain_id, and read CHAIN_NAME (defaulting to "Unknown") to set
AppState.chain_name; ensure any code that constructs AppState (e.g., main or
create_app_state) is updated to perform the RPC call and populate these new
fields.
♻️ Duplicate comments (2)
frontend/src/hooks/useLatestBlockHeight.ts (1)

2-2: ⚠️ Potential issue | 🟠 Major

Polling fallback should use the lightweight /api/height endpoint, not /api/status.

getStatus() here keeps the fast-poll path tied to the wrong contract.

Suggested fix
-import { getStatus } from '../api/status';
+import { getHeight } from '../api/height';
@@
-      const status = await getStatus();
-      const latestHeight = status?.block_height;
+      const status = await getHeight();
+      const latestHeight = status?.block_height;

As per coding guidelines, frontend/src/hooks/**/*.ts: Fast polling endpoint is GET /api/height returning { block_height, indexed_at }, while GET /api/status is full chain info fetched once on page load.

Also applies to: 80-88

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/hooks/useLatestBlockHeight.ts` at line 2, The hook is using
getStatus() for fast polling but should call the lightweight height endpoint;
replace usages of getStatus() in useLatestBlockHeight (and the similar block
around the code referenced at lines 80-88) with a function that hits GET
/api/height (e.g., getHeight or getBlockHeight) which returns { block_height,
indexed_at }, and update the hook to poll that instead of getStatus() so the
fast-poll path is tied to the correct lightweight endpoint.
CLAUDE.md (1)

82-83: ⚠️ Potential issue | 🟠 Major

The endpoint contract documented here conflicts with the frontend contract.

These lines redefine /api/status as the fast polling endpoint and remove the separate full-status contract, which creates drift with existing frontend expectations.

As per coding guidelines, frontend/src/hooks/**/*.ts: Fast polling endpoint is GET /api/height returning { block_height, indexed_at }, and GET /api/status returns full chain info fetched once on page load.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@CLAUDE.md` around lines 82 - 83, The documented endpoint contract in
CLAUDE.md conflicts with the frontend: restore the original contracts so the
fast-polling endpoint is GET /api/height returning { block_height, indexed_at }
and GET /api/status remains the full chain-info endpoint fetched once on load;
update the doc text that currently describes `/api/status` as the sub-ms polling
endpoint and `/api/events` so that `/api/height` is described as the single-key
fast poll and `/api/status` is described as the full-chain info endpoint to
match frontend/hooks expectations.
🧹 Nitpick comments (1)
backend/crates/atlas-api/src/handlers/sse.rs (1)

20-20: Avoid duplicating the NOTIFY channel name across crates.

BLOCK_EVENT_CHANNEL is hardcoded here and also separately in backend/crates/atlas-indexer/src/indexer.rs:27. This creates silent break risk if one side is renamed. Move the channel constant to a shared crate/module and import it in both places.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` at line 20, The hardcoded
NOTIFY channel constant BLOCK_EVENT_CHANNEL is duplicated and should be
centralized; extract BLOCK_EVENT_CHANNEL into a shared module/crate (e.g., a
common constants module) and replace local declarations with a public import
where currently used (the constant referenced in sse.rs and the one in
indexer.rs); update the uses of BLOCK_EVENT_CHANNEL to import from the new
shared module and remove the local constant to ensure both crates reference the
single shared symbol.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 288-348: Add unit tests in the existing #[cfg(test)] mod tests
that exercise the new fanout/catch-up logic (not just JSON shape) by using
sample_block and NewBlockEvent to simulate sequences: (1) a gap-catchup test
that simulates missing block(s) and asserts the catch_up logic retrieves/emits
the missing blocks; (2) a lagged-fallback test that simulates the node being
behind the live head beyond the fallback threshold and asserts the code takes
the lagged fallback path; and (3) a broadcast-replay-bounds test that verifies
replay windows respect configured min/max bounds and do not request blocks
outside those limits; call the actual functions/structs that implement these
behaviors in sse.rs (the fanout/catch-up entrypoints) from the tests, assert
emitted events or return values accordingly, keep tests in the same file under
mod tests and run with cargo test --workspace.

In `@backend/crates/atlas-indexer/src/indexer.rs`:
- Around line 788-793: Add a unit test in a #[cfg(test)] mod tests block in this
file that exercises the new write-path so it actually calls the code path which
executes pg_tx.execute("SELECT pg_notify($1, $2)", &[&BLOCK_EVENT_CHANNEL,
&last_value]) (e.g., by invoking collect_block or the higher-level function that
triggers the watermark update), then open a Postgres LISTEN on
BLOCK_EVENT_CHANNEL (or use a test connection/transaction) to capture
notifications and assert a notification with the expected channel and payload
(last_value) was emitted; include setup/teardown in the test and run it with
cargo test --workspace.

In `@frontend/src/api/client.ts`:
- Line 5: The constant API_BASE_URL falls back to an absolute localhost URL;
change its fallback to the proxied path '/api'. Update the export const
API_BASE_URL assignment so that API_BASE_URL uses
import.meta.env.VITE_API_BASE_URL if present, otherwise the string '/api'
(modify the symbol API_BASE_URL in frontend/src/api/client.ts accordingly) to
ensure requests go through the nginx proxy.

In `@frontend/src/components/Layout.tsx`:
- Around line 59-65: When sse.connected is true the code only updates
displayedRef.current and returns, which can leave the displayedHeight state
stale when height moves downward; update the React state synchronously as well
to avoid overshoot. In the SSE branch (where sse.connected is checked) call the
setter that updates displayedHeight (the same setDisplayedHeight used elsewhere)
in addition to setting displayedRef.current = height so the UI immediately
reflects decreases/corrections; reference displayedRef.current,
displayedHeight/setDisplayedHeight, sse.connected and height to locate and fix
the logic.

In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 120-129: The onerror handler currently ignores the SSE event so
the error state never gets set; update the es.onerror callback in useBlockSSE
(and the corresponding handler around line ~177) to accept the event param (e)
and call setError with a useful value (e.g., e or e.type or a stringified
message) before closing and scheduling reconnect; ensure you clear/set error
appropriately in onopen (or where you setConnected(true)) so error is reset on
successful reconnect and use the same pattern in connectSSE to keep error state
consistent.

In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 46-50: The prepend logic in setSseBlocks (inside the state updater
using prev and pending) only deduplicates against existing prev, allowing
duplicate block.number values within the pending batch; update the routine to
first deduplicate pending by block.number (e.g., iterate pending and build a Map
or Set to keep the first/last occurrence per number) and then filter that
dedupedPending against seen before reversing and prepending so the final
[...prepend, ...prev].slice(0, 20) never contains duplicates from the incoming
batch; touch the setSseBlocks updater where pending, seen, prepend are used.

---

Outside diff comments:
In `@backend/crates/atlas-api/src/main.rs`:
- Around line 18-24: AppState currently lacks chain identification; add fields
chain_id: u64 and chain_name: String to the AppState struct and initialize them
at startup: call the RPC eth_chainId once (e.g., in main or your app
initialization function that constructs AppState) and parse the returned
hex/number into a u64 to set AppState.chain_id, and read CHAIN_NAME (defaulting
to "Unknown") to set AppState.chain_name; ensure any code that constructs
AppState (e.g., main or create_app_state) is updated to perform the RPC call and
populate these new fields.

---

Duplicate comments:
In `@CLAUDE.md`:
- Around line 82-83: The documented endpoint contract in CLAUDE.md conflicts
with the frontend: restore the original contracts so the fast-polling endpoint
is GET /api/height returning { block_height, indexed_at } and GET /api/status
remains the full chain-info endpoint fetched once on load; update the doc text
that currently describes `/api/status` as the sub-ms polling endpoint and
`/api/events` so that `/api/height` is described as the single-key fast poll and
`/api/status` is described as the full-chain info endpoint to match
frontend/hooks expectations.

In `@frontend/src/hooks/useLatestBlockHeight.ts`:
- Line 2: The hook is using getStatus() for fast polling but should call the
lightweight height endpoint; replace usages of getStatus() in
useLatestBlockHeight (and the similar block around the code referenced at lines
80-88) with a function that hits GET /api/height (e.g., getHeight or
getBlockHeight) which returns { block_height, indexed_at }, and update the hook
to poll that instead of getStatus() so the fast-poll path is tied to the correct
lightweight endpoint.

---

Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Line 20: The hardcoded NOTIFY channel constant BLOCK_EVENT_CHANNEL is
duplicated and should be centralized; extract BLOCK_EVENT_CHANNEL into a shared
module/crate (e.g., a common constants module) and replace local declarations
with a public import where currently used (the constant referenced in sse.rs and
the one in indexer.rs); update the uses of BLOCK_EVENT_CHANNEL to import from
the new shared module and remove the local constant to ensure both crates
reference the single shared symbol.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 37f05432-3709-4931-bc43-74fae8662192

📥 Commits

Reviewing files that changed from the base of the PR and between 67bfb14 and 6ddf26e.

📒 Files selected for processing (12)
  • CLAUDE.md
  • backend/.dockerignore
  • backend/crates/atlas-api/src/handlers/sse.rs
  • backend/crates/atlas-api/src/main.rs
  • backend/crates/atlas-indexer/src/indexer.rs
  • docker-compose.yml
  • frontend/src/api/client.ts
  • frontend/src/components/Layout.tsx
  • frontend/src/hooks/useBlockSSE.ts
  • frontend/src/hooks/useLatestBlockHeight.ts
  • frontend/src/hooks/useStats.ts
  • frontend/src/pages/BlocksPage.tsx
✅ Files skipped from review due to trivial changes (1)
  • backend/.dockerignore
🚧 Files skipped from review as they are similar to previous changes (1)
  • docker-compose.yml

Comment on lines +788 to +793
pg_tx
.execute(
"SELECT pg_notify($1, $2)",
&[&BLOCK_EVENT_CHANNEL, &last_value],
)
.await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add a test that verifies pg_notify is emitted with watermark updates.

This introduces new write-path behavior, but the current tests only cover collect_block logic and do not validate notification emission.

As per coding guidelines, **/*.rs: Add unit tests for new logic in a #[cfg(test)] mod tests block in the same file and run with cargo test --workspace.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 788 - 793, Add a
unit test in a #[cfg(test)] mod tests block in this file that exercises the new
write-path so it actually calls the code path which executes
pg_tx.execute("SELECT pg_notify($1, $2)", &[&BLOCK_EVENT_CHANNEL, &last_value])
(e.g., by invoking collect_block or the higher-level function that triggers the
watermark update), then open a Postgres LISTEN on BLOCK_EVENT_CHANNEL (or use a
test connection/transaction) to capture notifications and assert a notification
with the expected channel and payload (last_value) was emitted; include
setup/teardown in the test and run it with cargo test --workspace.

import type { ApiError } from '../types';

const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api';
export const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api';
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Default base URL should be /api, not http://localhost:3000/api.

Using localhost as fallback can break non-local environments and bypass the intended nginx proxy contract.

Suggested fix
-export const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api';
+export const API_BASE_URL = (import.meta.env.VITE_API_BASE_URL || '/api').replace(/\/+$/, '');

As per coding guidelines, frontend/src/api/**/*.ts: Frontend API client base URL is /api (proxied by nginx to atlas-api:3000).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
export const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api';
export const API_BASE_URL = (import.meta.env.VITE_API_BASE_URL || '/api').replace(/\/+$/, '');
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/api/client.ts` at line 5, The constant API_BASE_URL falls back
to an absolute localhost URL; change its fallback to the proxied path '/api'.
Update the export const API_BASE_URL assignment so that API_BASE_URL uses
import.meta.env.VITE_API_BASE_URL if present, otherwise the string '/api'
(modify the symbol API_BASE_URL in frontend/src/api/client.ts accordingly) to
ensure requests go through the nginx proxy.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 24-33: The current computeBpsFromLog implementation scans forward
and ends up using log[0] whenever the buffer exceeds minSpan, causing windows to
expand; change the selection to use the newest prior sample that satisfies the
span by iterating backwards from newest toward older entries. In
computeBpsFromLog, loop i from log.length-2 down to 0, compute span = newest.ts
- log[i].ts and if span >= minSpan return (newest.num - log[i].num)/span; after
the loop check fallback: if newest.ts - log[0].ts >= fallbackMinSpan return
(newest.num - log[0].num)/(newest.ts - log[0].ts); otherwise return null. This
preserves newest as the reference and picks the closest sample meeting minSpan
(and similarly for fallbackMinSpan).

In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 37-59: When autoRefresh is turned back on while sseConnected is
still true the component skips the catch-up refetch and misses blocks; modify
the logic that observes latestBlockEvent/page/autoRefresh (symbols:
latestBlockEvent, page, autoRefresh, sseConnected, lastSseBlockRef,
pendingSseBlocksRef, setSseBlocks, ssePrependRafRef) to detect the autoRefresh
transition from false->true (only for page===1) and trigger the same catch-up
behavior used when sseConnected toggles (i.e. perform the refetch of missed
blocks or invoke the existing catch-up function) or otherwise reset
lastSseBlockRef so pending blocks are fetched/processed; apply the same change
to the other similar block referenced around lines 114-129.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: f8acc341-cd83-405a-a054-c6525b5129c5

📥 Commits

Reviewing files that changed from the base of the PR and between 6ddf26e and b601d91.

📒 Files selected for processing (2)
  • frontend/src/hooks/useBlockSSE.ts
  • frontend/src/pages/BlocksPage.tsx

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
frontend/src/hooks/useBlockSSE.ts (1)

31-40: ⚠️ Potential issue | 🟠 Major

Forward scan selects the oldest qualifying sample instead of the newest.

The loop iterates from index 0 (oldest) and returns immediately when span >= minSpan. After a rate change, this always picks the oldest sample exceeding the threshold rather than the nearest one, causing bps and drain pacing to reflect stale history instead of the advertised 30s/10s rolling windows.

🔧 Proposed fix: iterate backward to find the nearest qualifying sample
 function computeBpsFromLog(log: BlockLog, minSpan: number, fallbackMinSpan: number): number | null {
   if (log.length < 2) return null;
   const newest = log[log.length - 1];
-  for (let i = 0; i < log.length - 1; i++) {
+  for (let i = log.length - 2; i >= 0; i--) {
     const span = newest.ts - log[i].ts;
     if (span >= minSpan) return (newest.num - log[i].num) / span;
-    if (i === 0 && span >= fallbackMinSpan) return (newest.num - log[0].num) / span;
   }
+  // Fallback: use oldest if it meets fallbackMinSpan
+  const oldestSpan = newest.ts - log[0].ts;
+  if (oldestSpan >= fallbackMinSpan) {
+    return (newest.num - log[0].num) / oldestSpan;
+  }
   return null;
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@frontend/src/hooks/useBlockSSE.ts` around lines 31 - 40, computeBpsFromLog
currently scans forward and returns the oldest qualifying sample; change it to
scan backward from the newest to pick the nearest qualifying sample. In
computeBpsFromLog(log, minSpan, fallbackMinSpan) iterate i from log.length-2
down to 0 and if newest.ts - log[i].ts >= minSpan return (newest.num -
log[i].num)/span; if none qualify for minSpan, do a second backward pass (or
continue same loop tracking fallback) to find the nearest sample with span >=
fallbackMinSpan and return using that sample; otherwise return null. Target the
computeBpsFromLog function and the variables log, minSpan, fallbackMinSpan,
newest when editing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 52-87: Rewrite the outer match-based loop over rx.recv().await
into a while-let to satisfy Clippy: replace the loop { match rx.recv().await {
Ok(()) | Err(broadcast::error::RecvError::Lagged(_)) => { ... },
Err(broadcast::error::RecvError::Closed) => break, } } with while let Ok(()) |
Err(broadcast::error::RecvError::Lagged(_)) = rx.recv().await { /* keep the
existing inner loop that uses last_block_number, fetch_blocks_after,
FETCH_BATCH_SIZE, block_to_event, etc. unchanged */ } and let the while-let
naturally exit on the Closed variant (no special Closed arm needed).

---

Duplicate comments:
In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 31-40: computeBpsFromLog currently scans forward and returns the
oldest qualifying sample; change it to scan backward from the newest to pick the
nearest qualifying sample. In computeBpsFromLog(log, minSpan, fallbackMinSpan)
iterate i from log.length-2 down to 0 and if newest.ts - log[i].ts >= minSpan
return (newest.num - log[i].num)/span; if none qualify for minSpan, do a second
backward pass (or continue same loop tracking fallback) to find the nearest
sample with span >= fallbackMinSpan and return using that sample; otherwise
return null. Target the computeBpsFromLog function and the variables log,
minSpan, fallbackMinSpan, newest when editing.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: e0115fb8-70d6-4d68-86b2-25d49d2abe67

📥 Commits

Reviewing files that changed from the base of the PR and between b601d91 and b7043c1.

📒 Files selected for processing (4)
  • backend/crates/atlas-api/src/handlers/sse.rs
  • backend/crates/atlas-api/src/main.rs
  • frontend/src/hooks/useBlockSSE.ts
  • frontend/src/pages/BlocksPage.tsx

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
backend/crates/atlas-api/src/handlers/sse.rs (1)

160-220: ⚠️ Potential issue | 🟡 Minor

Core SSE behavior is still under-tested.

At Lines 160-220, tests only validate JSON shape. Please add behavior tests for: gap catch-up progression, Lagged(_) recovery path, and reconnect wake-up replay behavior.

As per coding guidelines "Add unit tests for new logic in a #[cfg(test)] mod tests block in the same file and run with cargo test --workspace."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 160 - 220, Add
unit tests in the existing #[cfg(test)] mod tests in sse.rs that exercise SSE
behavior beyond JSON shape: (1) simulate gap catch-up progression by sending a
sequence of blocks with a missing intermediate number and assert the catch-up
logic advances through the missing numbers (target functions/structs: the SSE
handler function that processes incoming Block events and any gap/catch-up state
machine used), (2) test the Lagged(_) recovery path by forcing the handler to
encounter a lag condition and assert it emits the Lagged variant and then
recovers to normal processing (reference enum/variant names like Lagged and the
recovery method used), and (3) test reconnect wake-up replay by simulating a
reconnect event and verifying that the replayed blocks are reprocessed in order
and no duplicates are emitted (use NewBlockEvent, sample_block helper, and the
SSE reconnect/replay entry point). Add assertions for emitted events/state
transitions and run cargo test --workspace to validate.
🧹 Nitpick comments (1)
backend/crates/atlas-api/src/handlers/sse.rs (1)

97-129: Use exponential backoff with jitter for listener reconnects.

At Lines 97-129, reconnects use a fixed 1s sleep. Under DB instability, synchronized retries can amplify load. Prefer capped exponential backoff + jitter.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 97 - 129, Replace
the fixed 1s sleeps around PgListener::connect and listener.listen (and after
the inner listener.recv error break) with a capped exponential backoff with
jitter: start with a base delay (e.g., 1s), double on each consecutive failure
up to a max (e.g., 30s), add randomized jitter (+/- a small fraction or uniform
0..base) to avoid sync, and reset the backoff to the base delay immediately when
PgListener::connect and listener.listen succeed (i.e., before sending the first
tx.send and entering the inner recv loop). Ensure the same backoff instance is
used for consecutive connection attempts and that the backoff is cleared/reset
after a successful reconnect so subsequent failures start from the base again.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 63-67: The loop advances cursor and last_block_number for every
fetched block which can skip earlier gaps; change the logic in the for block in
blocks loop so you only advance cursor when the block is contiguous to the
previous tracked number: compute block_number, then if let Some(event) =
block_to_event(block) handle the event, and only set cursor = Some(block_number)
(and update last_block_number) when either cursor.is_none() or block_number ==
cursor.unwrap() + 1 (or similarly when last_block_number + 1 == block_number);
if a gap is detected do not advance cursor so subsequent queries can still fetch
the missing lower-numbered blocks. Ensure you reference and update the same
variables (cursor, last_block_number, block_to_event) in this conditional flow.

---

Duplicate comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 160-220: Add unit tests in the existing #[cfg(test)] mod tests in
sse.rs that exercise SSE behavior beyond JSON shape: (1) simulate gap catch-up
progression by sending a sequence of blocks with a missing intermediate number
and assert the catch-up logic advances through the missing numbers (target
functions/structs: the SSE handler function that processes incoming Block events
and any gap/catch-up state machine used), (2) test the Lagged(_) recovery path
by forcing the handler to encounter a lag condition and assert it emits the
Lagged variant and then recovers to normal processing (reference enum/variant
names like Lagged and the recovery method used), and (3) test reconnect wake-up
replay by simulating a reconnect event and verifying that the replayed blocks
are reprocessed in order and no duplicates are emitted (use NewBlockEvent,
sample_block helper, and the SSE reconnect/replay entry point). Add assertions
for emitted events/state transitions and run cargo test --workspace to validate.

---

Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 97-129: Replace the fixed 1s sleeps around PgListener::connect and
listener.listen (and after the inner listener.recv error break) with a capped
exponential backoff with jitter: start with a base delay (e.g., 1s), double on
each consecutive failure up to a max (e.g., 30s), add randomized jitter (+/- a
small fraction or uniform 0..base) to avoid sync, and reset the backoff to the
base delay immediately when PgListener::connect and listener.listen succeed
(i.e., before sending the first tx.send and entering the inner recv loop).
Ensure the same backoff instance is used for consecutive connection attempts and
that the backoff is cleared/reset after a successful reconnect so subsequent
failures start from the base again.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c40a19f6-b670-4b97-935f-30a914b7bbbd

📥 Commits

Reviewing files that changed from the base of the PR and between b7043c1 and 7aa2162.

📒 Files selected for processing (1)
  • backend/crates/atlas-api/src/handlers/sse.rs

Comment on lines +63 to +67
for block in blocks {
let block_number = block.number;
last_block_number = Some(block_number);
cursor = Some(block_number);
if let Some(event) = block_to_event(block) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid advancing the cursor across gaps; this can permanently skip blocks.

At Lines 63-67, cursor state advances to each fetched block immediately. If number=102 is observed before number=101, subsequent queries (WHERE number > cursor) can permanently miss 101.

🔧 Proposed fix
                         let batch_len = blocks.len();
                         for block in blocks {
+                            if let Some(prev) = cursor {
+                                if block.number > prev + 1 {
+                                    warn!(
+                                        expected = prev + 1,
+                                        seen = block.number,
+                                        "sse: detected block gap, delaying cursor advance"
+                                    );
+                                    break;
+                                }
+                            }
                             let block_number = block.number;
                             last_block_number = Some(block_number);
                             cursor = Some(block_number);
                             if let Some(event) = block_to_event(block) {
                                 yield Ok(event);
                             }
                         }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 63 - 67, The loop
advances cursor and last_block_number for every fetched block which can skip
earlier gaps; change the logic in the for block in blocks loop so you only
advance cursor when the block is contiguous to the previous tracked number:
compute block_number, then if let Some(event) = block_to_event(block) handle the
event, and only set cursor = Some(block_number) (and update last_block_number)
when either cursor.is_none() or block_number == cursor.unwrap() + 1 (or
similarly when last_block_number + 1 == block_number); if a gap is detected do
not advance cursor so subsequent queries can still fetch the missing
lower-numbered blocks. Ensure you reference and update the same variables
(cursor, last_block_number, block_to_event) in this conditional flow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add SSE for block production

2 participants