diff --git a/backend/crates/atlas-api/src/handlers/sse.rs b/backend/crates/atlas-api/src/handlers/sse.rs index cc850d3..794425d 100644 --- a/backend/crates/atlas-api/src/handlers/sse.rs +++ b/backend/crates/atlas-api/src/handlers/sse.rs @@ -13,6 +13,9 @@ use crate::AppState; use atlas_common::Block; use tracing::warn; +const BLOCK_COLUMNS: &str = + "number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at"; + #[derive(Serialize, Debug)] struct NewBlockEvent { block: Block, @@ -33,40 +36,25 @@ pub async fn block_events( tick.tick().await; ping_counter += 1; - // On first tick, seed with the latest block number + // On first tick, seed with the latest block if last_block_number.is_none() { - let latest: Option = match sqlx::query_scalar("SELECT MAX(number) FROM blocks") - .fetch_one(&state.pool) - .await + let block: Option = match sqlx::query_as( + &format!("SELECT {} FROM blocks ORDER BY number DESC LIMIT 1", BLOCK_COLUMNS) + ) + .fetch_optional(&state.pool) + .await { Ok(v) => v, - Err(e) => { warn!(error = ?e, "sse: failed to query latest block number"); continue; } + Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; } }; - if let Some(max_num) = latest { - // Emit the current latest block as the initial event. - // Only advance the cursor after a successful fetch-and-emit so the - // block is not skipped if the fetch fails. - let block: Option = match sqlx::query_as( - "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at - FROM blocks WHERE number = $1" - ) - .bind(max_num) - .fetch_optional(&state.pool) - .await - { - Ok(v) => v, - Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; } - }; - - if let Some(block) = block { - last_block_number = Some(block.number); - let event = NewBlockEvent { block }; - if let Ok(json) = serde_json::to_string(&event) { - yield Ok(Event::default().event("new_block").data(json)); - } - ping_counter = 0; + if let Some(block) = block { + last_block_number = Some(block.number); + let event = NewBlockEvent { block }; + if let Ok(json) = serde_json::to_string(&event) { + yield Ok(Event::default().event("new_block").data(json)); } + ping_counter = 0; } continue; } @@ -76,8 +64,7 @@ pub async fn block_events( // Fetch new blocks since last sent, in ascending order (capped to avoid // unbounded memory usage and to stay well within the 10s statement_timeout). let new_blocks: Vec = match sqlx::query_as( - "SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at - FROM blocks WHERE number > $1 ORDER BY number ASC LIMIT 100" + &format!("SELECT {} FROM blocks WHERE number > $1 ORDER BY number ASC LIMIT 100", BLOCK_COLUMNS) ) .bind(cursor) .fetch_all(&state.pool) diff --git a/frontend/src/api/client.ts b/frontend/src/api/client.ts index 5e806cf..8e13b59 100644 --- a/frontend/src/api/client.ts +++ b/frontend/src/api/client.ts @@ -2,7 +2,7 @@ import axios from 'axios'; import type { AxiosInstance, AxiosError } from 'axios'; import type { ApiError } from '../types'; -const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api'; +export const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api'; const client: AxiosInstance = axios.create({ baseURL: API_BASE_URL, diff --git a/frontend/src/components/Layout.tsx b/frontend/src/components/Layout.tsx index 9d7e1d2..2d3d63b 100644 --- a/frontend/src/components/Layout.tsx +++ b/frontend/src/components/Layout.tsx @@ -12,7 +12,7 @@ export default function Layout() { const location = useLocation(); const isHome = location.pathname === '/'; const sse = useBlockSSE(); - const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, 1000000, sse.height, sse.connected, sse.bps); + const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, sse); const [now, setNow] = useState(() => Date.now()); const recentlyUpdated = lastUpdatedAt ? (now - lastUpdatedAt) < 10000 : false; const [displayedHeight, setDisplayedHeight] = useState(null); diff --git a/frontend/src/hooks/useBlockSSE.ts b/frontend/src/hooks/useBlockSSE.ts index 5b0c788..9d0d62c 100644 --- a/frontend/src/hooks/useBlockSSE.ts +++ b/frontend/src/hooks/useBlockSSE.ts @@ -1,7 +1,6 @@ import { useCallback, useEffect, useRef, useState } from 'react'; import type { Block } from '../types'; - -const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api'; +import { API_BASE_URL } from '../api/client'; export interface NewBlockEvent { block: Block; @@ -15,6 +14,24 @@ export interface BlockSSEState { bps: number | null; } +type BlockLog = { num: number; ts: number }[]; + +/** + * Compute bps from a rolling log of (blockNumber, blockTimestamp) samples. + * Returns the rate from the first sample whose span >= minSpan, or from the + * oldest sample if its span >= fallbackMinSpan. Returns null if insufficient data. + */ +function computeBpsFromLog(log: BlockLog, minSpan: number, fallbackMinSpan: number): number | null { + if (log.length < 2) return null; + const newest = log[log.length - 1]; + for (let i = 0; i < log.length - 1; i++) { + const span = newest.ts - log[i].ts; + if (span >= minSpan) return (newest.num - log[i].num) / span; + if (i === 0 && span >= fallbackMinSpan) return (newest.num - log[0].num) / span; + } + return null; +} + /** * Connects to the SSE endpoint and delivers block events one-by-one. * @@ -44,10 +61,17 @@ export default function useBlockSSE(): BlockSSEState { // We keep up to 500 samples (~45s at 11 bps) and use two windows: // - 30s of block-time for the displayed bps (very stable) // - 10s of block-time for drain pacing (responsive enough to adapt) - const blockLogRef = useRef<{ num: number; ts: number }[]>([]); + const blockLogRef = useRef([]); // Cached drain interval in ms, derived from chain block timestamps const drainIntervalRef = useRef(90); // initial guess ~11 bps + // Kick the drain loop when new items arrive (avoids 33hz idle polling) + const kickDrain = useCallback(() => { + // If drain is already scheduled, let it run naturally + if (drainTimerRef.current !== null) return; + drainTimerRef.current = window.setTimeout(drainOne, 0); + }, []); + const connect = useCallback(function connectSSE() { if (esRef.current) { esRef.current.close(); @@ -67,52 +91,27 @@ export default function useBlockSSE(): BlockSSEState { const data: NewBlockEvent = JSON.parse(e.data); // Log block number + on-chain timestamp for true chain-rate calculation - blockLogRef.current.push({ - num: data.block.number, - ts: data.block.timestamp, // unix seconds from the chain - }); + const log = blockLogRef.current; + log.push({ num: data.block.number, ts: data.block.timestamp }); // Keep last 500 samples (~45s at 11 bps) - if (blockLogRef.current.length > 500) { - blockLogRef.current = blockLogRef.current.slice(-500); + if (log.length > 500) { + blockLogRef.current = log.slice(-500); } - // Recalculate bps and drain interval from chain timestamps. - const log = blockLogRef.current; - const newest = log[log.length - 1]; - - // Displayed bps: use 30s window for maximum stability - for (let i = 0; i < log.length - 1; i++) { - const span = newest.ts - log[i].ts; - if (span >= 30) { - const chainBps = (newest.num - log[i].num) / span; - setBps(chainBps); - break; - } - // If we don't have 30s yet, use whatever we have (≥5s) - if (i === 0 && span >= 5) { - const chainBps = (newest.num - log[0].num) / span; - setBps(chainBps); - } - } + // Displayed bps: 30s window for stability, 5s fallback while bootstrapping + const displayBps = computeBpsFromLog(blockLogRef.current, 30, 5); + if (displayBps !== null) setBps(displayBps); - // Drain pacing: use 10s window for moderate responsiveness - for (let i = 0; i < log.length - 1; i++) { - const span = newest.ts - log[i].ts; - if (span >= 10) { - const drainBps = (newest.num - log[i].num) / span; - drainIntervalRef.current = Math.max(30, Math.min(500, 1000 / drainBps)); - break; - } - // Bootstrap: use ≥2s while building up - if (i === 0 && span >= 2) { - const drainBps = (newest.num - log[0].num) / span; - drainIntervalRef.current = Math.max(30, Math.min(500, 1000 / drainBps)); - } + // Drain pacing: 10s window for moderate responsiveness, 2s fallback + const drainBps = computeBpsFromLog(blockLogRef.current, 10, 2); + if (drainBps !== null) { + drainIntervalRef.current = Math.max(30, Math.min(500, 1000 / drainBps)); } // Push to ref queue — synchronous, never lost by React batching queueRef.current.push(data); + kickDrain(); } catch { // Ignore malformed events } @@ -128,51 +127,33 @@ export default function useBlockSSE(): BlockSSEState { connectSSE(); }, 2000); }; - }, []); + }, [kickDrain]); - // Drain loop: emit one block per tick at the chain's natural cadence. - useEffect(() => { - let running = true; - - const drain = () => { - if (!running) return; - const queue = queueRef.current; + // Drain one block from the queue at the chain's natural cadence. + function drainOne() { + const queue = queueRef.current; + drainTimerRef.current = null; - if (queue.length === 0) { - // Nothing to drain — check again soon - drainTimerRef.current = window.setTimeout(drain, 30); - return; - } + if (queue.length === 0) return; // idle — kickDrain will restart when items arrive - // If queue is backing up (> 50 items), skip to near the end - if (queue.length > 50) { - const skip = queue.splice(0, queue.length - 5); - const lastSkipped = skip[skip.length - 1]; - setHeight(lastSkipped.block.number); - } + // If queue is backing up (> 50 items), skip to near the end + if (queue.length > 50) { + const skip = queue.splice(0, queue.length - 5); + const lastSkipped = skip[skip.length - 1]; + setHeight(lastSkipped.block.number); + } - const next = queue.shift()!; - setLatestBlock(next); - setHeight(next.block.number); + const next = queue.shift()!; + setLatestBlock(next); + setHeight(next.block.number); - // Use the chain-rate interval, but if queue is growing speed up gently + // Schedule next drain if more items remain + if (queue.length > 0) { let interval = drainIntervalRef.current; - if (queue.length > 5) { - interval = interval * 0.7; - } - drainTimerRef.current = window.setTimeout(drain, interval); - }; - - drainTimerRef.current = window.setTimeout(drain, 30); - - return () => { - running = false; - if (drainTimerRef.current !== null) { - clearTimeout(drainTimerRef.current); - drainTimerRef.current = null; - } - }; - }, []); + if (queue.length > 5) interval = interval * 0.7; + drainTimerRef.current = window.setTimeout(drainOne, interval); + } + } useEffect(() => { connect(); diff --git a/frontend/src/hooks/useLatestBlockHeight.ts b/frontend/src/hooks/useLatestBlockHeight.ts index 52b5c21..a0ff29e 100644 --- a/frontend/src/hooks/useLatestBlockHeight.ts +++ b/frontend/src/hooks/useLatestBlockHeight.ts @@ -1,6 +1,12 @@ import { useCallback, useEffect, useRef, useState } from 'react'; import { getStatus } from '../api/status'; +export interface SSEState { + height: number | null; + connected: boolean; + bps: number | null; +} + export interface LatestHeightState { height: number | null; loading: boolean; @@ -16,12 +22,8 @@ export interface LatestHeightState { */ export default function useLatestBlockHeight( pollMs = 2000, - _windowBlocks = 1000000, - sseHeight: number | null = null, - sseConnected = false, - sseBps: number | null = null, + sse: SSEState | null = null, ): LatestHeightState { - void _windowBlocks; const [height, setHeight] = useState(null); const heightRef = useRef(null); const [loading, setLoading] = useState(true); @@ -32,6 +34,10 @@ export default function useLatestBlockHeight( const prevSampleRef = useRef<{ h: number; t: number } | null>(null); const alphaRef = useRef(0.25); // smoothing factor for EMA + const sseConnected = sse?.connected ?? false; + const sseHeight = sse?.height ?? null; + const sseBps = sse?.bps ?? null; + // When SSE provides bps from block timestamps, use it directly useEffect(() => { if (sseConnected && sseBps != null) { diff --git a/frontend/src/hooks/useStats.ts b/frontend/src/hooks/useStats.ts index 527c334..5f47d6c 100644 --- a/frontend/src/hooks/useStats.ts +++ b/frontend/src/hooks/useStats.ts @@ -7,7 +7,7 @@ export default function useStats() { const [dailyTx, setDailyTx] = useState(null); const [loading, setLoading] = useState(true); const [error, setError] = useState(null); - const { bps } = useLatestBlockHeight(4000, 1_000_000); + const { bps } = useLatestBlockHeight(4000); const fetchAll = useCallback(async () => { setLoading(true); diff --git a/frontend/src/pages/BlocksPage.tsx b/frontend/src/pages/BlocksPage.tsx index 6bbd4e6..18d3729 100644 --- a/frontend/src/pages/BlocksPage.tsx +++ b/frontend/src/pages/BlocksPage.tsx @@ -24,6 +24,12 @@ export default function BlocksPage() { const pendingSseBlocksRef = useRef([]); const sseFilterRafRef = useRef(null); + // Cache fetched block numbers to avoid recreating Sets on every effect/memo + const fetchedNumberSet = useMemo( + () => new Set(fetchedBlocks.map((b) => b.number)), + [fetchedBlocks], + ); + // Prepend new blocks from SSE on page 1 with auto-refresh. // Buffer pending blocks so that burst arrivals (e.g. 100, 101, 102 before the // next frame) are all flushed in a single RAF rather than cancelling each other. @@ -39,7 +45,6 @@ export default function BlocksPage() { pendingSseBlocksRef.current = []; setSseBlocks((prev) => { const seen = new Set(prev.map((b) => b.number)); - // pending is oldest-first; reverse so newest ends up at the top const prepend = pending.filter((b) => !seen.has(b.number)).reverse(); return [...prepend, ...prev].slice(0, 20); }); @@ -51,22 +56,20 @@ export default function BlocksPage() { // but keep any that haven't been fetched yet. useEffect(() => { if (!fetchedBlocks.length) return; - const fetched = new Set(fetchedBlocks.map((b) => b.number)); if (sseFilterRafRef.current !== null) cancelAnimationFrame(sseFilterRafRef.current); sseFilterRafRef.current = window.requestAnimationFrame(() => { - setSseBlocks((prev) => prev.filter((b) => !fetched.has(b.number))); + setSseBlocks((prev) => prev.filter((b) => !fetchedNumberSet.has(b.number))); sseFilterRafRef.current = null; }); - }, [fetchedBlocks]); + }, [fetchedBlocks, fetchedNumberSet]); // Merge: SSE blocks prepended, deduped, trimmed to page size. // Only prepend on page 1 with auto-refresh — other pages show fetched data only. const blocks = useMemo(() => { if (page !== 1 || !autoRefresh || !sseBlocks.length) return fetchedBlocks; - const seen = new Set(fetchedBlocks.map((b) => b.number)); - const unique = sseBlocks.filter((b) => !seen.has(b.number)); + const unique = sseBlocks.filter((b) => !fetchedNumberSet.has(b.number)); return [...unique, ...fetchedBlocks].slice(0, 20); - }, [fetchedBlocks, sseBlocks, page, autoRefresh]); + }, [fetchedBlocks, fetchedNumberSet, sseBlocks, page, autoRefresh]); const navigate = useNavigate(); const [sort, setSort] = useState<{ key: 'number' | 'hash' | 'timestamp' | 'transaction_count' | 'gas_used' | null; direction: 'asc' | 'desc'; }>({ key: null, direction: 'desc' }); const seenBlocksRef = useRef>(new Set());