Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,18 @@ jobs:
with:
go-version-file: go.mod
- run: go build -o bin/apex ./cmd/apex

e2e-submission:
name: E2E Submission
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version-file: e2e/go.mod
cache-dependency-path: |
go.sum
e2e/go.sum
- working-directory: e2e
run: go test -race -count=1 -timeout 20m -run TestSubmissionViaJSONRPC ./...
128 changes: 128 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Apex — Celestia Namespace Indexer

Lightweight indexer that watches Celestia namespaces, stores blobs/headers in SQLite, and exposes them via JSON-RPC, gRPC, and REST health endpoints. Includes Prometheus observability, a CLI client, and multi-stage Docker build.

## Build Commands

```bash
just build # compile to bin/apex
just test # go test -race ./...
just lint # golangci-lint run
just fmt # gofumpt -w .
just check # tidy + lint + test + build (CI equivalent)
just run # build and run
just clean # remove bin/
just tidy # go mod tidy
```

## Architecture

### Data Flow

```text
Celestia Node → Fetcher → Sync Coordinator → Store (SQLite)
→ Notifier → Subscribers
API (JSON-RPC + gRPC + Health)
```

The sync coordinator runs in two phases: **backfill** (historical blocks in batches) then **streaming** (live via header subscription). Height observers publish events to the notifier which fans out to API subscribers.

### File Structure

```text
cmd/apex/
main.go CLI entrypoint, server wiring, graceful shutdown
client.go Thin HTTP JSON-RPC client for CLI commands
status.go `apex status` command (health endpoint)
blob_cmd.go `apex blob get|list` commands
config_cmd.go `apex config validate|show` commands

config/
config.go Config structs (DataSource, Storage, RPC, Sync, Metrics, Log)
load.go YAML loading, validation, env var override, template generation

pkg/types/
types.go Domain types: Namespace, Blob, Header, SyncState, SyncStatus

pkg/store/
store.go Store interface (PutBlobs, GetBlobs, PutHeader, GetHeader, sync state)
sqlite.go SQLite implementation with metrics instrumentation
migrations/ SQL migration files

pkg/fetch/
fetcher.go DataFetcher + ProofForwarder interfaces
celestia_node.go Celestia node-api client (headers, blobs, subscriptions, proofs)
celestia_app.go Celestia-app gRPC client (headers, blobs, polling subscription)

pkg/sync/
coordinator.go Sync lifecycle: initialize → backfill → stream, tracks heights
backfill.go Concurrent batch backfill with configurable batch size/concurrency
subscription.go Header subscription manager for live streaming

pkg/api/
service.go API service layer (blob/header queries, proof forwarding, subscriptions)
notifier.go Event fan-out to subscribers with bounded buffers
health.go /health and /health/ready HTTP endpoints, HealthStatus JSON
jsonrpc/ JSON-RPC server (go-jsonrpc), blob/header/subscription handlers
grpc/ gRPC server, protobuf service implementations
gen/apex/v1/ Generated protobuf Go code
gen/cosmos/base/tendermint/v1beta1/ Generated Cosmos CometBFT service client

pkg/metrics/
metrics.go Recorder interface (nil-safe), nopRecorder, PromRecorder (Prometheus)
server.go HTTP server for /metrics endpoint

proto/apex/v1/ Protobuf definitions (blob, header, types)
proto/cosmos/base/tendermint/v1beta1/ Minimal Cosmos SDK CometBFT service proto

Dockerfile Multi-stage build (golang builder + distroless runtime)
```

### Key Interfaces

- **`store.Store`** — persistence (SQLite impl, instrumented with metrics)
- **`fetch.DataFetcher`** — block data retrieval (Celestia node JSON-RPC or celestia-app gRPC)
- **`fetch.ProofForwarder`** — proof/inclusion forwarding to upstream node
- **`metrics.Recorder`** — nil-safe metrics abstraction (Prometheus or no-op)
- **`api.StatusProvider`** — sync status for health endpoints (implemented by coordinator)

### Ports (defaults)

