chore: enable leader check & retrigger digest#1143
Conversation
…dling and logging This commit introduces several key improvements to the `auto_digest` process, including: - Enhanced transaction broadcasting with error handling for transaction results. - Added structured logging for better monitoring of processed days and deleted rows. - Introduced a new `DigestTxResult` type to encapsulate results from the auto_digest transactions. - Implemented a new method for parsing digest results from transaction logs, improving clarity and reliability. These changes aim to optimize the digest operation's performance and provide better insights into its execution outcomes.
…ed processing This commit introduces significant updates to the digest actions, focusing on leader-only authorization and optimizations in the `batch_digest` and `auto_digest` processes. Key changes include: - Added leader authorization checks to ensure only the current block leader can execute digest operations, enhancing security and control. - Refined the `batch_digest` action to improve performance and clarity, utilizing optimized SQL queries for bulk processing. - Enhanced the `auto_digest` action to leverage the improved `batch_digest`, streamlining the processing of multiple pending days. - Introduced comprehensive tests to validate leader authorization functionality, ensuring robust error handling and expected behavior. These changes aim to improve the reliability and efficiency of the digest system while maintaining strict access controls for critical operations.
|
Warning Rate limit exceeded@outerlook has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 8 minutes and 5 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (3)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (13)
internal/migrations/020-digest-actions.sql (5)
27-32: $preserve_past_days in batch_digest is unused; clarify intent or remove.Either document it as intentionally unused (safety handled by auto_digest) or drop it to avoid confusion.
Apply this minimal doc tweak:
CREATE OR REPLACE ACTION batch_digest( $stream_refs INT[], $day_indexes INT[], $delete_cap INT DEFAULT 10000, - $preserve_past_days INT DEFAULT 2 + -- NOTE: preserve_past_days is ignored here; safety window enforced by auto_digest + $preserve_past_days INT DEFAULT 2 )
84-135: Reduce repeated scans over primitive_events by materializing or consolidating CTEs.The same targets/windows/day_events/ranked are rebuilt multiple times. Consider materializing windows/day_events into a temp relation or consolidating steps into one statement to cut I/O.
Example of lightweight materialization (if allowed by the action engine):
+-- Materialize once for reuse within this action execution +CREATE TEMP TABLE _windows AS +WITH targets AS ( + SELECT ord, sr AS stream_ref, di AS day_index, + (di * 86400) AS day_start, (di * 86400) + 86400 AS day_end + FROM UNNEST($stream_refs, $day_indexes) WITH ORDINALITY AS u(sr, di, ord) +), +windows AS ( + SELECT stream_ref, MIN(day_index) AS lo, MAX(day_index) AS hi + FROM ( + SELECT stream_ref, day_index, + day_index - ROW_NUMBER() OVER (PARTITION BY stream_ref ORDER BY day_index) AS grp + FROM targets + ) s + GROUP BY stream_ref, grp +) +SELECT * FROM windows;Then replace later JOIN windows w ... with JOIN _windows w ...
Also applies to: 141-206, 208-286, 290-373, 376-404, 408-439, 441-509
514-527: Pending-queue cleanup only when no leftovers: confirm behavior vs. desired resumability.This deletes all batch targets from pending_prune_days only if no leftovers. Verify this matches the intended semantics (avoid re-queuing days that truly need no work, while retaining those with leftover deletes). If needed, delete only fully-cleared (stream_ref, day_index) rows.
I can draft a per-day leftover detector to delete only cleared days if the engine supports grouping during probes.
645-657: Progress logging granularity may not meet “every 10k records” acceptance.emit_auto_digest_notice logs once per auto_digest call. If DigestDeleteCap is 100k, progress isn’t reported per 10k. Consider emitting intermediate notices after the events and markers delete passes when thresholds are crossed.
- for $result in batch_digest($stream_refs, $day_indexes, $delete_cap, $preserve_past_days) { + for $result in batch_digest($stream_refs, $day_indexes, $delete_cap, $preserve_past_days) { $processed := $result.processed_days; $total_deleted := $result.total_deleted_rows; if $result.has_more_to_delete { emit_auto_digest_notice($processed, $total_deleted, true); $has_more := true; RETURN $processed, $total_deleted, $has_more; } }Optionally, expose partials from batch_digest (e.g., deleted_events/deleted_markers) to log mid-run milestones.
14-26: Docstring claims “single-statement” but implementation is multi-statement for engine compatibility.Tighten wording to avoid confusion.
- * batch_digest: Single-statement bulk processing with CTEs for maximum performance + * batch_digest: Bulk processing with CTEs; consolidated where possible, with multiple statements for engine compatibilityextensions/tn_digest/engine_ops_integration_test.go (1)
61-66: Stabilize time-dependent behavior in tests.If future seeds depend on @block_timestamp, consider setting BlockContext.Timestamp for determinism.
- BlockContext: &common.BlockContext{Height: 1}, + BlockContext: &common.BlockContext{Height: 1, Timestamp: 1_725_552_000}, // 2024-09-06T00:00:00Zextensions/tn_digest/broadcast.go (1)
53-58: Avoid magic value for sync; map explicitly and document.Using sync == 1 is brittle. Prefer a small mapper helper and package constant.
- if sync == uint8(rpcclient.BroadcastWaitCommit) || sync == 1 { + if sync == uint8(rpcclient.BroadcastWaitCommit) || sync == uint8(1) { // TODO: replace 1 with a local const WaitCommitSync mode = rpcclient.BroadcastWaitCommit }Or introduce a NormalizeSyncToBroadcastMode function.
extensions/tn_digest/internal/engine_ops_test.go (1)
5-20: Parser tests cover true/false and missing-entry; add multi-entry case.Consider asserting the parser picks the last auto_digest entry when multiple exist.
+func TestParseDigestResultFromTxLog_MultipleEntries_LastWins(t *testing.T) { + log := "auto_digest:{\"processed_days\":1,\"total_deleted_rows\":10,\"has_more_to_delete\":true}\n" + + "auto_digest:{\"processed_days\":3,\"total_deleted_rows\":30,\"has_more_to_delete\":false}" + res, err := parseDigestResultFromTxLog(log) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if res.ProcessedDays != 3 || res.TotalDeletedRows != 30 || res.HasMoreToDelete { + t.Fatalf("unexpected parse result: %+v", res) + } +}Also applies to: 22-37, 39-45
extensions/tn_digest/scheduler/constants.go (1)
5-13: Validate operational safety of DigestDeleteCap=100k and make tunable.100k deletes per run is aggressive; confirm DB capacity and add a config override (env/flag) in scheduler.go.
tests/streams/digest/digest_actions_test.go (2)
67-134: Test name could be more preciseThe test name
TestDigestActionsLeaderAuthorizationcould be more descriptive about what aspect of leader authorization is being tested. Consider renaming toTestDigestActionsLeaderOnlyEnforcementto clarify that it tests the enforcement of leader-only restrictions, not the authorization mechanism itself.
103-107: Consider improving error assertion readabilityThe nested conditions for checking the error message could be more readable. Also, the error wrapping might mask the original error message that's being tested.
- if r, err := callWithCtx("batch_digest", []any{[]int{}, []int{}}, platform.Deployer, coreauth.EthPersonalSignAuth); err != nil { - return errors.Wrap(err, "batch_digest non-leader call error") - } else if r == nil || r.Error == nil || !strings.Contains(r.Error.Error(), "Only the current block leader") { - return errors.New("expected leader-only error for batch_digest when not leader") - } + r, err := callWithCtx("batch_digest", []any{[]int{}, []int{}}, platform.Deployer, coreauth.EthPersonalSignAuth) + if err != nil { + return errors.Wrap(err, "batch_digest non-leader call error") + } + if r == nil || r.Error == nil { + return errors.New("expected error for non-leader batch_digest call, but got success") + } + if !strings.Contains(r.Error.Error(), "Only the current block leader") { + return errors.Errorf("expected 'Only the current block leader' error, got: %s", r.Error.Error()) + }extensions/tn_digest/scheduler/scheduler.go (1)
180-183: Consider making max runs configurableWhile the current implementation logs when max runs is reached, consider whether
DrainMaxRunsshould be configurable via environment variable or config file for different deployment scenarios.For production deployments, you might want to make these drain parameters configurable through environment variables or configuration files, allowing operators to tune them based on their specific data volumes and performance requirements without requiring code changes.
extensions/tn_digest/internal/engine_ops.go (1)
144-161: Duplicate nonce resolution logicThe nonce resolution logic is duplicated between
BuildAndBroadcastAutoDigestTxandBroadcastAutoDigestWithArgsAndParse. Consider extracting this into a helper function to follow DRY principles.+// getNextNonce retrieves the next nonce for the signer account +func (e *EngineOperations) getNextNonce(ctx context.Context, signer auth.Signer) (uint64, error) { + signerAccountID, err := ktypes.GetSignerAccount(signer) + if err != nil { + return 0, fmt.Errorf("failed to get signer account: %w", err) + } + + account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + if err != nil { + // Account doesn't exist yet - use nonce 1 for first transaction + e.logger.Info("DEBUG: Account not found, using nonce 1 for first transaction", + "account", fmt.Sprintf("%x", signerAccountID.Identifier), "error", err) + return uint64(1), nil + } + + // Account exists - use next nonce + nextNonce := uint64(account.Nonce + 1) + e.logger.Info("DEBUG: Account found, using next nonce", + "account", fmt.Sprintf("%x", signerAccountID.Identifier), + "currentNonce", account.Nonce, "nextNonce", nextNonce, "balance", account.Balance) + return nextNonce, nil +}Then update both methods to use this helper:
- // Get the signer account ID - signerAccountID, err := ktypes.GetSignerAccount(signer) - if err != nil { - return nil, fmt.Errorf("failed to get signer account: %w", err) - } - - // Get account information using the accounts service directly on database - account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) - var nextNonce uint64 - if err != nil { - // Account doesn't exist yet - use nonce 1 for first transaction - e.logger.Info("DEBUG: Account not found, using nonce 1 for first transaction", "account", fmt.Sprintf("%x", signerAccountID.Identifier), "error", err) - nextNonce = uint64(1) - } else { - // Account exists - use next nonce - nextNonce = uint64(account.Nonce + 1) - e.logger.Info("DEBUG: Account found, using next nonce", "account", fmt.Sprintf("%x", signerAccountID.Identifier), "currentNonce", account.Nonce, "nextNonce", nextNonce, "balance", account.Balance) - } + nextNonce, err := e.getNextNonce(ctx, signer) + if err != nil { + return nil, err + }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (8)
extensions/tn_digest/broadcast.go(2 hunks)extensions/tn_digest/engine_ops_integration_test.go(2 hunks)extensions/tn_digest/internal/engine_ops.go(3 hunks)extensions/tn_digest/internal/engine_ops_test.go(1 hunks)extensions/tn_digest/scheduler/constants.go(1 hunks)extensions/tn_digest/scheduler/scheduler.go(2 hunks)internal/migrations/020-digest-actions.sql(1 hunks)tests/streams/digest/digest_actions_test.go(2 hunks)
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/README.md:9-14
Timestamp: 2025-09-03T16:45:52.165Z
Learning: In the trufnetwork/node digest system, the backfill migration (internal/migrations/backfill-digest/) intentionally includes all days including the current day as candidates for pending_prune_days. The safety window logic for excluding recent days is handled by the auto_digest action via its preserve_past_days parameter, not during the backfill phase. This separation ensures the backfill creates a comprehensive queue while the processor handles recency filtering.
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/candidates.sql:1-8
Timestamp: 2025-09-03T16:44:35.205Z
Learning: In the trufnetwork/node repository digest system, the 24-hour safety window to exclude current partial days is enforced at the auto_digest action level via the preserve_past_days parameter (default 2), not in backfill preparation scripts like candidates.sql. The backfill scripts only identify candidate days for queueing, while auto_digest handles the actual processing with appropriate safety boundaries.
📚 Learning: 2025-09-03T19:06:50.873Z
Learnt from: MicBun
PR: trufnetwork/node#1140
File: extensions/tn_digest/scheduler/scheduler.go:75-85
Timestamp: 2025-09-03T19:06:50.873Z
Learning: In the tn_digest scheduler (extensions/tn_digest/scheduler/scheduler.go), the digest jobs intentionally use context.Background() instead of derived contexts to avoid cancellation issues during job execution. Using context.WithTimeout or derived contexts from s.ctx causes errors in this implementation.
Applied to files:
extensions/tn_digest/scheduler/constants.goextensions/tn_digest/scheduler/scheduler.go
📚 Learning: 2025-08-25T15:28:08.459Z
Learnt from: MicBun
PR: trufnetwork/node#1123
File: internal/migrations/020-digest-actions.sql:379-399
Timestamp: 2025-08-25T15:28:08.459Z
Learning: In the batch_digest action in internal/migrations/020-digest-actions.sql, the cleanup step intentionally removes all valid candidates from pending_prune_days using $valid_stream_refs, including days that were skipped due to insufficient records. This prevents repeatedly considering days that don't need OHLC processing in future auto_digest runs, as confirmed by MicBun.
Applied to files:
internal/migrations/020-digest-actions.sql
📚 Learning: 2025-08-20T12:22:14.250Z
Learnt from: outerlook
PR: trufnetwork/node#1113
File: internal/migrations/020-digest-init.sql:3-11
Timestamp: 2025-08-20T12:22:14.250Z
Learning: In the trufnetwork/node repository, for the digest_config table in internal/migrations/020-digest-init.sql, the maintainer prefers to leave the table without initial seed rows, allowing the extension to handle the empty state case at runtime rather than seeding default configuration values in the migration.
Applied to files:
internal/migrations/020-digest-actions.sql
📚 Learning: 2025-09-03T16:45:52.165Z
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/README.md:9-14
Timestamp: 2025-09-03T16:45:52.165Z
Learning: In the trufnetwork/node digest system, the backfill migration (internal/migrations/backfill-digest/) intentionally includes all days including the current day as candidates for pending_prune_days. The safety window logic for excluding recent days is handled by the auto_digest action via its preserve_past_days parameter, not during the backfill phase. This separation ensures the backfill creates a comprehensive queue while the processor handles recency filtering.
Applied to files:
internal/migrations/020-digest-actions.sql
📚 Learning: 2025-08-26T07:04:04.029Z
Learnt from: MicBun
PR: trufnetwork/node#1121
File: internal/migrations/020-digest-actions.sql:107-107
Timestamp: 2025-08-26T07:04:04.029Z
Learning: Single-record daily streams should be processed in the digest system with all OHLC flags set (bitwise 15), meaning the single record serves as open, high, low, and close simultaneously. The current HAVING COUNT(pe.event_time) > 1 condition incorrectly excludes these streams from processing.
Applied to files:
internal/migrations/020-digest-actions.sql
📚 Learning: 2025-09-03T16:44:29.508Z
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/candidates_no_table.sql:1-26
Timestamp: 2025-09-03T16:44:29.508Z
Learning: In the digest migration system, the 24h safety window to exclude current partial days is enforced at the auto_digest action level via the preserve_past_days parameter (default 2 days), not in the backfill candidate generation queries. The candidate queries generate all possible (stream_ref, day_index) pairs, and auto_digest applies the safety constraints during processing.
Applied to files:
internal/migrations/020-digest-actions.sql
📚 Learning: 2025-09-03T16:44:35.205Z
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/candidates.sql:1-8
Timestamp: 2025-09-03T16:44:35.205Z
Learning: In the trufnetwork/node repository digest system, the 24-hour safety window to exclude current partial days is enforced at the auto_digest action level via the preserve_past_days parameter (default 2), not in backfill preparation scripts like candidates.sql. The backfill scripts only identify candidate days for queueing, while auto_digest handles the actual processing with appropriate safety boundaries.
Applied to files:
internal/migrations/020-digest-actions.sql
🧬 Code graph analysis (2)
tests/streams/digest/digest_actions_test.go (3)
tests/streams/utils/utils.go (2)
RunSchemaTest(372-407)GetTestOptionsWithCache(223-237)internal/migrations/migration.go (1)
GetSeedScriptPaths(13-39)tests/streams/digest/digest_helpers_test.go (1)
WithSignerAndProvider(59-68)
extensions/tn_digest/scheduler/scheduler.go (1)
extensions/tn_digest/scheduler/constants.go (6)
DigestDeleteCap(7-7)DigestExpectedRecordsPerStream(8-8)DigestPreservePastDays(9-9)DrainMaxRuns(11-11)DrainMaxConsecutiveFailures(12-12)DrainRunDelay(10-10)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: acceptance-test
🔇 Additional comments (18)
internal/migrations/020-digest-actions.sql (4)
133-134: Including single-record days fixes prior exclusion bug (bitwise 15 case).HAVING COUNT(*) >= 1 correctly includes single-record days for OHLC. Matches prior guidance.
Also applies to: 191-192, 258-259, 339-340, 490-491
659-669: Leader check variable name: validate @leader_sender vs @leader.Comment mentions @leader, code uses @leader_sender. Confirm the correct system variable; otherwise the check may misfire.
Proposed fix if @leader is the canonical var:
- IF @leader_sender != @signer { - $leader_sender_hex := encode(@leader_sender, 'hex'); + IF @leader != @signer { + $leader_sender_hex := encode(@leader, 'hex'); $signer_hex := encode(@signer, 'hex'); ERROR('Only the current block leader can execute this operation: leader_sender: ' || $leader_sender_hex::TEXT || ' signer: ' || $signer_hex::TEXT); }
693-706: get_daily_ohlc digested/raw paths look solid; markers verified against events.Joins prevent stale markers; ordering matches OHLC semantics.
Also applies to: 713-778
541-549: Verify backfill action, drain loop, and scheduler integration
- No
CREATE OR REPLACE ACTION backfill_pending_daysfound; ensure the backfill action is defined and documented.- Drain loop re-runs until
has_more=falseas intended (see scheduler.go:100).- Confirm scheduler supports progress reporting and cancellation.
extensions/tn_digest/engine_ops_integration_test.go (2)
58-69: Good in-process engine execution path with auth override.This validates action wiring without network; appropriate for integration scope.
70-71: TxResult.Log payload matches parser expectation.auto_digest:{...} format aligns with the SQL NOTICE helper.
tests/streams/digest/digest_actions_test.go (3)
73-83: Good test setup with explicit key generationThe test properly generates a secp256k1 key for the leader and validates the type assertion. This approach ensures the test is self-contained and doesn't rely on external state.
85-100: Well-structured helper function for testing contextsThe
callWithCtxhelper function properly constructs the execution context with explicitBlockContext.Proposerand signer/auth settings, making the test scenarios clear and maintainable. The error handling for identifier extraction is appropriately defensive.
116-128: Confirm externalEthereumAddressFromPubKeyimplementation
The test correctly derives the leader’s Ethereum address from the public key for signing, but sinceEthereumAddressFromPubKeylives ingithub.com/trufnetwork/kwil-db/core/crypto, please manually verify it implements standard secp256k1‐based Ethereum address derivation.extensions/tn_digest/scheduler/scheduler.go (6)
75-85: Proper context management for cancellation supportGood implementation of using the scheduler's context (
s.ctx) for job execution, which enables proper cancellation when leadership is lost. The comment clearly documents this important design decision.
100-106: Clear drain mode initialization with comprehensive loggingThe drain mode implementation starts with excellent logging that includes all critical parameters. This makes debugging and monitoring much easier.
112-120: Robust cancellation checks throughout the drain loopThe implementation properly checks for cancellation at the start of each iteration using a non-blocking select. This ensures responsive shutdown behavior.
134-146: Well-designed failure handling with consecutive failure trackingThe consecutive failure tracking and threshold-based abortion provides good resilience against transient failures while preventing infinite retry loops. The error logging includes all relevant context for debugging.
147-168: Comprehensive success handling with cumulative trackingExcellent implementation of:
- Resetting consecutive failures on success
- Tracking cumulative totals across runs
- Detailed logging at each step
- Clean exit when digestion is complete
170-178: Proper interruptible sleep between runsThe sleep implementation correctly uses select to allow cancellation during the delay period, maintaining responsiveness to shutdown signals.
extensions/tn_digest/internal/engine_ops.go (3)
16-22: Clear result structure for digest operationsThe
DigestTxResultstruct properly encapsulates the key metrics from digest operations. The naming is clear and the boolean flag for continuation is intuitive.
114-119: Proper transaction result validationGood practice to check the transaction result code before attempting to parse logs. The error message includes both the actual and expected codes, making debugging easier.
163-176: Correct argument encoding for digest parametersThe implementation properly encodes all three digest parameters as int64 values and constructs the Arguments array correctly for the ActionExecution payload.
Time Submission Status
|
This commit updates the `kwil-db`, `kwil-db/core`, and `sdk-go` dependencies in the `go.mod` and `go.sum` files to their latest versions, ensuring compatibility and access to the latest features and fixes. The updated versions are: - `kwil-db` updated to v0.10.3-0.20250904210327-3e5f363152f3 - `kwil-db/core` updated to v0.4.3-0.20250904210327-3e5f363152f3 - `sdk-go` updated to v0.3.2-0.20250630062504-841b40cdb709 These changes aim to maintain the project's dependency health and improve overall stability.
…rations This commit introduces several enhancements to the digest operations, focusing on transaction handling and JSON parsing. Key changes include: - Implemented short polling for transaction results in `makeBroadcasterFromURL`, improving reliability in `BroadcastWaitAccept` mode. - Enhanced error handling for transaction queries, ensuring robust responses in case of failures. - Refactored JSON parsing in `parseDigestResultFromTxLog` to utilize `json.Unmarshal`, simplifying the extraction of `processed_days`, `total_deleted_rows`, and `has_more_to_delete` values. These changes aim to optimize the digest operation's performance and provide clearer insights into transaction outcomes.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
go.mod (1)
22-23: Good: kwil-db and core bumped together to the same pseudo-versionUpdating both
kwil-dbandkwil-db/coreto matching commits reduces incompatibility risk and aligns with the PR goal of enabling leader checks and digest changes. No issues here.If upstream has tagged releases for these commits, consider switching from pseudo-versions to tags for clearer provenance and easier dependency hygiene.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (1)
go.mod(1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/README.md:9-14
Timestamp: 2025-09-03T16:45:52.165Z
Learning: In the trufnetwork/node digest system, the backfill migration (internal/migrations/backfill-digest/) intentionally includes all days including the current day as candidates for pending_prune_days. The safety window logic for excluding recent days is handled by the auto_digest action via its preserve_past_days parameter, not during the backfill phase. This separation ensures the backfill creates a comprehensive queue while the processor handles recency filtering.
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/candidates.sql:1-8
Timestamp: 2025-09-03T16:44:35.205Z
Learning: In the trufnetwork/node repository digest system, the 24-hour safety window to exclude current partial days is enforced at the auto_digest action level via the preserve_past_days parameter (default 2), not in backfill preparation scripts like candidates.sql. The backfill scripts only identify candidate days for queueing, while auto_digest handles the actual processing with appropriate safety boundaries.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: acceptance-test
- GitHub Check: lint
This commit modifies the `go.mod` and `go.sum` files to ensure the `golang.org/x/text` dependency is correctly specified without the indirect comment. This change aims to maintain clarity and consistency in the dependency management of the project.
This commit updates the `kwil-db` and `kwil-db/core` dependencies in the `go.mod` and `go.sum` files to their latest versions, ensuring compatibility and access to the latest features and fixes. The updated versions are: - `kwil-db` updated to v0.10.3-0.20250905175054-602e824e33c2 - `kwil-db/core` updated to v0.4.3-0.20250905175054-602e824e33c2 These changes aim to maintain the project's dependency health and improve overall stability.
… tests This commit refactors the `metadata_test.go` file by improving the formatting of the test cases and enhancing the assertions for expected results. Key changes include: - Removed unnecessary whitespace and aligned the input parameters for better readability. - Updated the expected result tables to ensure clarity and accuracy in the test assertions. - Adjusted the excluded columns in the assertions to reflect the correct indices. These changes aim to enhance the maintainability and clarity of the test suite, ensuring more reliable test outcomes.
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
extensions/tn_digest/internal/engine_ops.go (1)
81-93: Differentiate “account not found” from real errors to avoid wrong nonceTreating every error as “not found” can mask DB failures and cause repeated tx failures.
- account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) + account, err := e.accounts.GetAccount(ctx, e.db, signerAccountID) var nextNonce uint64 - if err != nil { - // Account doesn't exist yet - use nonce 1 for first transaction - e.logger.Info("DEBUG: Account not found, using nonce 1 for first transaction", "account", fmt.Sprintf("%x", signerAccountID.Identifier), "error", err) - nextNonce = uint64(1) + if err != nil { + // Treat only explicit "not found" as first-tx case; surface other errors + msg := strings.ToLower(err.Error()) + if !strings.Contains(msg, "not found") && !strings.Contains(msg, "no rows") { + return fmt.Errorf("get account: %w", err) + } + e.logger.Info("Account not found, using nonce 1 for first transaction", "account", fmt.Sprintf("%x", signerAccountID.Identifier)) + nextNonce = 1 } else { // Account exists - use next nonce nextNonce = uint64(account.Nonce + 1) - e.logger.Info("DEBUG: Account found, using next nonce", "account", fmt.Sprintf("%x", signerAccountID.Identifier), "currentNonce", account.Nonce, "nextNonce", nextNonce, "balance", account.Balance) + e.logger.Info("Account found, using next nonce", "account", fmt.Sprintf("%x", signerAccountID.Identifier), "currentNonce", account.Nonce, "nextNonce", nextNonce, "balance", account.Balance) }
♻️ Duplicate comments (1)
extensions/tn_digest/broadcast.go (1)
64-95: Accept-mode polling + TxQuery propagation looks solidAddresses prior concern about nil/not-yet-committed results; now returns a non-nil TxResult or a clear error.
🧹 Nitpick comments (12)
extensions/tn_digest/scheduler_lifecycle.go (1)
43-44: Add a one-line rationale comment to prevent future regressionsA brief note explaining why ctx is ignored will save future refactors from “fixing” this.
Apply:
func (e *Extension) startScheduler(_ context.Context) error { - return e.Scheduler().Start(context.Background(), e.Schedule()) + // Intentionally use context.Background(): scheduler manages its own cancellation; upstream ctx can cause premature stops. + return e.Scheduler().Start(context.Background(), e.Schedule()) }tests/streams/query/metadata_test.go (2)
208-216: Stabilize ordering to avoid flaky comparisonThe expected rows assume a specific order, but no SortColumns are provided. Add a stable sort on value_i, created_at.
Apply:
table.AssertResultRowsEqualMarkdownTable(t, table.AssertResultRowsEqualMarkdownTableInput{ Actual: result, Expected: expected, - ExcludedColumns: []int{0, 1, 2}, + ExcludedColumns: []int{0, 1, 2}, + SortColumns: []string{"value_i", "created_at"}, })
214-216: Index-based ExcludedColumns is brittleConsider excluding by header name in the helper to decouple from column positions. If not feasible now, add a comment listing the excluded headers.
extensions/tn_digest/broadcast.go (3)
55-58: Clarify the comment; keep typed constants preferredSuggest noting that sync==1 is supported for backward compatibility, but callers should prefer rpcclient.BroadcastWaitCommit.
Apply:
- // Map sync flag to broadcast mode (callers should pass 1 for WaitCommit) + // Map sync flag to broadcast mode. + // Prefer using rpcclient.BroadcastWaitCommit; sync==1 is accepted for backward compatibility.
76-79: Return the tx hash on ctx cancellation to aid diagnosticsHelps upstream correlate the broadcast even if the follow-up query is canceled.
Apply:
- case <-ctx.Done(): - return types.Hash{}, nil, ctx.Err() + case <-ctx.Done(): + return h, nil, ctx.Err()
81-93: Improve the “nil result” error to indicate polling exhaustionDifferentiate “no commit observed after retries” from other query failures for easier triage.
Apply:
- if queryErr != nil { - return types.Hash{}, nil, fmt.Errorf("failed to query transaction result: %w", queryErr) - } + if queryErr != nil { + return types.Hash{}, nil, fmt.Errorf("failed to query transaction result: %w", queryErr) + } + if txQueryResp == nil || txQueryResp.Result == nil { + return types.Hash{}, nil, fmt.Errorf("transaction result not observed after %d attempts (~%dms)", 10, 10*200) + } @@ - if txQueryResp == nil || txQueryResp.Result == nil { - return types.Hash{}, nil, fmt.Errorf("transaction result is nil") - } + if txQueryResp == nil || txQueryResp.Result == nil { + return types.Hash{}, nil, fmt.Errorf("transaction result is nil") + }extensions/tn_digest/internal/engine_ops.go (6)
16-21: Add JSON tags to DigestTxResult and unmarshal directly into itRemoves duplicate struct and simplifies parsing.
type DigestTxResult struct { - ProcessedDays int - TotalDeletedRows int - HasMoreToDelete bool + ProcessedDays int `json:"processed_days"` + TotalDeletedRows int `json:"total_deleted_rows"` + HasMoreToDelete bool `json:"has_more_to_delete"` } @@ - // Parse JSON properly - var jsonResult struct { - ProcessedDays int `json:"processed_days"` - TotalDeletedRows int `json:"total_deleted_rows"` - HasMoreToDelete bool `json:"has_more_to_delete"` - } - - if err := json.Unmarshal([]byte(digestJSON), &jsonResult); err != nil { - return nil, fmt.Errorf("failed to parse digest JSON: %w", err) - } - - result := &DigestTxResult{ - ProcessedDays: jsonResult.ProcessedDays, - TotalDeletedRows: jsonResult.TotalDeletedRows, - HasMoreToDelete: jsonResult.HasMoreToDelete, - } - - return result, nil + // Parse JSON directly into the result type + var result DigestTxResult + if err := json.Unmarshal([]byte(digestJSON), &result); err != nil { + return nil, fmt.Errorf("failed to parse digest JSON: %w", err) + } + return &result, nilAlso applies to: 252-270
22-22: Replace magic number “1” with a named constant for broadcaster modeImproves readability/self-documentation.
+const defaultBroadcastMode uint8 = 1 @@ - hash, txResult, err := broadcaster(ctx, tx, 1) + hash, txResult, err := broadcaster(ctx, tx, defaultBroadcastMode) @@ - hash, txResult, err := broadcaster(ctx, tx, 1) + hash, txResult, err := broadcaster(ctx, tx, defaultBroadcastMode)Also applies to: 109-109, 194-194
144-149: Validate digest args earlyAvoids constructing/signing obviously bad txs.
func (e *EngineOperations) BroadcastAutoDigestWithArgsAndParse( @@ ) (*DigestTxResult, error) { + if deleteCap <= 0 { + return nil, fmt.Errorf("deleteCap must be > 0") + } + if expectedRecords < 0 { + return nil, fmt.Errorf("expectedRecords must be >= 0") + } + if preserveDays < 0 { + return nil, fmt.Errorf("preserveDays must be >= 0") + }
73-75: Consider returning DigestTxResult from BuildAndBroadcastAutoDigestTx for symmetryCallers may want parsed results just like the args variant.
-func (e *EngineOperations) BuildAndBroadcastAutoDigestTx(ctx context.Context, chainID string, signer auth.Signer, broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error)) error { +func (e *EngineOperations) BuildAndBroadcastAutoDigestTx(ctx context.Context, chainID string, signer auth.Signer, broadcaster func(context.Context, *ktypes.Transaction, uint8) (ktypes.Hash, *ktypes.TxResult, error)) (*DigestTxResult, error) { @@ - return fmt.Errorf("broadcast tx: %w", err) + return nil, fmt.Errorf("broadcast tx: %w", err) @@ - return fmt.Errorf("transaction failed with code %d (expected %d): %s", + return nil, fmt.Errorf("transaction failed with code %d (expected %d): %s", txResult.Code, uint32(ktypes.CodeOk), txResult.Log) @@ - result, err := parseDigestResultFromTxLog(txResult.Log) + result, err := parseDigestResultFromTxLog(txResult.Log) if err != nil { - return fmt.Errorf("failed to parse digest result: %w", err) + return nil, fmt.Errorf("failed to parse digest result: %w", err) } @@ - return nil + return result, nilAlso applies to: 132-132
59-66: Tighten “digest_config not found” detectionSubstring checks can hide other errors. Prefer a typed/structured error or a helper (e.g., errors.Is(..., ErrTableNotFound)).
101-109: Add deadlines/retries around broadcaster and DB callsBounded contexts and simple retry/backoff for transient errors will make the cron/drain loop more robust.
Also applies to: 185-195
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (5)
extensions/tn_digest/broadcast.go(3 hunks)extensions/tn_digest/internal/engine_ops.go(3 hunks)extensions/tn_digest/scheduler_lifecycle.go(1 hunks)go.mod(2 hunks)tests/streams/query/metadata_test.go(7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- go.mod
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/README.md:9-14
Timestamp: 2025-09-03T16:45:52.165Z
Learning: In the trufnetwork/node digest system, the backfill migration (internal/migrations/backfill-digest/) intentionally includes all days including the current day as candidates for pending_prune_days. The safety window logic for excluding recent days is handled by the auto_digest action via its preserve_past_days parameter, not during the backfill phase. This separation ensures the backfill creates a comprehensive queue while the processor handles recency filtering.
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/candidates.sql:1-8
Timestamp: 2025-09-03T16:44:35.205Z
Learning: In the trufnetwork/node repository digest system, the 24-hour safety window to exclude current partial days is enforced at the auto_digest action level via the preserve_past_days parameter (default 2), not in backfill preparation scripts like candidates.sql. The backfill scripts only identify candidate days for queueing, while auto_digest handles the actual processing with appropriate safety boundaries.
📚 Learning: 2025-09-03T19:06:50.873Z
Learnt from: MicBun
PR: trufnetwork/node#1140
File: extensions/tn_digest/scheduler/scheduler.go:75-85
Timestamp: 2025-09-03T19:06:50.873Z
Learning: In the tn_digest scheduler (extensions/tn_digest/scheduler/scheduler.go), the digest jobs intentionally use context.Background() instead of derived contexts to avoid cancellation issues during job execution. Using context.WithTimeout or derived contexts from s.ctx causes errors in this implementation.
Applied to files:
extensions/tn_digest/scheduler_lifecycle.go
🧬 Code graph analysis (2)
extensions/tn_digest/scheduler_lifecycle.go (1)
extensions/tn_digest/extension.go (1)
Extension(24-44)
tests/streams/query/metadata_test.go (3)
tests/streams/utils/procedure/execute.go (1)
ListMetadataByHeight(848-888)tests/streams/utils/procedure/types.go (1)
ListMetadataByHeightInput(163-172)tests/streams/utils/table/assert.go (2)
AssertResultRowsEqualMarkdownTable(19-93)AssertResultRowsEqualMarkdownTableInput(11-17)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: acceptance-test
🔇 Additional comments (10)
extensions/tn_digest/scheduler_lifecycle.go (1)
43-44: LGTM: use Background for scheduler start is consistent with scheduler constraintsMatches prior learning that derived ctx causes cancellations in scheduler; the underscore param also clarifies intent.
tests/streams/query/metadata_test.go (7)
198-202: LGTM: query inputs look correctPlatform, Key, and Height are passed as expected.
249-252: LGTM: no-key pathExercising the no-key filter is good coverage.
292-300: LGTM: pagination paramsLimit/offset wiring is correct.
311-316: LGTM: second page paginationCovers tail page behavior.
356-364: LGTM: invalid range checkGood to assert the error path and message content.
409-413: LGTM: negative limit behaviorMatches “convert negative to zero” expectation.
423-428: LGTM: negative offset behaviorValidates clamping or acceptance without error.
extensions/tn_digest/broadcast.go (1)
9-9: LGTM: time importRequired for short polling backoff.
extensions/tn_digest/internal/engine_ops.go (1)
176-183: No changes needed: arg order matches SQL signature and leader gating is enforced.
deleteCap, expectedRecordsPerStream, preservePastDays map to $delete_cap, $expected_records_per_stream, $preserve_past_days in internal/migrations/020-digest-actions.sql (lines 540–560), andcheck_leader_authorization()protects the action at runtime.
This commit enhances the `BroadcastAutoDigestWithArgsAndParse` function in `engine_ops.go` by refining the error handling for account retrieval. Key changes include: - Updated the error handling to only treat "not found" or "no rows" as missing accounts, failing fast on other errors. - Improved logging messages for both account not found and account found scenarios, enhancing clarity and consistency. Additionally, the `metadata_test.go` file was modified to replace a wrapped error with a simpler error message for expected empty results, improving test clarity. These changes aim to enhance the robustness and maintainability of the codebase.
There was a problem hiding this comment.
Actionable comments posted: 0
♻️ Duplicate comments (1)
tests/streams/query/metadata_test.go (1)
257-258: Fix validated: no longer wrapping nil errorReplacing errors.Wrapf(err, ...) with errors.New avoids returning nil when err is nil. Good catch.
🧹 Nitpick comments (5)
tests/streams/query/metadata_test.go (5)
214-216: Stabilize comparison with explicit sort; double-check excluded indicesIndex-based ExcludedColumns is brittle across engine/schema changes. Also, ordering may vary. Sort on stable columns to avoid flaky diffs.
Apply:
table.AssertResultRowsEqualMarkdownTable(t, table.AssertResultRowsEqualMarkdownTableInput{ Actual: result, Expected: expected, ExcludedColumns: []int{0, 1, 2}, + SortColumns: []string{"value_i", "created_at"}, })If any of the first three actual columns aren’t the ones you intend to drop, adjust the indices accordingly.
295-299: Pagination smoke-check is fine; consider asserting payload tooLength checks are OK, but adding a quick assert on returned value_i (or created_at) would detect ordering regressions.
311-315: Second page length check OK; same note on content validationOptional: assert the single row’s value_i to tighten the test.
356-364: Invalid range: make error assertion resilient to wording/casingRelying on the exact “Invalid height range” casing can be brittle across engine versions.
Apply:
- expectedError := "Invalid height range" - if !strings.Contains(err.Error(), expectedError) { - return errors.Errorf("expected error message to contain '%s', got: %s", expectedError, err.Error()) - } + if !strings.Contains(strings.ToLower(err.Error()), "invalid height range") { + return errors.Errorf("expected error message to contain 'invalid height range', got: %s", err.Error()) + }
423-427: Negative offset: clarify expected semanticsYou assert only “no error.” Consider asserting behavior (e.g., negative offset treated as 0) to lock semantics.
Example follow-up (optional):
zero := 0 baseline, _ := procedure.ListMetadataByHeight(ctx, procedure.ListMetadataByHeightInput{ Platform: platform, Key: metadataKey, Offset: &zero, Height: 1, }) // now compare lengths or first row with negative offset result
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (2)
extensions/tn_digest/internal/engine_ops.go(3 hunks)tests/streams/query/metadata_test.go(7 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- extensions/tn_digest/internal/engine_ops.go
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/README.md:9-14
Timestamp: 2025-09-03T16:45:52.165Z
Learning: In the trufnetwork/node digest system, the backfill migration (internal/migrations/backfill-digest/) intentionally includes all days including the current day as candidates for pending_prune_days. The safety window logic for excluding recent days is handled by the auto_digest action via its preserve_past_days parameter, not during the backfill phase. This separation ensures the backfill creates a comprehensive queue while the processor handles recency filtering.
Learnt from: outerlook
PR: trufnetwork/node#1135
File: internal/migrations/backfill-digest/candidates.sql:1-8
Timestamp: 2025-09-03T16:44:35.205Z
Learning: In the trufnetwork/node repository digest system, the 24-hour safety window to exclude current partial days is enforced at the auto_digest action level via the preserve_past_days parameter (default 2), not in backfill preparation scripts like candidates.sql. The backfill scripts only identify candidate days for queueing, while auto_digest handles the actual processing with appropriate safety boundaries.
🧬 Code graph analysis (1)
tests/streams/query/metadata_test.go (3)
tests/streams/utils/procedure/execute.go (1)
ListMetadataByHeight(848-888)tests/streams/utils/procedure/types.go (1)
ListMetadataByHeightInput(163-172)tests/streams/utils/table/assert.go (2)
AssertResultRowsEqualMarkdownTable(19-93)AssertResultRowsEqualMarkdownTableInput(11-17)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: acceptance-test
🔇 Additional comments (5)
tests/streams/query/metadata_test.go (5)
197-197: No-op changeWhitespace-only; nothing to review.
199-201: LGTM: correct input wiring for ListMetadataByHeightPassing Platform, Key, and Height is consistent with the procedure signature.
248-251: Behavior without Key: confirm intended semanticsThe call omits Key and expects zero rows later. Ensure the underlying procedure treats empty Key as “no results” rather than “no filter.”
Use the script above to inspect the procedure’s WHERE clause around key handling; update the expectation if the engine returns all keys by default.
208-211: No change needed: projection and test alignment confirmedAfter excluding the first three columns (stream_id, data_provider, row_id), the test’s header (
value_i | value_f | value_b | value_s | value_ref | created_at) matches the SQL SELECT order inlist_metadata_by_height.
409-413: No changes needed for negative limit normalization
Thelist_metadata_by_heightaction explicitly coerces negative$limit(and$offset) to0in its SQL definition, so the test’s assumption of empty results without error is correct.
This commit introduces a new task in the Taskfile for cleaning up lingering Kwil DB containers and processes, aimed at improving CI reliability. Additionally, a new script `ci-cleanup.sh` is added to handle the cleanup operations, including stopping and removing Docker containers and images associated with Kwil DB, as well as killing any processes that may bind to specific ports. The CI workflow is updated to invoke this cleanup script before each test attempt, ensuring a clean environment for retries.
Description
Related Problem
How Has This Been Tested?
Summary by CodeRabbit
New Features
Bug Fixes
Tests
Chores