Skip to content

fix: startup races#3172

Open
alpe wants to merge 7 commits intomainfrom
alex/fix_startup
Open

fix: startup races#3172
alpe wants to merge 7 commits intomainfrom
alex/fix_startup

Conversation

@alpe
Copy link
Contributor

@alpe alpe commented Mar 17, 2026

Overview

E2E HA tests fail sometimes on a race when the leader is waiting for p2p sync complete on a fresh start.
Sync store before closing DB.

Summary by CodeRabbit

Release Notes

  • New Features

    • Added publisher-mode synchronization option for failover scenarios with early P2P infrastructure readiness.
  • Bug Fixes

    • Enhanced stale block detection during failover to prevent resuming outdated execution states.
    • Improved store durability with synchronized graceful shutdown.
  • Tests

    • Added E2E artifact logging for improved failure diagnostics.
    • Extended test coverage for synchronization lifecycle and failover recovery scenarios.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 17, 2026

📝 Walkthrough

Walkthrough

The PR adds store synchronization capabilities to the public interface, introduces publisher-mode sync initialization for raft leaders, refactors syncer lifecycle management with mutex-based state tracking, enhances EVM execution recovery against stale inputs, implements raft-aware failover sync branching, and improves E2E testing infrastructure.

Changes

Cohort / File(s) Summary
Store Sync Interface
pkg/store/types.go
Introduces new Syncer interface with Sync method and embeds it into Store interface, extending the public store API.
Store Sync Implementations
pkg/store/cached_store.go, pkg/store/store.go, pkg/store/tracing.go
Adds Sync method implementations across cached, core, and traced store variants; modifies Close to invoke Sync before shutdown with timeout handling.
Store Sync Testing
pkg/store/store_test.go, pkg/store/tracing_test.go, test/mocks/store.go
Adds test utilities and mocks for Sync behavior including verification that Close triggers Sync, traced store span recording, and comprehensive mock support.
Sync Service Publisher Mode
pkg/sync/sync_service.go, pkg/sync/sync_service_test.go
Introduces StartForPublishing method for raft-leader startup, refactors startup sequencing with prepareStart and revised initFromP2PWithRetry, updates startSyncer to return boolean indicating first-time start.
Syncer Lifecycle Management
pkg/sync/syncer_status.go, pkg/sync/syncer_status_test.go
Replaces atomic-based state with mutex-protected boolean; adds startOnce and stopIfStarted methods for coordinated lifecycle transitions with comprehensive concurrency testing.
Node Failover Leadership
node/failover.go
Adds raft leadership and aggregator mode tracking; introduces shouldStartSyncInPublisherMode to conditionally branch sync startup between publishing and standard modes based on leadership state.
EVM Execution Recovery
execution/evm/execution.go, execution/evm/execution_reconcile_test.go
Implements stale input detection by comparing ExecMeta timestamps and transaction hashes; introduces hashTxs helper and test suite validating payload resume logic under matching/mismatched input conditions.
E2E Testing Infrastructure
.github/workflows/test.yml, test/e2e/failover_e2e_test.go, test/e2e/sut_helper.go
Adds E2E artifact upload on failure, implements per-node P2P peer addressing and multiaddress computation, improves logging with processLogDir helper and enhanced process cleanup/watchLogs utilities.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested labels

T:testing

Suggested reviewers

  • julienrbrt
  • tac0turtle

Poem

🐰 A store that syncs before it sleeps,
Publisher modes the leader keeps,
Stale inputs caught, fresh executions flow,
Mutex locks guard the status below,
E2E logs now capture the tale!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 23.53% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix: startup races' is concise and directly related to the PR's main objective of resolving race conditions during startup, as evidenced by the description and changes addressing P2P sync races.
Description check ✅ Passed The PR description provides a clear overview explaining the startup race issue and the solution (syncing store before closing DB), though it lacks detail about implementation scope and testing.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch alex/fix_startup
📝 Coding Plan
  • Generate coding plan for human review comments

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions
Copy link
Contributor

github-actions bot commented Mar 17, 2026

The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).

BuildFormatLintBreakingUpdated (UTC)
✅ passed⏩ skipped✅ passed✅ passedMar 18, 2026, 3:30 PM

@claude
Copy link
Contributor

claude bot commented Mar 17, 2026

Claude finished @alpe's task in 2m 56s —— View job


