Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions extensions/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package extensions
import (
"github.com/trufnetwork/node/extensions/database-size"
"github.com/trufnetwork/node/extensions/leaderwatch"
"github.com/trufnetwork/node/extensions/tn_attestation"
"github.com/trufnetwork/node/extensions/tn_cache"
"github.com/trufnetwork/node/extensions/tn_digest"
"github.com/trufnetwork/node/extensions/tn_vacuum"
Expand All @@ -13,5 +14,6 @@ func init() {
tn_cache.InitializeExtension()
tn_digest.InitializeExtension()
tn_vacuum.InitializeExtension()
tn_attestation.InitializeExtension()
database_size.InitializeExtension()
}
3 changes: 3 additions & 0 deletions extensions/tn_attestation/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package tn_attestation

const ExtensionName = "tn_attestation"
78 changes: 78 additions & 0 deletions extensions/tn_attestation/precompile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package tn_attestation

import (
"bytes"
"fmt"

"github.com/trufnetwork/kwil-db/common"
"github.com/trufnetwork/kwil-db/core/types"
"github.com/trufnetwork/kwil-db/extensions/precompiles"
)

// registerPrecompile registers the tn_attestation precompile with queue_for_signing() method.
// This is called from InitializeExtension().
func registerPrecompile() error {
return precompiles.RegisterPrecompile(ExtensionName, precompiles.Precompile{
// No cache needed: this precompile only affects leader's in-memory state,
// which is intentionally non-deterministic. All validators return nil (deterministic).
Cache: nil,
Methods: []precompiles.Method{
{
Name: "queue_for_signing",
Parameters: []precompiles.PrecompileValue{
precompiles.NewPrecompileValue("attestation_hash", types.TextType, false),
},
AccessModifiers: []precompiles.Modifier{precompiles.SYSTEM},
Handler: func(ctx *common.EngineContext, app *common.App, inputs []any, resultFn func([]any) error) error {
// Validate inputs length
if len(inputs) == 0 {
return fmt.Errorf("expected 1 input parameter, got 0")
}

// Safe type assertion with comma-ok
attestationHash, ok := inputs[0].(string)
if !ok {
return fmt.Errorf("attestation_hash must be a string, got %T", inputs[0])
}

// Validate attestation hash is not empty
if attestationHash == "" {
return fmt.Errorf("attestation_hash cannot be empty")
}

// Check if the current node is the leader
// We check by comparing the proposer's public key with our own identity
// Treat missing context as non-leader to preserve determinism
isLeader := false
if ctx != nil &&
ctx.TxContext != nil &&
ctx.TxContext.BlockContext != nil &&
ctx.TxContext.BlockContext.Proposer != nil &&
app != nil &&
app.Service != nil &&
app.Service.Identity != nil {
proposerBytes := ctx.TxContext.BlockContext.Proposer.Bytes()
isLeader = bytes.Equal(proposerBytes, app.Service.Identity)
}

// Only queue if we are the leader
// For non-leaders, this is a no-op to maintain determinism
if isLeader {
queue := GetAttestationQueue()
queue.Enqueue(attestationHash)

if app != nil && app.Service != nil && app.Service.Logger != nil {
app.Service.Logger.Debug("Queued attestation for signing",
"hash", attestationHash,
"queue_size", queue.Len())
}
}

// Always return nil (no return value) for all validators
// This maintains determinism while only affecting leader's in-memory state
return nil
},
},
},
})
}
120 changes: 120 additions & 0 deletions extensions/tn_attestation/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package tn_attestation

import "sync"

const (
// MaxQueueSize is the maximum number of attestation hashes that can be queued.
// This prevents unbounded memory growth before the signing worker (Issue 6) is implemented.
// With deduplication, this limit applies to unique hashes only.
MaxQueueSize = 10000
)

// AttestationQueue is a thread-safe queue for managing attestation hashes that need signing.
// It is shared between the queue_for_signing() precompile and the leader signing worker.
// The queue has a maximum size to prevent unbounded memory growth.
type AttestationQueue struct {
mu sync.RWMutex
hashes map[string]struct{} // Using map for O(1) deduplication
order []string // Maintains FIFO order for eviction
}

// NewAttestationQueue creates a new attestation queue.
func NewAttestationQueue() *AttestationQueue {
return &AttestationQueue{
hashes: make(map[string]struct{}),
order: make([]string, 0),
}
}

// Enqueue adds an attestation hash to the queue if it doesn't already exist.
// Returns true if the hash was added, false if it already existed.
// If the queue is at max capacity, the oldest hash is evicted (FIFO).
func (q *AttestationQueue) Enqueue(hash string) bool {
q.mu.Lock()
defer q.mu.Unlock()

// Check if hash already exists
if _, exists := q.hashes[hash]; exists {
return false
}

// If at max capacity, evict the oldest hash
if len(q.hashes) >= MaxQueueSize {
if len(q.order) > 0 {
oldestHash := q.order[0]
delete(q.hashes, oldestHash)
q.order = q.order[1:]
}
}

// Add new hash
q.hashes[hash] = struct{}{}
q.order = append(q.order, hash)
return true
}

// DequeueAll removes and returns all attestation hashes from the queue.
// The hashes are returned in FIFO order.
func (q *AttestationQueue) DequeueAll() []string {
q.mu.Lock()
defer q.mu.Unlock()

if len(q.hashes) == 0 {
return nil
}

// Return copy of order slice
hashes := make([]string, len(q.order))
copy(hashes, q.order)

// Clear the queue
q.hashes = make(map[string]struct{})
q.order = make([]string, 0)

return hashes
}

// Len returns the current number of hashes in the queue.
func (q *AttestationQueue) Len() int {
q.mu.RLock()
defer q.mu.RUnlock()
return len(q.hashes)
}

// Clear removes all hashes from the queue.
func (q *AttestationQueue) Clear() {
q.mu.Lock()
defer q.mu.Unlock()
q.hashes = make(map[string]struct{})
q.order = make([]string, 0)
}

// Copy creates a deep copy of the queue.
func (q *AttestationQueue) Copy() *AttestationQueue {
q.mu.RLock()
defer q.mu.RUnlock()

newQueue := NewAttestationQueue()
// Copy hashes map
for hash := range q.hashes {
newQueue.hashes[hash] = struct{}{}
}
// Copy order slice
newQueue.order = make([]string, len(q.order))
copy(newQueue.order, q.order)
return newQueue
}

// attestationQueueSingleton is the global queue shared between precompile and extension.
// This will be accessed by both queue_for_signing() precompile and the leader signing worker.
var attestationQueueSingleton *AttestationQueue
var queueOnce sync.Once

// GetAttestationQueue returns the global attestation queue singleton.
// This function is exported so both the precompile and leader worker can access it.
func GetAttestationQueue() *AttestationQueue {
queueOnce.Do(func() {
attestationQueueSingleton = NewAttestationQueue()
})
return attestationQueueSingleton
}
116 changes: 116 additions & 0 deletions extensions/tn_attestation/tn_attestation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package tn_attestation

import (
"context"
"fmt"

"github.com/trufnetwork/kwil-db/common"
"github.com/trufnetwork/kwil-db/extensions/hooks"
"github.com/trufnetwork/node/extensions/leaderwatch"
)

// InitializeExtension registers the tn_attestation extension.
// This includes:
// - Registering the queue_for_signing() precompile
// - Registering leader watch callbacks for signing worker // TODO: WIP
func InitializeExtension() {
// Register the precompile for queue_for_signing() method
if err := registerPrecompile(); err != nil {
panic(fmt.Sprintf("failed to register %s precompile: %v", ExtensionName, err))
}

// Register engine ready hook
if err := hooks.RegisterEngineReadyHook(ExtensionName+"_engine_ready", engineReadyHook); err != nil {
panic(fmt.Sprintf("failed to register %s engine ready hook: %v", ExtensionName, err))
}

// Register leader watch callbacks (for Issue 6 - leader signing worker)
if err := leaderwatch.Register(ExtensionName, leaderwatch.Callbacks{
OnAcquire: onLeaderAcquire,
OnLose: onLeaderLose,
OnEndBlock: onLeaderEndBlock,
}); err != nil {
panic(fmt.Sprintf("failed to register %s leader watcher: %v", ExtensionName, err))
}
}

// engineReadyHook is called when the engine is ready.
func engineReadyHook(ctx context.Context, app *common.App) error {
if app == nil || app.Service == nil {
return nil
}

logger := app.Service.Logger.New(ExtensionName)
logger.Info("tn_attestation extension ready",
"queue_size", GetAttestationQueue().Len())

return nil
}

// onLeaderAcquire is called when the node becomes the leader.
// TODO: Start the signing worker here
func onLeaderAcquire(ctx context.Context, app *common.App, block *common.BlockContext) {
if app == nil || app.Service == nil {
return
}

logger := app.Service.Logger.New(ExtensionName)
logger.Info("tn_attestation: acquired leadership")

// TODO: Implement signing worker startup
// Reference implementation:
// ext := GetExtension()
// ext.startSigningWorker(ctx, app)
}

// onLeaderLose is called when the node loses leadership.
// TODO: Stop the signing worker here
func onLeaderLose(ctx context.Context, app *common.App, block *common.BlockContext) {
if app == nil || app.Service == nil {
return
}

logger := app.Service.Logger.New(ExtensionName)
logger.Info("tn_attestation: lost leadership")

// TODO: Implement signing worker shutdown
// Reference implementation:
// ext := GetExtension()
// ext.stopSigningWorker()
}

// onLeaderEndBlock is called on every EndBlock when the node is the leader.
// Currently dequeues and logs hashes to prevent unbounded memory growth.
// TODO: Implement actual signing and submission of attestations.
func onLeaderEndBlock(ctx context.Context, app *common.App, block *common.BlockContext) {
if app == nil || app.Service == nil {
return
}

// Dequeue all pending attestation hashes to prevent unbounded growth
queue := GetAttestationQueue()
hashes := queue.DequeueAll()

// If there are hashes, log them (signing implementation pending in Issue 6)
if len(hashes) > 0 {
logger := app.Service.Logger.New(ExtensionName)
logger.Info("tn_attestation: dequeued attestations for signing",
"count", len(hashes),
"block_height", block.Height,
"note", "signing implementation pending (Issue 6)")

// TODO: Implement actual signing and submission
// Reference implementation:
// ext := GetExtension()
// for _, hash := range hashes {
// signature, err := ext.signAttestation(ctx, app, hash)
// if err != nil {
// logger.Error("failed to sign attestation", "hash", hash, "error", err)
// continue
// }
// if err := ext.submitSignature(ctx, app, hash, signature); err != nil {
// logger.Error("failed to submit signature", "hash", hash, "error", err)
// }
// }
}
}
Loading
Loading