Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Required: Your L2 RPC endpoint
RPC_URL=http://localhost:8545

# Human-readable name for your chain, displayed in the explorer UI
CHAIN_NAME="My Chain"

# Optional settings (defaults shown)
START_BLOCK=0
BATCH_SIZE=100
Expand Down
5 changes: 3 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ pub struct AppState {

### Frontend API client
- Base URL: `/api` (proxied by nginx to `atlas-server:3000`)
- `GET /api/status` → `{ block_height, indexed_at }` — single key-value lookup from `indexer_state`, sub-ms. Used by the navbar as a polling fallback when SSE is disconnected.
- `GET /api/events` → SSE stream of `new_block` events, one per block in order. Primary live-update path for navbar counter and blocks page. Falls back to `/api/status` polling on disconnect.
- Fast polling endpoint: `GET /api/height` → `{ block_height, indexed_at }` — single key-value lookup from `indexer_state`, sub-ms. Used by the navbar as a polling fallback when SSE is disconnected.
- Chain status: `GET /api/status` → `{ chain_id, chain_name, block_height, total_transactions, total_addresses, indexed_at }` — full chain info, fetched once on page load.
- `GET /api/events` → SSE stream of `new_block` events, one per block in order. Primary live-update path for navbar counter and blocks page. Falls back to `/api/height` polling on disconnect.

## Important Conventions

Expand Down
58 changes: 52 additions & 6 deletions backend/crates/atlas-server/src/api/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,26 @@ pub mod transactions;

use sqlx::PgPool;

/// Get transactions table row count efficiently.
fn exact_count_sql(table_name: &str) -> Result<&'static str, sqlx::Error> {
match table_name {
"transactions" => Ok("SELECT COUNT(*) FROM transactions"),
"addresses" => Ok("SELECT COUNT(*) FROM addresses"),
_ => Err(sqlx::Error::Protocol(format!(
"unsupported table for exact count: {table_name}"
))),
}
}

fn should_use_approximate_count(approx: i64) -> bool {
approx > 100_000
}