Code Review: fix: startup races

  • Gather context and understand PR scope
  • Review changed files
    • pkg/sync/sync_service.go - Core sync service changes
    • pkg/sync/syncer_status.go - New syncer status tracking
    • node/failover.go - Failover logic changes
    • execution/evm/execution.go - EVM execution reconcile
    • pkg/store/ - Store changes
    • Tests and E2E changes
  • Analyze concurrency patterns and race conditions
  • Post comprehensive review

Summary

This PR fixes a real race in the HA startup path: when a fresh raft leader has an empty store, it was blocking in initFromP2PWithRetry waiting for peers to serve height 1 — which can't exist yet since the leader hasn't produced any blocks. The fix is correct and well-structured. The new StartForPublishing / SyncerStatus abstractions are clean.

A few issues worth addressing are noted below.


Issues

1. time.After leak in waitForCatchupnode/failover.go:321

timeoutCh = time.After(f.catchupTimeout)

time.After creates a timer that is not garbage-collected until it fires. If waitForCatchup returns early (e.g., context cancel, or daCaughtUp && p2pCaughtUp), the timer goroutine leaks until the timeout expires. Use time.NewTimer with defer timer.Stop() instead:

t := time.NewTimer(f.catchupTimeout)
defer t.Stop()
timeoutCh = t.C

Fix this →


2. Nil panic in Stop if Start was never called — pkg/sync/sync_service.go:450