| Port | Protocol | Purpose |
|-------|----------|------------------|
| :8080 | HTTP | JSON-RPC + health|
| :9090 | TCP | gRPC |
| :9091 | HTTP | Prometheus /metrics |

### Config

YAML with strict unknown-field rejection. Auth token via `APEX_AUTH_TOKEN` env var only (not in config file). See `config/config.go` for all fields and `DefaultConfig()` for defaults.

## Conventions

- Go 1.25+ (`go.mod` specifies 1.25.0)
- SQLite via `modernc.org/sqlite` (CGo-free)
- Config: YAML (`gopkg.in/yaml.v3`), strict unknown-field rejection
- Logging: `rs/zerolog`
- CLI: `spf13/cobra`
- Metrics: `prometheus/client_golang` behind nil-safe `Recorder` interface
- JSON-RPC: `filecoin-project/go-jsonrpc`
- gRPC: `google.golang.org/grpc` + `google.golang.org/protobuf`
- Protobuf codegen: `buf` (`buf.yaml` + `buf.gen.yaml`)
- Linter: golangci-lint v2 (.golangci.yml v2 format), gocyclo max 15
- Formatter: gofumpt
- Build runner: just (justfile)

## Dependencies

- Only add deps that are strictly necessary
- Prefer stdlib where reasonable
- No CGo dependencies (cross-compilation constraint)

## Testing