/// Get a table's row count efficiently.
/// - For tables > 100k rows: uses PostgreSQL's approximate count (instant, ~99% accurate)
/// - For smaller tables: uses exact COUNT(*) (fast enough)
///
/// This avoids the slow COUNT(*) full table scan on large tables.
pub async fn get_table_count(pool: &PgPool) -> Result<i64, sqlx::Error> {
let table_name = "transactions";

pub async fn get_table_count(pool: &PgPool, table_name: &str) -> Result<i64, sqlx::Error> {
// Sum approximate reltuples across partitions if any, else use parent.
// This is instant and reasonably accurate for large tables.
// Cast to float8 (f64) since reltuples is float4 and SUM returns float4
Expand Down Expand Up @@ -47,13 +59,47 @@ pub async fn get_table_count(pool: &PgPool) -> Result<i64, sqlx::Error> {
parent.0.unwrap_or(0.0) as i64
};

if approx > 100_000 {
if should_use_approximate_count(approx) {
Ok(approx)
} else {
// Exact count for small tables
let exact: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM transactions")
let exact: (i64,) = sqlx::query_as(exact_count_sql(table_name)?)
.fetch_one(pool)
.await?;
Ok(exact.0)
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn exact_count_sql_whitelists_supported_tables() {
assert_eq!(
exact_count_sql("transactions").unwrap(),
"SELECT COUNT(*) FROM transactions"
);
assert_eq!(
exact_count_sql("addresses").unwrap(),
"SELECT COUNT(*) FROM addresses"
);
}

#[test]
fn exact_count_sql_rejects_unsupported_tables() {
let err = exact_count_sql("blocks").unwrap_err();
assert!(err.to_string().contains("unsupported table"));
}

#[test]
fn should_use_approximate_count_above_threshold() {
assert!(should_use_approximate_count(100_001));
}

#[test]
fn should_use_approximate_count_uses_exact_count_at_threshold_and_below() {
assert!(!should_use_approximate_count(100_000));
assert!(!should_use_approximate_count(42));
}
}
38 changes: 36 additions & 2 deletions backend/crates/atlas-server/src/api/handlers/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,43 @@ use serde::Serialize;
use std::sync::Arc;

use crate::api::error::ApiResult;
use crate::api::handlers::get_table_count;
use crate::api::AppState;

#[derive(Serialize)]
pub struct HeightResponse {
pub block_height: i64,
pub indexed_at: String,
}

#[derive(Serialize)]
pub struct ChainStatus {
pub chain_id: u64,
pub chain_name: String,
pub block_height: i64,
pub total_transactions: i64,
pub total_addresses: i64,
pub indexed_at: String,
}

/// GET /api/status - Lightweight endpoint for current chain status
/// Returns in <1ms, optimized for frequent polling
/// GET /api/height - Lightweight endpoint for current block height.
/// Returns in <1ms, optimized for frequent polling.
pub async fn get_height(State(state): State<Arc<AppState>>) -> ApiResult<Json<HeightResponse>> {
let result: (String, chrono::DateTime<chrono::Utc>) = sqlx::query_as(
"SELECT value, updated_at FROM indexer_state WHERE key = 'last_indexed_block'",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is still blocked on items written to disk, if we batch write then its not really latest height but latest written height. what are your thoughts on doing a internal api on the indexer to talk to the api process to allow direct streaming before writing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll wait for the SSE refactoring to be merged and than we can read this value directly from the latest block in the buffer 👍

)
.fetch_one(&state.pool)
.await?;

let block_height: i64 = result.0.parse().unwrap_or(0);

Ok(Json(HeightResponse {
block_height,
indexed_at: result.1.to_rfc3339(),
}))
}

/// GET /api/status - Full chain status including chain ID, name, and counts.
pub async fn get_status(State(state): State<Arc<AppState>>) -> ApiResult<Json<ChainStatus>> {
let result: (String, chrono::DateTime<chrono::Utc>) = sqlx::query_as(
"SELECT value, updated_at FROM indexer_state WHERE key = 'last_indexed_block'",
Expand All @@ -22,8 +49,15 @@ pub async fn get_status(State(state): State<Arc<AppState>>) -> ApiResult<Json<Ch

let block_height: i64 = result.0.parse().unwrap_or(0);

let total_transactions = get_table_count(&state.pool, "transactions").await?;
let total_addresses = get_table_count(&state.pool, "addresses").await?;

Ok(Json(ChainStatus {
chain_id: state.chain_id,
chain_name: state.chain_name.clone(),
block_height,
total_transactions,
total_addresses,
indexed_at: result.1.to_rfc3339(),
}))
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub async fn list_transactions(
Query(pagination): Query<Pagination>,
) -> ApiResult<Json<PaginatedResponse<Transaction>>> {
// Use optimized count (approximate for large tables, exact for small)
let total = get_table_count(&state.pool).await?;
let total = get_table_count(&state.pool, "transactions").await?;

let transactions: Vec<Transaction> = sqlx::query_as(
"SELECT hash, block_number, block_index, from_address, to_address, value, gas_price, gas_used, input_data, status, contract_created, timestamp
Expand Down
3 changes: 3 additions & 0 deletions backend/crates/atlas-server/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub struct AppState {
pub pool: PgPool,
pub block_events_tx: broadcast::Sender<()>,
pub rpc_url: String,
pub chain_id: u64,
pub chain_name: String,
}

pub fn build_router(state: Arc<AppState>) -> Router {
Expand Down Expand Up @@ -132,6 +134,7 @@ pub fn build_router(state: Arc<AppState>) -> Router {
// Search
.route("/api/search", get(handlers::search::search))
// Status
.route("/api/height", get(handlers::status::get_height))
.route("/api/status", get(handlers::status::get_status))
// Health
.route("/health", get(|| async { "OK" }))
Expand Down
2 changes: 2 additions & 0 deletions backend/crates/atlas-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct Config {
// API-specific
pub api_host: String,
pub api_port: u16,
pub chain_name: String,
}

impl Config {
Expand Down Expand Up @@ -84,6 +85,7 @@ impl Config {
.unwrap_or_else(|_| "3000".to_string())
.parse()
.context("Invalid API_PORT")?,
chain_name: env::var("CHAIN_NAME").unwrap_or_else(|_| "Unknown".to_string()),
})
}
}
83 changes: 81 additions & 2 deletions backend/crates/atlas-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,32 @@ mod indexer;
const RETRY_DELAYS: &[u64] = &[5, 10, 20, 30, 60];
const MAX_RETRY_DELAY: u64 = 60;

fn parse_chain_id(hex: &str) -> Option<u64> {
u64::from_str_radix(hex.trim_start_matches("0x"), 16).ok()
}

async fn fetch_chain_id(rpc_url: &str) -> Result<u64> {
let client = reqwest::Client::new();
let resp = client
.post(rpc_url)
.json(&serde_json::json!({
"jsonrpc": "2.0",
"method": "eth_chainId",
"params": [],
"id": 1
}))
.timeout(Duration::from_secs(5))
.send()
.await?
.error_for_status()?;

let json: serde_json::Value = resp.json().await?;
let hex = json["result"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("eth_chainId result missing"))?;
parse_chain_id(hex).ok_or_else(|| anyhow::anyhow!("invalid eth_chainId hex"))
}

#[tokio::main]
async fn main() -> Result<()> {
// Initialize tracing
Expand All @@ -29,6 +55,10 @@ async fn main() -> Result<()> {
dotenvy::dotenv().ok();
let config = config::Config::from_env()?;

tracing::info!("Fetching chain ID from RPC");
let chain_id = fetch_chain_id(&config.rpc_url).await?;
tracing::info!("Chain ID: {}", chain_id);

// Run migrations once (dedicated pool, no statement_timeout)
tracing::info!("Running database migrations");
atlas_common::db::run_migrations(&config.database_url).await?;
Expand All @@ -48,6 +78,8 @@ async fn main() -> Result<()> {
pool: api_pool,
block_events_tx: block_events_tx.clone(),
rpc_url: config.rpc_url.clone(),
chain_id,
chain_name: config.chain_name.clone(),
});

// Spawn indexer task with retry logic
Expand Down Expand Up @@ -168,8 +200,32 @@ where

#[cfg(test)]
mod tests {
use super::wait_for_shutdown_signal;
use tokio::sync::oneshot;
use super::*;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::TcpListener,
sync::oneshot,
};

async fn serve_json_once(body: &'static str) -> String {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();

tokio::spawn(async move {
let (mut socket, _) = listener.accept().await.unwrap();
let mut buf = [0_u8; 1024];
let _ = socket.read(&mut buf).await.unwrap();

let response = format!(
"HTTP/1.1 200 OK\r\ncontent-type: application/json\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
body.len(),
body
);
socket.write_all(response.as_bytes()).await.unwrap();
});

format!("http://{}", addr)
}

#[tokio::test]
async fn wait_for_shutdown_signal_returns_on_ctrl_c_future() {
Expand Down Expand Up @@ -206,4 +262,27 @@ mod tests {
term_tx.send(()).unwrap();
shutdown.await.unwrap();
}

#[tokio::test]
async fn fetch_chain_id_reads_hex_result_from_rpc_response() {
let url = serve_json_once(r#"{"jsonrpc":"2.0","id":1,"result":"0xa4b1"}"#).await;
assert_eq!(fetch_chain_id(&url).await.unwrap(), 42161);
}

#[tokio::test]
async fn fetch_chain_id_returns_error_for_invalid_result() {
let url = serve_json_once(r#"{"jsonrpc":"2.0","id":1,"result":"not_hex"}"#).await;
let err = fetch_chain_id(&url).await.unwrap_err();
assert!(err.to_string().contains("invalid eth_chainId hex"));
}

#[tokio::test]
async fn fetch_chain_id_returns_error_for_http_failure() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
drop(listener);

let url = format!("http://{}", addr);
assert!(fetch_chain_id(&url).await.is_err());
}
}
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ services:
FETCH_WORKERS: ${FETCH_WORKERS:-10}
RPC_REQUESTS_PER_SECOND: ${RPC_REQUESTS_PER_SECOND:-100}
RPC_BATCH_SIZE: ${RPC_BATCH_SIZE:-20}
CHAIN_NAME: ${CHAIN_NAME:-Unknown}
API_HOST: 0.0.0.0
API_PORT: 3000
RUST_LOG: atlas_server=info,tower_http=info
Expand Down
2 changes: 2 additions & 0 deletions frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
WelcomePage,
SearchResultsPage,
AddressesPage,
StatusPage,
} from './pages';
import { ThemeProvider } from './context/ThemeContext';

Expand All @@ -37,6 +38,7 @@ export default function App() {
<Route path="nfts" element={<NFTsPage />} />
<Route path="nfts/:contract" element={<NFTContractPage />} />
<Route path="nfts/:contract/:tokenId" element={<NFTTokenPage />} />
<Route path="status" element={<StatusPage />} />
<Route path="tokens" element={<TokensPage />} />
<Route path="tokens/:address" element={<TokenDetailPage />} />
<Route path="*" element={<NotFoundPage />} />
Expand Down
19 changes: 16 additions & 3 deletions frontend/src/api/status.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
import client from './client';

export interface StatusResponse {
export interface HeightResponse {
block_height: number;
indexed_at: string; // ISO timestamp
}

export async function getStatus(): Promise<StatusResponse> {
const response = await client.get<StatusResponse>('/status');
export interface ChainStatusResponse {
chain_id: number;
chain_name: string;
block_height: number;
total_transactions: number;
total_addresses: number;
indexed_at: string; // ISO timestamp
}

export async function getStatus(): Promise<HeightResponse> {
const response = await client.get<HeightResponse>('/height');
return response.data;
}

export async function getChainStatus(): Promise<ChainStatusResponse> {
const response = await client.get<ChainStatusResponse>('/status');
return response.data;
}
6 changes: 6 additions & 0 deletions frontend/src/components/Layout.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ export default function Layout() {
<NavLink to="/nfts" className={navLinkClass}>
NFTs
</NavLink>
<NavLink to="/status" className={navLinkClass}>
Status
</NavLink>
</nav>

{/* Right status: latest height + live pulse */}
Expand Down Expand Up @@ -216,6 +219,9 @@ export default function Layout() {
<NavLink to="/nfts" className={navLinkClass}>
NFTs
</NavLink>
<NavLink to="/status" className={navLinkClass}>
Status
</NavLink>
<button
type="button"
onClick={toggleTheme}
Expand Down
Loading
Loading