diff --git a/CHANGELOG.md b/CHANGELOG.md index 488fde511..4e507bdab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix race on startup sync. [#3162](https://github.com/evstack/ev-node/pull/3162) - Strict raft state. [#3167](https://github.com/evstack/ev-node/pull/3167) +- Retry fetching the timestamp on error in da-client [#3166](https://github.com/evstack/ev-node/pull/3166) ### Changes diff --git a/apps/evm/server/force_inclusion_test.go b/apps/evm/server/force_inclusion_test.go index a107a10d1..8b05cad68 100644 --- a/apps/evm/server/force_inclusion_test.go +++ b/apps/evm/server/force_inclusion_test.go @@ -34,6 +34,10 @@ func (m *mockDA) Retrieve(ctx context.Context, height uint64, namespace []byte) return da.ResultRetrieve{} } +func (m *mockDA) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) da.ResultRetrieve { + return da.ResultRetrieve{} +} + func (m *mockDA) RetrieveHeaders(ctx context.Context, height uint64) da.ResultRetrieve { return da.ResultRetrieve{} } diff --git a/block/internal/da/client.go b/block/internal/da/client.go index 0b30ed646..4db8e7716 100644 --- a/block/internal/da/client.go +++ b/block/internal/da/client.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "strings" + "sync" "time" "github.com/celestiaorg/go-square/v3/share" @@ -37,11 +38,81 @@ type client struct { dataNamespaceBz []byte forcedNamespaceBz []byte hasForcedNamespace bool + timestampCache *blockTimestampCache } // Ensure client implements the FullClient interface (Client + BlobGetter + Verifier). var _ FullClient = (*client)(nil) +const ( + blockTimestampFetchMaxAttempts = 3 + blockTimestampFetchBackoff = 100 * time.Millisecond + blockTimestampCacheWindow = 2048 +) + +type blockTimestampCache struct { + mu sync.RWMutex + byHeight map[uint64]time.Time + highest uint64 + window uint64 +} + +func newBlockTimestampCache(window uint64) *blockTimestampCache { + if window == 0 { + window = blockTimestampCacheWindow + } + return &blockTimestampCache{ + byHeight: make(map[uint64]time.Time), + window: window, + } +} + +func (c *blockTimestampCache) get(height uint64) (time.Time, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + blockTime, ok := c.byHeight[height] + return blockTime, ok +} + +func (c *blockTimestampCache) put(height uint64, blockTime time.Time) { + if c == nil || blockTime.IsZero() { + return + } + + blockTime = blockTime.UTC() + + c.mu.Lock() + defer c.mu.Unlock() + + minRetained := c.minRetainedHeightLocked() + if minRetained > 0 && height < minRetained { + return + } + + if height > c.highest { + c.highest = height + } + c.byHeight[height] = blockTime + + minRetained = c.minRetainedHeightLocked() + if minRetained == 0 { + return + } + for cachedHeight := range c.byHeight { + if cachedHeight < minRetained { + delete(c.byHeight, cachedHeight) + } + } +} + +func (c *blockTimestampCache) minRetainedHeightLocked() uint64 { + if c.window == 0 || c.highest < c.window-1 { + return 0 + } + return c.highest - c.window + 1 +} + // NewClient creates a new blob client wrapper with pre-calculated namespace bytes. func NewClient(cfg Config) FullClient { if cfg.DA == nil { @@ -66,6 +137,7 @@ func NewClient(cfg Config) FullClient { dataNamespaceBz: datypes.NamespaceFromString(cfg.DataNamespace).Bytes(), forcedNamespaceBz: forcedNamespaceBz, hasForcedNamespace: hasForcedNamespace, + timestampCache: newBlockTimestampCache(blockTimestampCacheWindow), } } @@ -184,21 +256,76 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace // getBlockTimestamp fetches the block timestamp from the DA layer header. func (c *client) getBlockTimestamp(ctx context.Context, height uint64) (time.Time, error) { - headerCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) - defer cancel() + var lastErr error + backoff := blockTimestampFetchBackoff + + for attempt := 1; attempt <= blockTimestampFetchMaxAttempts; attempt++ { + headerCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + header, err := c.headerAPI.GetByHeight(headerCtx, height) + cancel() + if err == nil { + blockTime := header.Time().UTC() + c.storeBlockTimestamp(height, blockTime) + return blockTime, nil + } + lastErr = err - header, err := c.headerAPI.GetByHeight(headerCtx, height) - if err != nil { - return time.Time{}, fmt.Errorf("failed to get header timestamp for block %d: %w", height, err) + if attempt == blockTimestampFetchMaxAttempts { + break + } + + c.logger.Info(). + Uint64("height", height). + Int("attempt", attempt). + Int("max_attempts", blockTimestampFetchMaxAttempts). + Dur("retry_in", backoff). + Err(err). + Msg("failed to get block timestamp, retrying") + + select { + case <-ctx.Done(): + return time.Time{}, fmt.Errorf("fetching header timestamp for block %d: %w", height, ctx.Err()) + case <-time.After(backoff): + } + + backoff *= 2 } - return header.Time(), nil + return time.Time{}, fmt.Errorf("get header timestamp for block %d after %d attempts: %w", height, blockTimestampFetchMaxAttempts, lastErr) +} + +func (c *client) cachedBlockTimestamp(height uint64) (time.Time, bool) { + return c.timestampCache.get(height) +} + +func (c *client) storeBlockTimestamp(height uint64, blockTime time.Time) { + c.timestampCache.put(height, blockTime) +} + +func (c *client) resolveBlockTimestamp(ctx context.Context, height uint64, strict bool) (time.Time, error) { + if !strict { + if blockTime, ok := c.cachedBlockTimestamp(height); ok { + return blockTime, nil + } + return time.Time{}, nil + } + + return c.getBlockTimestamp(ctx, height) +} + +// RetrieveBlobs retrieves blobs without blocking on DA header timestamps. +func (c *client) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + return c.retrieve(ctx, height, namespace, false) } // Retrieve retrieves blobs from the DA layer at the specified height and namespace. // It uses GetAll to fetch all blobs at once. // The timestamp is derived from the DA block header to ensure determinism. func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + return c.retrieve(ctx, height, namespace, true) +} + +func (c *client) retrieve(ctx context.Context, height uint64, namespace []byte, strictTimestamp bool) datypes.ResultRetrieve { ns, err := share.NewNamespaceFromBytes(namespace) if err != nil { return datypes.ResultRetrieve{ @@ -220,12 +347,16 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) switch { case strings.Contains(err.Error(), datypes.ErrBlobNotFound.Error()): c.logger.Debug().Uint64("height", height).Msg("No blobs found at height") - // Fetch block timestamp for deterministic responses using parent context - blockTime, err := c.getBlockTimestamp(ctx, height) + blockTime, err := c.resolveBlockTimestamp(ctx, height, strictTimestamp) if err != nil { c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp") - blockTime = time.Now() - // TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers. + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusError, + Message: fmt.Sprintf("failed to get block timestamp: %s", err.Error()), + Height: height, + }, + } } return datypes.ResultRetrieve{ @@ -258,12 +389,16 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) } } - // Fetch block timestamp for deterministic responses using parent context - blockTime, err := c.getBlockTimestamp(ctx, height) + blockTime, err := c.resolveBlockTimestamp(ctx, height, strictTimestamp) if err != nil { c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp") - blockTime = time.Now() - // TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers. + return datypes.ResultRetrieve{ + BaseResult: datypes.BaseResult{ + Code: datypes.StatusError, + Message: fmt.Sprintf("failed to get block timestamp: %s", err.Error()), + Height: height, + }, + } } if len(blobs) == 0 { @@ -386,14 +521,14 @@ func (c *client) Subscribe(ctx context.Context, namespace []byte, includeTimesta var blockTime time.Time // Use header time if available (celestia-node v0.21.0+) if resp.Header != nil && !resp.Header.Time.IsZero() { - blockTime = resp.Header.Time + blockTime = resp.Header.Time.UTC() + c.storeBlockTimestamp(resp.Height, blockTime) } else if includeTimestamp { // Fallback to fetching timestamp for older nodes blockTime, err = c.getBlockTimestamp(ctx, resp.Height) if err != nil { c.logger.Error().Uint64("height", resp.Height).Err(err).Msg("failed to get DA block timestamp for subscription event") - blockTime = time.Now() - // TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers. + blockTime = time.Time{} } } select { diff --git a/block/internal/da/client_test.go b/block/internal/da/client_test.go index ed92a7727..ea185e9fe 100644 --- a/block/internal/da/client_test.go +++ b/block/internal/da/client_test.go @@ -156,6 +156,205 @@ func TestClient_Retrieve_Success(t *testing.T) { require.Equal(t, expectedTime, res.Timestamp) } +func TestClient_Retrieve_TimestampFetchRetry(t *testing.T) { + ns := share.MustNewV0Namespace([]byte("ns")) + nsBz := ns.Bytes() + fixedTime := time.Date(2024, 1, 2, 12, 0, 0, 0, time.UTC) + + specs := map[string]struct { + getAllErr error + headerFailures int + wantStatus datypes.StatusCode + wantTimestamp time.Time + wantMessageSubstr string + wantHeaderCalls int + }{ + "success_retries_timestamp_fetch": { + getAllErr: nil, + headerFailures: 2, + wantStatus: datypes.StatusSuccess, + wantTimestamp: fixedTime, + wantHeaderCalls: 3, + }, + "not_found_fails_hard_when_timestamp_unavailable": { + getAllErr: datypes.ErrBlobNotFound, + headerFailures: blockTimestampFetchMaxAttempts, + wantStatus: datypes.StatusError, + wantMessageSubstr: "failed to get block timestamp", + wantHeaderCalls: blockTimestampFetchMaxAttempts, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + blobModule := mocks.NewMockBlobModule(t) + headerModule := mocks.NewMockHeaderModule(t) + + if spec.getAllErr != nil { + blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob(nil), spec.getAllErr).Once() + } else { + b, err := blobrpc.NewBlobV0(ns, []byte("payload")) + require.NoError(t, err) + blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob{b}, nil).Once() + } + + headerCalls := 0 + headerModule.EXPECT().GetByHeight(mock.Anything, uint64(7)).RunAndReturn(func(context.Context, uint64) (*blobrpc.Header, error) { + headerCalls++ + if headerCalls <= spec.headerFailures { + return nil, errors.New("header unavailable") + } + return &blobrpc.Header{Header: blobrpc.RawHeader{Time: fixedTime}}, nil + }) + + cl := NewClient(Config{ + DA: makeBlobRPCClient(blobModule, headerModule), + Logger: zerolog.Nop(), + Namespace: "ns", + DataNamespace: "ns", + }) + + res := cl.Retrieve(context.Background(), 7, nsBz) + require.Equal(t, spec.wantStatus, res.Code) + require.Equal(t, spec.wantHeaderCalls, headerCalls) + + if !spec.wantTimestamp.IsZero() { + require.Equal(t, spec.wantTimestamp, res.Timestamp) + } + if spec.wantMessageSubstr != "" { + require.Contains(t, res.Message, spec.wantMessageSubstr) + } + }) + } +} + +func TestClient_RetrieveBlobs_TimestampBehavior(t *testing.T) { + ns := share.MustNewV0Namespace([]byte("ns")) + nsBz := ns.Bytes() + fixedTime := time.Date(2024, 1, 3, 12, 0, 0, 0, time.UTC) + + specs := map[string]struct { + primeCache bool + getAllErr error + wantStatus datypes.StatusCode + wantTimestamp time.Time + wantHeaderCalls int + }{ + "uncached_skips_header_fetch": { + wantStatus: datypes.StatusSuccess, + wantTimestamp: time.Time{}, + wantHeaderCalls: 0, + }, + "cached_reuses_timestamp": { + primeCache: true, + getAllErr: datypes.ErrBlobNotFound, + wantStatus: datypes.StatusNotFound, + wantTimestamp: fixedTime, + wantHeaderCalls: 1, + }, + } + + for name, spec := range specs { + t.Run(name, func(t *testing.T) { + blobModule := mocks.NewMockBlobModule(t) + headerModule := mocks.NewMockHeaderModule(t) + + payloadBlob, err := blobrpc.NewBlobV0(ns, []byte("payload")) + require.NoError(t, err) + + headerCalls := 0 + headerModule.EXPECT().GetByHeight(mock.Anything, uint64(7)).RunAndReturn(func(context.Context, uint64) (*blobrpc.Header, error) { + headerCalls++ + return &blobrpc.Header{Header: blobrpc.RawHeader{Time: fixedTime}}, nil + }).Maybe() + + cl := NewClient(Config{ + DA: makeBlobRPCClient(blobModule, headerModule), + Logger: zerolog.Nop(), + Namespace: "ns", + DataNamespace: "ns", + }) + + if spec.primeCache { + blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob{payloadBlob}, nil).Once() + res := cl.Retrieve(context.Background(), 7, nsBz) + require.Equal(t, datypes.StatusSuccess, res.Code) + require.Equal(t, fixedTime, res.Timestamp) + } + + if spec.getAllErr != nil { + blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob(nil), spec.getAllErr).Once() + } else { + blobModule.On("GetAll", mock.Anything, uint64(7), mock.Anything).Return([]*blobrpc.Blob{payloadBlob}, nil).Once() + } + + res := cl.RetrieveBlobs(context.Background(), 7, nsBz) + require.Equal(t, spec.wantStatus, res.Code) + require.Equal(t, spec.wantTimestamp, res.Timestamp) + require.Equal(t, spec.wantHeaderCalls, headerCalls) + }) + } +} + +func TestBlockTimestampCache_Bounded(t *testing.T) { + cache := newBlockTimestampCache(2) + t1 := time.Date(2024, 1, 4, 12, 0, 0, 0, time.UTC) + t2 := t1.Add(time.Second) + t3 := t2.Add(time.Second) + + cache.put(10, t1) + cache.put(11, t2) + cache.put(12, t3) + + _, ok := cache.get(10) + require.False(t, ok) + + got, ok := cache.get(11) + require.True(t, ok) + require.Equal(t, t2, got) + + got, ok = cache.get(12) + require.True(t, ok) + require.Equal(t, t3, got) +} + +func TestClient_Subscribe_PrimesTimestampCache(t *testing.T) { + ns := share.MustNewV0Namespace([]byte("ns")) + nsBz := ns.Bytes() + fixedTime := time.Date(2024, 1, 5, 12, 0, 0, 0, time.UTC) + + blobModule := mocks.NewMockBlobModule(t) + subCh := make(chan *blobrpc.SubscriptionResponse, 1) + blobModule.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan *blobrpc.SubscriptionResponse)(subCh), nil).Once() + blobModule.On("GetAll", mock.Anything, uint64(10), mock.Anything).Return([]*blobrpc.Blob(nil), datypes.ErrBlobNotFound).Once() + + cl := NewClient(Config{ + DA: makeBlobRPCClient(blobModule, nil), + Logger: zerolog.Nop(), + Namespace: "ns", + DataNamespace: "ns", + }) + + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + events, err := cl.Subscribe(ctx, nsBz, false) + require.NoError(t, err) + + subCh <- &blobrpc.SubscriptionResponse{ + Height: 10, + Header: &blobrpc.RawHeader{Time: fixedTime}, + } + + ev := <-events + require.Equal(t, uint64(10), ev.Height) + require.Equal(t, fixedTime, ev.Timestamp) + + res := cl.RetrieveBlobs(t.Context(), 10, nsBz) + require.Equal(t, datypes.StatusNotFound, res.Code) + require.Equal(t, fixedTime, res.Timestamp) +} + func TestClient_SubmitOptionsMerge(t *testing.T) { ns := share.MustNewV0Namespace([]byte("ns")).Bytes() blobModule := mocks.NewMockBlobModule(t) diff --git a/block/internal/da/interface.go b/block/internal/da/interface.go index 812d12847..f1272087f 100644 --- a/block/internal/da/interface.go +++ b/block/internal/da/interface.go @@ -14,6 +14,11 @@ type Client interface { // Retrieve retrieves blobs from the DA layer at the specified height and namespace. Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve + // RetrieveBlobs retrieves blobs from the DA layer at the specified height and namespace + // without requiring a DA header timestamp. Callers that need deterministic DA time should + // use Retrieve instead. + RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve + // Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs. Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) diff --git a/block/internal/da/tracing.go b/block/internal/da/tracing.go index 3da79fedd..c41c92049 100644 --- a/block/internal/da/tracing.go +++ b/block/internal/da/tracing.go @@ -67,6 +67,26 @@ func (t *tracedClient) Retrieve(ctx context.Context, height uint64, namespace [] return res } +func (t *tracedClient) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + ctx, span := t.tracer.Start(ctx, "DA.RetrieveBlobs", + trace.WithAttributes( + attribute.Int("ns.length", len(namespace)), + attribute.String("da.namespace", hex.EncodeToString(namespace)), + ), + ) + defer span.End() + + res := t.inner.RetrieveBlobs(ctx, height, namespace) + + if res.Code != datypes.StatusSuccess && res.Code != datypes.StatusNotFound { + span.RecordError(&submitError{msg: res.Message}) + span.SetStatus(codes.Error, res.Message) + } else { + span.SetAttributes(attribute.Int("blob.count", len(res.Data))) + } + return res +} + func (t *tracedClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { ctx, span := t.tracer.Start(ctx, "DA.Get", trace.WithAttributes( diff --git a/block/internal/da/tracing_test.go b/block/internal/da/tracing_test.go index fc1ae72f9..9ea344e3a 100644 --- a/block/internal/da/tracing_test.go +++ b/block/internal/da/tracing_test.go @@ -17,12 +17,13 @@ import ( // mockFullClient provides function hooks for testing the tracing decorator. type mockFullClient struct { - submitFn func(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit - retrieveFn func(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve - getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) - getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) - validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) - subscribeFn func(ctx context.Context, namespace []byte, ts bool) (<-chan datypes.SubscriptionEvent, error) + submitFn func(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit + retrieveFn func(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve + retrieveBlobsFn func(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve + getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) + getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error) + validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) + subscribeFn func(ctx context.Context, namespace []byte, ts bool) (<-chan datypes.SubscriptionEvent, error) } func (m *mockFullClient) Subscribe(ctx context.Context, namespace []byte, ts bool) (<-chan datypes.SubscriptionEvent, error) { @@ -44,6 +45,12 @@ func (m *mockFullClient) Retrieve(ctx context.Context, height uint64, namespace } return datypes.ResultRetrieve{} } +func (m *mockFullClient) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + if m.retrieveBlobsFn != nil { + return m.retrieveBlobsFn(ctx, height, namespace) + } + return datypes.ResultRetrieve{} +} func (m *mockFullClient) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { if m.getFn != nil { return m.getFn(ctx, ids, namespace) @@ -164,6 +171,26 @@ func TestTracedDA_Retrieve_Error(t *testing.T) { require.Equal(t, "oops", span.Status().Description) } +func TestTracedDA_RetrieveBlobs_Success(t *testing.T) { + mock := &mockFullClient{ + retrieveBlobsFn: func(ctx context.Context, height uint64, _ []byte) datypes.ResultRetrieve { + return datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Height: height}, Data: []datypes.Blob{{}}} + }, + } + client, sr := setupDATrace(t, mock) + ctx := context.Background() + + _ = client.RetrieveBlobs(ctx, 11, []byte{0x01}) + + spans := sr.Ended() + require.Len(t, spans, 1) + span := spans[0] + require.Equal(t, "DA.RetrieveBlobs", span.Name()) + attrs := span.Attributes() + testutil.RequireAttribute(t, attrs, "ns.length", 1) + testutil.RequireAttribute(t, attrs, "blob.count", 1) +} + func TestTracedDA_Get_Success(t *testing.T) { mock := &mockFullClient{ getFn: func(ctx context.Context, ids []datypes.ID, _ []byte) ([]datypes.Blob, error) { diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index ff1a9e3d5..f1ea3ba64 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -84,14 +84,14 @@ func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co // fetchBlobs retrieves blobs from both header and data namespaces func (r *daRetriever) fetchBlobs(ctx context.Context, daHeight uint64) (datypes.ResultRetrieve, error) { // Retrieve from both namespaces using the DA client - headerRes := r.client.Retrieve(ctx, daHeight, r.client.GetHeaderNamespace()) + headerRes := r.client.RetrieveBlobs(ctx, daHeight, r.client.GetHeaderNamespace()) // If namespaces are the same, return header result if bytes.Equal(r.client.GetHeaderNamespace(), r.client.GetDataNamespace()) { return headerRes, r.validateBlobResponse(headerRes, daHeight) } - dataRes := r.client.Retrieve(ctx, daHeight, r.client.GetDataNamespace()) + dataRes := r.client.RetrieveBlobs(ctx, daHeight, r.client.GetDataNamespace()) // Validate responses headerErr := r.validateBlobResponse(headerRes, daHeight) diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index cbb527488..d5e5a7ab6 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -85,8 +85,8 @@ func TestDARetriever_RetrieveFromDA_Invalid(t *testing.T) { client.On("GetDataNamespace").Return([]byte("test-data-ns")).Maybe() client.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() client.On("HasForcedInclusionNamespace").Return(false).Maybe() - client.On("Retrieve", mock.Anything, uint64(42), []byte("test-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}}).Once() - client.On("Retrieve", mock.Anything, uint64(42), []byte("test-data-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "just invalid"}}).Once() + client.On("RetrieveBlobs", mock.Anything, uint64(42), []byte("test-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}}).Once() + client.On("RetrieveBlobs", mock.Anything, uint64(42), []byte("test-data-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: "just invalid"}}).Once() r := newTestDARetriever(t, client, config.DefaultConfig(), genesis.Genesis{}) events, err := r.RetrieveFromDA(context.Background(), 42) @@ -100,8 +100,8 @@ func TestDARetriever_RetrieveFromDA_NotFound(t *testing.T) { client.On("GetDataNamespace").Return([]byte("test-data-ns")).Maybe() client.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() client.On("HasForcedInclusionNamespace").Return(false).Maybe() - client.On("Retrieve", mock.Anything, uint64(42), []byte("test-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}}).Once() - client.On("Retrieve", mock.Anything, uint64(42), []byte("test-data-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound, Message: fmt.Sprintf("%s: whatever", datypes.ErrBlobNotFound.Error())}}).Once() + client.On("RetrieveBlobs", mock.Anything, uint64(42), []byte("test-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}}).Once() + client.On("RetrieveBlobs", mock.Anything, uint64(42), []byte("test-data-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound, Message: fmt.Sprintf("%s: whatever", datypes.ErrBlobNotFound.Error())}}).Once() r := newTestDARetriever(t, client, config.DefaultConfig(), genesis.Genesis{}) events, err := r.RetrieveFromDA(context.Background(), 42) @@ -115,8 +115,8 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) { client.On("GetDataNamespace").Return([]byte("test-data-ns")).Maybe() client.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() client.On("HasForcedInclusionNamespace").Return(false).Maybe() - client.On("Retrieve", mock.Anything, uint64(1000), []byte("test-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}}).Once() - client.On("Retrieve", mock.Anything, uint64(1000), []byte("test-data-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture, Message: fmt.Sprintf("%s: later", datypes.ErrHeightFromFuture.Error())}}).Once() + client.On("RetrieveBlobs", mock.Anything, uint64(1000), []byte("test-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess}}).Once() + client.On("RetrieveBlobs", mock.Anything, uint64(1000), []byte("test-data-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture, Message: fmt.Sprintf("%s: later", datypes.ErrHeightFromFuture.Error())}}).Once() r := newTestDARetriever(t, client, config.DefaultConfig(), genesis.Genesis{}) events, derr := r.RetrieveFromDA(context.Background(), 1000) @@ -131,8 +131,8 @@ func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) { client.On("GetDataNamespace").Return([]byte("test-data-ns")).Maybe() client.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() client.On("HasForcedInclusionNamespace").Return(false).Maybe() - client.On("Retrieve", mock.Anything, uint64(42), []byte("test-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: context.DeadlineExceeded.Error()}}).Once() - client.On("Retrieve", mock.Anything, uint64(42), []byte("test-data-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusContextDeadline, Message: context.DeadlineExceeded.Error()}}).Once() + client.On("RetrieveBlobs", mock.Anything, uint64(42), []byte("test-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusError, Message: context.DeadlineExceeded.Error()}}).Once() + client.On("RetrieveBlobs", mock.Anything, uint64(42), []byte("test-data-ns")).Return(datypes.ResultRetrieve{BaseResult: datypes.BaseResult{Code: datypes.StatusContextDeadline, Message: context.DeadlineExceeded.Error()}}).Once() r := newTestDARetriever(t, client, config.DefaultConfig(), genesis.Genesis{}) @@ -259,11 +259,11 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) { client.On("GetDataNamespace").Return([]byte("nsData")).Maybe() client.On("GetForcedInclusionNamespace").Return([]byte(nil)).Maybe() client.On("HasForcedInclusionNamespace").Return(false).Maybe() - client.On("Retrieve", mock.Anything, uint64(1234), []byte("nsHdr")).Return(datypes.ResultRetrieve{ + client.On("RetrieveBlobs", mock.Anything, uint64(1234), []byte("nsHdr")).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: [][]byte{[]byte("h1")}, Timestamp: time.Now()}, Data: [][]byte{hdrBin}, }).Once() - client.On("Retrieve", mock.Anything, uint64(1234), []byte("nsData")).Return(datypes.ResultRetrieve{ + client.On("RetrieveBlobs", mock.Anything, uint64(1234), []byte("nsData")).Return(datypes.ResultRetrieve{ BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: [][]byte{[]byte("d1")}, Timestamp: time.Now()}, Data: [][]byte{dataBin}, }).Once() diff --git a/test/mocks/da.go b/test/mocks/da.go index f3a4e960d..e1daf7003 100644 --- a/test/mocks/da.go +++ b/test/mocks/da.go @@ -417,6 +417,69 @@ func (_c *MockClient_Retrieve_Call) RunAndReturn(run func(ctx context.Context, h return _c } +// RetrieveBlobs provides a mock function for the type MockClient +func (_mock *MockClient) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) da.ResultRetrieve { + ret := _mock.Called(ctx, height, namespace) + + if len(ret) == 0 { + panic("no return value specified for RetrieveBlobs") + } + + var r0 da.ResultRetrieve + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64, []byte) da.ResultRetrieve); ok { + r0 = returnFunc(ctx, height, namespace) + } else { + r0 = ret.Get(0).(da.ResultRetrieve) + } + return r0 +} + +// MockClient_RetrieveBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveBlobs' +type MockClient_RetrieveBlobs_Call struct { + *mock.Call +} + +// RetrieveBlobs is a helper method to define mock.On call +// - ctx context.Context +// - height uint64 +// - namespace []byte +func (_e *MockClient_Expecter) RetrieveBlobs(ctx interface{}, height interface{}, namespace interface{}) *MockClient_RetrieveBlobs_Call { + return &MockClient_RetrieveBlobs_Call{Call: _e.mock.On("RetrieveBlobs", ctx, height, namespace)} +} + +func (_c *MockClient_RetrieveBlobs_Call) Run(run func(ctx context.Context, height uint64, namespace []byte)) *MockClient_RetrieveBlobs_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 uint64 + if args[1] != nil { + arg1 = args[1].(uint64) + } + var arg2 []byte + if args[2] != nil { + arg2 = args[2].([]byte) + } + run( + arg0, + arg1, + arg2, + ) + }) + return _c +} + +func (_c *MockClient_RetrieveBlobs_Call) Return(resultRetrieve da.ResultRetrieve) *MockClient_RetrieveBlobs_Call { + _c.Call.Return(resultRetrieve) + return _c +} + +func (_c *MockClient_RetrieveBlobs_Call) RunAndReturn(run func(ctx context.Context, height uint64, namespace []byte) da.ResultRetrieve) *MockClient_RetrieveBlobs_Call { + _c.Call.Return(run) + return _c +} + // Submit provides a mock function for the type MockClient func (_mock *MockClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) da.ResultSubmit { ret := _mock.Called(ctx, data, gasPrice, namespace, options) diff --git a/test/testda/dummy.go b/test/testda/dummy.go index 2f92e47e6..e1f93642c 100644 --- a/test/testda/dummy.go +++ b/test/testda/dummy.go @@ -224,6 +224,13 @@ func (d *DummyDA) Retrieve(_ context.Context, height uint64, namespace []byte) d } } +// RetrieveBlobs returns blobs stored at the given height and namespace without +// requiring a timestamp lookup. DummyDA already serves timestamps from memory, +// so this shares the same implementation. +func (d *DummyDA) RetrieveBlobs(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve { + return d.Retrieve(ctx, height, namespace) +} + // GetHeaderNamespace returns the header namespace. func (d *DummyDA) GetHeaderNamespace() []byte { return []byte("hdr") }