- All tests use `-race`
- Table-driven tests preferred
- Test files alongside source (`_test.go`)
- No test frameworks beyond stdlib `testing`
3 changes: 3 additions & 0 deletions buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ modules:
lint:
use:
- DEFAULT
ignore_only:
PACKAGE_VERSION_SUFFIX:
- proto/cosmos/crypto/secp256k1/keys.proto
breaking:
use:
- FILE
122 changes: 99 additions & 23 deletions cmd/apex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/evstack/apex/pkg/metrics"
"github.com/evstack/apex/pkg/profile"
"github.com/evstack/apex/pkg/store"
"github.com/evstack/apex/pkg/submit"
syncer "github.com/evstack/apex/pkg/sync"
"github.com/evstack/apex/pkg/types"
)
Expand Down Expand Up @@ -113,6 +114,7 @@ func startCmd() *cobra.Command {
startLog := log.Info().
Str("version", version).
Str("datasource_type", cfg.DataSource.Type).
Bool("submission_enabled", cfg.Submission.Enabled).
Int("namespaces", len(cfg.DataSource.Namespaces))
if cfg.DataSource.Type == dataSourceTypeApp {
startLog = startLog.Str("app_grpc_addr", cfg.DataSource.CelestiaAppGRPCAddr)
Expand Down Expand Up @@ -171,7 +173,12 @@ func setupProfiling(cfg *config.Config) *profile.Server {
func openDataSource(ctx context.Context, cfg *config.Config) (fetch.DataFetcher, fetch.ProofForwarder, error) {
switch cfg.DataSource.Type {
case dataSourceTypeApp:
appFetcher, err := fetch.NewCelestiaAppFetcher(cfg.DataSource.CelestiaAppGRPCAddr, cfg.DataSource.AuthToken, log.Logger)
appFetcher, err := fetch.NewCelestiaAppFetcher(
cfg.DataSource.CelestiaAppGRPCAddr,
cfg.DataSource.AuthToken,
cfg.DataSource.CelestiaAppGRPCInsecure,
log.Logger,
)
if err != nil {
return nil, nil, fmt.Errorf("create celestia-app fetcher: %w", err)
}
Expand Down Expand Up @@ -275,33 +282,18 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
}
defer dataFetcher.Close() //nolint:errcheck

// Set up API layer.
notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
notifier.SetMetrics(rec)
svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger)

// Build and run the sync coordinator with observer hook.
coordOpts := []syncer.Option{
syncer.WithStartHeight(cfg.Sync.StartHeight),
syncer.WithBatchSize(cfg.Sync.BatchSize),
syncer.WithConcurrency(cfg.Sync.Concurrency),
syncer.WithLogger(log.Logger),
syncer.WithMetrics(rec),
syncer.WithObserver(func(h uint64, hdr *types.Header, blobs []types.Blob) {
notifier.Publish(api.HeightEvent{Height: h, Header: hdr, Blobs: blobs})
}),
svc, notifier, closeSubmitter, err := setupAPIService(cfg, db, dataFetcher, proofFwd, rec)
if err != nil {
return err
}
defer closeSubmitter()

backfillOpt, closeBackfill, err := maybeBackfillSourceOption(cfg, log.Logger)
// Build and run the sync coordinator with observer hook.
coordOpts, closeBackfill, err := buildCoordinatorOptions(cfg, notifier, rec)
if err != nil {
return err
}
if closeBackfill != nil {
defer closeBackfill()
}
if backfillOpt != nil {
coordOpts = append(coordOpts, backfillOpt)
}
defer closeBackfill()

coord := syncer.New(db, dataFetcher, coordOpts...)

Expand Down Expand Up @@ -360,6 +352,90 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
return nil
}

func openBlobSubmitter(cfg *config.Config) (*submit.DirectSubmitter, error) {
if !cfg.Submission.Enabled {
return nil, nil
}

appClient, err := submit.NewGRPCAppClient(
cfg.Submission.CelestiaAppGRPCAddr,
cfg.Submission.CelestiaAppGRPCInsecure,
)
if err != nil {
return nil, fmt.Errorf("create submission app client: %w", err)
}

signer, err := submit.LoadSigner(cfg.Submission.SignerKey)
if err != nil {
_ = appClient.Close()
return nil, fmt.Errorf("load submission signer: %w", err)
}

blobSubmitter, err := submit.NewDirectSubmitter(appClient, signer, submit.DirectConfig{
ChainID: cfg.Submission.ChainID,
GasPrice: cfg.Submission.GasPrice,
MaxGasPrice: cfg.Submission.MaxGasPrice,
ConfirmationTimeout: time.Duration(cfg.Submission.ConfirmationTimeout) * time.Second,
})
if err != nil {
_ = appClient.Close()
return nil, fmt.Errorf("configure submission backend: %w", err)
}

return blobSubmitter, nil
}

func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder) (*api.Service, *api.Notifier, func(), error) {
blobSubmitter, err := openBlobSubmitter(cfg)
if err != nil {
return nil, nil, nil, err
}

closeSubmitter := func() {}
if blobSubmitter != nil {
closeSubmitter = func() {
_ = blobSubmitter.Close()
}
}

notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
notifier.SetMetrics(rec)

svcOpts := make([]api.ServiceOption, 0, 1)
if blobSubmitter != nil {
svcOpts = append(svcOpts, api.WithBlobSubmitter(blobSubmitter))
}

svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger, svcOpts...)
return svc, notifier, closeSubmitter, nil
}

func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec metrics.Recorder) ([]syncer.Option, func(), error) {
coordOpts := []syncer.Option{
syncer.WithStartHeight(cfg.Sync.StartHeight),
syncer.WithBatchSize(cfg.Sync.BatchSize),
syncer.WithConcurrency(cfg.Sync.Concurrency),
syncer.WithLogger(log.Logger),
syncer.WithMetrics(rec),
syncer.WithObserver(func(h uint64, hdr *types.Header, blobs []types.Blob) {
notifier.Publish(api.HeightEvent{Height: h, Header: hdr, Blobs: blobs})
}),
}

backfillOpt, closeBackfill, err := maybeBackfillSourceOption(cfg, log.Logger)
if err != nil {
return nil, nil, err
}
if closeBackfill == nil {
closeBackfill = func() {}
}
if backfillOpt != nil {
coordOpts = append(coordOpts, backfillOpt)
}

return coordOpts, closeBackfill, nil
}

