Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 17 additions & 30 deletions backend/crates/atlas-api/src/handlers/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ use crate::AppState;
use atlas_common::Block;
use tracing::warn;

const BLOCK_COLUMNS: &str =
"number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at";

#[derive(Serialize, Debug)]
struct NewBlockEvent {
block: Block,
Expand All @@ -33,40 +36,25 @@ pub async fn block_events(
tick.tick().await;
ping_counter += 1;

// On first tick, seed with the latest block number
// On first tick, seed with the latest block
if last_block_number.is_none() {
let latest: Option<i64> = match sqlx::query_scalar("SELECT MAX(number) FROM blocks")
.fetch_one(&state.pool)
.await
let block: Option<Block> = match sqlx::query_as(
&format!("SELECT {} FROM blocks ORDER BY number DESC LIMIT 1", BLOCK_COLUMNS)
)
.fetch_optional(&state.pool)
.await
{
Ok(v) => v,
Err(e) => { warn!(error = ?e, "sse: failed to query latest block number"); continue; }
Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; }
};

if let Some(max_num) = latest {
// Emit the current latest block as the initial event.
// Only advance the cursor after a successful fetch-and-emit so the
// block is not skipped if the fetch fails.
let block: Option<Block> = match sqlx::query_as(
"SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
FROM blocks WHERE number = $1"
)
.bind(max_num)
.fetch_optional(&state.pool)
.await
{
Ok(v) => v,
Err(e) => { warn!(error = ?e, "sse: failed to fetch initial block"); continue; }
};

if let Some(block) = block {
last_block_number = Some(block.number);
let event = NewBlockEvent { block };
if let Ok(json) = serde_json::to_string(&event) {
yield Ok(Event::default().event("new_block").data(json));
}
ping_counter = 0;
if let Some(block) = block {
last_block_number = Some(block.number);
let event = NewBlockEvent { block };
if let Ok(json) = serde_json::to_string(&event) {
yield Ok(Event::default().event("new_block").data(json));
}
ping_counter = 0;
}
continue;
}
Expand All @@ -76,8 +64,7 @@ pub async fn block_events(
// Fetch new blocks since last sent, in ascending order (capped to avoid
// unbounded memory usage and to stay well within the 10s statement_timeout).
let new_blocks: Vec<Block> = match sqlx::query_as(
"SELECT number, hash, parent_hash, timestamp, gas_used, gas_limit, transaction_count, indexed_at
FROM blocks WHERE number > $1 ORDER BY number ASC LIMIT 100"
&format!("SELECT {} FROM blocks WHERE number > $1 ORDER BY number ASC LIMIT 100", BLOCK_COLUMNS)
)
.bind(cursor)
.fetch_all(&state.pool)
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/api/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import axios from 'axios';
import type { AxiosInstance, AxiosError } from 'axios';
import type { ApiError } from '../types';

const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api';
export const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api';

const client: AxiosInstance = axios.create({
baseURL: API_BASE_URL,
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/components/Layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export default function Layout() {
const location = useLocation();
const isHome = location.pathname === '/';
const sse = useBlockSSE();
const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, 1000000, sse.height, sse.connected, sse.bps);
const { height, lastUpdatedAt, bps } = useLatestBlockHeight(2000, sse);
const [now, setNow] = useState(() => Date.now());
const recentlyUpdated = lastUpdatedAt ? (now - lastUpdatedAt) < 10000 : false;
const [displayedHeight, setDisplayedHeight] = useState<number | null>(null);
Expand Down
139 changes: 60 additions & 79 deletions frontend/src/hooks/useBlockSSE.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { useCallback, useEffect, useRef, useState } from 'react';
import type { Block } from '../types';

const API_BASE_URL = import.meta.env.VITE_API_BASE_URL || 'http://localhost:3000/api';
import { API_BASE_URL } from '../api/client';

export interface NewBlockEvent {
block: Block;
Expand All @@ -15,6 +14,24 @@ export interface BlockSSEState {
bps: number | null;
}

type BlockLog = { num: number; ts: number }[];

/**
* Compute bps from a rolling log of (blockNumber, blockTimestamp) samples.
* Returns the rate from the first sample whose span >= minSpan, or from the
* oldest sample if its span >= fallbackMinSpan. Returns null if insufficient data.
*/
function computeBpsFromLog(log: BlockLog, minSpan: number, fallbackMinSpan: number): number | null {
if (log.length < 2) return null;
const newest = log[log.length - 1];
for (let i = 0; i < log.length - 1; i++) {
const span = newest.ts - log[i].ts;
if (span >= minSpan) return (newest.num - log[i].num) / span;
if (i === 0 && span >= fallbackMinSpan) return (newest.num - log[0].num) / span;
}
return null;
}

/**
* Connects to the SSE endpoint and delivers block events one-by-one.
*
Expand Down Expand Up @@ -44,10 +61,17 @@ export default function useBlockSSE(): BlockSSEState {
// We keep up to 500 samples (~45s at 11 bps) and use two windows:
// - 30s of block-time for the displayed bps (very stable)
// - 10s of block-time for drain pacing (responsive enough to adapt)
const blockLogRef = useRef<{ num: number; ts: number }[]>([]);
const blockLogRef = useRef<BlockLog>([]);
// Cached drain interval in ms, derived from chain block timestamps
const drainIntervalRef = useRef<number>(90); // initial guess ~11 bps

// Kick the drain loop when new items arrive (avoids 33hz idle polling)
const kickDrain = useCallback(() => {
// If drain is already scheduled, let it run naturally
if (drainTimerRef.current !== null) return;
drainTimerRef.current = window.setTimeout(drainOne, 0);
}, []);

const connect = useCallback(function connectSSE() {
if (esRef.current) {
esRef.current.close();
Expand All @@ -67,52 +91,27 @@ export default function useBlockSSE(): BlockSSEState {
const data: NewBlockEvent = JSON.parse(e.data);

// Log block number + on-chain timestamp for true chain-rate calculation
blockLogRef.current.push({
num: data.block.number,
ts: data.block.timestamp, // unix seconds from the chain
});
const log = blockLogRef.current;
log.push({ num: data.block.number, ts: data.block.timestamp });

// Keep last 500 samples (~45s at 11 bps)
if (blockLogRef.current.length > 500) {
blockLogRef.current = blockLogRef.current.slice(-500);
if (log.length > 500) {
blockLogRef.current = log.slice(-500);
}

// Recalculate bps and drain interval from chain timestamps.
const log = blockLogRef.current;
const newest = log[log.length - 1];

// Displayed bps: use 30s window for maximum stability
for (let i = 0; i < log.length - 1; i++) {
const span = newest.ts - log[i].ts;
if (span >= 30) {
const chainBps = (newest.num - log[i].num) / span;
setBps(chainBps);
break;
}
// If we don't have 30s yet, use whatever we have (≥5s)
if (i === 0 && span >= 5) {
const chainBps = (newest.num - log[0].num) / span;
setBps(chainBps);
}
}
// Displayed bps: 30s window for stability, 5s fallback while bootstrapping
const displayBps = computeBpsFromLog(blockLogRef.current, 30, 5);
if (displayBps !== null) setBps(displayBps);

// Drain pacing: use 10s window for moderate responsiveness
for (let i = 0; i < log.length - 1; i++) {
const span = newest.ts - log[i].ts;
if (span >= 10) {
const drainBps = (newest.num - log[i].num) / span;
drainIntervalRef.current = Math.max(30, Math.min(500, 1000 / drainBps));
break;
}
// Bootstrap: use ≥2s while building up
if (i === 0 && span >= 2) {
const drainBps = (newest.num - log[0].num) / span;
drainIntervalRef.current = Math.max(30, Math.min(500, 1000 / drainBps));
}
// Drain pacing: 10s window for moderate responsiveness, 2s fallback
const drainBps = computeBpsFromLog(blockLogRef.current, 10, 2);
if (drainBps !== null) {
drainIntervalRef.current = Math.max(30, Math.min(500, 1000 / drainBps));
}

// Push to ref queue — synchronous, never lost by React batching
queueRef.current.push(data);
kickDrain();
} catch {
// Ignore malformed events
}
Expand All @@ -128,51 +127,33 @@ export default function useBlockSSE(): BlockSSEState {
connectSSE();
}, 2000);
};
}, []);
}, [kickDrain]);

// Drain loop: emit one block per tick at the chain's natural cadence.
useEffect(() => {
let running = true;

const drain = () => {
if (!running) return;
const queue = queueRef.current;
// Drain one block from the queue at the chain's natural cadence.
function drainOne() {
const queue = queueRef.current;
drainTimerRef.current = null;

if (queue.length === 0) {
// Nothing to drain — check again soon
drainTimerRef.current = window.setTimeout(drain, 30);
return;
}
if (queue.length === 0) return; // idle — kickDrain will restart when items arrive

// If queue is backing up (> 50 items), skip to near the end
if (queue.length > 50) {
const skip = queue.splice(0, queue.length - 5);
const lastSkipped = skip[skip.length - 1];
setHeight(lastSkipped.block.number);
}
// If queue is backing up (> 50 items), skip to near the end
if (queue.length > 50) {
const skip = queue.splice(0, queue.length - 5);
const lastSkipped = skip[skip.length - 1];
setHeight(lastSkipped.block.number);
}

const next = queue.shift()!;
setLatestBlock(next);
setHeight(next.block.number);
const next = queue.shift()!;
setLatestBlock(next);
setHeight(next.block.number);

// Use the chain-rate interval, but if queue is growing speed up gently
// Schedule next drain if more items remain
if (queue.length > 0) {
let interval = drainIntervalRef.current;
if (queue.length > 5) {
interval = interval * 0.7;
}
drainTimerRef.current = window.setTimeout(drain, interval);
};

drainTimerRef.current = window.setTimeout(drain, 30);

return () => {
running = false;
if (drainTimerRef.current !== null) {
clearTimeout(drainTimerRef.current);
drainTimerRef.current = null;
}
};
}, []);
if (queue.length > 5) interval = interval * 0.7;
drainTimerRef.current = window.setTimeout(drainOne, interval);
}
}

useEffect(() => {
connect();
Expand Down
16 changes: 11 additions & 5 deletions frontend/src/hooks/useLatestBlockHeight.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import { useCallback, useEffect, useRef, useState } from 'react';
import { getStatus } from '../api/status';

export interface SSEState {
height: number | null;
connected: boolean;
bps: number | null;
}

export interface LatestHeightState {
height: number | null;
loading: boolean;
Expand All @@ -16,12 +22,8 @@ export interface LatestHeightState {
*/
export default function useLatestBlockHeight(
pollMs = 2000,
_windowBlocks = 1000000,
sseHeight: number | null = null,
sseConnected = false,
sseBps: number | null = null,
sse: SSEState | null = null,
): LatestHeightState {
void _windowBlocks;
const [height, setHeight] = useState<number | null>(null);
const heightRef = useRef<number | null>(null);
const [loading, setLoading] = useState<boolean>(true);
Expand All @@ -32,6 +34,10 @@ export default function useLatestBlockHeight(
const prevSampleRef = useRef<{ h: number; t: number } | null>(null);
const alphaRef = useRef<number>(0.25); // smoothing factor for EMA

const sseConnected = sse?.connected ?? false;
const sseHeight = sse?.height ?? null;
const sseBps = sse?.bps ?? null;

// When SSE provides bps from block timestamps, use it directly
useEffect(() => {
if (sseConnected && sseBps != null) {
Expand Down
2 changes: 1 addition & 1 deletion frontend/src/hooks/useStats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ export default function useStats() {
const [dailyTx, setDailyTx] = useState<number | null>(null);
const [loading, setLoading] = useState(true);
const [error, setError] = useState<string | null>(null);
const { bps } = useLatestBlockHeight(4000, 1_000_000);
const { bps } = useLatestBlockHeight(4000);

const fetchAll = useCallback(async () => {
setLoading(true);
Expand Down
17 changes: 10 additions & 7 deletions frontend/src/pages/BlocksPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ export default function BlocksPage() {
const pendingSseBlocksRef = useRef<typeof fetchedBlocks>([]);
const sseFilterRafRef = useRef<number | null>(null);

// Cache fetched block numbers to avoid recreating Sets on every effect/memo
const fetchedNumberSet = useMemo(
() => new Set(fetchedBlocks.map((b) => b.number)),
[fetchedBlocks],
);

// Prepend new blocks from SSE on page 1 with auto-refresh.
// Buffer pending blocks so that burst arrivals (e.g. 100, 101, 102 before the
// next frame) are all flushed in a single RAF rather than cancelling each other.
Expand All @@ -39,7 +45,6 @@ export default function BlocksPage() {
pendingSseBlocksRef.current = [];
setSseBlocks((prev) => {
const seen = new Set(prev.map((b) => b.number));
// pending is oldest-first; reverse so newest ends up at the top
const prepend = pending.filter((b) => !seen.has(b.number)).reverse();
return [...prepend, ...prev].slice(0, 20);
});
Expand All @@ -51,22 +56,20 @@ export default function BlocksPage() {
// but keep any that haven't been fetched yet.
useEffect(() => {
if (!fetchedBlocks.length) return;
const fetched = new Set(fetchedBlocks.map((b) => b.number));
if (sseFilterRafRef.current !== null) cancelAnimationFrame(sseFilterRafRef.current);
sseFilterRafRef.current = window.requestAnimationFrame(() => {
setSseBlocks((prev) => prev.filter((b) => !fetched.has(b.number)));
setSseBlocks((prev) => prev.filter((b) => !fetchedNumberSet.has(b.number)));
sseFilterRafRef.current = null;
});
}, [fetchedBlocks]);
}, [fetchedBlocks, fetchedNumberSet]);

// Merge: SSE blocks prepended, deduped, trimmed to page size.
// Only prepend on page 1 with auto-refresh — other pages show fetched data only.
const blocks = useMemo(() => {
if (page !== 1 || !autoRefresh || !sseBlocks.length) return fetchedBlocks;
const seen = new Set(fetchedBlocks.map((b) => b.number));
const unique = sseBlocks.filter((b) => !seen.has(b.number));
const unique = sseBlocks.filter((b) => !fetchedNumberSet.has(b.number));
return [...unique, ...fetchedBlocks].slice(0, 20);
}, [fetchedBlocks, sseBlocks, page, autoRefresh]);
}, [fetchedBlocks, fetchedNumberSet, sseBlocks, page, autoRefresh]);
const navigate = useNavigate();
const [sort, setSort] = useState<{ key: 'number' | 'hash' | 'timestamp' | 'transaction_count' | 'gas_used' | null; direction: 'asc' | 'desc'; }>({ key: null, direction: 'desc' });
const seenBlocksRef = useRef<Set<number>>(new Set());
Expand Down