-
Notifications
You must be signed in to change notification settings - Fork 3
feat: enable attestation queue for validator signing workflow #1204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| package tn_attestation | ||
|
|
||
| const ExtensionName = "tn_attestation" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| package tn_attestation | ||
|
|
||
| import ( | ||
| "bytes" | ||
| "fmt" | ||
|
|
||
| "github.com/trufnetwork/kwil-db/common" | ||
| "github.com/trufnetwork/kwil-db/core/types" | ||
| "github.com/trufnetwork/kwil-db/extensions/precompiles" | ||
| ) | ||
|
|
||
| // registerPrecompile registers the tn_attestation precompile with queue_for_signing() method. | ||
| // This is called from InitializeExtension(). | ||
| func registerPrecompile() error { | ||
| return precompiles.RegisterPrecompile(ExtensionName, precompiles.Precompile{ | ||
| // No cache needed: this precompile only affects leader's in-memory state, | ||
| // which is intentionally non-deterministic. All validators return nil (deterministic). | ||
| Cache: nil, | ||
| Methods: []precompiles.Method{ | ||
| { | ||
| Name: "queue_for_signing", | ||
| Parameters: []precompiles.PrecompileValue{ | ||
| precompiles.NewPrecompileValue("attestation_hash", types.TextType, false), | ||
| }, | ||
| AccessModifiers: []precompiles.Modifier{precompiles.SYSTEM}, | ||
| Handler: func(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error { | ||
| // Validate inputs length | ||
| if len(inputs) == 0 { | ||
| return fmt.Errorf("expected 1 input parameter, got 0") | ||
| } | ||
|
|
||
| // Safe type assertion with comma-ok | ||
| attestationHash, ok := inputs[0].(string) | ||
| if !ok { | ||
| return fmt.Errorf("attestation_hash must be a string, got %T", inputs[0]) | ||
| } | ||
|
|
||
| // Validate attestation hash is not empty | ||
| if attestationHash == "" { | ||
| return fmt.Errorf("attestation_hash cannot be empty") | ||
| } | ||
|
|
||
| // Check if the current node is the leader | ||
| // We check by comparing the proposer's public key with our own identity | ||
| // Treat missing context as non-leader to preserve determinism | ||
| isLeader := false | ||
| if ctx != nil && | ||
| ctx.TxContext != nil && | ||
| ctx.TxContext.BlockContext != nil && | ||
| ctx.TxContext.BlockContext.Proposer != nil && | ||
| app != nil && | ||
| app.Service != nil && | ||
| app.Service.Identity != nil { | ||
| proposerBytes := ctx.TxContext.BlockContext.Proposer.Bytes() | ||
| isLeader = bytes.Equal(proposerBytes, app.Service.Identity) | ||
| } | ||
|
|
||
| // Only queue if we are the leader | ||
| // For non-leaders, this is a no-op to maintain determinism | ||
| if isLeader { | ||
| queue := GetAttestationQueue() | ||
| queue.Enqueue(attestationHash) | ||
|
|
||
| if app != nil && app.Service != nil && app.Service.Logger != nil { | ||
| app.Service.Logger.Debug("Queued attestation for signing", | ||
| "hash", attestationHash, | ||
| "queue_size", queue.Len()) | ||
| } | ||
| } | ||
|
|
||
| // Always return nil (no return value) for all validators | ||
| // This maintains determinism while only affecting leader's in-memory state | ||
| return nil | ||
| }, | ||
| }, | ||
| }, | ||
| }) | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,120 @@ | ||
| package tn_attestation | ||
|
|
||
| import "sync" | ||
|
|
||
| const ( | ||
| // MaxQueueSize is the maximum number of attestation hashes that can be queued. | ||
| // This prevents unbounded memory growth before the signing worker (Issue 6) is implemented. | ||
| // With deduplication, this limit applies to unique hashes only. | ||
| MaxQueueSize = 10000 | ||
| ) | ||
|
|
||
| // AttestationQueue is a thread-safe queue for managing attestation hashes that need signing. | ||
| // It is shared between the queue_for_signing() precompile and the leader signing worker. | ||
| // The queue has a maximum size to prevent unbounded memory growth. | ||
| type AttestationQueue struct { | ||
| mu sync.RWMutex | ||
| hashes map[string]struct{} // Using map for O(1) deduplication | ||
| order []string // Maintains FIFO order for eviction | ||
| } | ||
|
|
||
| // NewAttestationQueue creates a new attestation queue. | ||
| func NewAttestationQueue() *AttestationQueue { | ||
| return &AttestationQueue{ | ||
| hashes: make(map[string]struct{}), | ||
| order: make([]string, 0), | ||
| } | ||
| } | ||
|
|
||
| // Enqueue adds an attestation hash to the queue if it doesn't already exist. | ||
| // Returns true if the hash was added, false if it already existed. | ||
| // If the queue is at max capacity, the oldest hash is evicted (FIFO). | ||
| func (q *AttestationQueue) Enqueue(hash string) bool { | ||
| q.mu.Lock() | ||
| defer q.mu.Unlock() | ||
|
|
||
| // Check if hash already exists | ||
| if _, exists := q.hashes[hash]; exists { | ||
| return false | ||
| } | ||
|
|
||
| // If at max capacity, evict the oldest hash | ||
| if len(q.hashes) >= MaxQueueSize { | ||
| if len(q.order) > 0 { | ||
| oldestHash := q.order[0] | ||
| delete(q.hashes, oldestHash) | ||
| q.order = q.order[1:] | ||
| } | ||
| } | ||
|
|
||
| // Add new hash | ||
| q.hashes[hash] = struct{}{} | ||
| q.order = append(q.order, hash) | ||
| return true | ||
| } | ||
|
|
||
| // DequeueAll removes and returns all attestation hashes from the queue. | ||
| // The hashes are returned in FIFO order. | ||
| func (q *AttestationQueue) DequeueAll() []string { | ||
| q.mu.Lock() | ||
| defer q.mu.Unlock() | ||
|
|
||
| if len(q.hashes) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| // Return copy of order slice | ||
| hashes := make([]string, len(q.order)) | ||
| copy(hashes, q.order) | ||
|
|
||
| // Clear the queue | ||
| q.hashes = make(map[string]struct{}) | ||
| q.order = make([]string, 0) | ||
|
|
||
| return hashes | ||
| } | ||
|
|
||
| // Len returns the current number of hashes in the queue. | ||
| func (q *AttestationQueue) Len() int { | ||
| q.mu.RLock() | ||
| defer q.mu.RUnlock() | ||
| return len(q.hashes) | ||
| } | ||
|
|
||
| // Clear removes all hashes from the queue. | ||
| func (q *AttestationQueue) Clear() { | ||
| q.mu.Lock() | ||
| defer q.mu.Unlock() | ||
| q.hashes = make(map[string]struct{}) | ||
| q.order = make([]string, 0) | ||
| } | ||
|
|
||
| // Copy creates a deep copy of the queue. | ||
| func (q *AttestationQueue) Copy() *AttestationQueue { | ||
| q.mu.RLock() | ||
| defer q.mu.RUnlock() | ||
|
|
||
| newQueue := NewAttestationQueue() | ||
| // Copy hashes map | ||
| for hash := range q.hashes { | ||
| newQueue.hashes[hash] = struct{}{} | ||
| } | ||
| // Copy order slice | ||
| newQueue.order = make([]string, len(q.order)) | ||
| copy(newQueue.order, q.order) | ||
| return newQueue | ||
| } | ||
|
|
||
| // attestationQueueSingleton is the global queue shared between precompile and extension. | ||
| // This will be accessed by both queue_for_signing() precompile and the leader signing worker. | ||
| var attestationQueueSingleton *AttestationQueue | ||
| var queueOnce sync.Once | ||
|
|
||
| // GetAttestationQueue returns the global attestation queue singleton. | ||
| // This function is exported so both the precompile and leader worker can access it. | ||
| func GetAttestationQueue() *AttestationQueue { | ||
| queueOnce.Do(func() { | ||
| attestationQueueSingleton = NewAttestationQueue() | ||
| }) | ||
| return attestationQueueSingleton | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,116 @@ | ||
| package tn_attestation | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
|
|
||
| "github.com/trufnetwork/kwil-db/common" | ||
| "github.com/trufnetwork/kwil-db/extensions/hooks" | ||
| "github.com/trufnetwork/node/extensions/leaderwatch" | ||
| ) | ||
|
|
||
| // InitializeExtension registers the tn_attestation extension. | ||
| // This includes: | ||
| // - Registering the queue_for_signing() precompile | ||
| // - Registering leader watch callbacks for signing worker // TODO: WIP | ||
| func InitializeExtension() { | ||
| // Register the precompile for queue_for_signing() method | ||
| if err := registerPrecompile(); err != nil { | ||
| panic(fmt.Sprintf("failed to register %s precompile: %v", ExtensionName, err)) | ||
| } | ||
|
|
||
| // Register engine ready hook | ||
| if err := hooks.RegisterEngineReadyHook(ExtensionName+"_engine_ready", engineReadyHook); err != nil { | ||
| panic(fmt.Sprintf("failed to register %s engine ready hook: %v", ExtensionName, err)) | ||
| } | ||
|
|
||
| // Register leader watch callbacks (for Issue 6 - leader signing worker) | ||
| if err := leaderwatch.Register(ExtensionName, leaderwatch.Callbacks{ | ||
| OnAcquire: onLeaderAcquire, | ||
| OnLose: onLeaderLose, | ||
| OnEndBlock: onLeaderEndBlock, | ||
| }); err != nil { | ||
| panic(fmt.Sprintf("failed to register %s leader watcher: %v", ExtensionName, err)) | ||
| } | ||
| } | ||
|
|
||
| // engineReadyHook is called when the engine is ready. | ||
| func engineReadyHook(ctx context.Context, app *common.App) error { | ||
| if app == nil || app.Service == nil { | ||
| return nil | ||
| } | ||
|
|
||
| logger := app.Service.Logger.New(ExtensionName) | ||
| logger.Info("tn_attestation extension ready", | ||
| "queue_size", GetAttestationQueue().Len()) | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| // onLeaderAcquire is called when the node becomes the leader. | ||
| // TODO: Start the signing worker here | ||
| func onLeaderAcquire(ctx context.Context, app *common.App, block *common.BlockContext) { | ||
| if app == nil || app.Service == nil { | ||
| return | ||
| } | ||
|
|
||
| logger := app.Service.Logger.New(ExtensionName) | ||
| logger.Info("tn_attestation: acquired leadership") | ||
|
|
||
| // TODO: Implement signing worker startup | ||
| // Reference implementation: | ||
| // ext := GetExtension() | ||
| // ext.startSigningWorker(ctx, app) | ||
| } | ||
|
|
||
| // onLeaderLose is called when the node loses leadership. | ||
| // TODO: Stop the signing worker here | ||
| func onLeaderLose(ctx context.Context, app *common.App, block *common.BlockContext) { | ||
| if app == nil || app.Service == nil { | ||
| return | ||
| } | ||
|
|
||
| logger := app.Service.Logger.New(ExtensionName) | ||
| logger.Info("tn_attestation: lost leadership") | ||
|
|
||
| // TODO: Implement signing worker shutdown | ||
| // Reference implementation: | ||
| // ext := GetExtension() | ||
| // ext.stopSigningWorker() | ||
| } | ||
|
|
||
| // onLeaderEndBlock is called on every EndBlock when the node is the leader. | ||
| // Currently dequeues and logs hashes to prevent unbounded memory growth. | ||
| // TODO: Implement actual signing and submission of attestations. | ||
| func onLeaderEndBlock(ctx context.Context, app *common.App, block *common.BlockContext) { | ||
| if app == nil || app.Service == nil { | ||
| return | ||
| } | ||
|
|
||
| // Dequeue all pending attestation hashes to prevent unbounded growth | ||
| queue := GetAttestationQueue() | ||
| hashes := queue.DequeueAll() | ||
|
|
||
| // If there are hashes, log them (signing implementation pending in Issue 6) | ||
| if len(hashes) > 0 { | ||
| logger := app.Service.Logger.New(ExtensionName) | ||
| logger.Info("tn_attestation: dequeued attestations for signing", | ||
| "count", len(hashes), | ||
| "block_height", block.Height, | ||
| "note", "signing implementation pending (Issue 6)") | ||
|
|
||
| // TODO: Implement actual signing and submission | ||
| // Reference implementation: | ||
| // ext := GetExtension() | ||
| // for _, hash := range hashes { | ||
| // signature, err := ext.signAttestation(ctx, app, hash) | ||
| // if err != nil { | ||
| // logger.Error("failed to sign attestation", "hash", hash, "error", err) | ||
| // continue | ||
| // } | ||
| // if err := ext.submitSignature(ctx, app, hash, signature); err != nil { | ||
| // logger.Error("failed to submit signature", "hash", hash, "error", err) | ||
| // } | ||
| // } | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.