func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) {
stopped := make(chan struct{})
go func() {
Expand Down
44 changes: 35 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,23 @@ type Config struct {
Metrics MetricsConfig `yaml:"metrics"`
Profiling ProfilingConfig `yaml:"profiling"`
Log LogConfig `yaml:"log"`
Submission SubmissionConfig `yaml:"submission"`
}

// DataSourceConfig configures the Celestia data source.
// Type selects the backend: "node" (default) uses a Celestia DA node,
// "app" uses a celestia-app consensus node via Cosmos SDK gRPC.
type DataSourceConfig struct {
Type string `yaml:"type"` // "node" (default) or "app"
CelestiaNodeURL string `yaml:"celestia_node_url"`
CelestiaAppGRPCAddr string `yaml:"celestia_app_grpc_addr"`
BackfillSource string `yaml:"backfill_source"` // "rpc" (default) or "db" for app mode
CelestiaAppDBPath string `yaml:"celestia_app_db_path"` // required when backfill_source=db
CelestiaAppDBBackend string `yaml:"celestia_app_db_backend"` // auto|pebble|leveldb
CelestiaAppDBLayout string `yaml:"celestia_app_db_layout"` // auto|v1|v2
AuthToken string `yaml:"-"` //nolint:gosec // populated only via APEX_AUTH_TOKEN env var; not a hardcoded credential
Namespaces []string `yaml:"namespaces"`
Type string `yaml:"type"` // "node" (default) or "app"
CelestiaNodeURL string `yaml:"celestia_node_url"`
CelestiaAppGRPCAddr string `yaml:"celestia_app_grpc_addr"`
CelestiaAppGRPCInsecure bool `yaml:"celestia_app_grpc_insecure"` // allow plaintext gRPC to non-loopback celestia-app endpoints
BackfillSource string `yaml:"backfill_source"` // "rpc" (default) or "db" for app mode
CelestiaAppDBPath string `yaml:"celestia_app_db_path"` // required when backfill_source=db
CelestiaAppDBBackend string `yaml:"celestia_app_db_backend"` // auto|pebble|leveldb
CelestiaAppDBLayout string `yaml:"celestia_app_db_layout"` // auto|v1|v2
AuthToken string `yaml:"-"` //nolint:gosec // populated only via APEX_AUTH_TOKEN env var; not a hardcoded credential
Namespaces []string `yaml:"namespaces"`
}

// StorageConfig configures the persistence backend.
Expand Down Expand Up @@ -92,6 +94,18 @@ type LogConfig struct {
Format string `yaml:"format"`
}

// SubmissionConfig contains settings for the future blob submission pipeline.
type SubmissionConfig struct {
Enabled bool `yaml:"enabled"`
CelestiaAppGRPCAddr string `yaml:"app_grpc_addr"`
CelestiaAppGRPCInsecure bool `yaml:"app_grpc_insecure"` // allow plaintext gRPC to non-loopback celestia-app endpoints
ChainID string `yaml:"chain_id"`
SignerKey string `yaml:"signer_key"` // path to a file containing the hex-encoded secp256k1 key
GasPrice float64 `yaml:"gas_price"` // 0 means unset; callers must provide gas_price per request
MaxGasPrice float64 `yaml:"max_gas_price"` // 0 disables the max gas price cap
ConfirmationTimeout int `yaml:"confirmation_timeout"` // seconds
}

// DefaultConfig returns a Config with sensible defaults.
func DefaultConfig() Config {
return Config{
Expand Down Expand Up @@ -120,6 +134,15 @@ func DefaultConfig() Config {
BufferSize: 64,
MaxSubscribers: 1024,
},
Submission: SubmissionConfig{
Enabled: false,
CelestiaAppGRPCAddr: "",
ChainID: "",
SignerKey: "",
GasPrice: 0,
MaxGasPrice: 0,
ConfirmationTimeout: 30,
},
Metrics: MetricsConfig{
Enabled: true,
ListenAddr: ":9091",
Expand All @@ -143,6 +166,9 @@ func (c *Config) ParsedNamespaces() ([]types.Namespace, error) {
if err != nil {
return nil, fmt.Errorf("invalid namespace %q: %w", hex, err)
}
if err := ns.ValidateForBlob(); err != nil {
return nil, fmt.Errorf("invalid namespace %q: %w", hex, err)
}
namespaces = append(namespaces, ns)
}
return namespaces, nil
Expand Down
Loading
Loading