Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Fix race on startup sync. [#3162](https://github.com/evstack/ev-node/pull/3162)
- Strict raft state. [#3167](https://github.com/evstack/ev-node/pull/3167)
- Retry fetching the timestamp on error in da-client [#3166](https://github.com/evstack/ev-node/pull/3166)

### Changes

Expand Down
4 changes: 4 additions & 0 deletions apps/evm/server/force_inclusion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (m *mockDA) Retrieve(ctx context.Context, height uint64, namespace []byte)
return da.ResultRetrieve{}
}

func (m *mockDA) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) da.ResultRetrieve {
return da.ResultRetrieve{}
}

func (m *mockDA) RetrieveHeaders(ctx context.Context, height uint64) da.ResultRetrieve {
return da.ResultRetrieve{}
}
Expand Down
169 changes: 152 additions & 17 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"strings"
"sync"
"time"

"github.com/celestiaorg/go-square/v3/share"
Expand Down Expand Up @@ -37,11 +38,81 @@ type client struct {
dataNamespaceBz []byte
forcedNamespaceBz []byte
hasForcedNamespace bool
timestampCache *blockTimestampCache
}

// Ensure client implements the FullClient interface (Client + BlobGetter + Verifier).
var _ FullClient = (*client)(nil)

const (
blockTimestampFetchMaxAttempts = 3
blockTimestampFetchBackoff = 100 * time.Millisecond
blockTimestampCacheWindow = 2048
)

type blockTimestampCache struct {
mu sync.RWMutex
byHeight map[uint64]time.Time
highest uint64
window uint64
}

func newBlockTimestampCache(window uint64) *blockTimestampCache {
if window == 0 {
window = blockTimestampCacheWindow
}
return &blockTimestampCache{
byHeight: make(map[uint64]time.Time),
window: window,
}
}

func (c *blockTimestampCache) get(height uint64) (time.Time, bool) {
c.mu.RLock()
defer c.mu.RUnlock()

blockTime, ok := c.byHeight[height]
return blockTime, ok
}

func (c *blockTimestampCache) put(height uint64, blockTime time.Time) {
if c == nil || blockTime.IsZero() {
return
}

blockTime = blockTime.UTC()

c.mu.Lock()
defer c.mu.Unlock()

minRetained := c.minRetainedHeightLocked()
if minRetained > 0 && height < minRetained {
return
}

if height > c.highest {
c.highest = height
}
c.byHeight[height] = blockTime

minRetained = c.minRetainedHeightLocked()
if minRetained == 0 {
return
}
for cachedHeight := range c.byHeight {
if cachedHeight < minRetained {
delete(c.byHeight, cachedHeight)
}
}
}

func (c *blockTimestampCache) minRetainedHeightLocked() uint64 {
if c.window == 0 || c.highest < c.window-1 {
return 0
}
return c.highest - c.window + 1
}

// NewClient creates a new blob client wrapper with pre-calculated namespace bytes.
func NewClient(cfg Config) FullClient {
if cfg.DA == nil {
Expand All @@ -66,6 +137,7 @@ func NewClient(cfg Config) FullClient {
dataNamespaceBz: datypes.NamespaceFromString(cfg.DataNamespace).Bytes(),
forcedNamespaceBz: forcedNamespaceBz,
hasForcedNamespace: hasForcedNamespace,
timestampCache: newBlockTimestampCache(blockTimestampCacheWindow),
}
}

Expand Down Expand Up @@ -184,21 +256,76 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace

// getBlockTimestamp fetches the block timestamp from the DA layer header.
func (c *client) getBlockTimestamp(ctx context.Context, height uint64) (time.Time, error) {
headerCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
defer cancel()
var lastErr error
backoff := blockTimestampFetchBackoff

for attempt := 1; attempt <= blockTimestampFetchMaxAttempts; attempt++ {
headerCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
header, err := c.headerAPI.GetByHeight(headerCtx, height)
cancel()
if err == nil {
blockTime := header.Time().UTC()
c.storeBlockTimestamp(height, blockTime)
return blockTime, nil
}
lastErr = err

header, err := c.headerAPI.GetByHeight(headerCtx, height)
if err != nil {
return time.Time{}, fmt.Errorf("failed to get header timestamp for block %d: %w", height, err)
if attempt == blockTimestampFetchMaxAttempts {
break
}

c.logger.Info().
Uint64("height", height).
Int("attempt", attempt).
Int("max_attempts", blockTimestampFetchMaxAttempts).
Dur("retry_in", backoff).
Err(err).
Msg("failed to get block timestamp, retrying")

select {
case <-ctx.Done():
return time.Time{}, fmt.Errorf("fetching header timestamp for block %d: %w", height, ctx.Err())
case <-time.After(backoff):
}

backoff *= 2
}

return header.Time(), nil
return time.Time{}, fmt.Errorf("get header timestamp for block %d after %d attempts: %w", height, blockTimestampFetchMaxAttempts, lastErr)
}

func (c *client) cachedBlockTimestamp(height uint64) (time.Time, bool) {
return c.timestampCache.get(height)
}

func (c *client) storeBlockTimestamp(height uint64, blockTime time.Time) {
c.timestampCache.put(height, blockTime)
}

func (c *client) resolveBlockTimestamp(ctx context.Context, height uint64, strict bool) (time.Time, error) {
if !strict {
if blockTime, ok := c.cachedBlockTimestamp(height); ok {
return blockTime, nil
}
return time.Time{}, nil
}

return c.getBlockTimestamp(ctx, height)
}

// RetrieveBlobs retrieves blobs without blocking on DA header timestamps.
func (c *client) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
return c.retrieve(ctx, height, namespace, false)
}

// Retrieve retrieves blobs from the DA layer at the specified height and namespace.
// It uses GetAll to fetch all blobs at once.
// The timestamp is derived from the DA block header to ensure determinism.
func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
return c.retrieve(ctx, height, namespace, true)
}

func (c *client) retrieve(ctx context.Context, height uint64, namespace []byte, strictTimestamp bool) datypes.ResultRetrieve {
ns, err := share.NewNamespaceFromBytes(namespace)
if err != nil {
return datypes.ResultRetrieve{
Expand All @@ -220,12 +347,16 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
switch {
case strings.Contains(err.Error(), datypes.ErrBlobNotFound.Error()):
c.logger.Debug().Uint64("height", height).Msg("No blobs found at height")
// Fetch block timestamp for deterministic responses using parent context
blockTime, err := c.getBlockTimestamp(ctx, height)
blockTime, err := c.resolveBlockTimestamp(ctx, height, strictTimestamp)
if err != nil {
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
blockTime = time.Now()
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
return datypes.ResultRetrieve{
BaseResult: datypes.BaseResult{
Code: datypes.StatusError,
Message: fmt.Sprintf("failed to get block timestamp: %s", err.Error()),
Height: height,
},
}
}

return datypes.ResultRetrieve{
Expand Down Expand Up @@ -258,12 +389,16 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
}
}

// Fetch block timestamp for deterministic responses using parent context
blockTime, err := c.getBlockTimestamp(ctx, height)
blockTime, err := c.resolveBlockTimestamp(ctx, height, strictTimestamp)
if err != nil {
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
blockTime = time.Now()
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
return datypes.ResultRetrieve{
BaseResult: datypes.BaseResult{
Code: datypes.StatusError,
Message: fmt.Sprintf("failed to get block timestamp: %s", err.Error()),
Height: height,
},
}
}

if len(blobs) == 0 {
Expand Down Expand Up @@ -386,14 +521,14 @@ func (c *client) Subscribe(ctx context.Context, namespace []byte, includeTimesta
var blockTime time.Time
// Use header time if available (celestia-node v0.21.0+)
if resp.Header != nil && !resp.Header.Time.IsZero() {
blockTime = resp.Header.Time
blockTime = resp.Header.Time.UTC()
c.storeBlockTimestamp(resp.Height, blockTime)
} else if includeTimestamp {
// Fallback to fetching timestamp for older nodes
blockTime, err = c.getBlockTimestamp(ctx, resp.Height)
if err != nil {
c.logger.Error().Uint64("height", resp.Height).Err(err).Msg("failed to get DA block timestamp for subscription event")
blockTime = time.Now()
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
blockTime = time.Time{}
}
}
select {
Expand Down
Loading
Loading