chore: optimize digest system with UNNEST batch processing#1121
chore: optimize digest system with UNNEST batch processing#1121
Conversation
WalkthroughReplaces single-day digest workflow with an UNNEST-driven batch pipeline: removes Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Leader as Leader (@leader)
participant Auto as auto_digest
participant DB as DB (events, pending_prune_days)
participant Batch as batch_digest
Leader->>Auto: call auto_digest(batch_size)
Auto->>DB: select up to batch_size candidate rows -> arrays (stream_refs[], day_indexes[])
alt no candidates
Auto-->>Leader: processed_days=0, total_deleted_rows=0
else candidates found
Auto->>Batch: batch_digest(stream_refs[], day_indexes[])
Batch->>DB: UNNEST + compute OHLC, delete extra primitive_events (batched/capped), update type markers, remove pending_prune_days
Batch-->>Auto: processed_days, total_deleted_rows, total_preserved_rows
Auto-->>Leader: processed_days, total_deleted_rows
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
Time Submission Status
|
There was a problem hiding this comment.
Actionable comments posted: 0
🧹 Nitpick comments (5)
tests/streams/digest/digest_actions_test.go (1)
934-939: Consider extracting NOTICE log printing into a helper functionThe NOTICE log printing logic is duplicated between
callBatchDigestandcallAutoDigest. Consider extracting this into a helper function to follow DRY principles.+// printNoticeLogs prints NOTICE logs from engine call results +func printNoticeLogs(r *common.CallResult) { + if r != nil && r.Logs != nil && len(r.Logs) > 0 { + for i, log := range r.Logs { + fmt.Println("NOTICE log", i, ":", log) + } + } +} // In callBatchDigest function: - // Print debug information via NOTICE log - if r != nil && r.Logs != nil && len(r.Logs) > 0 { - for i, log := range r.Logs { - fmt.Println("NOTICE log", i, ":", log) - } - } + // Print debug information via NOTICE log + printNoticeLogs(r) // In callAutoDigest function: - // Print debug information via NOTICE log - if r != nil && r.Logs != nil && len(r.Logs) > 0 { - for i, log := range r.Logs { - fmt.Println("NOTICE log", i, ":", log) - } - } + // Print debug information via NOTICE log + printNoticeLogs(r)Also applies to: 982-987
internal/migrations/020-digest-actions.sql (4)
79-82: Leader authorization checks are commented outThe leader authorization checks are currently commented out. This aligns with the PR objectives mentioning that the implementation is blocked by issue #1586. However, these should be enabled once the blocking issue is resolved.
Would you like me to create a follow-up issue to track the re-enabling of leader authorization checks once the blocking dependency (trufnetwork/kwil-db#1586) is resolved?
Also applies to: 285-288
93-95: Initialize counter variables with explicit types for clarityConsider explicitly typing the counter variables for better code clarity and to avoid potential type inference issues.
- $total_processed := 0; - $total_deleted := 0; - $total_preserved := 0; + $total_processed INT := 0; + $total_deleted INT := 0; + $total_preserved INT := 0;
114-120: Consider using EXISTS for record count check optimizationFor better performance when only checking if records exist (>1), consider using EXISTS with LIMIT 2 instead of COUNT(*).
- -- Count records for this candidate - $record_count := 0; - for $count_row in SELECT COUNT(*) as cnt FROM primitive_events - WHERE stream_ref = $stream_ref - AND event_time >= $day_start AND event_time <= $day_end { - $record_count := $count_row.cnt; - } + -- Check if candidate has sufficient records (>1) + $record_count := 0; + for $row in SELECT 1 FROM primitive_events + WHERE stream_ref = $stream_ref + AND event_time >= $day_start AND event_time <= $day_end + LIMIT 2 { + $record_count := $record_count + 1; + }
266-268: Consider implementing configurable logging for debuggingThe commented debug NOTICE is useful. Consider making it configurable through a debug flag parameter or system setting.
+ -- Optional: Add a debug parameter to the action + -- if $debug_mode { + -- NOTICE('Skipping digest for day ' || $day_index::TEXT || ' in stream ' || $stream_ref::TEXT || ': only ' || $record_count::TEXT || ' records found'); + -- } } else { -- Uncomment for debugging if needed -- NOTICE('Skipping digest for day ' || $day_index::TEXT || ' in stream ' || $stream_ref::TEXT || ': only ' || $record_count::TEXT || ' records found'); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (3)
go.mod(1 hunks)internal/migrations/020-digest-actions.sql(2 hunks)tests/streams/digest/digest_actions_test.go(6 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
tests/streams/digest/digest_actions_test.go (5)
tests/streams/utils/procedure/execute.go (1)
WithSigner(432-436)tests/streams/utils/setup/common.go (1)
CreateDataProvider(192-233)tests/streams/utils/setup/primitive.go (2)
SetupPrimitiveFromMarkdown(122-128)MarkdownPrimitiveSetupInput(31-36)tests/streams/utils/table/assert.go (2)
AssertResultRowsEqualMarkdownTable(18-81)AssertResultRowsEqualMarkdownTableInput(11-16)tests/streams/utils/procedure/types.go (1)
ResultRow(32-32)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
- GitHub Check: lint
- GitHub Check: acceptance-test
🔇 Additional comments (14)
go.mod (1)
20-21: Verify compatibility and stability of updated kwil-db dependenciesgo.mod (lines 20–21) now pins:
github.com/trufnetwork/kwil-db v0.10.3-0.20250823041522-ec54fdc11d4f github.com/trufnetwork/kwil-db/core v0.4.3-0.20250823041522-ec54fdc11d4fgo.sum contains matching entries for both modules.
Our shallow-fetch inspection didn’t reveal any commit messages (including “UNNEST”) between the previous main commit (26effaee3b2a3a117b2b95d9fe014f57db2afdaa) and the featUnnest commit (ec54fd11d4f51efa2420518b1302e441954826c). To ensure this update truly includes the table-valued UNNEST support and introduces no regressions, please:
- Manually review the kwil-db repo’s commit history between those two SHAs to confirm the UNNEST implementation.
- Run integration tests that exercise batch-processing paths relying on UNNEST.
- Check for any breaking changes or regressions in existing functionality.
tests/streams/digest/digest_actions_test.go (5)
6-6: Good addition of the strings import for error validationThe
stringsimport is appropriately added to support error message validation in the new test cases.
36-40: Well-structured test coverage for the new batch digest functionalityThe addition of comprehensive test cases for batch digest operations provides excellent coverage of various scenarios including single/multiple candidates, empty arrays, mismatched arrays, and the optimized auto_digest function.
110-121: Update reflects the new batch processing return structureThe test correctly validates the new return structure from
auto_digestwhich now returnsprocessed_daysandtotal_deleted_rowscolumns, aligning with the batch processing implementation.
663-731: Comprehensive test setup for batch digest scenariosThe
WithBatchDigestTestSetupfunction provides excellent test data setup with multiple streams and varied data patterns, which effectively exercises the batch processing capabilities.
843-862: Robust error handling validation for mismatched arraysGood test coverage for the error case when array lengths don't match. The validation using substring matching is appropriate.
internal/migrations/020-digest-actions.sql (8)
1-19: Excellent documentation of the migration purpose and dependenciesThe header comment clearly explains the optimization strategy, performance improvements, and dependencies. The mention of UNNEST table-valued function requirement is particularly helpful.
31-51: Clever optimization using UNNEST with JOIN for bulk candidate validationThe
get_valid_digest_candidatesfunction effectively separates the UNNEST operation from the main processing logic, avoiding nested query issues while maintaining performance benefits. The use of INNER JOIN ensures only valid candidates are processed.
85-87: Good input validation for array length mismatchThe validation ensures data integrity by checking that both input arrays have the same length before processing.
175-196: Efficient deletion tracking and batch countingGood implementation of tracking deleted and preserved records. The calculation of deleted count by comparing initial and remaining counts is accurate and efficient.
204-258: Excellent implementation of combination flags for OHLC markersThe logic for handling combination flags when OHLC values overlap (e.g., when OPEN equals LOW) is well-implemented. The bitwise flag system (OPEN=1, HIGH=2, LOW=4, CLOSE=8) allows for efficient representation of multiple types in a single record.
290-305: Optimized candidate collection using ARRAY_AGGExcellent use of ARRAY_AGG to collect candidates in a single query, maintaining order and respecting batch size limits. This is a significant performance improvement over iterative processing.
307-310: Proper null handling for empty result caseGood defensive programming by checking for both NULL and empty array cases.
329-433: Comprehensive dual-path OHLC retrieval implementationThe
get_daily_ohlcfunction maintains backward compatibility by supporting both digested and raw data retrieval paths. The bitwise operations for extracting individual OHLC flags from combination values are correct.
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (5)
internal/migrations/020-digest-actions.sql (5)
205-218: Bit-flag scheme differs from acceptance criteria; confirm and either align or update retrieval logic accordinglyYour writes imply OPEN=1, HIGH=2, LOW=4, CLOSE=8 and your reads match that. The acceptance criteria say CLOSE=1, HIGH=2, LOW=4 (no OPEN specified). This can break downstream consumers.
Options:
- Keep OPEN=1/H=2/L=4/C=8 and update the acceptance spec/docs.
- Switch to C=1/H=2/L=4 and encode OPEN=8 (or omit OPEN markers). Then update all INSERT type computations and get_daily_ohlc predicates accordingly. I can supply a full, safe diff once you confirm the desired mapping.
Also applies to: 221-235, 237-249, 251-259, 357-393, 401-434
291-306: auto_digest should deduplicate candidates and propagate preserved count for observability
- If pending_prune_days contains duplicate pairs, ARRAY_AGG will forward duplicates to batch_digest.
- Returning preserved rows from auto_digest helps monitoring and test assertions.
Apply this diff to dedupe and pass through total_preserved_rows:
@@ - SELECT - ARRAY_AGG(stream_ref) as stream_refs, - ARRAY_AGG(day_index) as day_indexes + SELECT + ARRAY_AGG(stream_ref) as stream_refs, + ARRAY_AGG(day_index) as day_indexes FROM ( - SELECT stream_ref, day_index FROM pending_prune_days + SELECT DISTINCT stream_ref, day_index FROM pending_prune_days ORDER BY day_index ASC, stream_ref ASC LIMIT $batch_size ) AS ordered_candidates { @@ -) PUBLIC RETURNS TABLE( - processed_days INT, - total_deleted_rows INT +) PUBLIC RETURNS TABLE( + processed_days INT, + total_deleted_rows INT, + total_preserved_rows INT ) { @@ - $processed := 0; - $total_deleted := 0; + $processed := 0; + $total_deleted := 0; + $total_preserved := 0; @@ - for $result in batch_digest($stream_refs, $day_indexes) { - $processed := $result.processed_days; - $total_deleted := $result.total_deleted_rows; - } + for $result in batch_digest($stream_refs, $day_indexes) { + $processed := $result.processed_days; + $total_deleted := $result.total_deleted_rows; + $total_preserved := $result.total_preserved_rows; + } @@ - RETURN $processed, $total_deleted; + RETURN $processed, $total_deleted, $total_preserved;Also applies to: 313-323
351-364: get_daily_ohlc: tie-breakers in raw path should mirror digested logic for consistencyRaw-path HIGH/LOW use plain MAX/MIN without the tie-breakers used during digestion (event_time ASC, created_at DESC). This can produce different OHLC values pre- vs post-digest for the same day.
Apply this diff to mirror tie-breaking:
@@ - -- HIGH: Maximum value - for $row in SELECT MAX(value) as max_val FROM primitive_events - WHERE stream_ref = $stream_ref - AND event_time >= $day_start AND event_time <= $day_end { - $high_value := $row.max_val; - } + -- HIGH: Maximum value, tie-break by earliest time and created_at DESC + for $row in SELECT value FROM primitive_events + WHERE stream_ref = $stream_ref + AND event_time >= $day_start AND event_time < $day_end + ORDER BY value DESC, event_time ASC, created_at DESC + LIMIT 1 { + $high_value := $row.value; + } @@ - -- LOW: Minimum value - for $row in SELECT MIN(value) as min_val FROM primitive_events - WHERE stream_ref = $stream_ref - AND event_time >= $day_start AND event_time <= $day_end { - $low_value := $row.min_val; - } + -- LOW: Minimum value, tie-break by earliest time and created_at DESC + for $row in SELECT value FROM primitive_events + WHERE stream_ref = $stream_ref + AND event_time >= $day_start AND event_time < $day_end + ORDER BY value ASC, event_time ASC, created_at DESC + LIMIT 1 { + $low_value := $row.value; + }Also applies to: 366-391
200-203: Idempotency is good, but consider unique constraint or upsert semantics for primitive_event_typeYou delete all markers then insert anew. If concurrent runs race, duplicates could appear without a unique key, or a window exists where no markers exist. If primitive_event_type lacks a unique constraint (stream_ref, event_time), add one, or switch to INSERT … ON CONFLICT DO UPDATE (if supported).
I can author a small migration to add the unique constraint or adjust inserts to be idempotent-upserts.
Also applies to: 262-266
295-306: Observability: add minimal notices to aid ops while keeping noise controlledGiven the emphasis on “comprehensive logging”, add bounded notices summarizing the batch and per-day actions when thresholds or caps are hit.
Example insertions (commented now, easy to toggle):
@@ - $stream_refs := $candidates.stream_refs; - $day_indexes := $candidates.day_indexes; + $stream_refs := $candidates.stream_refs; + $day_indexes := $candidates.day_indexes; + -- NOTICE('auto_digest batch size=' || array_length($stream_refs)::TEXT); @@ - $processed := $result.processed_days; - $total_deleted := $result.total_deleted_rows; + $processed := $result.processed_days; + $total_deleted := $result.total_deleted_rows; + -- NOTICE('batch_digest processed=' || $processed::TEXT || ', deleted=' || $total_deleted::TEXT);Also applies to: 317-320
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/migrations/020-digest-actions.sql(2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: acceptance-test
🔇 Additional comments (1)
internal/migrations/020-digest-actions.sql (1)
111-121: Off-by-one day-boundary: use half-open [start, end) instead of inclusive endAll per-day scans include records where event_time = $day_end via “<= $day_end”. That mistakenly pulls the first event of the next day (00:00). Use “event_time < $day_end”.
Apply this diff across all day-range predicates:
- AND event_time >= $day_start AND event_time <= $day_end + AND event_time >= $day_start AND event_time < $day_endDo this consistently in:
- sentinel/COUNT queries,
- OHLC selection queries,
- DELETEs of primitive_events,
- DELETEs of primitive_event_type,
- get_daily_ohlc (both digested and raw branches).
Also applies to: 129-137, 141-149, 151-160, 163-172, 178-186, 189-194, 200-203, 344-348, 358-364, 366-373, 375-382, 401-407, 411-417, 419-424, 426-431
⛔ Skipped due to learnings
Learnt from: MicBun PR: trufnetwork/node#1118 File: internal/migrations/020-digest-actions.sql:47-56 Timestamp: 2025-08-20T14:54:34.405Z Learning: In the OHLC digest system for crypto markets, day boundaries are intentionally overlapping using `<= $day_end` so that today's close becomes tomorrow's open, reflecting the continuous 24/7 nature of crypto trading.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/migrations/020-digest-actions.sql (1)
347-452: Apply half-open day-window boundariesTo prevent next-day bleed-through, replace every
<= $day_endwith< $day_endin both the “digested” and “raw” paths of the action. Kwil’s SQL dialect does not include a bitwise-AND operator, so continue using the existing arithmetic bit-checks.Locations needing updates (in internal/migrations/020-digest-actions.sql around lines 347–452):
All occurrences of
AND event_time >= $day_start AND event_time <= $day_endshould become
AND event_time >= $day_start AND event_time < $day_endAll occurrences of
AND p.event_time >= $day_start AND p.event_time <= $day_endshould become
AND p.event_time >= $day_start AND p.event_time < $day_end
♻️ Duplicate comments (4)
internal/migrations/020-digest-actions.sql (4)
39-40: Avoid dialect-specific empty-array casts; rely on auto-initialized arraysPer our recent learning for Kwil, declaring
$var INT[];initializes it to an empty array. The explicitARRAY[]::INT[]cast may be unnecessary or dialect-sensitive.Apply this diff to simplify:
- $valid_stream_refs INT[] := ARRAY[]::INT[]; - $valid_day_indexes INT[] := ARRAY[]::INT[]; + $valid_stream_refs INT[]; + $valid_day_indexes INT[];
297-307: Enforce leader-only authorization in auto_digest as wellSame requirement as batch_digest.
Apply this diff:
- -- Leader authorization check, keep it commented out for now so test passing and until we can inject how leader is - -- if @caller != @leader { - -- ERROR('Only the leader node can execute auto digest operations'); - -- } + -- Enforce leader-only execution + if @caller != @leader { + ERROR('Only the leader node can execute auto digest operations'); + }
73-85: Enforce leader-only authorization as required by acceptance criteriaLeader-only checks are still commented out. The objectives require leader-only execution for digest actions.
Apply this diff:
- -- Leader authorization check, keep it commented out for now so test passing and until we can inject how leader is - -- if @caller != @leader { - -- ERROR('Only the leader node can execute batch digest operations'); - -- } + -- Enforce leader-only execution + if @caller != @leader { + ERROR('Only the leader node can execute batch digest operations'); + }
192-201: Implement the 10,000 soft-delete cap and chunk deletions to avoid long locksCurrent DELETE can remove unbounded rows in one statement, violating the cap and risking long-running transactions. Chunk deletes and stop at 10,000 per transaction, as per acceptance criteria.
Apply this diff for the per-day deletion loop:
- -- Delete excess records in single operation (keep only OHLC) - DELETE FROM primitive_events - WHERE stream_ref = $stream_ref - AND event_time >= $day_start AND event_time <= $day_end - AND NOT ( - (event_time = $open_time AND created_at = $open_created_at) OR - (event_time = $close_time AND created_at = $close_created_at) OR - (event_time = $high_time AND created_at = $high_created_at) OR - (event_time = $low_time AND created_at = $low_created_at) - ); + -- Delete excess records in chunks (keep only OHLC), respecting a 10,000 cap for the whole action + $batch_deleted := 0; + loop { + -- Collect a batch of keys to delete for this day (avoid touching OHLC rows) + for $k in + SELECT event_time, created_at + FROM primitive_events + WHERE stream_ref = $stream_ref + AND event_time >= $day_start AND event_time < $day_end + AND NOT ( + (event_time = $open_time AND created_at = $open_created_at) OR + (event_time = $close_time AND created_at = $close_created_at) OR + (event_time = $high_time AND created_at = $high_created_at) OR + (event_time = $low_time AND created_at = $low_created_at) + ) + LIMIT LEAST(1000, 10000 - $deleted_in_tx) + { + DELETE FROM primitive_events + WHERE stream_ref = $stream_ref + AND event_time = $k.event_time + AND created_at = $k.created_at; + $batch_deleted := $batch_deleted + 1; + $deleted_in_tx := $deleted_in_tx + 1; + if $deleted_in_tx >= 10000 { break; } + } + if $batch_deleted = 0 OR $deleted_in_tx >= 10000 { break; } + $deleted_this_day := $deleted_this_day + $batch_deleted; + $total_deleted := $total_deleted + $batch_deleted; + $batch_deleted := 0; + }And declare a transaction-scoped counter near your totals:
-- add near other totals at the top of the action $deleted_in_tx := 0;If Kwil requires explicit typing:
$deleted_in_tx INT := 0;.
🧹 Nitpick comments (4)
internal/migrations/020-digest-actions.sql (4)
139-185: OHLC tie-breakers: logic is solid; align comments and ensure consistent ordering
- OPEN/CLOSE queries and HIGH/LOW tie-breakers look deterministic and sensible.
- Nit: LOW comment says “tie-break by earliest time and created_at” — the code uses
created_at DESC(latest). Update the comment for accuracy.Apply this diff to fix the LOW comment:
- -- LOW: Minimum value, tie-break by earliest time and created_at + -- LOW: Minimum value, tie-break by earliest time and created_at DESC
220-230: Bit flags OPEN=1, HIGH=2, LOW=4, CLOSE=8 — aligned with updated rationale; please sync the specThe mapping matches real-world OHLC needs (OPEN added for gap detection). Please update issue #1106 acceptance criteria to avoid future confusion.
I can open a follow-up issue/PR to update the spec/checks if you want.
289-289: Consider minimal structured logging for operabilityAdd INFO-level notices for: day processed, rows deleted, preserved markers count, and when skip conditions trigger. This helps trace batch runs without being verbose.
Example:
-- After finishing a day -- RAISE INFO 'digested stream %, day %, deleted %, preserved %', $stream_ref, $day_index, $deleted_this_day, ($total_preserved - $prev_total_preserved);
1-19: Meta: digest_daily removal vs objectives — align ticket/PR descriptionThe objectives ask for a digest_daily action. This migration replaces it with batch_digest. If that’s the approved direction, please update #1106/PR description and any downstream references; otherwise consider reintroducing a thin digest_daily wrapper that delegates to batch_digest for compatibility.
I can draft either an issue update or a backward-compat wrapper action if desired.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/migrations/020-digest-actions.sql(2 hunks)
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-08-20T12:22:14.250Z
Learnt from: outerlook
PR: trufnetwork/node#1113
File: internal/migrations/020-digest-init.sql:3-11
Timestamp: 2025-08-20T12:22:14.250Z
Learning: In the trufnetwork/node repository, for the digest_config table in internal/migrations/020-digest-init.sql, the maintainer prefers to leave the table without initial seed rows, allowing the extension to handle the empty state case at runtime rather than seeding default configuration values in the migration.
Applied to files:
internal/migrations/020-digest-actions.sql
📚 Learning: 2025-08-22T12:03:10.476Z
Learnt from: outerlook
PR: trufnetwork/node#1119
File: internal/migrations/018-fix-array-ordering-get-stream-ids.sql:5-5
Timestamp: 2025-08-22T12:03:10.476Z
Learning: In Kwil, declaring an array variable like `$stream_refs int[];` automatically initializes it as an empty array, not NULL. This means array_append operations will work correctly without explicit initialization to `ARRAY[]::INT[]`.
Applied to files:
internal/migrations/020-digest-actions.sql
📚 Learning: 2025-08-23T07:55:06.669Z
Learnt from: MicBun
PR: trufnetwork/node#1121
File: internal/migrations/020-digest-actions.sql:4-19
Timestamp: 2025-08-23T07:55:06.669Z
Learning: Markets with trading hours (like US stock markets) require OPEN values in OHLC data to properly capture gaps-up and gaps-down between trading sessions. The bit-flag mapping OPEN=1, HIGH=2, LOW=4, CLOSE=8 is correct for real-world financial data processing.
Applied to files:
internal/migrations/020-digest-actions.sql
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: acceptance-test
🔇 Additional comments (9)
internal/migrations/020-digest-actions.sql (9)
31-53: Good helper action; aggregated UNNEST filter is clear and efficientThe single-pass COALESCE(array_agg(...)) + UNNEST+JOIN pattern is a solid way to pre-filter candidates and avoids nested-query pitfalls.
86-94: Input validation and early-return guard look goodLength parity check and empty-array early return are appropriate defenses here.
108-114: Nice: null/empty guard before iterating valid candidatesPrevents out-of-bounds/NULL issues when no candidates remain after filtering.
215-234: Type-marker rebuild sequence is reasonable and idempotent-friendlyWiping same-day markers then inserting combined-flag markers for OPEN/CLOSE/HIGH/LOW is a clean approach and will converge on repeat runs.
271-275: Good: LOW-only insert guarded to avoid duplicates across overlapsPrevents redundant markers when LOW coincides with OPEN/HIGH/CLOSE.
277-281: pending_prune_days cleanup is correct; place it after successful marker writesDeletion happens after marker reinserts, which preserves idempotency if failures occur earlier.
325-339: Empty-batch handling and delegation to batch_digest look goodEarly return and aggregation of results are straightforward.
18-19: UNNEST zip semantics and blocking dependencyUNNEST with multiple arrays (zip semantics) is required here. Please confirm the kwil-db version in this PR includes support (the linked blocker #1586). If support is partial, we may need a different approach (e.g., UNNEST one array + index, or JOIN LATERAL).
Would you like me to prepare a fallback version that does not rely on multi-arg UNNEST?
118-137: I’m running scripts to locate where$day_endis defined and used, and to find allevent_timepredicates (including inget_daily_ohlc) so we can confirm whether switching to a half-open interval (< $day_end) is required consistently.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
internal/migrations/020-digest-actions.sql (1)
402-407: Apply exclusive day_end in get_daily_ohlc as wellSame off-by-one applies to read paths; switch to event_time < day_end.
-AND p.event_time >= $day_start AND p.event_time <= $day_end +AND p.event_time >= $day_start AND p.event_time < $day_end @@ -AND p.event_time >= $day_start AND p.event_time <= $day_end +AND p.event_time >= $day_start AND p.event_time < $day_end @@ -AND p.event_time >= $day_start AND p.event_time <= $day_end +AND p.event_time >= $day_start AND p.event_time < $day_end @@ -AND p.event_time >= $day_start AND p.event_time <= $day_end +AND p.event_time >= $day_start AND p.event_time < $day_end @@ -AND event_time >= $day_start AND event_time <= $day_end +AND event_time >= $day_start AND event_time < $day_end @@ -AND event_time >= $day_start AND event_time <= $day_end +AND event_time >= $day_start AND event_time < $day_end @@ -AND event_time >= $day_start AND event_time <= $day_end +AND event_time >= $day_start AND event_time < $day_end @@ -AND event_time >= $day_start AND event_time <= $day_end +AND event_time >= $day_start AND event_time < $day_endAlso applies to: 411-416, 420-426, 429-435, 446-452, 455-461, 464-469, 471-475
♻️ Duplicate comments (3)
internal/migrations/020-digest-actions.sql (3)
340-348: Deterministic ARRAY_AGG ordering to keep paired arrays alignedAdd explicit ORDER BY inside ARRAY_AGG to guarantee index alignment across both arrays regardless of planner behavior.
- SELECT - ARRAY_AGG(stream_ref) as stream_refs, - ARRAY_AGG(day_index) as day_indexes + SELECT + ARRAY_AGG(stream_ref ORDER BY day_index ASC, stream_ref ASC) as stream_refs, + ARRAY_AGG(day_index ORDER BY day_index ASC, stream_ref ASC) as day_indexes
42-45: Enforce leader-only authorization (acceptance criteria) in both actionsLeader checks are still commented out in batch_digest and auto_digest. The linked issue requires leader-only execution.
--- a/internal/migrations/020-digest-actions.sql +++ b/internal/migrations/020-digest-actions.sql @@ - -- if @caller != @leader { - -- ERROR('Only the leader node can execute batch digest operations'); - -- } + if @caller != @leader { + ERROR('Only the leader node can execute batch digest operations'); + } @@ - -- if @caller != @leader { - -- ERROR('Only the leader node can execute auto digest operations'); - -- } + if @caller != @leader { + ERROR('Only the leader node can execute auto digest operations'); + }Also applies to: 331-334
217-235: Implement the 10,000-row soft-delete cap and avoid removing pending_prune_days unless the day is fully prunedAcceptance criteria require a cap of 10,000 deletions per transaction. Current DELETE is unbounded and pending_prune_days is removed unconditionally, which can leave days partially pruned but marked done.
@@ - -- Count initial records BEFORE deletion + -- Count initial records BEFORE deletion $initial_record_count := 0; @@ - -- Delete excess records (keep only OHLC) - DELETE FROM primitive_events - WHERE stream_ref = $stream_ref - AND event_time >= $day_start AND event_time <= $day_end - AND NOT ( - (event_time = $open_time AND created_at = $open_created_at) OR - (event_time = $close_time AND created_at = $close_created_at) OR - (event_time = $high_time AND created_at = $high_created_at) OR - (event_time = $low_time AND created_at = $low_created_at) - ); + -- Delete excess records (keep only OHLC), capped and chunked + if NOT DEFINED($deleted_in_tx) { + $deleted_in_tx := 0; + } + while $deleted_in_tx < 10000 { + $batch_deleted := 0; + for $d in SELECT event_time, created_at + FROM primitive_events + WHERE stream_ref = $stream_ref + AND event_time >= $day_start AND event_time < $day_end + AND NOT ( + (event_time = $open_time AND created_at = $open_created_at) OR + (event_time = $close_time AND created_at = $close_created_at) OR + (event_time = $high_time AND created_at = $high_created_at) OR + (event_time = $low_time AND created_at = $low_created_at) + ) + LIMIT LEAST(10000 - $deleted_in_tx, 1000) { + DELETE FROM primitive_events + WHERE stream_ref = $stream_ref + AND event_time = $d.event_time + AND created_at = $d.created_at; + $batch_deleted := $batch_deleted + 1; + } + if $batch_deleted = 0 { break; } + $deleted_in_tx := $deleted_in_tx + $batch_deleted; + if $deleted_in_tx >= 10000 { + -- Optional: RAISE INFO('Soft-delete cap reached (10,000). Stopping further deletions.'); + break; + } + } @@ - -- Calculate actual deletions + -- Calculate actual deletions $deleted_this_stream := $initial_record_count - $remaining_count; $total_deleted := $total_deleted + $deleted_this_stream; @@ - -- Delete old type markers + -- Delete old type markers DELETE FROM primitive_event_type WHERE stream_ref = $stream_ref - AND event_time >= $day_start AND event_time <= $day_end; + AND event_time >= $day_start AND event_time < $day_end; @@ - -- Insert OPEN marker with combined flags + -- Insert OPEN marker with combined flags + $preserved_for_day := 0; @@ INSERT INTO primitive_event_type (stream_ref, event_time, type) VALUES ($stream_ref, $open_time, $open_type); @@ - $total_preserved := $total_preserved + 1; + $preserved_for_day := $preserved_for_day + 1; + $total_preserved := $total_preserved + 1; @@ - INSERT INTO primitive_event_type (stream_ref, event_time, type) + INSERT INTO primitive_event_type (stream_ref, event_time, type) VALUES ($stream_ref, $close_time, $close_type); @@ - $total_preserved := $total_preserved + 1; + $preserved_for_day := $preserved_for_day + 1; + $total_preserved := $total_preserved + 1; @@ - INSERT INTO primitive_event_type (stream_ref, event_time, type) + INSERT INTO primitive_event_type (stream_ref, event_time, type) VALUES ($stream_ref, $high_time, $high_type); @@ - $total_preserved := $total_preserved + 1; + $preserved_for_day := $preserved_for_day + 1; + $total_preserved := $total_preserved + 1; @@ - INSERT INTO primitive_event_type (stream_ref, event_time, type) + INSERT INTO primitive_event_type (stream_ref, event_time, type) VALUES ($stream_ref, $low_time, 4); -- LOW flag only @@ - $total_preserved := $total_preserved + 1; + $preserved_for_day := $preserved_for_day + 1; + $total_preserved := $total_preserved + 1; @@ - -- Remove from pending queue - DELETE FROM pending_prune_days - WHERE stream_ref = $stream_ref AND day_index = $day_index; + -- Remove from pending queue only if fully pruned (remaining == preserved) + if $remaining_count <= $preserved_for_day { + DELETE FROM pending_prune_days + WHERE stream_ref = $stream_ref AND day_index = $day_index; + } + -- Stop processing more days in this transaction if cap reached + if $deleted_in_tx >= 10000 { break; }Also applies to: 310-313
🧹 Nitpick comments (7)
internal/migrations/020-digest-actions.sql (7)
5-7: Design drift: digest_daily removed but still an acceptance criterion—confirm intent and update scopeThe PR objectives require implementing digest_daily, but this migration replaces it with batch_digest and routes auto_digest to it. If digest_daily is intentionally deprecated, please update issue #1106 and dependent docs/tests to reflect the new API contract; otherwise, reintroduce a thin digest_daily wrapper that delegates to batch_digest for a single (stream_ref, day_index).
+-- Optional compatibility wrapper if digest_daily must remain addressable: +CREATE OR REPLACE ACTION digest_daily($stream_ref INT, $day_index INT) +) PUBLIC RETURNS TABLE(processed_days INT, total_deleted_rows INT, total_preserved_rows INT) { + for $r in batch_digest(ARRAY[$stream_ref], ARRAY[$day_index]) { + RETURN $r.processed_days, $r.total_deleted_rows, $r.total_preserved_rows; + } + RETURN 0,0,0; +};
61-73: Optional: aggregate valid candidates in one shot instead of row-by-row array_appendYou already initialize arrays—good. For larger batches, prefer a single SELECT with ARRAY_AGG to reduce per-row appends and improve readability.
- for $valid_candidate in - SELECT u.stream_ref, u.day_index - FROM UNNEST($stream_refs, $day_indexes) AS u(stream_ref, day_index) - INNER JOIN pending_prune_days ppd - ON ppd.stream_ref = u.stream_ref AND ppd.day_index = u.day_index { - - $valid_stream_refs := array_append($valid_stream_refs, $valid_candidate.stream_ref); - $valid_day_indexes := array_append($valid_day_indexes, $valid_candidate.day_index); - } + for $row in + SELECT + COALESCE(ARRAY_AGG(u.stream_ref), ARRAY[]::INT[]) AS vs, + COALESCE(ARRAY_AGG(u.day_index), ARRAY[]::INT[]) AS vd + FROM UNNEST($stream_refs, $day_indexes) AS u(stream_ref, day_index) + INNER JOIN pending_prune_days ppd + ON ppd.stream_ref = u.stream_ref AND ppd.day_index = u.day_index { + $valid_stream_refs := $row.vs; + $valid_day_indexes := $row.vd; + }
92-99: WITH RECURSIVE not needed (no recursion used)Use WITH instead of WITH RECURSIVE for clarity.
-WITH RECURSIVE stream_days AS ( +WITH stream_days AS (
81-88: Type width: INT[] may be too narrow for event_time/created_atIf primitive_events.event_time/created_at are BIGINT/TIMESTAMP-derived epoch values, INT can overflow (2038 problem) or truncate. Recommend BIGINT[] for times and created_ats.
- $ohlc_open_times INT[] := ARRAY[]::INT[]; - $ohlc_open_created_ats INT[] := ARRAY[]::INT[]; - $ohlc_close_times INT[] := ARRAY[]::INT[]; - $ohlc_close_created_ats INT[] := ARRAY[]::INT[]; - $ohlc_high_times INT[] := ARRAY[]::INT[]; - $ohlc_high_created_ats INT[] := ARRAY[]::INT[]; - $ohlc_low_times INT[] := ARRAY[]::INT[]; - $ohlc_low_created_ats INT[] := ARRAY[]::INT[]; + $ohlc_open_times BIGINT[] := ARRAY[]::BIGINT[]; + $ohlc_open_created_ats BIGINT[] := ARRAY[]::BIGINT[]; + $ohlc_close_times BIGINT[] := ARRAY[]::BIGINT[]; + $ohlc_close_created_ats BIGINT[] := ARRAY[]::BIGINT[]; + $ohlc_high_times BIGINT[] := ARRAY[]::BIGINT[]; + $ohlc_high_created_ats BIGINT[] := ARRAY[]::BIGINT[]; + $ohlc_low_times BIGINT[] := ARRAY[]::BIGINT[]; + $ohlc_low_created_ats BIGINT[] := ARRAY[]::BIGINT[];Also applies to: 124-131, 142-149, 158-165, 176-183
402-435: Optional: simplify flag tests with bitwise checks (if supported by kwil-db)Current modulo/division approach works but is less clear than bitwise AND. If supported, prefer (t.type & 1) = 1, (t.type & 2) = 2, (t.type & 4) = 4, (t.type & 8) = 8.
-AND (t.type % 2) = 1 +AND (t.type & 1) = 1 @@ -AND ((t.type / 2) % 2) = 1 +AND (t.type & 2) = 2 @@ -AND ((t.type / 4) % 2) = 1 +AND (t.type & 4) = 4 @@ -AND ((t.type / 8) % 2) = 1 +AND (t.type & 8) = 8
317-318: Add structured logging for observability (counts, caps, durations)Acceptance asks for comprehensive logging; currently silent. Recommend RAISE INFO logs summarizing processed_days, total_deleted_rows, total_preserved_rows, whether the 10,000 cap was hit, and per-day actions.
@@ - RETURN $total_processed, $total_deleted, $total_preserved; + -- RAISE INFO('batch_digest processed=% total_deleted=% total_preserved=%', $total_processed, $total_deleted, $total_preserved); + RETURN $total_processed, $total_deleted, $total_preserved; @@ - RETURN $processed, $total_deleted; + -- RAISE INFO('auto_digest processed=% total_deleted=%', $processed, $total_deleted); + RETURN $processed, $total_deleted;Also applies to: 367-368
318-318: Idempotency and test coverage—verify edge casesPlease ensure tests cover: day boundary at exact day_end second, ties on created_at for OHLC, days with only one record (skipped), partial-delete scenario when hitting cap (pending_prune_days should remain), and re-running digest on an already digested day. I can draft tests if helpful.
Also applies to: 367-368
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
internal/migrations/020-digest-actions.sql(2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-23T07:55:06.669Z
Learnt from: MicBun
PR: trufnetwork/node#1121
File: internal/migrations/020-digest-actions.sql:4-19
Timestamp: 2025-08-23T07:55:06.669Z
Learning: Markets with trading hours (like US stock markets) require OPEN values in OHLC data to properly capture gaps-up and gaps-down between trading sessions. The bit-flag mapping OPEN=1, HIGH=2, LOW=4, CLOSE=8 is correct for real-world financial data processing.
Applied to files:
internal/migrations/020-digest-actions.sql
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: acceptance-test
🔇 Additional comments (1)
internal/migrations/020-digest-actions.sql (1)
253-284: Thumbs up: OPEN/HIGH/LOW/CLOSE flag composition is correct and market-readyOPEN=1, HIGH=2, LOW=4, CLOSE=8 with composite markers handles session gaps and shared extrema deterministically. Good alignment with the clarified requirement for OPEN.
Also applies to: 289-306
|
I think I will create separate PR after this issue to limit the scope of this PR cc @outerlook |
resolves: #1106
Summary by CodeRabbit
New Features
Refactor
Tests
Chores