Conversation
Add Server-Sent Events endpoint (GET /api/events) that streams new blocks to the frontend in real-time, replacing polling for block height and blocks list updates. Backend: - New SSE handler polls DB every 200ms, emits one event per block in order - SSE route excluded from 10s TimeoutLayer so connections stay alive - Nginx configured to proxy SSE with buffering disabled Frontend: - useBlockSSE hook buffers events in a ref-based queue and drains at the chain's natural block rate (computed from on-chain timestamps) - Navbar counter updates instantly via SSE, block time derived from 30s rolling window of block timestamps for stability - BlocksPage prepends new blocks from SSE on page 1 with auto-refresh - Polling falls back automatically when SSE disconnects Closes #11
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds server-sent events for new blocks: backend exposes Changes
Sequence Diagram(s)sequenceDiagram
participant Browser as Browser (Client)
participant Nginx as Nginx
participant API as Atlas API
participant Indexer as Indexer/DB
participant Hook as useBlockSSE
participant UI as UI
Browser->>Nginx: GET /api/events (SSE)
Nginx->>API: Proxy /api/events (HTTP/1.1, no buffering)
API->>Indexer: Query latest block (seed)
Indexer-->>API: latest block
API->>Browser: SSE new_block (initial)
Note over Indexer,API: Indexer emits NOTIFY "atlas_new_blocks" on new blocks
Indexer->>API: NOTIFY payload
API->>API: broadcast via tokio::broadcast
API->>Browser: SSE new_block (streamed JSON)
API-->>Browser: SSE keep-alive (~15s)
Browser->>Hook: receive SSE events
Hook->>Hook: enqueue, compute bps, adapt drain interval
loop Drain (adaptive)
Hook->>UI: latestBlock, height, connected, bps
UI->>UI: render counter and indicators
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/crates/atlas-api/src/main.rs (1)
219-225:⚠️ Potential issue | 🟠 MajorThe SSE routes are not wrapped by the TimeoutLayer and violate the coding guidelines.
In axum 0.8,
.layer()only affects routes that exist at the time it is called. SinceTimeoutLayeris applied on lines 219–222 and.merge(sse_routes)occurs on line 225, the SSE routes do not receive the timeout middleware. Per coding guidelines, all routes inmain.rsmust be wrapped withTimeoutLayer::with_status_code(StatusCode::REQUEST_TIMEOUT, Duration::from_secs(10)).Move the
.layer(TimeoutLayer...)call after.merge(sse_routes), or applyTimeoutLayerto thesse_routesrouter before merging if SSE connections require a different timeout policy.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/main.rs` around lines 219 - 225, The TimeoutLayer is applied before merging sse_routes so SSE routes miss the timeout; move the call to TimeoutLayer::with_status_code(axum::http::StatusCode::REQUEST_TIMEOUT, Duration::from_secs(10)) so it wraps the final router that includes sse_routes (i.e., call .layer(TimeoutLayer::with_status_code(...)) after .merge(sse_routes)), or alternatively apply the same TimeoutLayer to the sse_routes Router itself before calling .merge so all routes in the combined router receive the timeout middleware while preserving long-lived SSE behavior if needed.
🧹 Nitpick comments (3)
docker-compose.yml (1)
69-70: Document first-run volume bootstrap for local/dev environments.Marking
pgdataas external is fine, but new environments will fail to start unlessatlas_pgdataalready exists. Consider documentingdocker volume create atlas_pgdatain setup docs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@docker-compose.yml` around lines 69 - 70, The docker-compose file marks the Postgres volume as external ("external: true" with name "atlas_pgdata"), which will cause new local/dev environments to fail unless that Docker volume already exists; add a short setup note instructing developers to run the Docker volume creation command (e.g., docker volume create atlas_pgdata) before first docker-compose up, and include this instruction in the project's README or local setup docs alongside the postgres/pgdata volume mention so new contributors know to bootstrap the volume.backend/crates/atlas-api/src/handlers/sse.rs (1)
26-33: Per-client 200ms DB polling will not scale well with concurrent SSE clients.Consider a shared broadcaster (single poller + fanout) so DB polling is O(1) per server instance instead of O(clients).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 26 - 33, The current per-client async_stream (symbols: stream, last_block_number, tick, ping_counter) polls the DB every 200ms for each SSE connection; change this to a shared broadcaster pattern: create a single background task (spawned once at server start or lazy-init) that owns last_block_number and tick, polls the DB and sends updates on a tokio::sync::broadcast::channel (or similar) and include periodic ping events; in the SSE handler, replace the per-client loop with subscribing to that broadcast channel (channel.subscribe()), receiving updates (with a timeout or handling lagged subscribers) and forwarding them to the client, falling back to the channel’s ping/events instead of each client running its own interval loop.frontend/src/components/Layout.tsx (1)
59-63: Inconsistent state update pattern in SSE branch.Line 62 calls
setDisplayedHeightdirectly, while all other branches (initialization, polling) wrap setState inrequestAnimationFrame. For consistency with the rest of the effect, consider applying the same RAF-wrapping pattern here.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/components/Layout.tsx` around lines 59 - 63, The SSE-connected branch updates displayedRef.current and calls setDisplayedHeight directly; change it to match the initialization/polling branches by wrapping the displayedRef.current assignment and setDisplayedHeight call inside requestAnimationFrame (use the same raf id tracking/cancellation pattern your effect uses), e.g., when sse.connected is true schedule the update via requestAnimationFrame and store/clear the raf id to ensure consistent rendering and proper cleanup; target the displayedRef, setDisplayedHeight, and sse.connected code paths for this change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 37-43: Replace the silent .ok().flatten() and unwrap_or_default()
patterns in the SSE loop with explicit error handling: use
sqlx::query_scalar::<_, Option<i64>>("SELECT MAX(number) FROM
blocks").fetch_optional(&state.pool).await and similarly switch the other block
fetch (currently using query_as with a tuple) to an appropriate query_scalar or
query_as for the concrete row type, then match on the Result; on Err(log the
error via the existing logger/tracing) and perform a short backoff (e.g.,
tokio::time::sleep) then continue the loop instead of treating errors as missing
data, and on Ok(None/Some) handle the null/Some case as before—this ensures
database errors are logged and the loop recovers rather than silently stalling.
In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 126-131: The TS2448 self-reference error comes from calling
connect() inside its own arrow-function initializer; change the anonymous arrow
callback used to define connect into a named function expression (e.g., function
connectInner(...) { ... } or const connect = function connect(...) { ... }) so
internal recursive/timeout references use the function's internal name rather
than the outer const binding; update any references from
reconnectTimeoutRef.current = window.setTimeout(() => { ... connect(); ... },
2000) to call the internal name (or keep calling connect() since the named
function expression allows self-reference) and ensure the connect symbol remains
exported/used the same elsewhere.
In `@frontend/src/hooks/useLatestBlockHeight.ts`:
- Around line 74-82: The polling fallback in useLatestBlockHeight's fetchHeight
currently calls getStatus() which is too heavy; change fetchHeight to call the
lightweight height endpoint (e.g., getHeight or an equivalent function that
performs GET /api/height) and use the returned { block_height, indexed_at }
instead of status?.block_height, then pass the numeric block_height into
processHeight(latestHeight, false); ensure fetchingRef logic remains unchanged
and update any imports/usages of getStatus inside this module to the new
getHeight function.
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 37-40: The useEffect that unconditionally calls setSseBlocks([])
when fetchedBlocks changes drops streamed blocks prematurely; instead, in the
effect inside the BlocksPage component compare current sseBlocks with
fetchedBlocks (e.g., by unique id or timestamp) and only remove/merge those SSE
entries that already appear in fetchedBlocks, leaving remaining sseBlocks
intact; update the effect to iterate over sseBlocks and filter out items present
in fetchedBlocks, then call setSseBlocks(filteredSseBlocks) so streamed blocks
are preserved until they are present in fetched data.
---
Outside diff comments:
In `@backend/crates/atlas-api/src/main.rs`:
- Around line 219-225: The TimeoutLayer is applied before merging sse_routes so
SSE routes miss the timeout; move the call to
TimeoutLayer::with_status_code(axum::http::StatusCode::REQUEST_TIMEOUT,
Duration::from_secs(10)) so it wraps the final router that includes sse_routes
(i.e., call .layer(TimeoutLayer::with_status_code(...)) after
.merge(sse_routes)), or alternatively apply the same TimeoutLayer to the
sse_routes Router itself before calling .merge so all routes in the combined
router receive the timeout middleware while preserving long-lived SSE behavior
if needed.
---
Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 26-33: The current per-client async_stream (symbols: stream,
last_block_number, tick, ping_counter) polls the DB every 200ms for each SSE
connection; change this to a shared broadcaster pattern: create a single
background task (spawned once at server start or lazy-init) that owns
last_block_number and tick, polls the DB and sends updates on a
tokio::sync::broadcast::channel (or similar) and include periodic ping events;
in the SSE handler, replace the per-client loop with subscribing to that
broadcast channel (channel.subscribe()), receiving updates (with a timeout or
handling lagged subscribers) and forwarding them to the client, falling back to
the channel’s ping/events instead of each client running its own interval loop.
In `@docker-compose.yml`:
- Around line 69-70: The docker-compose file marks the Postgres volume as
external ("external: true" with name "atlas_pgdata"), which will cause new
local/dev environments to fail unless that Docker volume already exists; add a
short setup note instructing developers to run the Docker volume creation
command (e.g., docker volume create atlas_pgdata) before first docker-compose
up, and include this instruction in the project's README or local setup docs
alongside the postgres/pgdata volume mention so new contributors know to
bootstrap the volume.
In `@frontend/src/components/Layout.tsx`:
- Around line 59-63: The SSE-connected branch updates displayedRef.current and
calls setDisplayedHeight directly; change it to match the initialization/polling
branches by wrapping the displayedRef.current assignment and setDisplayedHeight
call inside requestAnimationFrame (use the same raf id tracking/cancellation
pattern your effect uses), e.g., when sse.connected is true schedule the update
via requestAnimationFrame and store/clear the raf id to ensure consistent
rendering and proper cleanup; target the displayedRef, setDisplayedHeight, and
sse.connected code paths for this change.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (13)
backend/Cargo.tomlbackend/crates/atlas-api/Cargo.tomlbackend/crates/atlas-api/src/handlers/mod.rsbackend/crates/atlas-api/src/handlers/sse.rsbackend/crates/atlas-api/src/main.rsdocker-compose.ymlfrontend/nginx.conffrontend/src/components/Layout.tsxfrontend/src/components/SmoothCounter.tsxfrontend/src/context/BlockStatsContext.tsxfrontend/src/hooks/useBlockSSE.tsfrontend/src/hooks/useLatestBlockHeight.tsfrontend/src/pages/BlocksPage.tsx
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
backend/crates/atlas-api/src/handlers/sse.rs (1)
93-105: Redundant keep-alive mechanisms.The manual
ping_counterlogic (lines 93-97) sends a keep-alive comment every ~15 seconds, while axum'sKeepAliveconfiguration (lines 101-105) independently sends a keep-alive text at the same interval. Consider removing one to reduce redundancy.♻️ Proposed simplification — remove manual keep-alive
- // Send keep-alive ping every ~15s (75 ticks * 200ms) - if ping_counter >= 75 { - ping_counter = 0; - yield Ok(Event::default().comment("keep-alive")); - } } }; Sse::new(stream).keep_alive( axum::response::sse::KeepAlive::new() .interval(Duration::from_secs(15)) .text("keep-alive"), )You can also remove
ping_counterand its updates at lines 29, 33, 62, and 80 if adopting this change.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 93 - 105, Remove the redundant manual keep-alive: delete the ping_counter-based send (the block that yields Ok(Event::default().comment("keep-alive")) guarded by ping_counter >= 75) and any updates to ping_counter elsewhere (updates at the locations that increment/reset ping_counter), and rely solely on Sse::new(stream).keep_alive with axum::response::sse::KeepAlive::new().interval(...).text("keep-alive") to emit pings; this eliminates duplicate keep-alive messages while keeping the existing Sse::keep_alive configuration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 43-64: The code sets last_block_number = Some(max_num) before
ensuring the initial block was fetched and emitted, which can permanently skip
the block if the fetch fails; modify the logic in the block handling using
latest/max_num so that last_block_number is only updated after a successful
fetch-and-emit: perform the sqlx::query_as(...) fetch_optional, check and
serialize the fetched Block into NewBlockEvent (using serde_json::to_string) and
yield the Event first, and only then assign last_block_number = Some(max_num)
(and reset ping_counter) so the cursor does not advance on failed fetches;
update the code paths around latest, block, NewBlockEvent and the yield
Ok(Event::default().event("new_block").data(...)) accordingly.
---
Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 93-105: Remove the redundant manual keep-alive: delete the
ping_counter-based send (the block that yields
Ok(Event::default().comment("keep-alive")) guarded by ping_counter >= 75) and
any updates to ping_counter elsewhere (updates at the locations that
increment/reset ping_counter), and rely solely on Sse::new(stream).keep_alive
with axum::response::sse::KeepAlive::new().interval(...).text("keep-alive") to
emit pings; this eliminates duplicate keep-alive messages while keeping the
existing Sse::keep_alive configuration.
- sse.rs: replace silent error swallowing with tracing::warn! and continue - useBlockSSE: use named function expression to fix TS2448 self-reference - BlocksPage: filter SSE buffer instead of unconditional clear on fetch
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (2)
backend/crates/atlas-api/src/handlers/sse.rs (2)
46-68:⚠️ Potential issue | 🟠 MajorAdvance the SSE cursor only after a successful initial event emit.
last_block_numberis set before the initial block is fetched/emitted. If that fetch/emit path fails, the stream cursor advances and that block can be skipped for this client.Suggested fix
if let Some(max_num) = latest { - last_block_number = Some(max_num); // Emit the current latest block as the initial event let block: Option<Block> = match sqlx::query_as( "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at FROM blocks WHERE number = $1" ) @@ if let Some(block) = block { let event = NewBlockEvent { block }; if let Ok(json) = serde_json::to_string(&event) { yield Ok(Event::default().event("new_block").data(json)); + last_block_number = Some(max_num); + ping_counter = 0; } } - ping_counter = 0; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 46 - 68, The SSE cursor advancement currently sets last_block_number = Some(max_num) before fetching/emitting the initial block (in the branch handling latest), which can skip a block if fetch or emit fails; move the assignment of last_block_number so it only happens after a successful fetch and successful serde_json::to_string + yield of the NewBlockEvent (i.e., only after the block is Some, NewBlockEvent is constructed, serialization succeeds, and Event::default().event("new_block").data(json) is yielded); update the code around latest, sqlx::query_as(...).bind(max_num).fetch_optional(...), the block handling, serde_json::to_string(&event), and the yield to ensure last_block_number is set after emission succeeds and remains unchanged on errors.
43-44:⚠️ Potential issue | 🟠 MajorAdd a short backoff on repeated DB errors.
Current error branches immediately
continue, so outages can produce a tight warn/retry loop every 200ms per SSE client.Suggested fix
-use tokio::time::interval; +use tokio::time::{interval, sleep}; @@ - Err(e) => { warn!(error = ?e, "sse: failed to query latest block number"); continue; } + Err(e) => { + warn!(error = ?e, "sse: failed to query latest block number"); + sleep(Duration::from_millis(500)).await; + continue; + } @@ - Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; } + Err(e) => { + warn!(error = ?e, "sse: failed to fetch initial block"); + sleep(Duration::from_millis(500)).await; + continue; + } @@ - Err(e) => { warn!(error = ?e, cursor, "sse: failed to fetch new blocks"); continue; } + Err(e) => { + warn!(error = ?e, cursor, "sse: failed to fetch new blocks"); + sleep(Duration::from_millis(500)).await; + continue; + } };Also applies to: 58-59, 84-85
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 43 - 44, The error branches currently `continue` immediately (e.g. the branches that warn!("sse: failed to query latest block number") and the other similar warn! messages), causing a tight retry loop on DB outages; add a short async backoff (e.g., tokio::time::sleep for a few hundred milliseconds) before each `continue` so the SSE handler loop yields and avoids tight retries. Update the three error branches referenced (the Err(e) warn! handling around the latest block query and the two other warn! sites at the noted locations) to await a small delay and then continue; keep the warn! logs as-is and only add the sleep+await just prior to the continue. Ensure you import tokio::time::Duration and tokio::time::sleep in the sse.rs scope if not already present.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 46-51: The merge logic in the useMemo for blocks (using sseBlocks
and fetchedBlocks) runs on every page, causing newer page views to show items
belonging to page 1; change the logic to only prepend/merge sseBlocks when the
current page is 1: add the page (or currentPage) value to the dependency array
and, inside the useMemo, if page !== 1 return fetchedBlocks directly; otherwise
perform the existing Set/filter merge of sseBlocks and fetchedBlocks and
slice(0,20). Ensure you reference and update the blocks useMemo dependencies to
include the page/currentPage variable so updates happen correctly.
---
Duplicate comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 46-68: The SSE cursor advancement currently sets last_block_number
= Some(max_num) before fetching/emitting the initial block (in the branch
handling latest), which can skip a block if fetch or emit fails; move the
assignment of last_block_number so it only happens after a successful fetch and
successful serde_json::to_string + yield of the NewBlockEvent (i.e., only after
the block is Some, NewBlockEvent is constructed, serialization succeeds, and
Event::default().event("new_block").data(json) is yielded); update the code
around latest, sqlx::query_as(...).bind(max_num).fetch_optional(...), the block
handling, serde_json::to_string(&event), and the yield to ensure
last_block_number is set after emission succeeds and remains unchanged on
errors.
- Around line 43-44: The error branches currently `continue` immediately (e.g.
the branches that warn!("sse: failed to query latest block number") and the
other similar warn! messages), causing a tight retry loop on DB outages; add a
short async backoff (e.g., tokio::time::sleep for a few hundred milliseconds)
before each `continue` so the SSE handler loop yields and avoids tight retries.
Update the three error branches referenced (the Err(e) warn! handling around the
latest block query and the two other warn! sites at the noted locations) to
await a small delay and then continue; keep the warn! logs as-is and only add
the sleep+await just prior to the continue. Ensure you import
tokio::time::Duration and tokio::time::sleep in the sse.rs scope if not already
present.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
backend/crates/atlas-api/src/handlers/sse.rsfrontend/src/hooks/useBlockSSE.tsfrontend/src/pages/BlocksPage.tsx
🚧 Files skipped from review as they are similar to previous changes (1)
- frontend/src/hooks/useBlockSSE.ts
…-in-effect Wrap setSseBlocks calls in BlocksPage and setDisplayedHeight in Layout in requestAnimationFrame to avoid calling setState synchronously inside useEffect bodies, which is flagged by eslint-plugin-react-hooks v7.
Stop polling every 10s as a safety net when SSE is active — periodic refetches disrupt the smooth live flow. Instead, refetch immediately when SSE disconnects to catch any missed blocks during the gap.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
frontend/src/pages/BlocksPage.tsx (1)
55-60:⚠️ Potential issue | 🟠 MajorGate SSE merge to page 1 (and live mode) to avoid cross-page contamination.
blocksstill prependssseBlockson every page. After navigating away from page 1, buffered newest blocks can leak into page 2+ and break pagination correctness.Suggested fix
const blocks = useMemo(() => { - if (!sseBlocks.length) return fetchedBlocks; + if (page !== 1 || !autoRefresh || !sseBlocks.length) return fetchedBlocks; const seen = new Set(fetchedBlocks.map((b) => b.number)); const unique = sseBlocks.filter((b) => !seen.has(b.number)); return [...unique, ...fetchedBlocks].slice(0, 20); - }, [fetchedBlocks, sseBlocks]); + }, [fetchedBlocks, sseBlocks, page, autoRefresh]);🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/pages/BlocksPage.tsx` around lines 55 - 60, The merge logic in the useMemo for blocks currently prepends sseBlocks on every page and must be restricted to live mode on page 1; modify the useMemo so it first checks the page and live-mode flags (e.g., currentPage === 1 and isLive / liveMode) and if not both true simply return fetchedBlocks, otherwise run the existing dedup/merge (seen Set, unique, return [...unique, ...fetchedBlocks].slice(0,20)). Update the dependency array to include the page and live-mode identifiers so the memo recalculates when those change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 55-60: The merge logic in the useMemo for blocks currently
prepends sseBlocks on every page and must be restricted to live mode on page 1;
modify the useMemo so it first checks the page and live-mode flags (e.g.,
currentPage === 1 and isLive / liveMode) and if not both true simply return
fetchedBlocks, otherwise run the existing dedup/merge (seen Set, unique, return
[...unique, ...fetchedBlocks].slice(0,20)). Update the dependency array to
include the page and live-mode identifiers so the memo recalculates when those
change.
- BlocksPage: skip API polling entirely while SSE is connected to avoid disrupting the live block flow; trigger an immediate refetch when SSE drops to recover any missed blocks - BlocksPage: guard SSE block merge to page 1 + autoRefresh only, so navigating to page 2+ never shows page-1 SSE blocks - sse.rs: only advance last_block_number after the initial block is successfully fetched and emitted, preventing the cursor from skipping the block on a transient fetch failure
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@CLAUDE.md`:
- Around line 82-83: The docs in CLAUDE.md describe /api/status as a lightweight
fast-poll endpoint but the frontend (frontend/src/pages/*.tsx) expects
/api/status to return full chain info on initial page load; update the contract
to avoid drift by either (A) restoring /api/status to always return full chain
info (block_height, indexed_at plus the full chain payload the frontend needs)
and mark a new endpoint (e.g., /api/status/quick or /api/poll) as the
lightweight single-key fast-poll, or (B) explicitly document in CLAUDE.md that
/api/status returns only the full chain snapshot (used on load) and add a
separate documented fast-poll endpoint name to be implemented by the backend and
used by the navbar SSE fallback; ensure references in frontend/src/pages/*.tsx
match the chosen endpoint name.
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 26-40: The current useEffect cancels any existing ssePrependRafRef
scheduled RAF whenever a new latestBlockEvent arrives, which drops intermediate
blocks during bursts; change the logic to never cancel a pending RAF but instead
accumulate incoming blocks into a pending buffer (e.g., pendingSseBlocksRef) on
each arrival, dedupe by block.number and preserve arrival order, and ensure the
single scheduled RAF (tracked by ssePrependRafRef) flushes the whole buffer once
by calling setSseBlocks(prev => { merge buffer + prev, dedupe, slice(0,20) })
and then clears the buffer and RAF ref; update uses of lastSseBlockRef,
latestBlockEvent, and setSseBlocks accordingly so lastSseBlockRef is advanced
when blocks are actually prepended.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
CLAUDE.mdbackend/crates/atlas-api/src/handlers/sse.rsfrontend/src/pages/BlocksPage.tsx
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/crates/atlas-api/src/handlers/sse.rs
Replace cancel-and-reschedule RAF pattern with a pending buffer so that burst arrivals (e.g. blocks 100, 101, 102 before a frame fires) are all flushed in a single requestAnimationFrame rather than only the last block surviving. Also clarify CLAUDE.md /api/status contract.
- Add LIMIT 100 to the SSE block polling query to prevent unbounded memory usage and avoid infinite retry loops from statement_timeout - Add unit tests for NewBlockEvent serialization - Remove external: true from pgdata volume so docker compose up works on fresh systems
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/crates/atlas-api/src/main.rs (1)
18-24:⚠️ Potential issue | 🟠 Major
AppStateis missing requiredchain_idandchain_namefields.This main entrypoint should fetch
eth_chainIdonce at startup and store it inAppState.chain_id(u64), and populatechain_namefromCHAIN_NAME(default"Unknown").As per coding guidelines,
**/atlas-api/**/main.rs: fetch chain ID from RPC at startup intoAppState.chain_idand useCHAIN_NAME(default"Unknown") for chain identification.Also applies to: 60-66
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/main.rs` around lines 18 - 24, AppState currently lacks chain identification; add fields chain_id: u64 and chain_name: String to the AppState struct and initialize them at startup: call the RPC eth_chainId once (e.g., in main or your app initialization function that constructs AppState) and parse the returned hex/number into a u64 to set AppState.chain_id, and read CHAIN_NAME (defaulting to "Unknown") to set AppState.chain_name; ensure any code that constructs AppState (e.g., main or create_app_state) is updated to perform the RPC call and populate these new fields.
♻️ Duplicate comments (2)
frontend/src/hooks/useLatestBlockHeight.ts (1)
2-2:⚠️ Potential issue | 🟠 MajorPolling fallback should use the lightweight
/api/heightendpoint, not/api/status.
getStatus()here keeps the fast-poll path tied to the wrong contract.Suggested fix
-import { getStatus } from '../api/status'; +import { getHeight } from '../api/height'; @@ - const status = await getStatus(); - const latestHeight = status?.block_height; + const status = await getHeight(); + const latestHeight = status?.block_height;As per coding guidelines,
frontend/src/hooks/**/*.ts: Fast polling endpoint isGET /api/heightreturning{ block_height, indexed_at }, whileGET /api/statusis full chain info fetched once on page load.Also applies to: 80-88
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/hooks/useLatestBlockHeight.ts` at line 2, The hook is using getStatus() for fast polling but should call the lightweight height endpoint; replace usages of getStatus() in useLatestBlockHeight (and the similar block around the code referenced at lines 80-88) with a function that hits GET /api/height (e.g., getHeight or getBlockHeight) which returns { block_height, indexed_at }, and update the hook to poll that instead of getStatus() so the fast-poll path is tied to the correct lightweight endpoint.CLAUDE.md (1)
82-83:⚠️ Potential issue | 🟠 MajorThe endpoint contract documented here conflicts with the frontend contract.
These lines redefine
/api/statusas the fast polling endpoint and remove the separate full-status contract, which creates drift with existing frontend expectations.As per coding guidelines,
frontend/src/hooks/**/*.ts: Fast polling endpoint isGET /api/heightreturning{ block_height, indexed_at }, andGET /api/statusreturns full chain info fetched once on page load.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@CLAUDE.md` around lines 82 - 83, The documented endpoint contract in CLAUDE.md conflicts with the frontend: restore the original contracts so the fast-polling endpoint is GET /api/height returning { block_height, indexed_at } and GET /api/status remains the full chain-info endpoint fetched once on load; update the doc text that currently describes `/api/status` as the sub-ms polling endpoint and `/api/events` so that `/api/height` is described as the single-key fast poll and `/api/status` is described as the full-chain info endpoint to match frontend/hooks expectations.
🧹 Nitpick comments (1)
backend/crates/atlas-api/src/handlers/sse.rs (1)
20-20: Avoid duplicating the NOTIFY channel name across crates.
BLOCK_EVENT_CHANNELis hardcoded here and also separately inbackend/crates/atlas-indexer/src/indexer.rs:27. This creates silent break risk if one side is renamed. Move the channel constant to a shared crate/module and import it in both places.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` at line 20, The hardcoded NOTIFY channel constant BLOCK_EVENT_CHANNEL is duplicated and should be centralized; extract BLOCK_EVENT_CHANNEL into a shared module/crate (e.g., a common constants module) and replace local declarations with a public import where currently used (the constant referenced in sse.rs and the one in indexer.rs); update the uses of BLOCK_EVENT_CHANNEL to import from the new shared module and remove the local constant to ensure both crates reference the single shared symbol.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 288-348: Add unit tests in the existing #[cfg(test)] mod tests
that exercise the new fanout/catch-up logic (not just JSON shape) by using
sample_block and NewBlockEvent to simulate sequences: (1) a gap-catchup test
that simulates missing block(s) and asserts the catch_up logic retrieves/emits
the missing blocks; (2) a lagged-fallback test that simulates the node being
behind the live head beyond the fallback threshold and asserts the code takes
the lagged fallback path; and (3) a broadcast-replay-bounds test that verifies
replay windows respect configured min/max bounds and do not request blocks
outside those limits; call the actual functions/structs that implement these
behaviors in sse.rs (the fanout/catch-up entrypoints) from the tests, assert
emitted events or return values accordingly, keep tests in the same file under
mod tests and run with cargo test --workspace.
In `@backend/crates/atlas-indexer/src/indexer.rs`:
- Around line 788-793: Add a unit test in a #[cfg(test)] mod tests block in this
file that exercises the new write-path so it actually calls the code path which
executes pg_tx.execute("SELECT pg_notify($1, $2)", &[&BLOCK_EVENT_CHANNEL,
&last_value]) (e.g., by invoking collect_block or the higher-level function that
triggers the watermark update), then open a Postgres LISTEN on
BLOCK_EVENT_CHANNEL (or use a test connection/transaction) to capture
notifications and assert a notification with the expected channel and payload
(last_value) was emitted; include setup/teardown in the test and run it with
cargo test --workspace.
In `@frontend/src/api/client.ts`:
- Line 5: The constant API_BASE_URL falls back to an absolute localhost URL;
change its fallback to the proxied path '/api'. Update the export const
API_BASE_URL assignment so that API_BASE_URL uses
import.meta.env.VITE_API_BASE_URL if present, otherwise the string '/api'
(modify the symbol API_BASE_URL in frontend/src/api/client.ts accordingly) to
ensure requests go through the nginx proxy.
In `@frontend/src/components/Layout.tsx`:
- Around line 59-65: When sse.connected is true the code only updates
displayedRef.current and returns, which can leave the displayedHeight state
stale when height moves downward; update the React state synchronously as well
to avoid overshoot. In the SSE branch (where sse.connected is checked) call the
setter that updates displayedHeight (the same setDisplayedHeight used elsewhere)
in addition to setting displayedRef.current = height so the UI immediately
reflects decreases/corrections; reference displayedRef.current,
displayedHeight/setDisplayedHeight, sse.connected and height to locate and fix
the logic.
In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 120-129: The onerror handler currently ignores the SSE event so
the error state never gets set; update the es.onerror callback in useBlockSSE
(and the corresponding handler around line ~177) to accept the event param (e)
and call setError with a useful value (e.g., e or e.type or a stringified
message) before closing and scheduling reconnect; ensure you clear/set error
appropriately in onopen (or where you setConnected(true)) so error is reset on
successful reconnect and use the same pattern in connectSSE to keep error state
consistent.
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 46-50: The prepend logic in setSseBlocks (inside the state updater
using prev and pending) only deduplicates against existing prev, allowing
duplicate block.number values within the pending batch; update the routine to
first deduplicate pending by block.number (e.g., iterate pending and build a Map
or Set to keep the first/last occurrence per number) and then filter that
dedupedPending against seen before reversing and prepending so the final
[...prepend, ...prev].slice(0, 20) never contains duplicates from the incoming
batch; touch the setSseBlocks updater where pending, seen, prepend are used.
---
Outside diff comments:
In `@backend/crates/atlas-api/src/main.rs`:
- Around line 18-24: AppState currently lacks chain identification; add fields
chain_id: u64 and chain_name: String to the AppState struct and initialize them
at startup: call the RPC eth_chainId once (e.g., in main or your app
initialization function that constructs AppState) and parse the returned
hex/number into a u64 to set AppState.chain_id, and read CHAIN_NAME (defaulting
to "Unknown") to set AppState.chain_name; ensure any code that constructs
AppState (e.g., main or create_app_state) is updated to perform the RPC call and
populate these new fields.
---
Duplicate comments:
In `@CLAUDE.md`:
- Around line 82-83: The documented endpoint contract in CLAUDE.md conflicts
with the frontend: restore the original contracts so the fast-polling endpoint
is GET /api/height returning { block_height, indexed_at } and GET /api/status
remains the full chain-info endpoint fetched once on load; update the doc text
that currently describes `/api/status` as the sub-ms polling endpoint and
`/api/events` so that `/api/height` is described as the single-key fast poll and
`/api/status` is described as the full-chain info endpoint to match
frontend/hooks expectations.
In `@frontend/src/hooks/useLatestBlockHeight.ts`:
- Line 2: The hook is using getStatus() for fast polling but should call the
lightweight height endpoint; replace usages of getStatus() in
useLatestBlockHeight (and the similar block around the code referenced at lines
80-88) with a function that hits GET /api/height (e.g., getHeight or
getBlockHeight) which returns { block_height, indexed_at }, and update the hook
to poll that instead of getStatus() so the fast-poll path is tied to the correct
lightweight endpoint.
---
Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Line 20: The hardcoded NOTIFY channel constant BLOCK_EVENT_CHANNEL is
duplicated and should be centralized; extract BLOCK_EVENT_CHANNEL into a shared
module/crate (e.g., a common constants module) and replace local declarations
with a public import where currently used (the constant referenced in sse.rs and
the one in indexer.rs); update the uses of BLOCK_EVENT_CHANNEL to import from
the new shared module and remove the local constant to ensure both crates
reference the single shared symbol.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 37f05432-3709-4931-bc43-74fae8662192
📒 Files selected for processing (12)
CLAUDE.mdbackend/.dockerignorebackend/crates/atlas-api/src/handlers/sse.rsbackend/crates/atlas-api/src/main.rsbackend/crates/atlas-indexer/src/indexer.rsdocker-compose.ymlfrontend/src/api/client.tsfrontend/src/components/Layout.tsxfrontend/src/hooks/useBlockSSE.tsfrontend/src/hooks/useLatestBlockHeight.tsfrontend/src/hooks/useStats.tsfrontend/src/pages/BlocksPage.tsx
✅ Files skipped from review due to trivial changes (1)
- backend/.dockerignore
🚧 Files skipped from review as they are similar to previous changes (1)
- docker-compose.yml
| pg_tx | ||
| .execute( | ||
| "SELECT pg_notify($1, $2)", | ||
| &[&BLOCK_EVENT_CHANNEL, &last_value], | ||
| ) | ||
| .await?; |
There was a problem hiding this comment.
Add a test that verifies pg_notify is emitted with watermark updates.
This introduces new write-path behavior, but the current tests only cover collect_block logic and do not validate notification emission.
As per coding guidelines, **/*.rs: Add unit tests for new logic in a #[cfg(test)] mod tests block in the same file and run with cargo test --workspace.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/crates/atlas-indexer/src/indexer.rs` around lines 788 - 793, Add a
unit test in a #[cfg(test)] mod tests block in this file that exercises the new
write-path so it actually calls the code path which executes
pg_tx.execute("SELECT pg_notify($1, $2)", &[&BLOCK_EVENT_CHANNEL, &last_value])
(e.g., by invoking collect_block or the higher-level function that triggers the
watermark update), then open a Postgres LISTEN on BLOCK_EVENT_CHANNEL (or use a
test connection/transaction) to capture notifications and assert a notification
with the expected channel and payload (last_value) was emitted; include
setup/teardown in the test and run it with cargo test --workspace.
| import type { ApiError } from '../types'; | ||
|
|
||
| const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api'; | ||
| export const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api'; |
There was a problem hiding this comment.
Default base URL should be /api, not http://localhost:3000/api.
Using localhost as fallback can break non-local environments and bypass the intended nginx proxy contract.
Suggested fix
-export const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api';
+export const API_BASE_URL = (import.meta.env.VITE_API_BASE_URL || '/api').replace(/\/+$/, '');As per coding guidelines, frontend/src/api/**/*.ts: Frontend API client base URL is /api (proxied by nginx to atlas-api:3000).
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| export const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api'; | |
| export const API_BASE_URL = (import.meta.env.VITE_API_BASE_URL || '/api').replace(/\/+$/, ''); |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@frontend/src/api/client.ts` at line 5, The constant API_BASE_URL falls back
to an absolute localhost URL; change its fallback to the proxied path '/api'.
Update the export const API_BASE_URL assignment so that API_BASE_URL uses
import.meta.env.VITE_API_BASE_URL if present, otherwise the string '/api'
(modify the symbol API_BASE_URL in frontend/src/api/client.ts accordingly) to
ensure requests go through the nginx proxy.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 24-33: The current computeBpsFromLog implementation scans forward
and ends up using log[0] whenever the buffer exceeds minSpan, causing windows to
expand; change the selection to use the newest prior sample that satisfies the
span by iterating backwards from newest toward older entries. In
computeBpsFromLog, loop i from log.length-2 down to 0, compute span = newest.ts
- log[i].ts and if span >= minSpan return (newest.num - log[i].num)/span; after
the loop check fallback: if newest.ts - log[0].ts >= fallbackMinSpan return
(newest.num - log[0].num)/(newest.ts - log[0].ts); otherwise return null. This
preserves newest as the reference and picks the closest sample meeting minSpan
(and similarly for fallbackMinSpan).
In `@frontend/src/pages/BlocksPage.tsx`:
- Around line 37-59: When autoRefresh is turned back on while sseConnected is
still true the component skips the catch-up refetch and misses blocks; modify
the logic that observes latestBlockEvent/page/autoRefresh (symbols:
latestBlockEvent, page, autoRefresh, sseConnected, lastSseBlockRef,
pendingSseBlocksRef, setSseBlocks, ssePrependRafRef) to detect the autoRefresh
transition from false->true (only for page===1) and trigger the same catch-up
behavior used when sseConnected toggles (i.e. perform the refetch of missed
blocks or invoke the existing catch-up function) or otherwise reset
lastSseBlockRef so pending blocks are fetched/processed; apply the same change
to the other similar block referenced around lines 114-129.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f8acc341-cd83-405a-a054-c6525b5129c5
📒 Files selected for processing (2)
frontend/src/hooks/useBlockSSE.tsfrontend/src/pages/BlocksPage.tsx
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
frontend/src/hooks/useBlockSSE.ts (1)
31-40:⚠️ Potential issue | 🟠 MajorForward scan selects the oldest qualifying sample instead of the newest.
The loop iterates from index 0 (oldest) and returns immediately when
span >= minSpan. After a rate change, this always picks the oldest sample exceeding the threshold rather than the nearest one, causingbpsand drain pacing to reflect stale history instead of the advertised 30s/10s rolling windows.🔧 Proposed fix: iterate backward to find the nearest qualifying sample
function computeBpsFromLog(log: BlockLog, minSpan: number, fallbackMinSpan: number): number | null { if (log.length < 2) return null; const newest = log[log.length - 1]; - for (let i = 0; i < log.length - 1; i++) { + for (let i = log.length - 2; i >= 0; i--) { const span = newest.ts - log[i].ts; if (span >= minSpan) return (newest.num - log[i].num) / span; - if (i === 0 && span >= fallbackMinSpan) return (newest.num - log[0].num) / span; } + // Fallback: use oldest if it meets fallbackMinSpan + const oldestSpan = newest.ts - log[0].ts; + if (oldestSpan >= fallbackMinSpan) { + return (newest.num - log[0].num) / oldestSpan; + } return null; }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/src/hooks/useBlockSSE.ts` around lines 31 - 40, computeBpsFromLog currently scans forward and returns the oldest qualifying sample; change it to scan backward from the newest to pick the nearest qualifying sample. In computeBpsFromLog(log, minSpan, fallbackMinSpan) iterate i from log.length-2 down to 0 and if newest.ts - log[i].ts >= minSpan return (newest.num - log[i].num)/span; if none qualify for minSpan, do a second backward pass (or continue same loop tracking fallback) to find the nearest sample with span >= fallbackMinSpan and return using that sample; otherwise return null. Target the computeBpsFromLog function and the variables log, minSpan, fallbackMinSpan, newest when editing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 52-87: Rewrite the outer match-based loop over rx.recv().await
into a while-let to satisfy Clippy: replace the loop { match rx.recv().await {
Ok(()) | Err(broadcast::error::RecvError::Lagged(_)) => { ... },
Err(broadcast::error::RecvError::Closed) => break, } } with while let Ok(()) |
Err(broadcast::error::RecvError::Lagged(_)) = rx.recv().await { /* keep the
existing inner loop that uses last_block_number, fetch_blocks_after,
FETCH_BATCH_SIZE, block_to_event, etc. unchanged */ } and let the while-let
naturally exit on the Closed variant (no special Closed arm needed).
---
Duplicate comments:
In `@frontend/src/hooks/useBlockSSE.ts`:
- Around line 31-40: computeBpsFromLog currently scans forward and returns the
oldest qualifying sample; change it to scan backward from the newest to pick the
nearest qualifying sample. In computeBpsFromLog(log, minSpan, fallbackMinSpan)
iterate i from log.length-2 down to 0 and if newest.ts - log[i].ts >= minSpan
return (newest.num - log[i].num)/span; if none qualify for minSpan, do a second
backward pass (or continue same loop tracking fallback) to find the nearest
sample with span >= fallbackMinSpan and return using that sample; otherwise
return null. Target the computeBpsFromLog function and the variables log,
minSpan, fallbackMinSpan, newest when editing.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e0115fb8-70d6-4d68-86b2-25d49d2abe67
📒 Files selected for processing (4)
backend/crates/atlas-api/src/handlers/sse.rsbackend/crates/atlas-api/src/main.rsfrontend/src/hooks/useBlockSSE.tsfrontend/src/pages/BlocksPage.tsx
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
backend/crates/atlas-api/src/handlers/sse.rs (1)
160-220:⚠️ Potential issue | 🟡 MinorCore SSE behavior is still under-tested.
At Lines 160-220, tests only validate JSON shape. Please add behavior tests for: gap catch-up progression,
Lagged(_)recovery path, and reconnect wake-up replay behavior.As per coding guidelines "Add unit tests for new logic in a
#[cfg(test)] mod testsblock in the same file and run withcargo test --workspace."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 160 - 220, Add unit tests in the existing #[cfg(test)] mod tests in sse.rs that exercise SSE behavior beyond JSON shape: (1) simulate gap catch-up progression by sending a sequence of blocks with a missing intermediate number and assert the catch-up logic advances through the missing numbers (target functions/structs: the SSE handler function that processes incoming Block events and any gap/catch-up state machine used), (2) test the Lagged(_) recovery path by forcing the handler to encounter a lag condition and assert it emits the Lagged variant and then recovers to normal processing (reference enum/variant names like Lagged and the recovery method used), and (3) test reconnect wake-up replay by simulating a reconnect event and verifying that the replayed blocks are reprocessed in order and no duplicates are emitted (use NewBlockEvent, sample_block helper, and the SSE reconnect/replay entry point). Add assertions for emitted events/state transitions and run cargo test --workspace to validate.
🧹 Nitpick comments (1)
backend/crates/atlas-api/src/handlers/sse.rs (1)
97-129: Use exponential backoff with jitter for listener reconnects.At Lines 97-129, reconnects use a fixed 1s sleep. Under DB instability, synchronized retries can amplify load. Prefer capped exponential backoff + jitter.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 97 - 129, Replace the fixed 1s sleeps around PgListener::connect and listener.listen (and after the inner listener.recv error break) with a capped exponential backoff with jitter: start with a base delay (e.g., 1s), double on each consecutive failure up to a max (e.g., 30s), add randomized jitter (+/- a small fraction or uniform 0..base) to avoid sync, and reset the backoff to the base delay immediately when PgListener::connect and listener.listen succeed (i.e., before sending the first tx.send and entering the inner recv loop). Ensure the same backoff instance is used for consecutive connection attempts and that the backoff is cleared/reset after a successful reconnect so subsequent failures start from the base again.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 63-67: The loop advances cursor and last_block_number for every
fetched block which can skip earlier gaps; change the logic in the for block in
blocks loop so you only advance cursor when the block is contiguous to the
previous tracked number: compute block_number, then if let Some(event) =
block_to_event(block) handle the event, and only set cursor = Some(block_number)
(and update last_block_number) when either cursor.is_none() or block_number ==
cursor.unwrap() + 1 (or similarly when last_block_number + 1 == block_number);
if a gap is detected do not advance cursor so subsequent queries can still fetch
the missing lower-numbered blocks. Ensure you reference and update the same
variables (cursor, last_block_number, block_to_event) in this conditional flow.
---
Duplicate comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 160-220: Add unit tests in the existing #[cfg(test)] mod tests in
sse.rs that exercise SSE behavior beyond JSON shape: (1) simulate gap catch-up
progression by sending a sequence of blocks with a missing intermediate number
and assert the catch-up logic advances through the missing numbers (target
functions/structs: the SSE handler function that processes incoming Block events
and any gap/catch-up state machine used), (2) test the Lagged(_) recovery path
by forcing the handler to encounter a lag condition and assert it emits the
Lagged variant and then recovers to normal processing (reference enum/variant
names like Lagged and the recovery method used), and (3) test reconnect wake-up
replay by simulating a reconnect event and verifying that the replayed blocks
are reprocessed in order and no duplicates are emitted (use NewBlockEvent,
sample_block helper, and the SSE reconnect/replay entry point). Add assertions
for emitted events/state transitions and run cargo test --workspace to validate.
---
Nitpick comments:
In `@backend/crates/atlas-api/src/handlers/sse.rs`:
- Around line 97-129: Replace the fixed 1s sleeps around PgListener::connect and
listener.listen (and after the inner listener.recv error break) with a capped
exponential backoff with jitter: start with a base delay (e.g., 1s), double on
each consecutive failure up to a max (e.g., 30s), add randomized jitter (+/- a
small fraction or uniform 0..base) to avoid sync, and reset the backoff to the
base delay immediately when PgListener::connect and listener.listen succeed
(i.e., before sending the first tx.send and entering the inner recv loop).
Ensure the same backoff instance is used for consecutive connection attempts and
that the backoff is cleared/reset after a successful reconnect so subsequent
failures start from the base again.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: c40a19f6-b670-4b97-935f-30a914b7bbbd
📒 Files selected for processing (1)
backend/crates/atlas-api/src/handlers/sse.rs
| for block in blocks { | ||
| let block_number = block.number; | ||
| last_block_number = Some(block_number); | ||
| cursor = Some(block_number); | ||
| if let Some(event) = block_to_event(block) { |
There was a problem hiding this comment.
Avoid advancing the cursor across gaps; this can permanently skip blocks.
At Lines 63-67, cursor state advances to each fetched block immediately. If number=102 is observed before number=101, subsequent queries (WHERE number > cursor) can permanently miss 101.
🔧 Proposed fix
let batch_len = blocks.len();
for block in blocks {
+ if let Some(prev) = cursor {
+ if block.number > prev + 1 {
+ warn!(
+ expected = prev + 1,
+ seen = block.number,
+ "sse: detected block gap, delaying cursor advance"
+ );
+ break;
+ }
+ }
let block_number = block.number;
last_block_number = Some(block_number);
cursor = Some(block_number);
if let Some(event) = block_to_event(block) {
yield Ok(event);
}
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@backend/crates/atlas-api/src/handlers/sse.rs` around lines 63 - 67, The loop
advances cursor and last_block_number for every fetched block which can skip
earlier gaps; change the logic in the for block in blocks loop so you only
advance cursor when the block is contiguous to the previous tracked number:
compute block_number, then if let Some(event) = block_to_event(block) handle the
event, and only set cursor = Some(block_number) (and update last_block_number)
when either cursor.is_none() or block_number == cursor.unwrap() + 1 (or
similarly when last_block_number + 1 == block_number); if a gap is detected do
not advance cursor so subsequent queries can still fetch the missing
lower-numbered blocks. Ensure you reference and update the same variables
(cursor, last_block_number, block_to_event) in this conditional flow.
Summary
GET /api/events) that streams new block metadata in real-timeCloses #11
Summary by CodeRabbit
New Features
UI
Chores
Style