-
Notifications
You must be signed in to change notification settings - Fork 0
add test cases to build robustness #48
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,7 @@ import ( | |
| "context" | ||
| "errors" | ||
| "fmt" | ||
| "strconv" | ||
| "strings" | ||
| "sync" | ||
| "time" | ||
|
|
@@ -19,7 +20,10 @@ const ( | |
| maxSequenceRetryRounds = 2 | ||
| ) | ||
|
|
||
| var errSequenceMismatch = errors.New("account sequence mismatch") | ||
| var ( | ||
| errSequenceMismatch = errors.New("account sequence mismatch") | ||
| errTooManyInFlight = errors.New("too many in-flight submissions") | ||
| ) | ||
|
|
||
| // DirectConfig contains the fixed submission settings Apex owns for direct | ||
| // celestia-app writes. | ||
|
|
@@ -42,6 +46,12 @@ type DirectSubmitter struct { | |
| pollInterval time.Duration | ||
| feeDenom string | ||
| mu sync.Mutex | ||
| inFlight int | ||
| accountNumber uint64 | ||
| nextSequence uint64 | ||
| sequenceReady bool | ||
| pendingSequences map[string]uint64 | ||
| maxInFlight int | ||
| } | ||
|
|
||
| // NewDirectSubmitter builds a concrete single-account submitter. | ||
|
|
@@ -77,6 +87,7 @@ func NewDirectSubmitter(app AppClient, signer *Signer, cfg DirectConfig) (*Direc | |
| confirmationTimeout: cfg.ConfirmationTimeout, | ||
| pollInterval: defaultPollInterval, | ||
| feeDenom: defaultFeeDenom, | ||
| pendingSequences: make(map[string]uint64), | ||
| }, nil | ||
| } | ||
|
|
||
|
|
@@ -87,29 +98,23 @@ func (s *DirectSubmitter) Close() error { | |
| return s.app.Close() | ||
| } | ||
|
|
||
| // Submit serializes submissions for the configured signer so sequence handling | ||
| // stays bounded and explicit in v1. | ||
| // Submit serializes sequence reservation and broadcast for the configured | ||
| // signer, then waits for confirmation without blocking the next nonce. | ||
| func (s *DirectSubmitter) Submit(ctx context.Context, req *Request) (*Result, error) { | ||
| if err := validateSubmitRequest(req); err != nil { | ||
| return nil, err | ||
| } | ||
| if err := s.startSubmission(); err != nil { | ||
| return nil, err | ||
| } | ||
| defer s.finishSubmission() | ||
|
|
||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| var lastErr error | ||
| for range maxSequenceRetryRounds { | ||
| result, err := s.submitOnce(ctx, req) | ||
| if err == nil { | ||
| return result, nil | ||
| } | ||
| lastErr = err | ||
| if !errors.Is(err, errSequenceMismatch) { | ||
| return nil, err | ||
| } | ||
| broadcast, err := s.broadcastTx(ctx, req) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return nil, lastErr | ||
| return s.waitForConfirmation(ctx, broadcast.Hash) | ||
| } | ||
|
|
||
| func validateSubmitRequest(req *Request) error { | ||
|
|
@@ -127,32 +132,154 @@ func validateSubmitRequest(req *Request) error { | |
| return nil | ||
| } | ||
|
|
||
| func (s *DirectSubmitter) submitOnce(ctx context.Context, req *Request) (*Result, error) { | ||
| account, err := s.app.AccountInfo(ctx, s.signer.Address()) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("query submission account: %w", err) | ||
| func (s *DirectSubmitter) broadcastTx(ctx context.Context, req *Request) (*TxStatus, error) { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| var lastErr error | ||
| for range maxSequenceRetryRounds { | ||
| account, err := s.nextAccountLocked(ctx) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| txBytes, err := s.buildBlobTx(req, account) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| broadcast, err := s.app.BroadcastTx(ctx, txBytes) | ||
| if err != nil { | ||
| if isSequenceMismatchText(err.Error()) { | ||
| s.recoverSequenceLocked(account, err.Error()) | ||
| lastErr = fmt.Errorf("%w: %w", errSequenceMismatch, err) | ||
| continue | ||
| } | ||
| return nil, fmt.Errorf("broadcast blob tx: %w", err) | ||
| } | ||
| if err := checkTxStatus("broadcast", broadcast); err != nil { | ||
| if errors.Is(err, errSequenceMismatch) { | ||
| s.recoverSequenceLocked(account, err.Error()) | ||
| lastErr = err | ||
| continue | ||
| } | ||
| return nil, err | ||
| } | ||
|
|
||
| if broadcast.Hash != "" { | ||
| s.rememberPendingLocked(broadcast.Hash, account.Sequence) | ||
| } | ||
| s.nextSequence = account.Sequence + 1 | ||
| s.sequenceReady = true | ||
| return broadcast, nil | ||
| } | ||
| if account == nil { | ||
| return nil, errors.New("query submission account: empty response") | ||
|
|
||
| return nil, lastErr | ||
| } | ||
|
|
||
| func (s *DirectSubmitter) nextAccountLocked(ctx context.Context) (*AccountInfo, error) { | ||
| if !s.sequenceReady { | ||
| account, err := s.app.AccountInfo(ctx, s.signer.Address()) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("query submission account: %w", err) | ||
| } | ||
| if account == nil { | ||
| return nil, errors.New("query submission account: empty response") | ||
| } | ||
|
|
||
| s.accountNumber = account.AccountNumber | ||
| s.nextSequence = account.Sequence | ||
| s.sequenceReady = true | ||
| if err := s.reconcilePendingLocked(ctx); err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
|
|
||
| txBytes, err := s.buildBlobTx(req, account) | ||
| if err != nil { | ||
| return nil, err | ||
| return &AccountInfo{ | ||
| Address: s.signer.Address(), | ||
| AccountNumber: s.accountNumber, | ||
| Sequence: s.nextSequence, | ||
| }, nil | ||
| } | ||
|
|
||
| func (s *DirectSubmitter) invalidateSequenceLocked() { | ||
| s.accountNumber = 0 | ||
| s.nextSequence = 0 | ||
| s.sequenceReady = false | ||
| } | ||
|
|
||
| func (s *DirectSubmitter) startSubmission() error { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| if s.maxInFlight > 0 && s.inFlight >= s.maxInFlight { | ||
| return errTooManyInFlight | ||
| } | ||
| s.inFlight++ | ||
| return nil | ||
| } | ||
|
|
||
| broadcast, err := s.app.BroadcastTx(ctx, txBytes) | ||
| if err != nil { | ||
| if isSequenceMismatchText(err.Error()) { | ||
| return nil, fmt.Errorf("%w: %w", errSequenceMismatch, err) | ||
| func (s *DirectSubmitter) finishSubmission() { | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
|
|
||
| if s.inFlight > 0 { | ||
| s.inFlight-- | ||
| } | ||
| } | ||
|
|
||
| func (s *DirectSubmitter) recoverSequenceLocked(account *AccountInfo, errText string) { | ||
| expected, ok := expectedSequenceFromMismatchText(errText) | ||
| if !ok { | ||
| s.invalidateSequenceLocked() | ||
| return | ||
| } | ||
|
|
||
| s.accountNumber = account.AccountNumber | ||
| s.nextSequence = expected | ||
| s.sequenceReady = true | ||
| } | ||
|
|
||
| func (s *DirectSubmitter) reconcilePendingLocked(ctx context.Context) error { | ||
| if len(s.pendingSequences) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| nextSequence := s.nextSequence | ||
| for hash, sequence := range s.pendingSequences { | ||
| _, err := s.app.GetTx(ctx, hash) | ||
| if err == nil { | ||
| delete(s.pendingSequences, hash) | ||
| continue | ||
| } | ||
| return nil, fmt.Errorf("broadcast blob tx: %w", err) | ||
| if isTxNotFound(err) { | ||
| if sequence >= nextSequence { | ||
| nextSequence = sequence + 1 | ||
| } | ||
| continue | ||
| } | ||
| return fmt.Errorf("reconcile pending blob tx %s: %w", hash, err) | ||
| } | ||
| if err := checkTxStatus("broadcast", broadcast); err != nil { | ||
| return nil, err | ||
|
|
||
| s.nextSequence = nextSequence | ||
| return nil | ||
| } | ||
|
Comment on lines
+243
to
+266
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Potential unbounded growth of When Consider adding a cleanup mechanism, such as removing entries older than a certain age or limiting the map size. 🤖 Prompt for AI Agents |
||
|
|
||
| func (s *DirectSubmitter) rememberPendingLocked(hash string, sequence uint64) { | ||
| if hash == "" { | ||
| return | ||
| } | ||
| s.pendingSequences[hash] = sequence | ||
| } | ||
|
|
||
| return s.waitForConfirmation(ctx, broadcast.Hash) | ||
| func (s *DirectSubmitter) clearPending(hash string) { | ||
| if hash == "" { | ||
| return | ||
| } | ||
|
|
||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| delete(s.pendingSequences, hash) | ||
| } | ||
|
|
||
| func (s *DirectSubmitter) buildBlobTx(req *Request, account *AccountInfo) ([]byte, error) { | ||
|
|
@@ -297,6 +424,7 @@ func (s *DirectSubmitter) waitForConfirmation(parent context.Context, hash strin | |
| for { | ||
| tx, err := s.app.GetTx(ctx, hash) | ||
| if err == nil { | ||
| s.clearPending(hash) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Calling Useful? React with 👍 / 👎. |
||
| if err := checkTxStatus("confirm", tx); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -339,6 +467,29 @@ func isSequenceMismatchText(text string) bool { | |
| return strings.Contains(text, "account sequence mismatch") || strings.Contains(text, "incorrect account sequence") | ||
| } | ||
|
|
||
| func expectedSequenceFromMismatchText(text string) (uint64, bool) { | ||
| lower := strings.ToLower(text) | ||
| idx := strings.Index(lower, "expected ") | ||
| if idx < 0 { | ||
| return 0, false | ||
| } | ||
|
|
||
| start := idx + len("expected ") | ||
| end := start | ||
| for end < len(lower) && lower[end] >= '0' && lower[end] <= '9' { | ||
| end++ | ||
| } | ||
| if end == start { | ||
| return 0, false | ||
| } | ||
|
|
||
| sequence, err := strconv.ParseUint(lower[start:end], 10, 64) | ||
| if err != nil { | ||
| return 0, false | ||
| } | ||
| return sequence, true | ||
| } | ||
|
|
||
| func isTxNotFound(err error) bool { | ||
| return status.Code(err) == codes.NotFound | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nextAccountLockedsetssequenceReadyto true before runningreconcilePendingLocked, so a transientGetTxerror during reconciliation returns fromSubmitwhile leaving the cache marked ready. After that, later submissions skip both account refresh and reconciliation, continuing with potentially stalenextSequence/pending state instead of retrying reconciliation on the next request.Useful? React with 👍 / 👎.