func (syncService *SyncService[H]) Stop(ctx context.Context) error {
    syncService.topicSubscription.Cancel()  // panics if nil
    ...
    syncService.p2pServer.Stop(ctx)         // panics if nil
    syncService.ex.Stop(ctx)               // panics if nil
    syncService.sub.Stop(ctx)              // panics if nil

If Stop is called after a partial Start failure (e.g., setupP2PInfrastructure succeeds but startSubscriber fails), topicSubscription is nil and this panics. The current callers (failover.go) only defer Stop after a successful syncWg.Wait(), so the happy path is safe. But this is fragile under error paths.

A nil guard on topicSubscription before calling Cancel() would make this robust. The same applies to p2pServer, ex, and sub in the errors.Join(...) block.


3. storeInitialized flag is set to true before initStore completes — pkg/sync/sync_service.go

In both WriteToStoreAndBroadcast (line ~137) and initFromP2PWithRetry (line ~395):

if syncService.storeInitialized.CompareAndSwap(false, true) {
    // CAS sets flag=true here, but store is NOT yet ready
    _, err = syncService.initStore(ctx, headerOrData)
    if err != nil {
        syncService.storeInitialized.Store(false) // reset on failure
    }
}
// startSyncer is called below regardless...

Between the CAS and initStore completing there is a window where another goroutine can observe storeInitialized = true and proceed directly to startSyncer. If initFromP2PWithRetry's retry loop runs concurrently with a WriteToStoreAndBroadcast call (e.g., during the 30-second P2P timeout window when DA sync starts delivering blocks), the syncer could start against an uninitialised store.

In practice, StartForPublishing skips initFromP2PWithRetry entirely, so this race does not occur in the publisher mode introduced by this PR. Consider adding a comment explaining this invariant so future maintainers understand why the pattern is safe.


4. SyncerStatus.startOnce holds lock during the entire startFn execution — pkg/sync/syncer_status.go:19

func (syncerStatus *SyncerStatus) startOnce(startFn func() error) (bool, error) {
    syncerStatus.mu.Lock()
    defer syncerStatus.mu.Unlock()

    if syncerStatus.started {
        return false, nil
    }
    if err := startFn(); err != nil { ... }   // lock held here
    ...
}

startFn calls syncService.syncer.Start(ctx) which involves P2P network I/O. Holding mu throughout means all concurrent callers that lose the CAS (startSyncer called from WriteToStoreAndBroadcast and from initFromP2PWithRetry) block for the full duration of syncer startup rather than returning immediately once started=true.

This is intentional and functionally correct, but it's a design difference from sync.Once. A brief comment explaining why the lock is held across the start function body would help reviewers.


5. E2E test: unnecessary time.Sleep after last retry — test/e2e/failover_e2e_test.go:454

for attempt := 1; attempt <= maxRestartAttempts; attempt++ {
    ...
    time.Sleep(restartRetryInterval)  // sleeps even after final attempt
}

The sleep after the last iteration adds restartRetryInterval (3 seconds) of unnecessary test time. Move the sleep before the next-attempt check:

if len(deadNodes) == 0 { break }
...restart...
if attempt < maxRestartAttempts {
    time.Sleep(restartRetryInterval)
}

6. CachedStore.Sync is redundant — pkg/store/cached_store.go:179

func (cs *CachedStore) Sync(ctx context.Context) error {
    return cs.Store.Sync(ctx)
}

CachedStore embeds Store as an interface, which already promotes all methods including Sync. This explicit delegation is a no-op and only adds noise. (The equivalent in tracedStore is necessary because tracedStore uses a named field inner Store, not embedding.)


Positive Notes

  • StartForPublishing design is clean — the comment at the function head clearly explains the invariant (leader with empty store) and why initFromP2PWithRetry would deadlock in this scenario.
  • SyncerStatus correctly allows retrying a failed start while preventing double-starts — this is subtler than a plain sync.Once and the new tests (TestSyncerStatusStartOnce, TestSyncerStatusStopIfStarted) cover both paths well.
  • initFromP2PWithRetry graceful timeout (30 s → continue with DA sync) is a good fallback that avoids blocking startup indefinitely.
  • shouldStartSyncInPublisherMode is correctly guarded: only fires for aggregator + raft leader + empty store. The early-return logging is helpful.
  • reconcileExecutionAtHeight adds proper crash-recovery for in-flight payload builds. The table-driven tests cover the key matching/rejection cases.
  • Starting both header and data sync services concurrently via syncWg is a meaningful latency improvement.

@codecov
Copy link

codecov bot commented Mar 17, 2026

Codecov Report

❌ Patch coverage is 54.62963% with 49 lines in your changes missing coverage. Please review.
✅ Project coverage is 60.78%. Comparing base (0dc2767) to head (4736118).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
pkg/sync/sync_service.go 47.36% 13 Missing and 7 partials ⚠️
node/failover.go 37.03% 11 Missing and 6 partials ⚠️
pkg/store/tracing.go 0.00% 8 Missing ⚠️
pkg/store/cached_store.go 0.00% 2 Missing ⚠️
pkg/sync/syncer_status.go 90.47% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3172      +/-   ##
==========================================
+ Coverage   60.75%   60.78%   +0.02%     
==========================================
  Files         117      117              
  Lines       11988    12068      +80     
==========================================
+ Hits         7283     7335      +52     
- Misses       3886     3906      +20     
- Partials      819      827       +8     
Flag Coverage Δ
combined 60.78% <54.62%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@alpe alpe marked this pull request as ready for review March 18, 2026 15:30
@alpe alpe requested a review from julienrbrt March 18, 2026 15:33
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
test/e2e/sut_helper.go (1)

189-195: ⚠️ Potential issue | 🟡 Minor

Close the per-process logfile.

os.Create opens a descriptor that is never closed here. The E2E restart tests can spawn a lot of processes, so this leaks one fd per process and can eventually trip EMFILE or delay log flushing. Please close the file once both reader goroutines finish.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/e2e/sut_helper.go` around lines 189 - 195, The per-process logfile
returned by os.Create (logfile) is never closed; modify the block that sets
errReader and outReader to ensure logfile is closed after the readers finish by
adding a sync.WaitGroup and wrapping the reader goroutines so they call
wg.Done() when done, then spawn a goroutine that waits on wg.Wait() and calls
logfile.Close(); update the code that consumes errReader/outReader (or the code
that launches the reader goroutines) to increment the WaitGroup for each reader
and to close each reader when finished so the logfile.Close() runs only after
both readers have completed (reference symbols: processLogDir(), logfile,
errReader, outReader).
🧹 Nitpick comments (1)
execution/evm/execution_reconcile_test.go (1)

122-130: Track Eth RPC mock call counts to harden path guarantees.

The Eth mock currently cannot assert whether HeaderByNumber/GetTxs were invoked. Adding counters would make it easier to lock down expected short-circuit behavior in resume_when_inputs_match.

🔍 Suggested enhancement
 type mockReconcileEthRPCClient struct{}
+type mockReconcileEthRPCClient struct {
+	headerByNumberCalls int
+	getTxsCalls         int
+}

-func (mockReconcileEthRPCClient) HeaderByNumber(_ context.Context, _ *big.Int) (*types.Header, error) {
+func (m *mockReconcileEthRPCClient) HeaderByNumber(_ context.Context, _ *big.Int) (*types.Header, error) {
+	m.headerByNumberCalls++
 	return nil, errors.New("header not found")
 }

-func (mockReconcileEthRPCClient) GetTxs(_ context.Context) ([]string, error) {
+func (m *mockReconcileEthRPCClient) GetTxs(_ context.Context) ([]string, error) {
+	m.getTxsCalls++
 	return nil, errors.New("unexpected GetTxs call")
 }
- client := &EngineClient{
+ ethRPC := &mockReconcileEthRPCClient{}
+ client := &EngineClient{
    engineClient: engineRPC,
-   ethClient:    mockReconcileEthRPCClient{},
+   ethClient:    ethRPC,
    store:        store,
    logger:       zerolog.Nop(),
  }
+ if spec.expectPayloadID {
+   require.Equal(t, 0, ethRPC.headerByNumberCalls)
+   require.Equal(t, 0, ethRPC.getTxsCalls)
+ }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@execution/evm/execution_reconcile_test.go` around lines 122 - 130, The
mockReconcileEthRPCClient currently just returns errors and cannot assert
whether HeaderByNumber or GetTxs were invoked; modify the
mockReconcileEthRPCClient to include call-count fields (e.g.,
headerByNumberCalls, getTxsCalls) and increment them atomically (sync/atomic or
a mutex) inside HeaderByNumber and GetTxs, expose read accessors or exported
fields so tests can assert counts, then update the resume_when_inputs_match test
to assert expected call counts (e.g., HeaderByNumber was called 0/1 and GetTxs
0/1 as appropriate) to harden short-circuit guarantees.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@execution/evm/execution_reconcile_test.go`:
- Around line 59-61: The test uses a map-backed table `specs` and iterates with
`for name, spec := range specs` which yields non-deterministic subtest order;
refactor `specs` into a slice-backed table (e.g. `[]struct { name string; spec
<type> }`) and iterate `for _, tc := range specsSlice { t.Run(tc.name, func(t
*testing.T) { t.Parallel(); spec := tc.spec; ... }) }` so subtest order is
deterministic and the loop-local `spec` is captured correctly; update any
references to `specs` and the `t.Run` closure accordingly.

In `@execution/evm/execution.go`:
- Around line 759-760: The resume fingerprint is ambiguous because hashTxs()
currently hashes raw concatenation of tx bytes; change hashTxs to compute a
length-delimited digest (prefix each transaction with its length using a fixed
or varint encoding, then hash the sequence) so different segmentations/orderings
cannot collide, and update the comparisons that use hashTxs (where
execMeta.TxHash is compared to requestedTxHash alongside
execMeta.Timestamp/timestamp.Unix()) to use the new length-delimited hash
function; also apply the same replacement to the other places that call hashTxs
(the later equality check block referenced around the second usage) so all
resume checks use the unambiguous, length-prefixed hash.

In `@pkg/store/store_test.go`:
- Around line 38-52: The test currently uses two booleans
(syncCalled/closeCalled) in syncingBatchingDatastore which can't detect order;
change the stub to record the call sequence (e.g., a slice of strings or ints)
and have Sync(ctx, key) append "Sync" and Close() append "Close", then update
the test assertions to assert the recorded sequence equals the expected order
(Sync before Close). Update both occurrences that define/instantiate
syncingBatchingDatastore (the methods Sync and Close in
syncingBatchingDatastore) and the corresponding assertions so they check the
exact sequence rather than just both flags being true.

In `@pkg/store/store.go`:
- Around line 34-47: The current close path spawns a goroutine that discards the
result of s.Sync and can return nil after 4s even if Sync or s.db.Close failed
or is still running; change the goroutine to capture both the Sync error and
Close error (e.g., syncErr := s.Sync(syncCtx); closeErr := s.db.Close()), send a
combined/non-nil error on done (prefer returning syncErr if non-nil, else
closeErr), and update the select timeout branch to return a non-nil error (e.g.,
context.DeadlineExceeded or a wrapped error indicating close timed out) instead
of nil so callers cannot reopen while the old store may still be closing. Ensure
you reference the goroutine closure where s.Sync, s.db.Close and the done
channel are used.

In `@pkg/sync/syncer_status_test.go`:
- Around line 22-47: The test spawns goroutines that call status.startOnce and
currently use require.NoError inside each goroutine which can call FailNow only
in that goroutine and cause the main test to hang; instead, create an errors
channel (buffered to the number of workers), have each goroutine send its error
into that channel after calling status.startOnce (referencing the startOnce
call, calls counter, started and release channels, and wg), close the channel
after wg.Wait(), then range over the collected errors in the main goroutine and
use require.NoError (or require.Empty) to assert no worker returned an error;
keep the final checks for calls.Load() == 1 and status.isStarted() after error
checks.

In `@test/e2e/failover_e2e_test.go`:
- Around line 179-180: The restart call is using the global env.SequencerJWT
which can mismatch when oldLeader is node2/node3; replace env.SequencerJWT with
the restarted node's own JWT secret (the per-node JWT selection used in
TestHASequencerRollingRestartE2E) when calling setupRaftSequencerNode for
oldLeader so engine auth matches that node's reth instance (i.e., obtain the JWT
for oldLeader via the same per-node selection you added earlier and pass it
instead of env.SequencerJWT).

---

Outside diff comments:
In `@test/e2e/sut_helper.go`:
- Around line 189-195: The per-process logfile returned by os.Create (logfile)
is never closed; modify the block that sets errReader and outReader to ensure
logfile is closed after the readers finish by adding a sync.WaitGroup and
wrapping the reader goroutines so they call wg.Done() when done, then spawn a
goroutine that waits on wg.Wait() and calls logfile.Close(); update the code
that consumes errReader/outReader (or the code that launches the reader
goroutines) to increment the WaitGroup for each reader and to close each reader
when finished so the logfile.Close() runs only after both readers have completed
(reference symbols: processLogDir(), logfile, errReader, outReader).

---

Nitpick comments:
In `@execution/evm/execution_reconcile_test.go`:
- Around line 122-130: The mockReconcileEthRPCClient currently just returns
errors and cannot assert whether HeaderByNumber or GetTxs were invoked; modify
the mockReconcileEthRPCClient to include call-count fields (e.g.,
headerByNumberCalls, getTxsCalls) and increment them atomically (sync/atomic or
a mutex) inside HeaderByNumber and GetTxs, expose read accessors or exported
fields so tests can assert counts, then update the resume_when_inputs_match test
to assert expected call counts (e.g., HeaderByNumber was called 0/1 and GetTxs
0/1 as appropriate) to harden short-circuit guarantees.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 0c7b7655-2bb3-4eaf-a6d5-ecbe513092f3

📥 Commits

Reviewing files that changed from the base of the PR and between 0dc2767 and 4736118.

📒 Files selected for processing (17)
  • .github/workflows/test.yml
  • execution/evm/execution.go
  • execution/evm/execution_reconcile_test.go
  • node/failover.go
  • pkg/store/cached_store.go
  • pkg/store/store.go
  • pkg/store/store_test.go
  • pkg/store/tracing.go
  • pkg/store/tracing_test.go
  • pkg/store/types.go
  • pkg/sync/sync_service.go
  • pkg/sync/sync_service_test.go
  • pkg/sync/syncer_status.go
  • pkg/sync/syncer_status_test.go
  • test/e2e/failover_e2e_test.go
  • test/e2e/sut_helper.go
  • test/mocks/store.go

Comment on lines +59 to +61
for name, spec := range specs {
t.Run(name, func(t *testing.T) {
t.Parallel()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify current nondeterministic table pattern in this test file.
rg -n -C2 'specs := map\[string\]struct|for name, spec := range specs|t\.Run\(' execution/evm/execution_reconcile_test.go

Repository: evstack/ev-node

Length of output: 283


Use a slice-backed table to keep subtest execution order deterministic.

Map iteration in Go is randomized, so line 59's for name, spec := range specs produces non-deterministic subtest ordering. Refactor to use a []struct{...} table with a name field to satisfy the test determinism requirement.

Suggested refactor
- specs := map[string]struct {
+ specs := []struct {
+   name              string
    execMetaTimestamp int64
    execMetaTxs       [][]byte
    requestedTxs      [][]byte
    requestedTime     time.Time
    expectFound       bool
    expectPayloadID   bool
    expectGetPayloads int
- }{
-   "resume_when_inputs_match": {
+ }{
+   {
+     name:              "resume_when_inputs_match",
      execMetaTimestamp: 1700000012,
      execMetaTxs:       [][]byte{[]byte("tx-1")},
      requestedTxs:      [][]byte{[]byte("tx-1")},
      requestedTime:     time.Unix(1700000012, 0),
      expectFound:       true,
      expectPayloadID:   true,
      expectGetPayloads: 1,
    },
-   "ignore_when_timestamp_differs": {
+   {
+     name:              "ignore_when_timestamp_differs",
      execMetaTimestamp: 1700000010,
      execMetaTxs:       [][]byte{[]byte("tx-1")},
      requestedTxs:      [][]byte{[]byte("tx-1")},
      requestedTime:     time.Unix(1700000012, 0),
      expectFound:       false,
      expectPayloadID:   false,
      expectGetPayloads: 0,
    },
-   "ignore_when_txs_differ": {
+   {
+     name:              "ignore_when_txs_differ",
      execMetaTimestamp: 1700000012,
      execMetaTxs:       [][]byte{[]byte("tx-old")},
      requestedTxs:      [][]byte{[]byte("tx-new")},
      requestedTime:     time.Unix(1700000012, 0),
      expectFound:       false,
      expectPayloadID:   false,
      expectGetPayloads: 0,
    },
  }

- for name, spec := range specs {
-   t.Run(name, func(t *testing.T) {
+ for _, spec := range specs {
+   t.Run(spec.name, func(t *testing.T) {
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@execution/evm/execution_reconcile_test.go` around lines 59 - 61, The test
uses a map-backed table `specs` and iterates with `for name, spec := range
specs` which yields non-deterministic subtest order; refactor `specs` into a
slice-backed table (e.g. `[]struct { name string; spec <type> }`) and iterate
`for _, tc := range specsSlice { t.Run(tc.name, func(t *testing.T) {
t.Parallel(); spec := tc.spec; ... }) }` so subtest order is deterministic and
the loop-local `spec` is captured correctly; update any references to `specs`
and the `t.Run` closure accordingly.

Comment on lines +759 to +760
requestedTxHash := hashTxs(txs)
if execMeta.Timestamp != timestamp.Unix() || !bytes.Equal(execMeta.TxHash, requestedTxHash) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Make the resume fingerprint unambiguous.

hashTxs() hashes the raw byte concatenation, so different ordered transaction slices can share the same digest ([ab,c] vs [a,bc]). The equality check on Line 760 can then resume a payload built from different inputs. Hash a length-delimited encoding instead.

Suggested fix
+import "encoding/binary"
 ...
 func hashTxs(txs [][]byte) []byte {
 	if len(txs) == 0 {
 		return nil
 	}
 
 	h := sha256.New()
+	var lenBuf [8]byte
 	for _, tx := range txs {
-		h.Write(tx)
+		binary.LittleEndian.PutUint64(lenBuf[:], uint64(len(tx)))
+		_, _ = h.Write(lenBuf[:])
+		_, _ = h.Write(tx)
 	}
 
 	return h.Sum(nil)
 }

Also applies to: 1048-1058

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@execution/evm/execution.go` around lines 759 - 760, The resume fingerprint is
ambiguous because hashTxs() currently hashes raw concatenation of tx bytes;
change hashTxs to compute a length-delimited digest (prefix each transaction
with its length using a fixed or varint encoding, then hash the sequence) so
different segmentations/orderings cannot collide, and update the comparisons
that use hashTxs (where execMeta.TxHash is compared to requestedTxHash alongside
execMeta.Timestamp/timestamp.Unix()) to use the new length-delimited hash
function; also apply the same replacement to the other places that call hashTxs
(the later equality check block referenced around the second usage) so all
resume checks use the unambiguous, length-prefixed hash.

Comment on lines +38 to +52
type syncingBatchingDatastore struct {
ds.Batching
syncCalled bool
closeCalled bool
}

func (m *syncingBatchingDatastore) Sync(ctx context.Context, key ds.Key) error {
m.syncCalled = true
return m.Batching.Sync(ctx, key)
}

func (m *syncingBatchingDatastore) Close() error {
m.closeCalled = true
return m.Batching.Close()
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

This test doesn't verify the Sync()-before-Close() contract.

Both booleans become true even if the implementation calls Close() first, so the regression this test is named after would still pass. Record the call sequence and assert the exact order.

Suggested fix
+import "sync"
 ...
 type syncingBatchingDatastore struct {
 	ds.Batching
-	syncCalled  bool
-	closeCalled bool
+	mu    sync.Mutex
+	calls []string
 }
 
 func (m *syncingBatchingDatastore) Sync(ctx context.Context, key ds.Key) error {
-	m.syncCalled = true
+	m.mu.Lock()
+	m.calls = append(m.calls, "sync")
+	m.mu.Unlock()
 	return m.Batching.Sync(ctx, key)
 }
 
 func (m *syncingBatchingDatastore) Close() error {
-	m.closeCalled = true
+	m.mu.Lock()
+	m.calls = append(m.calls, "close")
+	m.mu.Unlock()
 	return m.Batching.Close()
 }
 ...
 	require.NoError(t, s.Close())
-	require.True(t, mock.syncCalled)
-	require.True(t, mock.closeCalled)
+	mock.mu.Lock()
+	defer mock.mu.Unlock()
+	require.Equal(t, []string{"sync", "close"}, mock.calls)
 }

Also applies to: 160-171

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/store_test.go` around lines 38 - 52, The test currently uses two
booleans (syncCalled/closeCalled) in syncingBatchingDatastore which can't detect
order; change the stub to record the call sequence (e.g., a slice of strings or
ints) and have Sync(ctx, key) append "Sync" and Close() append "Close", then
update the test assertions to assert the recorded sequence equals the expected
order (Sync before Close). Update both occurrences that define/instantiate
syncingBatchingDatastore (the methods Sync and Close in
syncingBatchingDatastore) and the corresponding assertions so they check the
exact sequence rather than just both flags being true.

Comment on lines +34 to +47
done := make(chan error, 1)
go func() {
syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

_ = s.Sync(syncCtx)
done <- s.db.Close()
}()

select {
case err := <-done:
return err
case <-time.After(4 * time.Second):
return nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't turn an incomplete close into a successful one.

This path drops Sync() failures and returns nil after 4 seconds even if the goroutine is still flushing or closing s.db. That lets callers reopen the same store while the old handle is still live and hides the durability failure this change is supposed to prevent.

Suggested fix
 func (s *DefaultStore) Close() error {
 	done := make(chan error, 1)
 	go func() {
 		syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
 		defer cancel()
 
-		_ = s.Sync(syncCtx)
-		done <- s.db.Close()
+		syncErr := s.Sync(syncCtx)
+		closeErr := s.db.Close()
+
+		switch {
+		case syncErr != nil && closeErr != nil:
+			done <- errors.Join(
+				fmt.Errorf("sync store before close: %w", syncErr),
+				fmt.Errorf("close datastore: %w", closeErr),
+			)
+		case syncErr != nil:
+			done <- fmt.Errorf("sync store before close: %w", syncErr)
+		default:
+			done <- closeErr
+		}
 	}()
 
 	select {
 	case err := <-done:
 		return err
 	case <-time.After(4 * time.Second):
-		return nil
+		return fmt.Errorf("closing datastore timed out after 4s")
 	}
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
done := make(chan error, 1)
go func() {
syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.Sync(syncCtx)
done <- s.db.Close()
}()
select {
case err := <-done:
return err
case <-time.After(4 * time.Second):
return nil
done := make(chan error, 1)
go func() {
syncCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
syncErr := s.Sync(syncCtx)
closeErr := s.db.Close()
switch {
case syncErr != nil && closeErr != nil:
done <- errors.Join(
fmt.Errorf("sync store before close: %w", syncErr),
fmt.Errorf("close datastore: %w", closeErr),
)
case syncErr != nil:
done <- fmt.Errorf("sync store before close: %w", syncErr)
default:
done <- closeErr
}
}()
select {
case err := <-done:
return err
case <-time.After(4 * time.Second):
return fmt.Errorf("closing datastore timed out after 4s")
}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/store/store.go` around lines 34 - 47, The current close path spawns a
goroutine that discards the result of s.Sync and can return nil after 4s even if
Sync or s.db.Close failed or is still running; change the goroutine to capture
both the Sync error and Close error (e.g., syncErr := s.Sync(syncCtx); closeErr
:= s.db.Close()), send a combined/non-nil error on done (prefer returning
syncErr if non-nil, else closeErr), and update the select timeout branch to
return a non-nil error (e.g., context.DeadlineExceeded or a wrapped error
indicating close timed out) instead of nil so callers cannot reopen while the
old store may still be closing. Ensure you reference the goroutine closure where
s.Sync, s.db.Close and the done channel are used.

Comment on lines +22 to +47
var calls atomic.Int32
started := make(chan struct{})
release := make(chan struct{})
var wg sync.WaitGroup

for range 8 {
wg.Add(1)
go func() {
defer wg.Done()
_, err := status.startOnce(func() error {
if calls.Add(1) == 1 {
close(started)
}
<-release
return nil
})
require.NoError(t, err)
}()
}

<-started
close(release)
wg.Wait()

require.Equal(t, int32(1), calls.Load())
require.True(t, status.isStarted())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the test file to confirm the code and understand the context
find . -name "syncer_status_test.go" -type f

Repository: evstack/ev-node

Length of output: 91


🏁 Script executed:

# Read the test file to see the actual code
cat -n pkg/sync/syncer_status_test.go | head -100

Repository: evstack/ev-node

Length of output: 2676


🏁 Script executed:

# Let's also check if there's any custom error collection mechanism or if require is really being used in goroutines
rg "require\.(NoError|Error)" pkg/sync/syncer_status_test.go -B 5 -A 2

Repository: evstack/ev-node

Length of output: 729


Use a channel to collect worker errors instead of calling require inside goroutines.

require.NoError (line 38) calls FailNow(), which exits only the spawned goroutine. If an error occurs before the first goroutine closes started, the main test hangs indefinitely on line 42. Collect errors in a channel from each worker and assert after wg.Wait() completes to ensure the test is deterministic.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/sync/syncer_status_test.go` around lines 22 - 47, The test spawns
goroutines that call status.startOnce and currently use require.NoError inside
each goroutine which can call FailNow only in that goroutine and cause the main
test to hang; instead, create an errors channel (buffered to the number of
workers), have each goroutine send its error into that channel after calling
status.startOnce (referencing the startOnce call, calls counter, started and
release channels, and wg), close the channel after wg.Wait(), then range over
the collected errors in the main goroutine and use require.NoError (or
require.Empty) to assert no worker returned an error; keep the final checks for
calls.Load() == 1 and status.isStarted() after error checks.

Comment on lines 179 to +180
oldDetails := clusterNodes.Details(oldLeader)
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pPeerAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Use the restarted node's own JWT secret here.

oldLeader is elected dynamically, so this branch can restart node2 or node3 with env.SequencerJWT. When that happens the engine auth no longer matches that node's reth instance, and the test either flakes or drops into the later "did not recover on restart" path. Reuse the per-node JWT selection you already added in TestHASequencerRollingRestartE2E.

Suggested fix
 	oldDetails := clusterNodes.Details(oldLeader)
+	oldLeaderJWT := map[string]string{
+		"node1": env.SequencerJWT,
+		"node2": env.FullNodeJWT,
+		"node3": jwtSecret3,
+	}[oldLeader]
 	restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr,
-		env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster,
+		oldLeaderJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster,
 		clusterNodes.Details(newLeader).p2pPeerAddr, oldDetails.rpcAddr, oldDetails.p2pAddr,
 		oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
As per coding guidelines, "Ensure tests are deterministic".
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
oldDetails := clusterNodes.Details(oldLeader)
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, env.SequencerJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pPeerAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
oldDetails := clusterNodes.Details(oldLeader)
oldLeaderJWT := map[string]string{
"node1": env.SequencerJWT,
"node2": env.FullNodeJWT,
"node3": jwtSecret3,
}[oldLeader]
restartedNodeProcess := setupRaftSequencerNode(t, sut, workDir, oldLeader, oldDetails.raftAddr, oldLeaderJWT, env.GenesisHash, env.Endpoints.GetDAAddress(), "", raftCluster, clusterNodes.Details(newLeader).p2pPeerAddr, oldDetails.rpcAddr, oldDetails.p2pAddr, oldDetails.engineURL, oldDetails.ethAddr, false, passphraseFile)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@test/e2e/failover_e2e_test.go` around lines 179 - 180, The restart call is
using the global env.SequencerJWT which can mismatch when oldLeader is
node2/node3; replace env.SequencerJWT with the restarted node's own JWT secret
(the per-node JWT selection used in TestHASequencerRollingRestartE2E) when
calling setupRaftSequencerNode for oldLeader so engine auth matches that node's
reth instance (i.e., obtain the JWT for oldLeader via the same per-node
selection you added earlier and pass it instead of env.SequencerJWT).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant