Skip to content

chore: normalized tables#1084

Merged
MicBun merged 43 commits intomainfrom
feat/normalized-tables
Aug 12, 2025
Merged

chore: normalized tables#1084
MicBun merged 43 commits intomainfrom
feat/normalized-tables

Conversation

@williamrusdyputra
Copy link
Contributor

@williamrusdyputra williamrusdyputra commented Jul 21, 2025

Summary by CodeRabbit

  • New Features

    • Added a data provider registration API and unified stream identification for more consistent handling.
  • Database & Schema Updates

    • Introduced a normalized data_providers table, new references and indexes, and migrated streams/events/metadata to use internal IDs.
  • Bug Fixes

    • Consolidated stream identity in queries and permission checks to reduce ambiguity and errors.
  • Tests

    • Updated many tests to register data providers during setup for more reliable test initialization.
  • Chores

    • Enhanced database size estimation, added migration tooling and CI workflow token configuration.

resolves: #1039

## How Has This Been Tested?
local node migration

if approved, will migrate on `tn-v2`
resolves: #1057

note: keeping old columns data alive to avoid downtime

## How Has This Been Tested?

<img width="496" height="215" alt="Screenshot 2025-07-15 at 16 20 02"
src="https://github.com/user-attachments/assets/a3bccc3e-f80a-466a-bf3d-acba998a573b"
/>
resolves: #1063

## How Has This Been Tested?
Batch Insertion on `query_test` file

<img width="473" height="51" alt="Screenshot 2025-07-15 at 20 34 16"
src="https://github.com/user-attachments/assets/b00dec7c-ae94-4504-a4be-1c474a5faca9"
/>
* chore: adjust common actions to normalized tables

* fix: join data providers

* chore: debug
* chore: adjust authorization actions to normalized tables

* chore: update actions

* chore: update actions

* fix: logic
* chore: adjust taxonomy actions to normalized tables

* chore: avoid rate limiting on github actions
* chore: adjust composed query actions to normalized tables

* chore: update derivates

* fix: stream ref for parent
* chore: adjust utilities actions to normalized tables

* chore: reduce database size expectation on test
@williamrusdyputra williamrusdyputra marked this pull request as ready for review July 30, 2025 07:08
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 9

🔭 Outside diff range comments (4)
tests/streams/multi_level_composed_test.go (1)

135-165: Fix inconsistent data provider references in test function.

The setup function now uses deployerMultiLevel but the test function still references deployerSharedComplex in lines 135, 142, and 165. This inconsistency will likely cause test failures.

Apply this diff to fix the inconsistent references:

-	multiLevelLocator := types.StreamLocator{StreamId: parentComposed, DataProvider: deployerSharedComplex}
+	multiLevelLocator := types.StreamLocator{StreamId: parentComposed, DataProvider: deployerMultiLevel}
-		StreamLocator: types.StreamLocator{StreamId: composed1, DataProvider: deployerSharedComplex},
+		StreamLocator: types.StreamLocator{StreamId: composed1, DataProvider: deployerMultiLevel},
-		StreamLocator: types.StreamLocator{StreamId: composed2, DataProvider: deployerSharedComplex},
+		StreamLocator: types.StreamLocator{StreamId: composed2, DataProvider: deployerMultiLevel},
internal/migrations/006-composed-query.sql (1)

290-299: Missing composite index for the new join predicate

primitive_events is now joined via:

ON pe.stream_ref = pw.primitive_stream_ref
AND pe.event_time BETWEEN pw.group_sequence_start AND pw.group_sequence_end

Create a multi-column index to keep this join selective:

+CREATE INDEX IF NOT EXISTS idx_pe_stream_ref_event_time
+  ON primitive_events (stream_ref, event_time);
internal/migrations/002-authorization.sql (2)

12-18: Lower-case variable is ignored – case-sensitive checks may fail

$lowercase_wallet_address is computed but never used in the subsequent equality checks (extension_agent, owner check, etc.). This leaves those comparisons case-sensitive and inconsistent with the rest of the codebase that lower-cases wallet addresses up-front.

-    $lowercase_wallet_address TEXT := LOWER($wallet_address);
+    $wallet_address := LOWER($wallet_address);       -- normalise in-place

Update the later usages (is_stream_owner, visibility checks, …) accordingly or switch everything to the local variable to avoid subtle permission mismatches.


66-71: Typo in error message references an undefined variable

$child_stream_id is not in scope; the intent is clearly $composing_stream_id.

-        ERROR('Stream does not exist: data_provider=' || $data_provider || ' stream_id=' || $child_stream_id);
+        ERROR('Stream does not exist: data_provider=' || $data_provider || ' stream_id=' || $composing_stream_id);

Without the fix the action will raise unbound variable at runtime.

♻️ Duplicate comments (7)
tests/streams/auth/auth_test.go (7)

153-153: Variable shadowing in error handling.

Similar to the previous instance, the err variable is being reused, which shadows the previous declaration.

Same variable shadowing issue as noted in line 90.


200-200: Variable reuse in stream creation.

Again, the err variable is being reused for the CreateStream call.

Same variable shadowing pattern noted throughout the file.


359-359: Variable reuse in taxonomy setup.

The err variable continues to be reused throughout the function for different operations.

Consistent with the variable shadowing pattern observed throughout the file.


480-480: Variable reuse continues.

The pattern of reusing the err variable for CreateStream call is consistent throughout the file.

Same variable shadowing pattern as noted elsewhere.


572-572: Variable reuse in metadata insertion.

The err variable reuse pattern continues in the InsertMetadata call.

Consistent with the variable shadowing pattern throughout the file.


884-884: Variable reuse in stream creation for deletion test.

The err variable reuse pattern continues in the deletion test's CreateStream call.

Consistent with the variable shadowing pattern noted throughout the file.


963-963: Variable reuse in filter test stream creation.

The final instance of err variable reuse in the CreateStream call for the filter test.

Final instance of the variable shadowing pattern observed throughout the file.

🧹 Nitpick comments (13)
tests/streams/aggregation/aggr07_inexistent_streams_test.go (1)

113-121: Consider removing redundant data provider recreation and registration.

The deployer is being recreated with the same address and the data provider is being registered again, which appears redundant since the same deployer and platform are being used from the previous test section.

Consider simplifying this section by removing the redundant operations:

-	deployer, err = util.NewEthereumAddressFromString("0x0000000000000000000000000000000000000123")
-	if err != nil {
-		return errors.Wrap(err, "error creating ethereum address")
-	}
-	platform = procedure.WithSigner(platform, deployer.Bytes())
-	err = setup.CreateDataProvider(ctx, platform, deployer.Address())
-	if err != nil {
-		return errors.Wrap(err, "error registering data provider")
-	}

The deployer and platform from the previous section should still be valid for the second test case.

internal/migrations/011-get-database-size.sql (2)

30-32: Validate primitive_events composite primary key size estimation.

The comment indicates a "Composite PK index (stream_ref + event_time + created_at)" with 60 bytes, but the calculation shows only 50 bytes for row data. Composite indexes typically require more storage than individual columns due to key ordering overhead.

Consider whether the 60-byte estimate for the composite primary key index is sufficient, as composite indexes often have higher overhead than the sum of individual column sizes.


37-39: Review metadata table variable-length field estimates.

The calculation uses fixed estimates for variable-length fields: metadata_key(~20) + value_text(~20). These are approximations that may not reflect actual data distribution.

Consider implementing dynamic size calculation based on actual data samples, or document these as conservative estimates that may need periodic review as data patterns change.

tests/streams/auth/auth_test.go (1)

90-90: Variable shadowing with error handling.

The variable err is being reused for the TransferStreamOwnership call, which shadows the previous error variable. While this works correctly, it could be clearer.

Consider using a different variable name or adding explicit assignment to improve code clarity:

-		err = procedure.TransferStreamOwnership(ctx, procedure.TransferStreamOwnershipInput{
+		err := procedure.TransferStreamOwnership(ctx, procedure.TransferStreamOwnershipInput{
internal/migrations/009-truflation-query.sql (1)

106-115: Redundant storage of (data_provider, stream_id) once stream_ref is present

The new insert now writes all three identifiers:

(stream_id, data_provider, …, stream_ref)

While this keeps legacy columns alive, it doubles the storage footprint and increases the risk of drift if any back-fill process updates one column but not the other.

If backward compatibility is no longer required downstream, consider:

-INSERT INTO primitive_events (stream_id, data_provider, …, stream_ref)
+INSERT INTO primitive_events (stream_ref, event_time, value, created_at, truflation_created_at)

and add a view for consumers still expecting the old columns.

internal/migrations/006-composed-query.sql (1)

57-58: Move get_stream_id after existence / permission checks

Calling get_stream_id before verifying the caller’s read / compose rights exposes an unnecessary execution path (and potential exception) for requests that will be rejected anyway.

- $stream_ref := get_stream_id($data_provider, $stream_id);- IF !is_allowed_to_read_all(…)
+ IF !is_allowed_to_read_all(…)
+     ERROR('Not allowed to read stream');
+ END IF;
+ $stream_ref := get_stream_id($data_provider, $stream_id);
internal/migrations/004-composed-taxonomy.sql (2)

52-60: Potential duplicate state – both legacy and normalised columns kept

insert_taxonomy now writes data_provider, stream_id as well as stream_ref.
Keeping both increases the risk of future divergence (e.g., manual UPDATE that touches one set but not the other).

If backward compatibility is still required, mark the string columns as generated or introduce a view; otherwise drop them to enforce single-source-of-truth.


102-109: Missing composite index – get_current_group_sequence will scan full table

The new filter WHERE stream_ref = $stream_ref AND disabled_at IS NULL is performance-critical.
Add a covering index:

CREATE INDEX IF NOT EXISTS tax_streamref_disabled_gs_idx
ON taxonomies(stream_ref, disabled_at, group_sequence DESC);
internal/migrations/007-composed-query-derivate.sql (1)

513-514: Hard-coded recursion depth (10) may truncate deep hierarchies

Streams composed >10 levels deep will silently drop grandchildren.
Either remove the limit or expose it as a configurable constant validated against max_recursion_depth setting.

internal/migrations/001-common-actions.sql (4)

131-140: Inefficient & brittle lookup for $data_provider_id

The FOR-loop is unnecessary; a single SELECT … INTO suffices.
More importantly, if another transaction inserts the provider between the create_data_provider call and this lookup, you’ll still hit the NOT FOUND error. Consider:

SELECT id INTO $data_provider_id
FROM data_providers
WHERE address = $data_provider;

IF $data_provider_id IS NULL THEN
    ERROR('Data provider not found: ' || $data_provider);
END IF;

167-183: Race condition when allocating stream ids

ROW_NUMBER() + COALESCE(MAX(id),0) has the same concurrency flaw as the provider table; overlapping transactions can hand out duplicate ids.
Post-greSQL sequences (or identity columns) are the canonical fix.


367-387: Redundant double call to get_stream_id

$stream_ref is computed at Line 367 and reassigned at Line 386. Besides wasted work, it risks accidental divergence if the underlying data changes mid-transaction. Drop the first assignment.

-    $stream_ref := get_stream_id($data_provider, $stream_id);-    $stream_ref INT := get_stream_id($data_provider, $stream_id);
+    $stream_ref := get_stream_id($data_provider, $stream_id);

539-542: delete_stream orphaning metadata & taxonomy rows

Only the streams row is deleted; related rows in metadata, taxonomies, etc., remain and reference a nonexistent stream. Either:

  1. Add ON DELETE CASCADE foreign keys, or
  2. Explicitly delete dependent rows inside the action.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 545db39 and 823cf78.

📒 Files selected for processing (49)
  • .github/workflows/ci.yaml (1 hunks)
  • extensions/tn_cache/internal/engine_ops_integration_test.go (3 hunks)
  • extensions/tn_cache/internal/engine_ops_permissions_test.go (3 hunks)
  • internal/migrations/000-initial-data.sql (1 hunks)
  • internal/migrations/001-common-actions.sql (34 hunks)
  • internal/migrations/002-authorization.sql (21 hunks)
  • internal/migrations/003-primitive-insertion.sql (3 hunks)
  • internal/migrations/004-composed-taxonomy.sql (7 hunks)
  • internal/migrations/005-primitive-query.sql (6 hunks)
  • internal/migrations/006-composed-query.sql (18 hunks)
  • internal/migrations/007-composed-query-derivate.sql (4 hunks)
  • internal/migrations/008-public-query.sql (1 hunks)
  • internal/migrations/009-truflation-query.sql (3 hunks)
  • internal/migrations/010-get-latest-write-activity.sql (1 hunks)
  • internal/migrations/011-get-database-size.sql (1 hunks)
  • internal/migrations/017-normalize-tables.sql (1 hunks)
  • tests/extensions/tn_cache/cache_height_tracking_test.go (2 hunks)
  • tests/extensions/tn_cache/cache_integration_test.go (3 hunks)
  • tests/extensions/tn_cache/cache_observability_test.go (2 hunks)
  • tests/extensions/tn_cache/resolution_transaction_test.go (2 hunks)
  • tests/streams/aggregation/aggr01_basic_aggregation_test.go (1 hunks)
  • tests/streams/aggregation/aggr02_weighted_contributions_test.go (1 hunks)
  • tests/streams/aggregation/aggr03_taxonomy_validity_periods_test.go (1 hunks)
  • tests/streams/aggregation/aggr04_missing_data_handling_test.go (1 hunks)
  • tests/streams/aggregation/aggr05_no_duplicate_child_streams_test.go (1 hunks)
  • tests/streams/aggregation/aggr06_single_active_taxonomy_test.go (1 hunks)
  • tests/streams/aggregation/aggr07_inexistent_streams_test.go (2 hunks)
  • tests/streams/aggregation/aggr08_weight_change_test.go (1 hunks)
  • tests/streams/aggregation/aggr09_duplicate_values_test.go (1 hunks)
  • tests/streams/aggregation/get_all_weights_for_query_test.go (2 hunks)
  • tests/streams/aggregation/get_category_test.go (1 hunks)
  • tests/streams/auth/auth_test.go (14 hunks)
  • tests/streams/common_test.go (5 hunks)
  • tests/streams/complex_composed_test.go (3 hunks)
  • tests/streams/composed_test.go (4 hunks)
  • tests/streams/comprehensive_shared_path_independence_test.go (1 hunks)
  • tests/streams/database_size_test.go (2 hunks)
  • tests/streams/gamefi_index_test.go (2 hunks)
  • tests/streams/index_change_test.go (1 hunks)
  • tests/streams/multi_level_composed_test.go (1 hunks)
  • tests/streams/other/other_test.go (5 hunks)
  • tests/streams/other/stream_exists_batch_test.go (1 hunks)
  • tests/streams/primitive_test.go (1 hunks)
  • tests/streams/query/metadata_test.go (1 hunks)
  • tests/streams/query/query_test.go (2 hunks)
  • tests/streams/roles/permission_gates_test.go (1 hunks)
  • tests/streams/truflation_composed_frozen_test.go (1 hunks)
  • tests/streams/truflation_primitive_frozen_test.go (1 hunks)
  • tests/streams/utils/setup/common.go (1 hunks)
🧰 Additional context used
🧠 Learnings (6)
tests/extensions/tn_cache/resolution_transaction_test.go (1)

Learnt from: outerlook
PR: #1055
File: extensions/tn_cache/internal/db_ops.go:40-43
Timestamp: 2025-07-14T18:18:20.905Z
Learning: The SetTx method in extensions/tn_cache/internal/db_ops.go is only used in tests that run sequentially, so concurrency concerns about race conditions don't apply to this method.

extensions/tn_cache/internal/engine_ops_permissions_test.go (1)

Learnt from: outerlook
PR: #1055
File: extensions/tn_cache/internal/db_ops.go:40-43
Timestamp: 2025-07-14T18:18:20.905Z
Learning: The SetTx method in extensions/tn_cache/internal/db_ops.go is only used in tests that run sequentially, so concurrency concerns about race conditions don't apply to this method.

tests/extensions/tn_cache/cache_height_tracking_test.go (1)

Learnt from: outerlook
PR: #1055
File: extensions/tn_cache/internal/db_ops.go:40-43
Timestamp: 2025-07-14T18:18:20.905Z
Learning: The SetTx method in extensions/tn_cache/internal/db_ops.go is only used in tests that run sequentially, so concurrency concerns about race conditions don't apply to this method.

tests/extensions/tn_cache/cache_integration_test.go (1)

Learnt from: outerlook
PR: #1055
File: extensions/tn_cache/internal/db_ops.go:40-43
Timestamp: 2025-07-14T18:18:20.905Z
Learning: The SetTx method in extensions/tn_cache/internal/db_ops.go is only used in tests that run sequentially, so concurrency concerns about race conditions don't apply to this method.

extensions/tn_cache/internal/engine_ops_integration_test.go (1)

Learnt from: outerlook
PR: #1055
File: extensions/tn_cache/internal/db_ops.go:40-43
Timestamp: 2025-07-14T18:18:20.905Z
Learning: The SetTx method in extensions/tn_cache/internal/db_ops.go is only used in tests that run sequentially, so concurrency concerns about race conditions don't apply to this method.

tests/extensions/tn_cache/cache_observability_test.go (1)

Learnt from: outerlook
PR: #1055
File: extensions/tn_cache/internal/db_ops.go:40-43
Timestamp: 2025-07-14T18:18:20.905Z
Learning: The SetTx method in extensions/tn_cache/internal/db_ops.go is only used in tests that run sequentially, so concurrency concerns about race conditions don't apply to this method.

🧬 Code Graph Analysis (33)
tests/streams/aggregation/aggr04_missing_data_handling_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr03_taxonomy_validity_periods_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr06_single_active_taxonomy_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr08_weight_change_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/extensions/tn_cache/resolution_transaction_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr01_basic_aggregation_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
extensions/tn_cache/internal/engine_ops_permissions_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/query/metadata_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr02_weighted_contributions_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/roles/permission_gates_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr05_no_duplicate_child_streams_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/get_all_weights_for_query_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/get_category_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr09_duplicate_values_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/extensions/tn_cache/cache_height_tracking_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/gamefi_index_test.go (2)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/setup/primitive.go (2)
  • SetupPrimitive (44-64)
  • SetupPrimitiveInput (38-42)
tests/streams/other/stream_exists_batch_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/comprehensive_shared_path_independence_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/index_change_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/truflation_primitive_frozen_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/extensions/tn_cache/cache_integration_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/primitive_test.go (1)
tests/streams/utils/setup/common.go (3)
  • CreateDataProvider (192-233)
  • CreateStream (30-32)
  • StreamInfo (20-23)
tests/streams/query/query_test.go (3)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/setup/primitive.go (2)
  • SetupPrimitiveFromMarkdown (122-128)
  • MarkdownPrimitiveSetupInput (31-36)
tests/streams/utils/setup/composed.go (2)
  • SetupComposedFromMarkdown (186-192)
  • MarkdownComposedSetupInput (73-80)
tests/streams/common_test.go (2)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/procedure/metadata.go (2)
  • InsertMetadata (293-328)
  • InsertMetadataInput (283-290)
tests/streams/truflation_composed_frozen_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/setup/common.go (1)
tests/streams/utils/setup/roles.go (1)
  • AddMemberToRoleBypass (73-98)
extensions/tn_cache/internal/engine_ops_integration_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/other/other_test.go (1)
tests/streams/utils/setup/common.go (3)
  • CreateDataProvider (192-233)
  • CreateStream (30-32)
  • StreamInfo (20-23)
tests/streams/auth/auth_test.go (3)
tests/streams/utils/setup/common.go (3)
  • CreateDataProvider (192-233)
  • CreateStream (30-32)
  • StreamInfo (20-23)
tests/streams/utils/procedure/metadata.go (2)
  • TransferStreamOwnership (338-371)
  • TransferStreamOwnershipInput (330-335)
tests/streams/utils/procedure/execute.go (2)
  • SetTaxonomy (487-533)
  • WithSigner (432-436)
tests/streams/complex_composed_test.go (3)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/setup/composed.go (2)
  • SetupComposedFromMarkdown (186-192)
  • MarkdownComposedSetupInput (73-80)
tests/streams/utils/procedure/execute.go (1)
  • WithSigner (432-436)
tests/streams/multi_level_composed_test.go (2)
tests/streams/utils/procedure/execute.go (1)
  • WithSigner (432-436)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr07_inexistent_streams_test.go (2)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/procedure/execute.go (1)
  • WithSigner (432-436)
tests/extensions/tn_cache/cache_observability_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: acceptance-test
🔇 Additional comments (84)
internal/migrations/000-initial-data.sql (1)

14-25: Well-designed data_providers table with proper constraints.

The new data_providers table is well-structured for the normalization effort:

  • Uses INT primary key for efficient internal referencing
  • Proper Ethereum address validation with check constraint
  • Unique index on address for fast lookups and uniqueness enforcement
  • Includes created_at timestamp for audit trail

The design effectively supports the migration from address/stream_id pairs to internal stream references.

tests/streams/aggregation/aggr06_single_active_taxonomy_test.go (1)

52-55: Consistent data provider registration implementation.

The addition of setup.CreateDataProvider call is properly placed and handled:

  • Correctly positioned after signer setup but before stream operations
  • Proper error wrapping with descriptive message
  • Follows the pattern established across other test files for the normalization effort

This change ensures the test environment aligns with the new schema requiring explicit data provider registration.

tests/streams/aggregation/aggr04_missing_data_handling_test.go (1)

55-58: Consistent data provider registration pattern.

The setup.CreateDataProvider call follows the same pattern as other test files:

  • Proper placement after signer setup
  • Appropriate error handling with descriptive message
  • Maintains consistency across the test suite for the normalization effort
tests/streams/aggregation/aggr03_taxonomy_validity_periods_test.go (1)

59-62: Maintains consistent data provider registration pattern.

The implementation matches the pattern established in other test files:

  • Same logical placement in test setup flow
  • Consistent error handling and messaging
  • Supports the schema normalization requirements
tests/streams/aggregation/aggr08_weight_change_test.go (1)

60-63: Systematic implementation of data provider registration.

The setup.CreateDataProvider call maintains the established pattern across all test files:

  • Consistent placement and error handling
  • Systematic approach to supporting schema normalization
  • Proper integration with existing test setup flow

This demonstrates a well-coordinated migration effort across the test suite.

tests/streams/aggregation/aggr01_basic_aggregation_test.go (1)

53-56: LGTM! Data provider registration follows the expected pattern.

The addition of setup.CreateDataProvider call is correctly placed after setting the signer and includes proper error handling with descriptive error wrapping. This aligns with the broader normalization effort for data provider management.

tests/streams/aggregation/aggr09_duplicate_values_test.go (1)

62-65: LGTM! Consistent data provider registration implementation.

The setup.CreateDataProvider call is properly implemented with appropriate error handling and placement in the test flow, consistent with the pattern used across other test files.

tests/streams/truflation_primitive_frozen_test.go (1)

577-580: LGTM! Data provider registration properly placed in setup function.

The setup.CreateDataProvider call is correctly positioned in the setup function after setting the signer and before deploying the primitive stream. This ensures the data provider is registered before any stream operations, which aligns with the new data provider requirements.

.github/workflows/ci.yaml (1)

59-60: LGTM! Standard GitHub Actions authentication configuration.

Adding the repo-token parameter with ${{ secrets.GITHUB_TOKEN }} is a standard practice for GitHub Actions that need repository access. This ensures the arduino/setup-task action has the necessary permissions.

tests/streams/aggregation/aggr05_no_duplicate_child_streams_test.go (1)

47-50: LGTM! Consistent implementation of data provider registration.

The setup.CreateDataProvider call is properly implemented with correct error handling and placement, maintaining consistency with the pattern established across other test files in this normalization effort.

tests/streams/roles/permission_gates_test.go (1)

43-47: LGTM! Proper data provider registration implementation.

The addition of explicit data provider registration for both authorizedWriter and managerWallet addresses is correctly implemented and aligns with the database normalization changes. The error handling appropriately fails the test early if registration fails, and the timing before role setup is correct.

internal/migrations/008-public-query.sql (2)

117-117: LGTM! Proper stream reference normalization.

The introduction of $stream_ref variable using get_stream_id($data_provider, $stream_id) correctly implements the migration to internal stream references, aligning with the broader normalization effort described in the PR objectives.


123-123: LGTM! Consistent use of normalized stream reference.

The metadata query filter now correctly uses stream_ref = $stream_ref instead of separate provider/stream_id fields, which should improve query performance and maintains consistency with the normalized schema.

tests/extensions/tn_cache/resolution_transaction_test.go (2)

7-7: LGTM! Necessary import for error handling.

The addition of the errors package import supports the error wrapping used in the data provider registration.


34-37: LGTM! Consistent data provider registration implementation.

The explicit registration of the deployer as a data provider follows the established pattern across the codebase. The error handling correctly fails the test early if registration fails, which is appropriate for the test context.

tests/extensions/tn_cache/cache_observability_test.go (2)

9-9: LGTM! Necessary import for error handling.

The addition of the errors package import supports the error wrapping used in the data provider registration.


43-46: LGTM! Consistent data provider registration implementation.

The explicit registration of the deployer as a data provider follows the established pattern across the codebase. The error handling correctly fails the test early if registration fails, and the timing after setting the signer is appropriate.

extensions/tn_cache/internal/engine_ops_permissions_test.go (3)

9-9: LGTM! Necessary import for error handling.

The addition of the errors package import supports the error wrapping used in the data provider registration.


23-25: LGTM! Improved test documentation.

The updated comments provide clearer description of the test purpose and the SQL authorization mechanism being tested.


43-46: LGTM! Consistent data provider registration implementation.

The explicit registration of the deployer as a data provider follows the established pattern across the codebase. The error handling correctly fails the test early if registration fails, and the timing after setting the signer is appropriate.

tests/streams/index_change_test.go (1)

38-41: LGTM! Proper data provider registration added.

The explicit call to setup.CreateDataProvider aligns with the database normalization changes and follows the consistent pattern across test files. The error handling is appropriate.

tests/streams/aggregation/get_all_weights_for_query_test.go (1)

458-461: LGTM! Data provider registration properly integrated.

The placement of setup.CreateDataProvider is correct - after setting the signer and before stream operations. The error handling follows the established pattern across the codebase.

tests/streams/query/metadata_test.go (1)

48-51: LGTM! Consistent data provider registration implementation.

The setup.CreateDataProvider call is correctly positioned between signer setup and stream creation, with proper error handling that matches the pattern used throughout the codebase.

tests/streams/comprehensive_shared_path_independence_test.go (1)

51-54: LGTM! Data provider registration correctly positioned.

The setup.CreateDataProvider call is appropriately placed after signer setup and before primitive deployment, with consistent error handling that aligns with the normalization changes.

tests/streams/gamefi_index_test.go (2)

117-120: LGTM! Proper data provider registration in test setup.

The setup.CreateDataProvider call is correctly positioned after signer setup and before primitive stream creation, with appropriate error handling.


280-283: LGTM! Consistent data provider registration pattern.

The data provider registration follows the same correct pattern as the other setup function, ensuring proper initialization before stream operations.

tests/streams/aggregation/get_category_test.go (1)

125-128: LGTM: Proper data provider registration implementation

The data provider registration follows the established pattern correctly. The call to setup.CreateDataProvider is appropriately placed after setting the signer and before stream creation, with proper error handling and descriptive error wrapping.

tests/streams/aggregation/aggr02_weighted_contributions_test.go (1)

53-56: LGTM: Consistent data provider registration

The data provider registration is correctly implemented following the established pattern. The placement before stream setup operations and the error handling are appropriate.

tests/streams/truflation_composed_frozen_test.go (1)

52-55: LGTM: Appropriate data provider registration for Truflation tests

The data provider registration correctly uses the Truflation-specific deployer address and follows the established pattern. The placement in the setup function before primitive deployment is appropriate.

tests/streams/primitive_test.go (1)

41-45: LGTM: Proper data provider registration in primitive test

The data provider registration is correctly implemented with the appropriate address and proper error variable reuse. The placement before stream creation follows the established pattern.

tests/streams/other/stream_exists_batch_test.go (1)

40-43: LGTM: Consistent data provider registration implementation

The data provider registration follows the established pattern correctly with proper error handling and appropriate placement before stream operations.

tests/streams/multi_level_composed_test.go (2)

46-46: LGTM! Consistent signer change.

The change from deployerSharedComplex to deployerMultiLevel aligns with the test's purpose and variable naming.


49-52: LGTM! Proper data provider registration.

The explicit registration of the data provider using setup.CreateDataProvider is consistent with the broader normalization changes and includes proper error handling.

tests/streams/database_size_test.go (3)

40-43: LGTM! Proper data provider registration.

The explicit registration of the data provider using setup.CreateDataProvider is consistent with the normalization changes and includes proper error handling.


46-46: LGTM! Improved error handling.

The explicit error assignment makes the error handling more clear and consistent with Go best practices.


77-77: LGTM! Updated expected database size.

The change from 2118 to 1029 reflects the updated database size calculation logic introduced in the migration 011-get-database-size.sql.

tests/extensions/tn_cache/cache_height_tracking_test.go (2)

9-9: LGTM! Added necessary import.

The github.com/pkg/errors import is required for the error wrapping used in the data provider registration.


43-46: LGTM! Proper data provider registration.

The explicit registration of the data provider using setup.CreateDataProvider is consistent with the normalization changes and includes proper error handling.

extensions/tn_cache/internal/engine_ops_integration_test.go (3)

9-9: LGTM! Added necessary import.

The github.com/pkg/errors import is required for the error wrapping used in the data provider registration.


49-52: LGTM! Proper data provider registration.

The explicit registration of the data provider using setup.CreateDataProvider is consistent with the normalization changes and includes proper error handling.


347-350: LGTM! Proper data provider registration.

The explicit registration of the data provider using setup.CreateDataProvider is consistent with the normalization changes and includes proper error handling.

tests/streams/aggregation/aggr07_inexistent_streams_test.go (1)

50-53: LGTM! Proper data provider registration.

The explicit registration of the data provider using setup.CreateDataProvider is consistent with the normalization changes and includes proper error handling.

tests/streams/query/query_test.go (2)

95-98: LGTM! Proper data provider registration in test setup.

The addition of explicit data provider registration using setup.CreateDataProvider is well-placed and correctly implemented. The error handling appropriately wraps and returns any registration failures, preventing further test execution with incomplete setup.


912-915: LGTM! Consistent data provider registration pattern.

The same pattern is correctly applied to the composed query test setup function, maintaining consistency across test helpers and ensuring proper data provider registration before stream operations.

tests/streams/composed_test.go (4)

46-49: LGTM! Proper data provider registration in test wrapper.

The explicit registration of the data provider is correctly placed after setting the platform signer and before stream creation. The error handling appropriately prevents test execution if registration fails.


72-75: LGTM! Consistent registration pattern in standalone test.

The data provider registration follows the same correct pattern as the wrapper function, ensuring proper setup for tests that don't use the wrapper.


133-136: LGTM! Proper registration before stream operations.

The registration is correctly positioned before stream creation operations, maintaining the required setup sequence.


238-241: LGTM! Registration for non-owner account properly handled.

The registration is correctly applied for the non-owner account, ensuring the test can proceed with proper data provider setup even when testing authorization restrictions.

tests/extensions/tn_cache/cache_integration_test.go (3)

10-10: LGTM! Necessary import for error wrapping.

The addition of the errors package import is required to support the error wrapping used in the data provider registration calls.


55-58: LGTM! Proper data provider registration for cache tests.

The data provider registration is correctly positioned after setting the platform signer and before stream setup operations, ensuring proper initialization for cache functionality tests.


252-255: LGTM! Consistent registration pattern in cache include children test.

The registration follows the same correct pattern, ensuring proper setup for hierarchical cache testing scenarios.

tests/streams/common_test.go (4)

58-61: LGTM! Proper data provider registration before stream creation.

The explicit registration ensures the data provider is recognized before stream operations, aligning with the normalized data provider model.


132-135: LGTM! Consistent registration pattern for metadata tests.

The registration is correctly positioned to ensure proper setup before testing read-only metadata constraints.


193-196: LGTM! Registration for batch creation path.

The data provider registration is properly included for the batch stream creation test path, ensuring consistency across different creation methods.


254-257: LGTM! Registration for visibility settings test.

The registration is correctly positioned before stream creation and visibility testing operations.

internal/migrations/010-get-latest-write-activity.sql (1)

21-48: LGTM! Excellent normalization of data provider references.

The migration correctly updates the get_last_transactions action to use the normalized data_providers table structure. Key improvements:

  1. Proper referential integrity: All queries now join through the data_providers table using foreign keys instead of direct string comparisons
  2. Consistent pattern: Each UNION ALL query follows the same join pattern - through streams table to data_providers table
  3. Maintained functionality: The action signature and core logic remain unchanged while leveraging the normalized schema

The joins are correctly structured:

  • streamsdata_providers via data_provider_id
  • primitive_events/taxonomies/metadatastreamsdata_providers via stream_ref and data_provider_id

This change significantly improves data consistency and integrity compared to the previous string-based filtering approach.

internal/migrations/005-primitive-query.sql (4)

37-66: LGTM! Excellent normalization pattern.

The migration correctly implements the stream reference normalization by:

  • Computing $stream_ref once using get_stream_id($data_provider, $stream_id)
  • Consistently replacing dual conditions with single stream_ref lookups
  • Maintaining the original query logic while improving performance

107-115: LGTM! Consistent with normalization pattern.

The get_last_record_primitive action correctly applies the same stream reference normalization pattern as the other actions.


142-150: LGTM! Consistent normalization implementation.

The get_first_record_primitive action properly implements the stream reference normalization pattern.


177-194: LGTM! Proper normalization across both primitive_events and metadata tables.

The get_index_primitive action correctly:

  • Computes $stream_ref once
  • Uses it in both primitive_events and metadata table queries
  • Maintains the existing logic flow

This demonstrates the comprehensive nature of the normalization effort.

tests/streams/utils/setup/common.go (1)

192-233: LGTM! Well-implemented helper function for data provider registration.

The CreateDataProvider function correctly:

  • Validates Ethereum address format before processing
  • Grants necessary "network_writer" role via bypass mechanism
  • Constructs proper transaction context with the address as signer/caller
  • Calls the create_data_provider engine action with appropriate parameters
  • Provides comprehensive error handling with wrapped error messages

This helper function properly supports the new normalized schema requirements where data providers must be explicitly registered before stream operations.

tests/streams/complex_composed_test.go (3)

75-78: LGTM! Proper data provider registration in test setup.

The addition of CreateDataProvider call ensures the data provider is registered before stream operations, which is required by the new normalized schema. Error handling is appropriate.


603-606: LGTM! Necessary data provider registration.

The CreateDataProvider call ensures the initial deployer is registered before proceeding with test operations.


613-619: LGTM! Excellent test isolation pattern.

The creation of a localDeployer and separate data provider registration provides proper test isolation by:

  • Using unique deployer addresses per test
  • Preventing interference with other tests
  • Creating isolated stream namespaces

This is a good testing practice that aligns with the new normalized schema requirements.

Also applies to: 655-655

internal/migrations/003-primitive-insertion.sql (2)

34-38: LGTM! Proper stream reference inclusion in single record insertion.

The insert_record action correctly:

  • Computes $stream_ref using the get_stream_id helper function
  • Includes stream_ref in the INSERT statement alongside existing fields
  • Maintains referential integrity with the normalized schema

84-86: LGTM! Correct batch insertion with stream reference normalization.

The insert_records action properly implements stream reference normalization for batch operations:

  • Uses get_stream_ids for efficient batch processing of stream references
  • Correctly integrates stream_refs_array into the WITH RECURSIVE pattern
  • Properly maps individual stream references in the arguments CTE
  • Includes stream_ref in the final INSERT statement

The implementation maintains the existing efficient batch processing while adding the required normalization.

Also applies to: 100-102, 108-110, 114-114, 121-122

tests/streams/other/other_test.go (1)

45-46: LGTM! Consistent data provider registration across all test cases.

All test functions correctly add CreateDataProvider calls with:

  • Proper placement after role setup but before stream operations
  • Appropriate error handling with descriptive error messages
  • Correct address parameters for each test scenario

These changes ensure compliance with the new normalized schema requirements where data providers must be explicitly registered before stream operations.

Also applies to: 102-103, 164-165, 234-237, 271-274

internal/migrations/011-get-database-size.sql (1)

14-18: Review updated streams table calculations.

The calculation shows UUID(16) + UUID(16) + stream_id(32) + stream_type(9) + created_at(8) = 81 bytes. This reflects the normalized schema with data_provider_id as a foreign key UUID instead of the original provider/address pair.

The byte calculation appears consistent with the normalization changes described in the PR summary.

tests/streams/auth/auth_test.go (11)

70-74: LGTM: Proper data provider registration.

The addition of setup.CreateDataProvider call with proper error handling aligns with the new normalized data provider system. The error wrapping provides clear context for debugging.


134-138: Consistent data provider registration pattern.

The same data provider registration pattern is correctly applied in the invalid address test, maintaining consistency across test functions.


193-197: Data provider registration for stream locator.

The test correctly registers the data provider using the address from streamInfo.Locator.DataProvider.Address(), ensuring consistency with the stream creation process.


343-346: Comprehensive data provider registration for nested test.

The nested test correctly registers the default deployer data provider before creating multiple streams with different owners.


474-478: Proper data provider registration in write permissions test.

The test correctly registers the data provider using the stream's data provider address, maintaining consistency with the ownership model.


547-551: Data provider registration in compose permissions test.

Correct registration of the data provider for the contract before stream creation and permission testing.


644-648: Default deployer registration for nested compose test.

The test properly registers the default deployer as a data provider before proceeding with the complex nested permission testing.


654-666: Multiple data provider registrations for nested hierarchy.

The test correctly registers all three data providers (parent, child, grandchild) before creating the stream hierarchy, ensuring proper setup for the nested permission testing.

This comprehensive setup ensures all data providers are properly registered before stream operations.


872-876: Data provider registration in deletion test.

Proper registration of the default deployer as a data provider before creating and testing stream deletion functionality.


939-943: Data provider registration in filter test.

The test correctly registers the default deployer before proceeding with stream existence filtering tests.


947-951: Additional data provider registration for different owner.

The test properly registers a second data provider for testing with a different owner, maintaining consistency with the normalized data provider system.

internal/migrations/009-truflation-query.sql (1)

75-77: get_stream_ids() must guarantee stable ordering

$stream_refs := get_stream_ids($data_provider, $stream_id);

The subsequent array-slicing logic assumes that the returned array preserves the exact ordering of the input pairs.
If get_stream_ids internally performs a GROUP BY or any set-based join, the order might be lost, causing silent data mis-alignment during insertion.

Please confirm that the helper preserves order (e.g. with ORDER BY ordinality) or sort explicitly here.

internal/migrations/006-composed-query.sql (2)

185-194: Recursion depth raised to 1000 — verify stack & performance impact

AND h.level < 1000

Previously this guard was 100 (earlier migrations). Raising it by an order of magnitude risks runaway recursion on maliciously crafted taxonomy graphs and may hurt planner performance.

Consider keeping a conservative limit (e.g. 200) or making it configurable.


430-457: Potential off-by-one at interval closure

Negative weight delta is emitted at group_sequence_end + 1.

If a child segment ends exactly at $effective_to, the closing event is pushed outside the query window and the weight remains active for the final point, skewing aggregation.

Double-check whether the + 1 is still required now that timestamps are discrete integers; you might instead emit at group_sequence_end.

internal/migrations/017-normalize-tables.sql (1)

21-31: Existing metadata rows will violate the new FK

stream_ref is added and immediately constrained, but no back-fill statement populates it from (data_provider, stream_id).
Either:

  1. Populate before adding the constraint, or
  2. DEFERRABLE INITIALLY DEFERRED plus a follow-up UPDATE.

Without this, the migration will fail on databases with existing data.

internal/migrations/001-common-actions.sql (1)

42-48: Off-by-one: last element of the input arrays is never processed

idx < array_length($data_providers) stops at length-1, skipping the final pair.
Use <= (or iterate to array_upper) and add a defensive length-equality check to avoid silent mismatches.

-WHERE idx < array_length($data_providers)
+WHERE idx <= array_length($data_providers)

Likely an incorrect or invalid review comment.

@holdex
Copy link

holdex bot commented Jul 30, 2025

Time Submission Status

Member Status Time Action Last Update
williamrusdyputra ✅ Submitted 8h Update time Aug 12, 2025, 4:09 AM
MicBun ✅ Submitted 2h 15min Update time Aug 12, 2025, 3:57 AM

@williamrusdyputra williamrusdyputra marked this pull request as draft July 30, 2025 07:42
* feat: add migration scripts for normalization process

- Introduced multiple SQL migration scripts to transition the database schema from UUID to INT types for IDs.
- Implemented actions to handle both old and new schemas during the migration.
- Added scripts to populate `data_provider_id` and `id` columns in the `streams` table.
- Created necessary indexes and constraints to optimize performance and maintain data integrity.
- Included a shell script for executing migrations and logging execution times.
- Documented the migration process and performance recommendations in the README.

* chore: remove optimized index for gap-filling queries in primitive_events table

* refactor: streamline SQL migration script for foreign key constraints

- Added unique index on streams.id to ensure data integrity before creating foreign key references.
- Simplified the addition of foreign key constraints for metadata and taxonomies tables.
- Removed commented-out code related to partial indexes for primitive_events, as it is not currently supported.
@MicBun
Copy link
Member

MicBun commented Aug 12, 2025

@williamrusdyputra can this be merged? The migration is already done. Or is there anything left?

@williamrusdyputra
Copy link
Contributor Author

@williamrusdyputra can this be merged? The migration is already done. Or is there anything left?

nothing, we can merge it

@williamrusdyputra williamrusdyputra marked this pull request as ready for review August 12, 2025 03:35
Copy link
Member

@MicBun MicBun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see no documentation or readme related to this block code, please create it on a new PR or new github issue for tracking.

err := setup.CreateDataProvider(ctx, platform, complexComposedDeployer.Address()

Waiting for CI check...

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 12

🧹 Nitpick comments (9)
internal/migrations/normalization_migration/README.md (4)

5-5: Typo/style: use “etc.” with a period.

Minor nit per LanguageTool suggestion.

Apply:

-It took 14 hours to complete (running non-stop). The unknown here is how it will behave in a network with heterogeneous nodes (not always same provider, etc).
+It took 14 hours to complete (running non-stop). The unknown here is how it will behave in a network with heterogeneous nodes (not always same provider, etc.).

39-64: Specify language for fenced code blocks.

Helps editors and linters; satisfies markdownlint MD040.

Apply:

-```
+```ini
 # Memory settings
 shared_buffers (recommend 1/4 of RAM)
 effective_cache_size (recommend 3/4 of RAM)
 work_mem (recommend 32MB)
 maintenance_work_mem (recommend 512MB up to 2GB for large migrations)
@@
 log_autovacuum_min_duration = -1            # Back to default (disabled)

---

`69-80`: **Specify language for fenced code blocks.**

Same as above; use INI for postgresql.conf examples.

Apply:

```diff
-```
+```ini
 # Aggressive settings for bulk migration workload
 autovacuum_max_workers = 5                  # vs default 3
@@
 log_autovacuum_min_duration = 0             # Log all autovacuum activity

---

`86-100`: **Tag SQL code blocks with language.**

Aids formatting and readability.

Apply:

```diff
-```sql
+```sql
 -- Check autovacuum activity
 SELECT schemaname, tablename, last_autovacuum, last_autoanalyze, 
@@
 WHERE tablename IN ('primitive_events', 'streams');

</blockquote></details>
<details>
<summary>internal/migrations/normalization_migration/02_insert_actions.sql (5)</summary><blockquote>

`265-267`: **Typo in comment (“catched” → “caught”).**

Minor grammar fix for clarity.

```diff
-    -- catched a bug where it's expected to have the same order of columns
+    -- caught a bug where it's expected to have the same order of columns

450-461: Redundant zero-children check and inconsistent ERROR casing in insert_taxonomy.

  • The first condition already includes $num_children == 0; then the next if block checks it again. Remove redundancy.
  • Use consistent ERROR(...) casing across the file.
-    if $num_children == 0 OR $num_children != array_length($child_data_providers) OR $num_children != array_length($weights) {
-        error('All child arrays must be of the same length');
-    }
+    if $num_children != array_length($child_data_providers) OR $num_children != array_length($weights) {
+        ERROR('All child arrays must be of the same length');
+    }
@@
-    if $num_children == 0 {
-        error('There must be at least 1 child');
-    }
+    if $num_children == 0 {
+        ERROR('There must be at least 1 child');
+    }

108-118: Stream type validation LGTM; consider centralizing allowed values.

Validation for 'primitive' vs 'composed' is correct. To avoid drift, consider deriving allowed values from a single metadata table or enum in a future cleanup.


120-136: Data provider presence check is good; consider explicit hint on remediation.

Erroring when the data provider isn’t found is correct. Optionally, suggest running create_data_provider in the message to improve UX.


158-175: Review stream ID allocation in 02_insert_actions.sql

The current pattern in lines 158–175—using MAX(id) + ROW_NUMBER() to assign streams.id—is race-prone if streams.id isn’t backed by a database sequence. I wasn’t able to locate the table definition to confirm whether streams.id is already defined as an IDENTITY/SEQUENCE.

Please verify the schema and, if it’s not sequence-based, address this as follows:

  • Long-term: migrate streams.id to use an IDENTITY or SEQUENCE (similar to data_providers.id).
  • Short-term: wrap this INSERT in a lock (e.g., application-level mutex or FOR UPDATE on a control row) or serialize creations by data_provider_id.
  • Optionally, add ON CONFLICT handling to detect duplicate IDs and surface clear errors.
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 75ad0bb and e127c70.

📒 Files selected for processing (10)
  • internal/migrations/000-initial-data.sql (1 hunks)
  • internal/migrations/normalization_migration/01migrate-structure.sql (1 hunks)
  • internal/migrations/normalization_migration/02_insert_actions.sql (1 hunks)
  • internal/migrations/normalization_migration/03migrate-streams_dpid.sql (1 hunks)
  • internal/migrations/normalization_migration/04migrate-streams_id.sql (1 hunks)
  • internal/migrations/normalization_migration/05migrate-structure_2.sql (1 hunks)
  • internal/migrations/normalization_migration/06migrate-primitives.sql (1 hunks)
  • internal/migrations/normalization_migration/07-after-primitives.sql (1 hunks)
  • internal/migrations/normalization_migration/README.md (1 hunks)
  • internal/migrations/normalization_migration/migrate_and_report.sh (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • internal/migrations/000-initial-data.sql
🧰 Additional context used
🪛 LanguageTool
internal/migrations/normalization_migration/README.md

[style] ~5-~5: In American English, abbreviations like “etc.” require a period.
Context: ...eneous nodes (not always same provider, etc). Requirements - Postgres image with f...

(ETC_PERIOD)

🪛 markdownlint-cli2 (0.17.2)
internal/migrations/normalization_migration/README.md

39-39: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


69-69: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: lint
  • GitHub Check: acceptance-test
🔇 Additional comments (9)
internal/migrations/normalization_migration/01migrate-structure.sql (2)

30-38: Confirm that rekeying data_providers preserves referential integrity and ordering.

Assigning integer IDs via ROW_NUMBER() over old_id is deterministic but arbitrary. Ensure downstream processes do not rely on stable numeric ordering between nodes.

If ordering matters, consider ordering by a stable business key (e.g., address) instead:

WITH with_ids AS (
  SELECT old_id, ROW_NUMBER() OVER (ORDER BY address) AS id
  FROM data_providers
)
UPDATE data_providers d
SET id = w.id
FROM with_ids w
WHERE d.old_id = w.old_id;

46-53: Dropping existing stream_ref columns will delete data; validate backfill plan before this step.

This irreversibly removes stream_ref in metadata/taxonomies/primitive_events. Make sure you have verified the backfill scripts and backups.

Would you like a preflight SQL checklist to assert row counts and nullability before/after this step?

internal/migrations/normalization_migration/07-after-primitives.sql (1)

2-9: LGTM: finalizing indexes and FK for primitive_events.

Indexes match the intended access patterns, and the FK to streams(id) is valid given the unique index added earlier.

internal/migrations/normalization_migration/05migrate-structure_2.sql (1)

1-9: Good: establish uniqueness on streams.id then add FKs. Validate timing in your runbook.

Ensure this file runs after streams.id is fully populated (04migrate-streams_id.sql), else FK creation may fail.

I can generate a readiness query to assert there are no NULL streams.id before this step if useful.

Also applies to: 10-19

internal/migrations/normalization_migration/06migrate-primitives.sql (1)

1-22: LGTM: batched backfill of primitive_events.stream_ref with deterministic key.

The 10k batching and match on provider/stream/event_time/created_at are appropriate for idempotent re-runs.

internal/migrations/normalization_migration/README.md (1)

17-24: Update run order filenames to match the repo (avoid confusion during execution).

Listed filenames don’t match the actual files committed in this PR.

Apply:

-01migrate-structure.sql: 12 seconds
-02_insert_actions.sql: instant
-02migrate-streams_dpid.sql: 12 seconds / 100K rows (total: ~200K)
-03migrate-streams_id.sql: 6 seconds / 100K rows (total: ~200K)
-04migrate-structure_2.sql: 2 seconds
-05migrate-primitives.sql: 6~10 seconds / 10K rows (total: >60M)
-06-after-primitives.sql: 12 seconds
+01migrate-structure.sql: 12 seconds
+02_insert_actions.sql: instant
+03migrate-streams_dpid.sql: 12 seconds / 100K rows (total: ~200K)
+04migrate-streams_id.sql: 6 seconds / 100K rows (total: ~200K)
+05migrate-structure_2.sql: 2 seconds
+06migrate-primitives.sql: 6–10 seconds / 10K rows (total: >60M)
+07-after-primitives.sql: 12 seconds

Additionally, either remove the partial-index prerequisite or adjust the migrations to avoid creating one (see 05migrate-structure_2.sql comment).

internal/migrations/normalization_migration/02_insert_actions.sql (3)

300-335: insert_record validations LGTM; zero-value skip is explicit.

Permissions, existence, and type checks are in place. The migration-aware stream_ref handling is appropriate.


639-724: disable_metadata/disable_taxonomy migration gates are correct and safe.

Only operating when stream_ref is present avoids ambiguous matches during migration. Read-only enforcement for metadata is also correct.


67-74: create_stream wrapper delegates cleanly to create_streams.

Good reuse pattern; keeps single source of validation/logic.

@MicBun
Copy link
Member

MicBun commented Aug 12, 2025

@coderabbitai full review

@coderabbitai
Copy link

coderabbitai bot commented Aug 12, 2025

✅ Actions performed

Full review triggered.

@MicBun
Copy link
Member

MicBun commented Aug 12, 2025

@williamrusdyputra the test is failing, this PR is already too big, I think it is better to split up in a new github issue and new PR

--- FAIL: TestTaxonomyQueryActions (28.56s)
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/000-extensions.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/000-initial-data.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/001-common-actions.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/002-authorization.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/003-primitive-insertion.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/004-composed-taxonomy.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/005-primitive-query.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/006-composed-query.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/007-composed-query-derivate.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/008-public-query.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/009-truflation-query.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/010-get-latest-write-activity.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/011-get-database-size.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/012-roles-schema.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/013-role-actions.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/014-role-manager-actions.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/015-system-roles-bootstrap.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/016-taxonomy-query-actions.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/017-normalize-tables.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/901-utilities.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/test-migrations/get-all-weights.sql"
    testing.go:122: reading seed script "/home/runner/work/node/node/internal/migrations/test-migrations/get-composed-data.sql"
    testing.go:591: running test container: c24a2ef8abdcfc787c1c8800092e64ea480447b1488a77a1f2b810b2286e817b
    testing.go:156: running test TestCase.Function-0
    testing.go:156: running test TestCase.Function-1
    testing.go:156: running test TestCase.Function-2
    testing.go:156: running test TestCase.Function-3
    testing.go:156: running test TestCase.Function-4
    testing.go:156: running test TestCase.Function-5
    testing.go:156: running test TestCase.Function-6
    testing.go:156: running test TestCase.Function-7
    testing.go:156: running test TestCase.Function-8
    testing.go:51: test failed: test "Function-0" failed: error setting up composed stream: error in createStream: Data provider not found: 0x0000000000000000000000000000000000000123
        test "Function-1" failed: error setting up composed stream: error in createStream: Data provider not found: 0x0000000000000000000000000000000000000124
        test "Function-2" failed: error setting up composed stream 1: error in createStream: Data provider not found: 0x0000000000000000000000000000000000000125
        test "Function-3" failed: error setting up first composed stream: error in createStream: Data provider not found: 0x0000000000000000000000000000000000000126
        test "Function-4" failed: error setting up composed stream: error in createStream: Data provider not found: 0x0000000000000000000000000000000000000127
        test "Function-5" failed: error setting up composed stream: error in createStream: Data provider not found: 0x0000000000000000000000000000000000000199
FAIL
FAIL	github.com/trufnetwork/node/tests/streams	233.651s

@williamrusdyputra
Copy link
Contributor Author

@MicBun its regarding the data provider, I just added the createDataProvider() for it, will do the documentation on separate PR

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 15

🔭 Outside diff range comments (1)
internal/migrations/009-truflation-query.sql (1)

124-186: Follow-up: migrate remaining DP/SID filters to stream_ref for perf consistency

Large CTEs still join primitive_events on data_provider/stream_id. After stream_ref backfill and indexing, refactoring these to stream_ref will:

  • Reduce predicate width and join cost.
  • Align with newly added stream_ref indexes.
♻️ Duplicate comments (21)
internal/migrations/001-common-actions.sql (3)

27-33: Concurrent inserts can silently generate duplicate ids.

This issue was previously flagged and remains unresolved. The MAX(id) + 1 approach is not safe under concurrent transactions – two writers can read the same max value and attempt to insert the same next ID.

Replace with a sequence or GENERATED BY DEFAULT AS IDENTITY:

-INSERT INTO data_providers (id, address, created_at) 
-SELECT 
-    COALESCE(MAX(id), 0) + 1,
-    $lower_address,
-    @height
-FROM data_providers
-ON CONFLICT DO NOTHING;
+INSERT INTO data_providers (address, created_at)
+VALUES ($lower_address, @height)
+ON CONFLICT DO NOTHING;

This assumes the id column is changed to use GENERATED BY DEFAULT AS IDENTITY.


306-323: get_stream_id returns NULL silently.

This issue was previously flagged and remains unresolved. Down-stream callers assume a valid integer; a NULL propagates into joins and comparisons, leading to empty results instead of clear errors.

Add error handling after the loop:

 FOR $stream_row IN SELECT s.id
     FROM streams s
     JOIN data_providers dp ON s.data_provider_id = dp.id
     WHERE s.stream_id = $stream_id 
     AND dp.address = $data_provider_address
     LIMIT 1 {
     $found := true;
     $id := $stream_row.id;
 }

+IF NOT $found THEN
+    ERROR('Stream not found: ' || $data_provider_address || '/' || $stream_id);
+END IF;
 return $id;

1290-1303: list_streams ignores NULL $block_height and may filter out all rows.

This issue was previously flagged and remains unresolved. The WHERE clause s.created_at > $block_height with NULL $block_height evaluates to UNKNOWN, filtering out all rows.

Fix the NULL handling:

-            AND s.created_at > $block_height
+            AND s.created_at > COALESCE($block_height, -1)
internal/migrations/normalization_migration/05migrate-structure_2.sql (1)

20-23: Replace unsupported partial index; it will fail under Kwil.

Comment already notes partial indexes aren’t supported. This DDL will break the migration at runtime. Use a supported alternative.

Apply:

--- note: this won't work until kwil supports partial indexes
-create index pe_missing_ref_by_created_idx
-    on primitive_events (created_at)
-    where (stream_ref IS NULL);
+-- Fallback: regular index to help batches ordered by created_at
+CREATE INDEX IF NOT EXISTS pe_created_at_idx ON primitive_events (created_at);

Alternatively, if your queries filter by stream_ref frequently, prefer a composite:

CREATE INDEX IF NOT EXISTS pe_stream_ref_created_at_idx ON primitive_events (stream_ref, created_at);
internal/migrations/009-truflation-query.sql (1)

24-29: Indexing for fast last_deployed_date by stream_ref

Switching to stream_ref for truflation_last_deployed_date is correct; ensure an index supports the lookup to avoid scans on growth. Prior feedback recommended indexing by stream_ref for these paths.

You already added pe_stream_created_idx and pe_gap_fill_idx in a later migration, which will help several queries. If possible, consider a generated/parsed numeric column for truflation_created_at or rely on created_at-based semantics; ordering by parse_unix_timestamp(truflation_created_at) cannot use a simple index.

internal/migrations/normalization_migration/04migrate-streams_id.sql (2)

3-13: Deterministic batching: add ORDER BY before LIMIT

Without ORDER BY, LIMIT yields nondeterministic batches across runs. Order by created_at (and id as a tie-breaker if present).

 WITH rows_to_update AS (
     SELECT
         data_provider,
         stream_id,
         id
     FROM
         streams
     WHERE
         id IS NULL
-    LIMIT
+    ORDER BY
+        created_at,
+        data_provider,
+        stream_id
+    LIMIT
         100000
 ),

15-19: Fix off-by-one when assigning new ids

Using COALESCE(MAX(id), 0) + 1 then adding ROW_NUMBER() starts at MAX+2. Compute next baseline as MAX(id) and add ROW_NUMBER().

-), latest_id AS (
+), latest_id AS (
     SELECT
-        COALESCE(MAX(id), 0) + 1 as next_id
+        COALESCE(MAX(id), 0) as next_id
     FROM
         streams
 ),
 -- add the new id
 rows_with_id AS (
     SELECT
         data_provider,
         stream_id,
-        next_id + ROW_NUMBER() OVER (
+        next_id + ROW_NUMBER() OVER (
             ORDER BY
                 data_provider,
                 stream_id
         ) as id

Also applies to: 21-29

internal/migrations/normalization_migration/03migrate-streams_dpid.sql (1)

18-26: Restrict UPDATE to NULL targets only

You already select NULL targets in the CTE, but adding this predicate in the UPDATE makes re-runs safer and clearer.

 WHERE
     data_provider = target_data_provider
-    AND stream_id = target_stream_id;
+    AND stream_id = target_stream_id
+    AND streams.data_provider_id IS NULL;
internal/migrations/normalization_migration/migrate_and_report.sh (2)

4-5: The hardcoded PRIVATE_KEY and unquoted paths have been flagged in previous reviews and remain unaddressed.


16-28: The infinite loop lacks safety nets and error handling, as previously identified. The script needs robust error handling, quoted variables, and exit conditions.

internal/migrations/011-get-database-size.sql (2)

7-12: The byte-size estimates for data_providers table are incorrect based on the schema changes. The id column is INT (4 bytes) not UUID (16 bytes), requiring adjustment to avg_row_bytes and avg_pk_index_bytes.


23-25: The taxonomies table comments and byte calculations reference fields and indexes that don't exist at this migration stage, requiring updates to reflect the actual schema at migration 011.

internal/migrations/002-authorization.sql (1)

606-606: The allow_compose_stream permission check still only matches on stream_id text, which can cause cross-provider collisions as previously identified.

internal/migrations/007-composed-query-derivate.sql (1)

486-493: The correlated sub-query for sibling sum calculation inside the recursive CTE still poses quadratic complexity concerns as previously identified.

internal/migrations/017-normalize-tables.sql (1)

5-15: The streams.id column is added without NOT NULL or auto-generated identity, creating foreign key integrity risks as previously identified.

internal/migrations/normalization_migration/02_insert_actions.sql (4)

378-417: Batch mapping relies on fixed-order stream_refs; ensure get_stream_ids fix is applied

arguments.stream_ref indexes into stream_refs_array by idx. Without the get_stream_ids LEFT JOIN + ORDER BY idx fix, refs can be misordered/misaligned.

After applying the get_stream_ids fix, verify that lengths align and NULL placeholders are preserved for non-migrated streams.


28-60: Fix ordering/left join/alias in get_stream_ids to preserve positional mapping

The current implementation drops non-migrated entries (INNER JOIN) and doesn’t guarantee order, and returns alias stream_refs while signature says stream_ids. This misaligns indices for batch inserts.

 CREATE OR REPLACE ACTION get_stream_ids(
     $data_providers TEXT[],
     $stream_ids TEXT[]
 ) PRIVATE VIEW RETURNS (stream_ids INT[]) {
@@
-    stream_lookups AS (
-        SELECT
-            s.id AS stream_ref
-        FROM indexes
-        JOIN input_arrays ON 1=1
-        JOIN data_providers dp ON dp.address = input_arrays.data_providers[idx]
-        JOIN streams s ON s.data_provider_id = dp.id 
-                      AND s.stream_id = input_arrays.stream_ids[idx]
-                      AND s.id IS NOT NULL  -- Only include migrated streams
-    )
-    SELECT ARRAY_AGG(stream_ref) AS stream_refs
-    FROM stream_lookups {
-      return $row.stream_refs;
-    }
+    stream_lookups AS (
+        SELECT
+            idx,
+            s.id AS stream_ref
+        FROM indexes
+        LEFT JOIN input_arrays ON TRUE
+        LEFT JOIN data_providers dp ON LOWER(dp.address) = LOWER(input_arrays.data_providers[idx])
+        LEFT JOIN streams s ON s.data_provider_id = dp.id
+                           AND s.stream_id = input_arrays.stream_ids[idx]
+                           AND s.id IS NOT NULL  -- Only include migrated streams
+    )
+    SELECT ARRAY_AGG(stream_ref ORDER BY idx) AS stream_ids
+    FROM stream_lookups {
+      return $row.stream_ids;
+    }
 };

551-573: Duplicate $stream_ref declaration in insert_metadata

$stream_ref is assigned at Line 551, then re-declared at Line 572, causing shadowing/redeclaration.

-    $stream_ref := get_stream_id($data_provider, $stream_id);
+    $stream_ref := get_stream_id($data_provider, $stream_id);
@@
-    $stream_ref INT := get_stream_id($data_provider, $stream_id);
+    -- reuse $stream_ref computed above
+    $stream_ref := get_stream_id($data_provider, $stream_id);

Alternatively, remove the second assignment entirely if not needed.


628-636: Potential ID race in create_data_provider (MAX(id)+1 with DO NOTHING)

Using COALESCE(MAX(id),0)+1 can collide under concurrency; ON CONFLICT DO NOTHING hides the race.

Consider:

  • Switch data_providers.id to a SEQUENCE/IDENTITY and use nextval().
  • Or perform INSERT ... ON CONFLICT DO NOTHING RETURNING id; if no id returned, SELECT id by address; if neither, raise a retriable error.

Example pattern (conceptual):

INSERT INTO data_providers (id, address, created_at)
SELECT nextval('data_providers_id_seq'), $lower_address, @height
ON CONFLICT (address) DO NOTHING;

-- Ensure existence
-- SELECT id FROM data_providers WHERE address = $lower_address LIMIT 1;

And add a follow-up migration to introduce the sequence if not present.

internal/migrations/normalization_migration/01migrate-structure.sql (2)

5-5: Create streams_provider_stream_idx after column re-add; current timing drops it

You create the index on data_provider_id before later dropping and re-adding that column (Lines 48, 56), which will discard the index.

Move/recreate the index after the new columns are added:

--- this index is necessary for the migration
-CREATE UNIQUE INDEX IF NOT EXISTS streams_provider_stream_idx ON streams(data_provider_id, stream_id);
@@
 CREATE INDEX IF NOT EXISTS tax_latest_sequence_idx ON taxonomies (stream_ref, start_time, group_sequence);
 
+-- (re)create provider+stream unique index after columns are present
+CREATE UNIQUE INDEX IF NOT EXISTS streams_provider_stream_idx ON streams(data_provider_id, stream_id);

22-23: Use IF EXISTS when dropping primary key to avoid brittle failures

DROP CONSTRAINT without IF EXISTS can halt the migration if the constraint name differs.

-ALTER TABLE data_providers DROP CONSTRAINT data_providers_pkey;
+ALTER TABLE data_providers DROP CONSTRAINT IF EXISTS data_providers_pkey;
🧹 Nitpick comments (14)
.github/workflows/ci.yaml (1)

59-60: Pin third‑party action to a commit SHA and review token scope

  • Consider pinning arduino/setup-task@v2 to a specific commit SHA to reduce supply-chain risk from mutable tags.
  • The token is scoped by the workflow’s permissions: contents: read, which is good. If the action only needs to read releases, this is sufficient. If you later broaden permissions, re‑verify the minimum scope needed.
tests/streams/complex_composed_test.go (1)

603-607: Avoid duplicate provider registration in wrapped test

This test is wrapped by WithTestSetup, which already registers the provider. The additional registration here is redundant (albeit likely idempotent) and adds noise/overhead.

Apply this diff to drop the duplication:

- platform = procedure.WithSigner(platform, complexComposedDeployer.Bytes())
- err := setup.CreateDataProvider(ctx, platform, complexComposedDeployer.Address())
- if err != nil {
-     return errors.Wrap(err, "error registering data provider")
- }
+ platform = procedure.WithSigner(platform, complexComposedDeployer.Bytes())

Also applies to: 617-617

tests/streams/common_test.go (1)

58-62: Consistent provider registration; consider decoupling role grant

  • Adding setup.CreateDataProvider before stream creation across these tests is correct under the normalized model.
  • If you need to validate read permissions for non‑owners or negative cases in future tests, it would help to have a way to register providers without automatically granting network_writer, to avoid confounding roles.

As a follow-up, consider switching to CreateDataProviderWithOptions(ctx, platform, addr, CreateDataProviderOptions{GrantNetworkWriter: false}) in tests that need a registered but unauthorized identity (once such an API exists).

Also applies to: 132-136, 193-197, 254-257, 288-296

internal/migrations/normalization_migration/README.md (2)

5-5: Fix grammatical issue.

Add a period after "etc" as required in American English.

-This migration plan was executed against an isolated instance at AWS that is less performant than our nodes, but should be the base size of other nodes. It took 14 hours to complete (running non-stop). The unknown here is how it will behave in a network with heterogeneous nodes (not always same provider, etc).
+This migration plan was executed against an isolated instance at AWS that is less performant than our nodes, but should be the base size of other nodes. It took 14 hours to complete (running non-stop). The unknown here is how it will behave in a network with heterogeneous nodes (not always same provider, etc.).

39-39: Add language identifiers to fenced code blocks.

For better markdown rendering and syntax highlighting, specify language identifiers for the PostgreSQL configuration code blocks.

-```
+```ini
# Memory settings
shared_buffers (recommend 1/4 of RAM)
-```
+```ini
# Aggressive settings for bulk migration workload
autovacuum_max_workers = 5                  # vs default 3

Also applies to: 69-69

internal/migrations/008-public-query.sql (1)

117-124: Switch to stream_ref is correct; consider explicit typing for $stream_ref

Using stream_ref avoids (data_provider, stream_id) joins and matches the normalization. For clarity and to avoid implicit typing gotchas, consider declaring $stream_ref with an explicit type.

Apply this diff to make the type explicit (adjust INT vs INT8 to match get_stream_id’s return and metadata.stream_ref type):

-    $stream_ref := get_stream_id($data_provider, $stream_id);
+    $stream_ref INT := get_stream_id($data_provider, $stream_id);
tests/streams/aggregation/get_category_test.go (1)

125-128: Reduce duplication: factor provider registration into a small setup helper.

Many tests repeat the same pattern of setting the signer and registering the provider. Consider introducing a wrapper helper to centralize this, improving readability and maintainability.

Example helper (to place in tests/streams/utils/setup/common.go or a new wrappers.go):

// WithDeployerAndRegisteredProvider wraps a test function with signer + provider registration.
func WithDeployerAndRegisteredProvider(addr util.EthereumAddress, inner kwilTesting.TestFunc) kwilTesting.TestFunc {
	return func(ctx context.Context, platform *kwilTesting.Platform) error {
		platform = procedure.WithSigner(platform, addr.Bytes())
		if err := setup.CreateDataProvider(ctx, platform, addr.Address()); err != nil {
			return errors.Wrap(err, "error registering data provider")
		}
		return inner(ctx, platform)
	}
}

Usage in this test:

wrapTestWithCacheModes(t, "CategoryStreams", func(t *testing.T, useCache bool) kwilTesting.TestFunc {
  return WithDeployerAndRegisteredProvider(
    util.Unsafe_NewEthereumAddressFromString("0x0000000000000000000000000000000000000000"),
    testCategoryStreams(t, useCache),
  )
})
internal/migrations/005-primitive-query.sql (1)

175-186: Add supporting metadata index for default_base_time lookup

This path now filters metadata by stream_ref + metadata_key with created_at ordering. Add an index to avoid scans as data grows.

For example, in a follow-up migration:

CREATE INDEX IF NOT EXISTS metadata_stream_key_created_idx
  ON metadata (stream_ref, metadata_key, created_at DESC)
  WHERE disabled_at IS NULL;
internal/migrations/normalization_migration/07-after-primitives.sql (1)

6-9: Minimize lock time when adding FK (optional)

If supported by the engine, consider adding the constraint as NOT VALID and validating afterwards to reduce lock pressure on hot tables.

Example:

ALTER TABLE primitive_events
  ADD CONSTRAINT fk_primitive_stream_ref
  FOREIGN KEY (stream_ref) REFERENCES streams(id) ON DELETE CASCADE NOT VALID;

ALTER TABLE primitive_events
  VALIDATE CONSTRAINT fk_primitive_stream_ref;
internal/migrations/009-truflation-query.sql (1)

106-115: Consider adding an invariant to prevent mismatched identities (optional)

While inserting both (data_provider, stream_id) and stream_ref is transitional, adding a check or trigger to enforce stream_ref = get_stream_id(data_provider, stream_id) would prevent drift during the migration window.

internal/migrations/010-get-latest-write-activity.sql (1)

21-48: LGTM: unify provider filter via data_providers join across sources

  • Filters consistently via dp.address when provided.
  • Reuses stream_ref to join primitive_events/taxonomies/metadata to streams → data_providers.

Add supporting indexes if not already present for taxonomies/metadata:

  • taxonomies(stream_ref, created_at)
  • metadata(stream_ref, created_at)
internal/migrations/normalization_migration/04migrate-streams_id.sql (1)

34-42: Guard UPDATE to NULL targets (idempotency safety)

Add a defensive predicate so re-runs don’t attempt to “update” already-populated rows.

 WHERE
     streams.data_provider = rows_with_id.data_provider
     AND streams.stream_id = rows_with_id.stream_id
+    AND streams.id IS NULL;
internal/migrations/normalization_migration/02_insert_actions.sql (2)

7-23: Harden get_stream_id against mixed-case addresses

Callers mostly lowercase addresses, but hardening here avoids footguns. Normalize the input and predicate to LOWER() for robustness.

-CREATE OR REPLACE ACTION get_stream_id(
-  $data_provider_address TEXT,
-  $stream_id TEXT
-) PRIVATE returns (id INT) {
+CREATE OR REPLACE ACTION get_stream_id(
+  $data_provider_address TEXT,
+  $stream_id TEXT
+) PRIVATE returns (id INT) {
   $id INT;
   FOR $stream_row IN SELECT s.id
       FROM streams s
       JOIN data_providers dp ON s.data_provider_id = dp.id
       WHERE s.stream_id = $stream_id 
-      AND dp.address = $data_provider_address
+      AND LOWER(dp.address) = LOWER($data_provider_address)
       AND s.id IS NOT NULL  -- Only return streams that have been migrated
       LIMIT 1 {
       $id := $stream_row.id;
   }
 
   return $id;
 };

451-462: Redundant zero-children check; simplify validation

You already error when $num_children == 0 in the previous condition. The second check duplicates this.

-    if $num_children == 0 OR $num_children != array_length($child_data_providers) OR $num_children != array_length($weights) {
-        error('All child arrays must be of the same length');
-    }
-
-    -- ensure there is at least 1 child, otherwise we might have silent bugs, with the user thinking he added something
-    if $num_children == 0 {
-        error('There must be at least 1 child');
-    }
+    if $num_children == 0 {
+        error('There must be at least 1 child');
+    }
+    if $num_children != array_length($child_data_providers) OR $num_children != array_length($weights) {
+        error('All child arrays must be of the same length');
+    }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a4894e6 and e127c70.

📒 Files selected for processing (58)
  • .github/workflows/ci.yaml (1 hunks)
  • extensions/tn_cache/internal/engine_ops_integration_test.go (3 hunks)
  • extensions/tn_cache/internal/engine_ops_permissions_test.go (3 hunks)
  • internal/migrations/000-initial-data.sql (1 hunks)
  • internal/migrations/001-common-actions.sql (34 hunks)
  • internal/migrations/002-authorization.sql (21 hunks)
  • internal/migrations/003-primitive-insertion.sql (3 hunks)
  • internal/migrations/004-composed-taxonomy.sql (7 hunks)
  • internal/migrations/005-primitive-query.sql (6 hunks)
  • internal/migrations/006-composed-query.sql (18 hunks)
  • internal/migrations/007-composed-query-derivate.sql (4 hunks)
  • internal/migrations/008-public-query.sql (1 hunks)
  • internal/migrations/009-truflation-query.sql (3 hunks)
  • internal/migrations/010-get-latest-write-activity.sql (1 hunks)
  • internal/migrations/011-get-database-size.sql (1 hunks)
  • internal/migrations/017-normalize-tables.sql (1 hunks)
  • internal/migrations/normalization_migration/01migrate-structure.sql (1 hunks)
  • internal/migrations/normalization_migration/02_insert_actions.sql (1 hunks)
  • internal/migrations/normalization_migration/03migrate-streams_dpid.sql (1 hunks)
  • internal/migrations/normalization_migration/04migrate-streams_id.sql (1 hunks)
  • internal/migrations/normalization_migration/05migrate-structure_2.sql (1 hunks)
  • internal/migrations/normalization_migration/06migrate-primitives.sql (1 hunks)
  • internal/migrations/normalization_migration/07-after-primitives.sql (1 hunks)
  • internal/migrations/normalization_migration/README.md (1 hunks)
  • internal/migrations/normalization_migration/migrate_and_report.sh (1 hunks)
  • tests/extensions/tn_cache/cache_height_tracking_test.go (2 hunks)
  • tests/extensions/tn_cache/cache_integration_test.go (3 hunks)
  • tests/extensions/tn_cache/cache_observability_test.go (2 hunks)
  • tests/extensions/tn_cache/resolution_transaction_test.go (2 hunks)
  • tests/streams/aggregation/aggr01_basic_aggregation_test.go (1 hunks)
  • tests/streams/aggregation/aggr02_weighted_contributions_test.go (1 hunks)
  • tests/streams/aggregation/aggr03_taxonomy_validity_periods_test.go (1 hunks)
  • tests/streams/aggregation/aggr04_missing_data_handling_test.go (1 hunks)
  • tests/streams/aggregation/aggr05_no_duplicate_child_streams_test.go (1 hunks)
  • tests/streams/aggregation/aggr06_single_active_taxonomy_test.go (1 hunks)
  • tests/streams/aggregation/aggr07_inexistent_streams_test.go (2 hunks)
  • tests/streams/aggregation/aggr08_weight_change_test.go (1 hunks)
  • tests/streams/aggregation/aggr09_duplicate_values_test.go (1 hunks)
  • tests/streams/aggregation/get_all_weights_for_query_test.go (2 hunks)
  • tests/streams/aggregation/get_category_test.go (1 hunks)
  • tests/streams/auth/auth_test.go (14 hunks)
  • tests/streams/common_test.go (5 hunks)
  • tests/streams/complex_composed_test.go (3 hunks)
  • tests/streams/composed_test.go (4 hunks)
  • tests/streams/comprehensive_shared_path_independence_test.go (1 hunks)
  • tests/streams/database_size_test.go (2 hunks)
  • tests/streams/gamefi_index_test.go (2 hunks)
  • tests/streams/index_change_test.go (1 hunks)
  • tests/streams/multi_level_composed_test.go (1 hunks)
  • tests/streams/other/other_test.go (5 hunks)
  • tests/streams/other/stream_exists_batch_test.go (1 hunks)
  • tests/streams/primitive_test.go (1 hunks)
  • tests/streams/query/metadata_test.go (1 hunks)
  • tests/streams/query/query_test.go (2 hunks)
  • tests/streams/roles/permission_gates_test.go (1 hunks)
  • tests/streams/truflation_composed_frozen_test.go (1 hunks)
  • tests/streams/truflation_primitive_frozen_test.go (1 hunks)
  • tests/streams/utils/setup/common.go (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (35)
tests/streams/aggregation/aggr08_weight_change_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr02_weighted_contributions_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
extensions/tn_cache/internal/engine_ops_integration_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr01_basic_aggregation_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/get_category_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/get_all_weights_for_query_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/comprehensive_shared_path_independence_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
extensions/tn_cache/internal/engine_ops_permissions_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/multi_level_composed_test.go (2)
tests/streams/utils/procedure/execute.go (1)
  • WithSigner (432-436)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr04_missing_data_handling_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/other/stream_exists_batch_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr09_duplicate_values_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/extensions/tn_cache/cache_height_tracking_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/setup/common.go (1)
tests/streams/utils/setup/roles.go (1)
  • AddMemberToRoleBypass (73-98)
tests/extensions/tn_cache/resolution_transaction_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/auth/auth_test.go (4)
tests/streams/utils/setup/common.go (3)
  • CreateDataProvider (192-233)
  • CreateStream (30-32)
  • StreamInfo (20-23)
tests/streams/utils/procedure/metadata.go (2)
  • TransferStreamOwnership (338-371)
  • TransferStreamOwnershipInput (330-335)
tests/streams/utils/procedure/execute.go (2)
  • SetTaxonomy (487-533)
  • WithSigner (432-436)
tests/streams/utils/procedure/types.go (1)
  • SetTaxonomyInput (64-72)
tests/streams/aggregation/aggr07_inexistent_streams_test.go (2)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/procedure/execute.go (1)
  • WithSigner (432-436)
tests/streams/truflation_composed_frozen_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/aggregation/aggr05_no_duplicate_child_streams_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/primitive_test.go (1)
tests/streams/utils/setup/common.go (3)
  • CreateDataProvider (192-233)
  • CreateStream (30-32)
  • StreamInfo (20-23)
tests/streams/aggregation/aggr03_taxonomy_validity_periods_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/query/query_test.go (3)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/setup/primitive.go (2)
  • SetupPrimitiveFromMarkdown (122-128)
  • MarkdownPrimitiveSetupInput (31-36)
tests/streams/utils/setup/composed.go (2)
  • SetupComposedFromMarkdown (186-192)
  • MarkdownComposedSetupInput (73-80)
tests/streams/composed_test.go (4)
tests/streams/utils/setup/common.go (2)
  • CreateDataProvider (192-233)
  • StreamInfo (20-23)
tests/streams/utils/setup/composed.go (2)
  • SetupComposedFromMarkdown (186-192)
  • MarkdownComposedSetupInput (73-80)
tests/streams/utils/procedure/execute.go (1)
  • SetTaxonomy (487-533)
tests/streams/utils/procedure/types.go (1)
  • SetTaxonomyInput (64-72)
tests/streams/database_size_test.go (1)
tests/streams/utils/setup/common.go (3)
  • CreateDataProvider (192-233)
  • CreateStream (30-32)
  • StreamInfo (20-23)
tests/extensions/tn_cache/cache_integration_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/roles/permission_gates_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/complex_composed_test.go (2)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/setup/composed.go (2)
  • SetupComposedFromMarkdown (186-192)
  • MarkdownComposedSetupInput (73-80)
tests/streams/aggregation/aggr06_single_active_taxonomy_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/extensions/tn_cache/cache_observability_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/query/metadata_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/truflation_primitive_frozen_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/other/other_test.go (1)
tests/streams/utils/setup/common.go (3)
  • CreateDataProvider (192-233)
  • CreateStream (30-32)
  • StreamInfo (20-23)
tests/streams/gamefi_index_test.go (2)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
tests/streams/utils/setup/primitive.go (2)
  • SetupPrimitive (44-64)
  • SetupPrimitiveInput (38-42)
tests/streams/common_test.go (2)
tests/streams/utils/setup/common.go (2)
  • CreateDataProvider (192-233)
  • CreateStream (30-32)
tests/streams/utils/procedure/metadata.go (2)
  • InsertMetadata (293-328)
  • InsertMetadataInput (283-290)
tests/streams/index_change_test.go (1)
tests/streams/utils/setup/common.go (1)
  • CreateDataProvider (192-233)
🪛 LanguageTool
internal/migrations/normalization_migration/README.md

[style] ~5-~5: In American English, abbreviations like “etc.” require a period.
Context: ...eneous nodes (not always same provider, etc). Requirements - Postgres image with f...

(ETC_PERIOD)

🪛 markdownlint-cli2 (0.17.2)
internal/migrations/normalization_migration/README.md

39-39: Fenced code blocks should have a language specified

(MD040, fenced-code-language)


69-69: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: acceptance-test
🔇 Additional comments (92)
tests/streams/truflation_primitive_frozen_test.go (1)

577-581: LGTM: provider registration placed correctly before deployment

Registering the deployer as a data provider before setting up the primitive aligns with the new normalized model and ensures later calls don’t fail due to missing provider state.

tests/streams/multi_level_composed_test.go (1)

49-53: LGTM: explicit data provider registration

Registering the signer as a data provider prior to deploying primitives/composed streams is consistent with the normalized schema and avoids setup flakiness.

tests/streams/truflation_composed_frozen_test.go (1)

52-56: LGTM: provider registration added before setup

This aligns the test with the new data provider normalization and prevents “missing provider” errors later.

tests/streams/complex_composed_test.go (1)

75-79: LGTM: register provider once and reuse err

Good placement of provider registration just after setting the signer; reusing the existing err variable keeps the code clean.

Also applies to: 81-81

internal/migrations/000-initial-data.sql (2)

8-8: LGTM! Accurate table documentation.

The updated table documentation correctly reflects the addition of the data_providers table.


14-24: Well-designed data_providers table.

The table design follows good practices:

  • Uses INT for primary key enabling efficient joins
  • Enforces Ethereum address format with appropriate CHECK constraint
  • Includes creation timestamp for auditability
  • Has proper unique index on address
internal/migrations/001-common-actions.sql (3)

36-67: LGTM! Efficient batch stream lookup implementation.

The get_stream_ids function uses an efficient WITH RECURSIVE approach to avoid expensive for-loop roundtrips, properly joining through the normalized data_providers table.


131-144: LGTM! Proper data provider validation.

The code correctly fetches and validates the data provider ID before creating streams, with appropriate error handling for missing providers.


175-183: LGTM! Updated streams insertion with normalization.

The INSERT correctly includes both the normalized data_provider_id and maintains the original data_provider address for backward compatibility.

tests/streams/aggregation/aggr04_missing_data_handling_test.go (1)

55-58: LGTM! Proper data provider registration.

The test correctly registers the deployer as a data provider before proceeding with stream setup. This follows the established pattern and includes appropriate error wrapping.

tests/streams/aggregation/aggr01_basic_aggregation_test.go (1)

53-56: LGTM! Consistent data provider registration pattern.

The test properly registers the data provider using the new helper function with appropriate error handling. This change aligns with the normalized data provider model introduced in the PR.

tests/streams/aggregation/aggr08_weight_change_test.go (1)

60-63: LGTM! Proper data provider registration in test setup.

The addition of data provider registration follows the consistent pattern across the test suite and includes proper error wrapping.

tests/streams/aggregation/aggr03_taxonomy_validity_periods_test.go (1)

59-62: LGTM! Consistent test pattern implementation.

The data provider registration is properly implemented following the established pattern with appropriate error handling and wrapping.

tests/streams/aggregation/aggr09_duplicate_values_test.go (1)

62-65: LGTM! Proper data provider setup.

The test correctly implements the data provider registration pattern consistent with other tests in the suite, including appropriate error handling.

tests/streams/aggregation/aggr06_single_active_taxonomy_test.go (1)

52-55: Registering the data provider at setup is correct and necessary

Calling setup.CreateDataProvider right after setting the signer aligns with the new normalized model and prevents later permission/lookup issues. Error wrapping is consistent.

extensions/tn_cache/internal/engine_ops_permissions_test.go (3)

9-9: Import added for error wrapping — good

The added github.com/pkg/errors import is required for errors.Wrap usage below.


23-24: Comment-only change

No functional impact.


43-46: Provider registration added correctly

Registering the deployer as a data provider after signer setup matches the new workflow and ensures subsequent stream operations work under normalized tables. Good error wrapping.

tests/streams/aggregation/aggr05_no_duplicate_child_streams_test.go (1)

47-50: Good: data provider registration aligns with normalized schema

Creating the data provider after setting the signer is correct and keeps tests consistent with the new stream_ref/provider model.

internal/migrations/008-public-query.sql (1)

117-124: Index Already in Place for metadata(stream_ref, metadata_key)

The meta_stream_to_key_idx index covers (stream_ref, metadata_key, created_at) and is defined in:

  • internal/migrations/normalization_migration/01migrate-structure.sql (lines 68–69)
  • internal/migrations/017-normalize-tables.sql (lines 28–29)

This index satisfies filtering on stream_ref + metadata_key and supports the ORDER BY created_at DESC in your query. Since partial indexes on disabled_at IS NULL aren’t currently supported in our environment, no additional migration is required at this time.

tests/streams/gamefi_index_test.go (1)

117-121: No action needed for provider registration: create_data_provider is idempotent

The create_data_provider action in both 001-common-actions.sql and normalization_migration/02_insert_actions.sql uses INSERT … ON CONFLICT DO NOTHING, so repeated calls won’t error if the provider already exists. No changes are required here.

tests/streams/other/stream_exists_batch_test.go (1)

40-44: Good addition: register provider before existence checks

This ensures the provider exists in the normalized model so stream_exists_batch can resolve references correctly. Error wrapping is consistent.

tests/streams/comprehensive_shared_path_independence_test.go (1)

51-55: Provider registration step looks correct

Done immediately after setting the signer and before deploying streams/taxonomies; consistent with the normalized provider model.

tests/extensions/tn_cache/cache_observability_test.go (2)

9-9: Import for error wrapping is appropriate.

Adding github.com/pkg/errors is aligned with the new error handling in this test.


43-46: Registering the data provider upfront is correct and necessary.

Calling setup.CreateDataProvider after setting the signer ensures the deployer is authorized before stream operations. The wrapped error message is clear and consistent.

tests/streams/query/metadata_test.go (1)

48-51: LGTM: Data provider registration added to setup.

This ensures the provider exists before stream creation and metadata ops. Error wrapping provides useful context.

tests/streams/aggregation/get_category_test.go (1)

125-128: LGTM: Registering data provider prior to taxonomy/stream setup.

The sequence (set signer → register provider → create streams) is correct and will avoid permission issues.

tests/extensions/tn_cache/resolution_transaction_test.go (2)

7-7: Import for pkg/errors is appropriate.

Needed for the added error wrapping.


34-37: LGTM: Provider registration added before transactional stream setup.

Registering the deployer as a data provider prior to opening the transaction ensures authorization during in-tx stream creation and resolution.

tests/streams/aggregation/aggr02_weighted_contributions_test.go (1)

53-56: LGTM: Provider registration before composed stream setup.

Correctly registers the deployer, preventing permission failures during SetupComposedFromMarkdown. Error message is clear.

tests/streams/primitive_test.go (1)

41-47: LGTM: Registering provider and avoiding err shadowing.

  • Registering the provider before CreateStream is correct.
  • Reusing the existing err (err = …) for CreateStream avoids shadowing bugs (good).
tests/extensions/tn_cache/cache_height_tracking_test.go (2)

9-9: Import change is correct.

github.com/pkg/errors is required for the new error wrapping.


43-46: LGTM: Provider registration included in setup.

Ensures deployer permissions are in place before composed stream creation and cache refresh operations.

tests/streams/database_size_test.go (2)

40-44: LGTM: Provider registration added.

Properly sets signer and registers provider before stream creation. Error wrapping is consistent.


46-46: Avoiding err shadowing is correct.

Switching to err = … instead of err := … prevents shadowing and keeps error handling consistent in this scope.

extensions/tn_cache/internal/engine_ops_integration_test.go (2)

9-9: Good addition of error wrapping dependency.

Importing github.com/pkg/errors is appropriate for consistent error context.


49-53: Registering the data provider early is correct and prevents permission issues.

Creating the provider right after setting the signer avoids downstream write failures in setup.

tests/streams/aggregation/get_all_weights_for_query_test.go (2)

458-462: Correct: ensure the signer is a registered data provider before any writes.

This aligns with the new model and prevents write-authorization errors.


505-505: Nice: reuse the existing err variable to avoid shadowing.

Prevents accidental shadowing and improves readability.

tests/extensions/tn_cache/cache_integration_test.go (3)

10-10: Good import for error wrapping.

Consistent with other tests using errors.Wrap for context.


55-58: Correct: register the data provider post-signer setup.

Prevents permission issues when creating streams and setting taxonomies.


252-255: Same here: correct provider registration prior to stream setup.

Keeps the test setup robust under the normalized provider model.

tests/streams/composed_test.go (4)

46-52: Solid setup sequence: signer → provider registration → stream creation.

Avoids “wallet not allowed to write” errors during subsequent operations.


72-92: Good: explicit provider registration and consistent err reuse.

Ensures composed setup runs with correct permissions; err reuse avoids shadowing.


133-142: Correct: register provider and reuse err before creating composed stream.

Keeps ownership and writer checks satisfied under the new model.


238-252: Non-owner registration is fine; permission check still enforced by SetTaxonomy.

Registering the non-owner as a provider won’t bypass owner-only write checks; assertions below are valid.

tests/streams/query/query_test.go (3)

95-99: Right move: register the deployer as a data provider before stream setup.

Prevents authorization failures during inserts and taxonomy ops in setup.


101-141: LGTM: setup continues to use the same err, avoiding shadowing.

The primitive and composed setup blocks remain clear and consistent.


912-931: Good consistency: provider registration included for composed-query setup as well.

Prevents surprises when running query tests independently.

tests/streams/aggregation/aggr07_inexistent_streams_test.go (2)

50-53: Correct: ensure provider exists before creating the composed stream.

Prevents setup from failing on permission checks.


113-121: Duplicated pattern applied correctly for the second scenario.

Reusing the same address and registering it again is harmless and keeps the test deterministic.

internal/migrations/003-primitive-insertion.sql (2)

34-39: Adopting stream_ref in insert_record is the right direction.

Using get_stream_id(...) and persisting stream_ref ensures consistency with normalized relations.


84-86: Good: batch path resolves stream_refs once up front.

Fetching all stream_refs via get_stream_ids(...) avoids per-row roundtrips.

internal/migrations/005-primitive-query.sql (1)

37-51: LGTM: moved primitive lookups to stream_ref consistently

Using a single computed $stream_ref and filtering primitive_events via pe.stream_ref greatly reduces predicate width and aligns with the normalization path. The access checks happen before the lookup, so NULL stream_ref scenarios are guarded.

Also applies to: 61-66, 104-115, 142-150

tests/streams/other/other_test.go (1)

234-237: LGTM: provider registration before stream creation (non-transactional test)

In this test helper, using the platform outside a WithTx context is appropriate. Error wrapping is clean.

internal/migrations/normalization_migration/06migrate-primitives.sql (1)

1-22: LGTM: batched, idempotent backfill for primitive_events.stream_ref

  • Batches 10k rows with deterministic ORDER BY pe.created_at.
  • Joins to streams by (data_provider, stream_id) to derive s.id.

This is safe to re-run and sets you up for the FK in the next step.

internal/migrations/normalization_migration/07-after-primitives.sql (1)

2-5: Good coverage for primitive_events query patterns

The two indexes (stream_ref, event_time, created_at) and (stream_ref, created_at) match hot paths in primitive queries.

tests/streams/auth/auth_test.go (9)

70-73: LGTM: Essential data provider registration added.

The addition of setup.CreateDataProvider ensures the data provider exists before stream creation, which is necessary for the normalized schema's stream_ref-based operations.


134-137: LGTM: Consistent data provider setup.

Proper registration pattern maintained across test functions.


193-196: LGTM: Stream-specific data provider registration.

Correctly registers the data provider that matches the stream's DataProvider field.


343-346: LGTM: Nested test setup with proper registration.

The data provider registration precedes stream creation, maintaining the correct dependency order.


474-477: LGTM: Write permission test properly initialized.

Data provider registration aligns with the stream's DataProvider field.


547-550: LGTM: Compose permission test setup.

Consistent pattern for registering the stream's data provider.


644-665: LGTM: Multiple data provider registration in nested compose test.

All four data providers (default, parent, child, grandchild) are properly registered before stream operations.


872-875: LGTM: Stream deletion test initialization.

Default deployer address changed and corresponding data provider registered.


939-950: LGTM: Comprehensive data provider setup for existence filtering test.

Both default deployer and primary data provider addresses are registered, supporting the stream existence filtering logic.

internal/migrations/004-composed-taxonomy.sql (5)

52-58: LGTM: Stream reference normalization implemented correctly.

The addition of $stream_ref and $child_stream_ref variables using get_stream_id() properly normalizes stream identification across the taxonomy system.


73-88: LGTM: INSERT statement updated for normalized schema.

The addition of stream_ref and child_stream_ref columns to the INSERT statement aligns with the normalized table structure.


102-117: LGTM: Query filters migrated to stream_ref.

The get_current_group_sequence function correctly uses stream_ref instead of separate data_provider/stream_id filtering.


141-180: LGTM: Complex JOIN logic updated for normalization.

The describe_taxonomies function properly joins streams and data_providers tables using the new stream_ref and child_stream_ref foreign keys, maintaining correct data provider and stream ID resolution.


196-200: LGTM: Disable function updated consistently.

The disable_taxonomy function correctly uses stream_ref for filtering, maintaining consistency with other functions.

internal/migrations/002-authorization.sql (5)

129-129: LGTM: Stream reference normalization in authorization.

The addition of $stream_ref := get_stream_id($data_provider, $stream_id) properly centralizes stream identification for authorization checks.


144-290: LGTM: Comprehensive migration to stream_ref-based queries.

The recursive CTE logic has been correctly updated to use stream_ref and child_stream_ref throughout, including:

  • Partitioning and grouping by stream_ref
  • JOIN conditions using stream_ref
  • WHERE clauses filtering on stream_ref
  • All taxonomy hierarchy traversal logic

This maintains the same authorization semantics while operating on the normalized schema.


298-369: LGTM: Stream existence and permission checks updated.

The refactoring of stream existence and permission validation to use stream_ref with proper JOINs to streams and data_providers tables maintains data integrity while leveraging the normalized structure.


405-642: LGTM: Compose authorization migrated to stream_ref.

The is_allowed_to_compose_all function correctly adopts the same stream_ref-based approach as the read authorization, maintaining consistency across permission systems.


733-788: LGTM: Batch write authorization updated for normalization.

The is_wallet_allowed_to_write_batch function properly uses JOINs with the normalized streams and data_providers tables via stream_ref, maintaining efficient batch processing while operating on the new schema.

internal/migrations/006-composed-query.sql (6)

57-57: LGTM: Stream reference initialization.

The addition of $stream_ref := get_stream_id($data_provider, $stream_id) establishes the normalized stream identifier early in the function.


106-125: LGTM: Taxonomy segment queries migrated to stream_ref.

The parent_distinct_start_times and parent_next_starts CTEs correctly use stream_ref for partitioning and filtering, maintaining the same overshadowing logic with normalized identifiers.


141-193: LGTM: Hierarchy resolution updated for normalization.

The taxonomy hierarchy resolution logic properly uses stream_ref and child_stream_ref throughout, including:

  • Base case filtering on root stream_ref
  • Recursive joins using child/parent stream_ref relationships
  • Anchor time calculations using stream_ref

The logic preserves the same hierarchical traversal semantics.


247-257: LGTM: Primitive stream filtering updated.

The hierarchy_primitive_paths CTE correctly joins with the streams table using stream_ref to identify primitive leaves.


296-340: LGTM: Event time gathering migrated to stream_ref.

The cleaned_event_times CTE properly uses stream_ref for joining primitive events with weight intervals, maintaining temporal accuracy across the normalized schema.


347-456: LGTM: Delta calculation method updated comprehensively.

The new delta calculation pipeline correctly uses stream_ref throughout:

  • Initial primitive states partitioned by stream_ref
  • Event changes calculated per stream_ref
  • Weight changes associated with stream_ref
  • Timeline calculations maintaining stream_ref partitioning

This preserves calculation accuracy while operating on normalized identifiers.

internal/migrations/007-composed-query-derivate.sql (6)

21-21: LGTM: Stream reference established early.

Consistent with other normalized functions, $stream_ref is computed once and used throughout.


57-85: LGTM: Recursive taxonomy traversal updated.

The recursive CTE correctly uses stream_ref and child_stream_ref for hierarchy traversal, maintaining the same discovery logic with normalized identifiers.


200-236: LGTM: First record function consistently updated.

The get_first_record_composed function mirrors the same normalization patterns as get_last_record_composed, using stream_ref throughout the recursive taxonomy resolution.


314-380: LGTM: Index function initialization and base time handling.

The get_index_composed function properly:

  • Establishes $stream_ref early
  • Maintains the same base time resolution logic
  • Preserves cache and permission check ordering

455-532: LGTM: Hierarchical weight calculation with effective weights.

The hierarchy CTE properly implements hierarchical normalization using stream_ref, though the correlated sub-query performance issue noted in previous reviews remains.


547-941: LGTM: Complete index calculation pipeline updated.

The extensive CTE pipeline for index calculation has been comprehensively updated to use stream_ref, including:

  • Event time gathering using stream_ref
  • Primitive state calculations partitioned by stream_ref
  • Weight changes associated with stream_ref
  • Final aggregation maintaining temporal accuracy

The indexed value calculations and LOCF logic remain functionally equivalent while operating on the normalized schema.

internal/migrations/017-normalize-tables.sql (3)

21-31: LGTM: Metadata table normalization with comprehensive indexing.

The addition of stream_ref column with foreign key constraint and well-designed indexes supports efficient metadata lookups across different access patterns:

  • meta_stream_to_key_idx: Stream-specific metadata queries
  • meta_stream_to_ref_idx: Reference-based lookups
  • meta_key_ref_to_stream_idx: Reverse lookups from metadata values

36-52: LGTM: Taxonomies normalization with proper constraints.

The addition of both stream_ref and child_stream_ref with foreign key constraints establishes proper referential integrity. The indexes are well-designed:

  • tax_child_unique_idx: Prevents duplicate child relationships
  • tax_stream_start_idx: Supports temporal taxonomy queries
  • tax_child_stream_idx: Enables efficient child lookup
  • tax_latest_sequence_idx: Optimizes overshadowing logic

57-65: LGTM: Primitive events normalization completed.

The stream_ref column addition with foreign key constraint and indexes supports efficient primitive event queries:

  • pe_gap_fill_idx: Time-based event queries
  • pe_stream_created_idx: Creation time-based operations
internal/migrations/normalization_migration/01migrate-structure.sql (1)

46-61: Confirm re-add of all dependent FKs; only streams->data_providers FK is restored here

You re-established fk_streams_data_provider, but not FKs on metadata/taxonomies/primitive_events to streams.stream_ref (or equivalent). If deferral is intentional (later migration), fine—otherwise add them here.

Action: confirm subsequent migration reinstates:

  • fk_metadata_stream_ref
  • fk_taxonomies_stream_ref
  • fk_taxonomies_child_stream_ref
  • fk_primitive_stream_ref

If not, add them after new INT columns are created.

Comment on lines +114 to 123
INSERT INTO primitive_events (stream_id, data_provider, event_time, value, created_at, truflation_created_at, stream_ref)
SELECT
stream_id,
data_provider,
event_time,
value,
$current_block,
NULL
NULL,
stream_ref
FROM arguments;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Confirm migration ordering: stream_ref must exist on primitive_events before these actions are invoked.

If tests or downstream actions call insert_record/insert_records before the column is added, migrations will fail at runtime. Please verify the seed/migration ordering guarantees primitive_events.stream_ref exists prior to executing these action bodies.

Run this script to validate presence and ordering hints for stream_ref on primitive_events:


🏁 Script executed:

#!/bin/bash
set -euo pipefail

echo "Search for stream_ref additions on primitive_events:"
rg -n --no-heading $'primitive_events\\b.*stream_ref|stream_ref\\b.*primitive_events' internal -A 2 -B 2 || true

echo
echo "List all migration files referencing primitive_events:"
fd -a -t f '.*\.sql$' internal | xargs rg -n --no-heading 'primitive_events' -A 1 -B 1 || true

echo
echo "List definitions of get_stream_id(s) helpers to ensure availability before 003:"
rg -n --no-heading 'CREATE OR REPLACE (FUNCTION|ACTION) .*get_stream_id' internal -A 3 -B 2 || true

Length of output: 37760


Ensure stream_ref Is Created Before It’s Referenced
Our grep shows that the primitive_events table is first given a stream_ref column only in 017-normalize-tables.sql (and in the separate normalization_migration/01migrate-structure.sql), but 003-primitive-insertion.sql already INSERTs into stream_ref. Running migrations from scratch will error with “column stream_ref does not exist.”

Please adjust the migration order or schema so that stream_ref exists by the time you hit migration 003:

• internal/migrations/000-initial-data.sql
– Add stream_ref INT to the CREATE TABLE primitive_events (…) definition.
• OR insert a new migration (e.g. 002-add-stream-ref.sql) before 003-primitive-insertion.sql:

ALTER TABLE primitive_events
  ADD COLUMN IF NOT EXISTS stream_ref INT;

• OR move the ALTER TABLE primitive_events ADD COLUMN stream_ref INT; from 017-normalize-tables.sql up ahead of migration 003.

Pinpointed locations needing changes:

  • internal/migrations/003-primitive-insertion.sql (lines 37 & 114): references stream_ref
  • internal/migrations/017-normalize-tables.sql (lines 57–58): current first ADD COLUMN of stream_ref
🤖 Prompt for AI Agents
In internal/migrations/003-primitive-insertion.sql (lines 37 & 114) the
migration INSERTs into stream_ref but that column is only added later in
internal/migrations/017-normalize-tables.sql (lines 57–58), causing failures
when running migrations from scratch; fix by ensuring stream_ref exists before
migration 003 — either add stream_ref INT to the primitive_events CREATE TABLE
in internal/migrations/000-initial-data.sql, or add a new migration (e.g.,
internal/migrations/002-add-stream-ref.sql) that runs before 003 with an ALTER
TABLE primitive_events ADD COLUMN IF NOT EXISTS stream_ref INT, or move the
ALTER TABLE that adds stream_ref from 017 to a migration placed before 003;
update migration order accordingly and verify migrations run cleanly from an
empty DB.

Comment on lines +75 to +77
-- Get stream reference for all streams
$stream_refs := get_stream_ids($data_provider, $stream_id);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use the lowercased $data_providers everywhere; fix array assembly

You compute $data_providers := helper_lowercase_array($data_provider) but then pass the original $data_provider to get_stream_ids and record_arrays, defeating normalization for mixed-case input. Use $data_providers to guarantee mapping consistency.

Apply:

-    $stream_refs := get_stream_ids($data_provider, $stream_id);
+    $stream_refs := get_stream_ids($data_providers, $stream_id);

And in record_arrays:

-        SELECT
-            $stream_id AS stream_ids,
-            $data_provider AS data_providers,
+        SELECT
+            $stream_id AS stream_ids,
+            $data_providers AS data_providers,

This ensures 1:1 alignment between normalized provider addresses and computed stream_refs.

Also applies to: 92-103

🤖 Prompt for AI Agents
internal/migrations/009-truflation-query.sql around lines 75-77 (and similarly
92-103): you compute $data_providers := helper_lowercase_array($data_provider)
but still call get_stream_ids and record_arrays with the original
$data_provider, breaking normalization and 1:1 alignment; replace uses of
$data_provider in those calls and in the array assembly with the normalized
$data_providers variable so get_stream_ids($data_providers) and
record_arrays(...) operate on lowercased provider addresses and maintain
consistent indexing between providers and stream_refs.

Comment on lines +54 to +61
-- Create new INT columns
ALTER TABLE streams ADD COLUMN id INT;
ALTER TABLE streams ADD COLUMN data_provider_id INT;
ALTER TABLE metadata ADD COLUMN stream_ref INT;
ALTER TABLE taxonomies ADD COLUMN stream_ref INT;
ALTER TABLE taxonomies ADD COLUMN child_stream_ref INT;
ALTER TABLE primitive_events ADD COLUMN stream_ref INT;

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Recreate streams_provider_stream_idx after column re-add (companion to earlier comment)

Ensure the unique index on (data_provider_id, stream_id) exists after altering columns to keep invariants for create_streams and lookups.

 CREATE INDEX IF NOT EXISTS tax_latest_sequence_idx ON taxonomies (stream_ref, start_time, group_sequence);
 
+-- Ensure unique provider+stream key after new columns exist
+CREATE UNIQUE INDEX IF NOT EXISTS streams_provider_stream_idx ON streams(data_provider_id, stream_id);

Also applies to: 66-76

🤖 Prompt for AI Agents
In internal/migrations/normalization_migration/01migrate-structure.sql around
lines 54 to 61 (and similarly for lines 66 to 76), after re-adding the INT
columns you must recreate the unique index that enforces the (data_provider_id,
stream_id) invariant; add a statement to create the unique index if it does not
exist (streams_provider_stream_idx on streams (data_provider_id, stream_id))
immediately after the ALTER TABLE ... ADD COLUMN statements so create_streams
and lookup logic keep the required uniqueness constraint.

Comment on lines +120 to +175
$base_uuid := uuid_generate_kwil('create_streams_' || @txid);

-- Get the data provider id
$data_provider_id INT;
$dp_found BOOL := false;
for $data_provider_row in SELECT id
FROM data_providers
WHERE address = $data_provider
LIMIT 1 {
$dp_found := true;
$data_provider_id := $data_provider_row.id;
}

if $dp_found = false {
ERROR('Data provider not found: ' || $data_provider);
}

-- Create the streams with proper INT IDs
WITH RECURSIVE
indexes AS (
SELECT 1 AS idx
UNION ALL
SELECT idx + 1 FROM indexes
WHERE idx < array_length($stream_ids)
),
stream_arrays AS (
SELECT
$stream_ids AS stream_ids,
$stream_types AS stream_types
),
arguments AS (
SELECT
idx,
stream_arrays.stream_ids[idx] AS stream_id,
stream_arrays.stream_types[idx] AS stream_type
FROM indexes
JOIN stream_arrays ON 1=1
),
sequential_ids AS (
SELECT
idx,
stream_id,
stream_type,
ROW_NUMBER() OVER (ORDER BY idx) + COALESCE((SELECT MAX(id) FROM streams), 0) AS id
FROM arguments
)
INSERT INTO streams (id, data_provider_id, data_provider, stream_id, stream_type, created_at)
SELECT
id,
$data_provider_id,
$data_provider,
stream_id,
stream_type,
@height
FROM sequential_ids;

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Stream ID allocation via MAX(id)+row_number is race-prone under concurrency

Two concurrent create_streams calls can compute the same MAX(id), leading to duplicate ids. Prefer sequence/identity-backed ids or serialize allocation.

Options:

  • Introduce a streams.id SEQUENCE/IDENTITY and use nextval() per row.
  • If sequence isn’t available in this DSL, allocate an id range in a serialized step (e.g., table-based allocator) and offset ROW_NUMBER() from the reserved base.
  • At minimum, detect conflicts and retry/fail with a clear error.

Would you like a follow-up migration to convert streams.id to IDENTITY and update this action accordingly?

Comment on lines +245 to +249
-- Add row number to be able to create deterministic UUIDs
args_with_row_number AS (
SELECT all_arguments.*, row_number() OVER () AS row_number
FROM all_arguments
),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Deterministic UUIDs require deterministic ordering for row_number()

row_number() OVER () without ORDER BY is nondeterministic. This can produce different UUIDs across runs.

-    args_with_row_number AS (
-        SELECT all_arguments.*, row_number() OVER () AS row_number
-        FROM all_arguments
-    ),
+    args_with_row_number AS (
+        SELECT
+            all_arguments.*,
+            row_number() OVER (ORDER BY stream_id, metadata_key, COALESCE(value_s, ''), COALESCE(value_i::TEXT, ''), COALESCE(value_ref, '')) AS row_number
+        FROM all_arguments
+    ),
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
-- Add row number to be able to create deterministic UUIDs
args_with_row_number AS (
SELECT all_arguments.*, row_number() OVER () AS row_number
FROM all_arguments
),
-- Add row number to be able to create deterministic UUIDs
args_with_row_number AS (
SELECT
all_arguments.*,
row_number() OVER (
ORDER BY
stream_id,
metadata_key,
COALESCE(value_s, ''),
COALESCE(value_i::TEXT, ''),
COALESCE(value_ref, '')
) AS row_number
FROM all_arguments
),
🤖 Prompt for AI Agents
In internal/migrations/normalization_migration/02_insert_actions.sql around
lines 245-249, the row_number() window is missing an ORDER BY which makes UUID
generation nondeterministic; update the window to include a deterministic ORDER
BY using a stable unique column (e.g., the primary key or a combination of
columns that uniquely and stably identify rows such as id, created_at) so that
row_number() produces the same sequence every run—if no single unique column
exists, order by a deterministic composite of columns that guarantees consistent
ordering across executions.

Comment on lines +45 to +47
err = setup.CreateDataProvider(ctx, platform, defaultStreamLocator.DataProvider.Address())
require.NoError(t, err, "error registering data provider")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Use txPlatform in transactional test block

Inside WithTx, use the transactional handle for CreateDataProvider to keep all operations within the same transaction context.

- err = setup.CreateDataProvider(ctx, platform, defaultStreamLocator.DataProvider.Address())
+ err = setup.CreateDataProvider(ctx, txPlatform, defaultStreamLocator.DataProvider.Address())
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err = setup.CreateDataProvider(ctx, platform, defaultStreamLocator.DataProvider.Address())
require.NoError(t, err, "error registering data provider")
err = setup.CreateDataProvider(ctx, txPlatform, defaultStreamLocator.DataProvider.Address())
require.NoError(t, err, "error registering data provider")
🤖 Prompt for AI Agents
In tests/streams/other/other_test.go around lines 45 to 47, the call to
setup.CreateDataProvider uses the top-level platform instead of the
transactional handle; change the call inside the WithTx block to use
txPlatform.CreateDataProvider(ctx, ..., ...) so the provider registration runs
inside the same transaction context and ensure you still check the returned
error with require.NoError(t, err, "...").

Comment on lines +102 to +104
err = setup.CreateDataProvider(ctx, platform, defaultStreamLocator.DataProvider.Address())
require.NoError(t, err, "error registering data provider")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Same fix: pass txPlatform to CreateDataProvider

Remain consistent with the WithTx transactional context.

- err = setup.CreateDataProvider(ctx, platform, defaultStreamLocator.DataProvider.Address())
+ err = setup.CreateDataProvider(ctx, txPlatform, defaultStreamLocator.DataProvider.Address())
🤖 Prompt for AI Agents
In tests/streams/other/other_test.go around lines 102 to 104, the call to
setup.CreateDataProvider uses the non-transactional platform variable instead of
the transactional context (txPlatform) created with WithTx; update the call to
pass txPlatform as the first argument so the provider registration runs inside
the same transaction/WithTx context and remains consistent with surrounding
transactional calls.

Comment on lines +164 to +166
err := setup.CreateDataProvider(ctx, platform, owner1)
require.NoError(t, err, "error registering data provider")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Same fix: pass txPlatform to CreateDataProvider in NonDuplicateStreamID

- err := setup.CreateDataProvider(ctx, platform, owner1)
+ err := setup.CreateDataProvider(ctx, txPlatform, owner1)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
err := setup.CreateDataProvider(ctx, platform, owner1)
require.NoError(t, err, "error registering data provider")
err := setup.CreateDataProvider(ctx, txPlatform, owner1)
require.NoError(t, err, "error registering data provider")
🤖 Prompt for AI Agents
In tests/streams/other/other_test.go around lines 164 to 166, the call to
setup.CreateDataProvider uses the wrong platform variable; update the call to
pass txPlatform instead of platform (i.e. setup.CreateDataProvider(ctx,
txPlatform, owner1)), and ensure txPlatform is in scope for the
NonDuplicateStreamID test so the transaction-bound platform is used.

Comment on lines +43 to +48
err := setup.CreateDataProvider(ctx, platform, authorizedWriter)
require.NoError(t, err, "error registering data provider")

err = setup.CreateDataProvider(ctx, platform, managerWallet)
require.NoError(t, err, "error registering data provider")

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Avoid over‑privileging test identities via helper

setup.CreateDataProvider currently also grants network_writer via bypass. Registering managerWallet this way implicitly gives it writer privileges, which isn’t necessary for the manager role and could mask permission gate behaviors.

  • Prefer a helper that registers the provider without granting writer, or add an option to skip the role grant.
  • For negative cases, it’s often desirable to pre-register the “unauthorized” provider without granting writer, ensuring failures are due solely to role gates.

Would you like a follow-up patch to introduce CreateDataProviderWithOptions and update this test to register unauthorizedUser without granting writer?


🏁 Script executed:

#!/bin/bash
# Locate the CreateDataProvider implementation and references to network_writer
rg -n "func CreateDataProvider" -n -A30
rg -n "network_writer" -n -A5

Length of output: 34752


Introduce SkipAutoRoleGrant for CreateDataProvider to avoid over-privileging

The current setup.CreateDataProvider helper always grants system:network_writer, so using it for managerWallet (or any “unauthorized” test identity) inadvertently gives them write privileges and masks the real permission-gate behavior.

• In tests/streams/utils/setup/common.go, add a new helper or option, e.g.:

type CreateDataProviderOptions struct {
  SkipAutoRoleGrant bool
}

func CreateDataProviderWithOptions(
  ctx context.Context,
  platform *kwilTesting.Platform,
  address string,
  opts CreateDataProviderOptions,
) error {
  addr, err := util.NewEthereumAddressFromString(address)
  if err != nil {
    return errors.Wrap(err, "invalid data provider address")
  }
  if !opts.SkipAutoRoleGrant {
    if err := AddMemberToRoleBypass(ctx, platform, "system", "network_writer", addr.Address()); err != nil {
      return errors.Wrap(err, "failed to grant network_writer")
    }
  }
  // ... call engine “create_data_provider” as before ...
}

• In tests/streams/roles/permission_gates_test.go (lines 43–48), switch to the new helper for the manager and unauthorized cases:

 // Register the authorized writer (needs write privileges)
-  err := setup.CreateDataProvider(ctx, platform, authorizedWriter)
+  require.NoError(t, setup.CreateDataProvider(ctx, platform, authorizedWriter))

 // Register the manager without writer privileges
-  err = setup.CreateDataProvider(ctx, platform, managerWallet)
+  require.NoError(t, setup.CreateDataProviderWithOptions(
+    ctx, platform, managerWallet,
+    setup.CreateDataProviderOptions{SkipAutoRoleGrant: true},
+  ))

• For negative-case tests, pre-register providers with SkipAutoRoleGrant: true so failures reflect only missing network_writer access.

Would you like a follow-up patch to add CreateDataProviderWithOptions (or a SkipAutoRoleGrant flag) and update these tests?

🤖 Prompt for AI Agents
In tests/streams/roles/permission_gates_test.go around lines 43–48, the test
uses setup.CreateDataProvider which currently auto-grants the
system:network_writer role and thus over-privileges managerWallet and other
“unauthorized” identities; add a new helper or option in
tests/streams/utils/setup/common.go (e.g., CreateDataProviderWithOptions or a
CreateDataProvider option struct with SkipAutoRoleGrant bool) that parses the
address like the current helper, calls the engine “create_data_provider” as
before, but only performs the
AddMemberToRoleBypass("system","network_writer",...) when SkipAutoRoleGrant is
false; then change the test at lines 43–48 to call the new helper with
SkipAutoRoleGrant: true for managerWallet/unauthorized cases (and leave default
behavior for cases that should receive the role) so negative tests actually
reflect missing network_writer permissions.

Comment on lines +191 to +233

func CreateDataProvider(ctx context.Context, platform *kwilTesting.Platform, address string) (error) {
addr, err := util.NewEthereumAddressFromString(address)
if err != nil {
return errors.Wrap(err, "invalid data provider address")
}

// Grant the data provider the network_writer role
err = AddMemberToRoleBypass(ctx, platform, "system", "network_writer", addr.Address())
if err != nil {
return errors.Wrap(err, "failed to enable stream deployer")
}

txContext := &common.TxContext{
Ctx: ctx,
BlockContext: &common.BlockContext{Height: 1},
Signer: addr.Bytes(),
Caller: addr.Address(),
TxID: platform.Txid(),
}

engineContext := &common.EngineContext{
TxContext: txContext,
}

r, err := platform.Engine.Call(engineContext,
platform.DB,
"",
"create_data_provider",
[]any{addr.Address()},
func(row *common.Row) error {
return nil
},
)
if err != nil {
return errors.Wrap(err, "error in createDataProvider")
}
if r.Error != nil {
return errors.Wrap(r.Error, "error in createDataProvider")
}

return nil
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Split concerns (register vs. grant role) and clean up signature

  • The helper currently both registers a data provider and grants the network_writer role via a bypass. This couples two distinct concerns and can unintentionally over‑privilege addresses (e.g., managers). Recommend adding an options parameter to control role grants, defaulting to true for backward compatibility.
  • Minor: remove unnecessary parentheses from the return type.

Apply this diff for the signature nit:

-func CreateDataProvider(ctx context.Context, platform *kwilTesting.Platform, address string) (error) {
+func CreateDataProvider(ctx context.Context, platform *kwilTesting.Platform, address string) error {

Outside this hunk, consider introducing:

// New API (outside this change range)
type CreateDataProviderOptions struct {
    GrantNetworkWriter bool // default true
}

func CreateDataProviderWithOptions(ctx context.Context, platform *kwilTesting.Platform, address string, opts CreateDataProviderOptions) error {
    if opts.GrantNetworkWriter {
        // grant role (same as current)
    }
    // call create_data_provider (same as current)
    return nil
}

Then have CreateDataProvider call CreateDataProviderWithOptions with GrantNetworkWriter=true.

@trufnetwork trufnetwork deleted a comment from coderabbitai bot Aug 12, 2025
MicBun
MicBun previously approved these changes Aug 12, 2025
Copy link
Member

@MicBun MicBun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@williamrusdyputra I approve, also take a look at the ai reviewer comments. If it make sense, create another issue so it can be dealt on another PR.

@MicBun MicBun merged commit 81b3227 into main Aug 12, 2025
5 checks passed
@MicBun MicBun deleted the feat/normalized-tables branch August 12, 2025 03:57
@coderabbitai
Copy link

coderabbitai bot commented Aug 12, 2025

✅ Actions performed

Reviews paused.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants