A synchronization layer for konserve key-value stores. Enables real-time replication from a primary store to multiple subscribers over pluggable transports.
- Single-writer replication: Primary store broadcasts updates to subscribers in real-time
- Differential sync: On reconnection, only keys with newer server timestamps are transferred
- Batched initial sync: Backpressure via acknowledgments prevents overwhelming slow clients
- Pluggable transports: Channel-based (testing) and kabel (network) transports included
- Custom key discovery: Walk functions for tree-structured data (e.g., Datahike)
;; deps.edn
{:deps {io.replikativ/konserve-sync {:mvn/version "LATEST"}}}(require '[konserve-sync.sync :as sync]
'[konserve-sync.protocol :as proto]
'[superv.async :refer [S]])
;; Create context and register store
(def ctx (sync/make-context S {:batch-size 20}))
(def store-config {:scope #uuid "12345678-1234-1234-1234-123456789abc"
:backend :memory})
(sync/register-store! ctx my-store store-config {});; Use SAME store config as server
(def store-id (proto/store-id store-config))
(<!! (sync/subscribe! ctx transport store-id local-store
{:on-error #(println "Error:" %)
:on-complete #(println "Sync complete")}))Stores are identified by a UUID computed from their config via proto/store-id. Client and server must use identical configs for the same logical store.
The :scope key represents the store's logical identity:
(def store-config
{:scope #uuid "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
:backend :file
:path "/tmp/my-store"})(require '[konserve-sync.transport.channels :as ch])
(let [[transport-a transport-b] (ch/channel-pair S)]
;; Messages sent on A arrive at B and vice versa
...)(require '[konserve-sync.transport.kabel :as kabel-sync])
;; Server: Add sync middleware to kabel peer
(peer/server-peer S handler server-id
(fn [peer-config]
(when (nil? @sync-server)
(reset! sync-server (kabel-sync/kabel-sync-server S ctx nil)))
((kabel-sync/sync-server-middleware @sync-server) peer-config))
identity)
;; Client: Add sync middleware and use transport for subscribe!
(peer/client-peer S client-id
(kabel-sync/sync-client-middleware ctx transport-atom)
identity)(sync/make-context S
{:batch-size 20 ;; Keys per batch during initial sync
:batch-timeout-ms 30000 ;; Timeout waiting for batch ack
})(sync/register-store! ctx store store-config
{:filter-fn (fn [key value] true) ;; Filter which keys to sync
:walk-fn (fn [store opts] ...) ;; Custom key discovery (returns channel of keys)
:key-sort-fn (fn [key] 0)}) ;; Sort keys for sync order (higher = later)For tree-structured data, syncing ALL keys is inefficient. The :walk-fn discovers only reachable keys:
;; Walk function signature: (fn [store opts] -> channel-yielding-set-of-keys)
(sync/register-store! ctx store config
{:walk-fn my-walk-fn})When some keys depend on others, use :key-sort-fn to control sync order. Keys with higher sort values are sent later:
;; Send :db last (after its dependencies)
(sync/register-store! ctx store config
{:key-sort-fn (fn [k] (if (= k :db) 1 0))})This prevents "not found" errors when callbacks try to use a key before its dependencies are synced.
Datahike stores its database as a :db key containing BTSet indices, where each BTSet node is stored as a separate UUID key. The konserve-sync.walkers.datahike namespace provides a walker that discovers only reachable keys.
(require '[konserve-sync.walkers.datahike :as dh-walker])
(def sync-store-config
{:scope #uuid "a1b2c3d4-e5f6-7890-abcd-ef1234567890"
:backend :file
:path "/tmp/my-datahike-store"})
;; Register with walker and key-sort-fn
;; - walk-fn: only sync keys reachable from :db
;; - key-sort-fn: send :db last so BTSet nodes arrive first
(sync/register-store! ctx (-> conn d/db :store) sync-store-config
{:walk-fn dh-walker/datahike-walk-fn
:key-sort-fn (fn [k] (if (= k :db) 1 0))})(require '[konserve-sync.walkers.datahike :as dh-walker]
'[konserve.tiered :as tiered])
;; Create TieredStore: memory (fast) + IndexedDB (persistent)
(let [frontend (<! (memory/new-mem-store))
backend (<! (indexeddb/connect-idb-store "my-datahike"))
store (<! (tiered/connect-tiered-store
frontend backend
:write-policy :write-through
:read-policy :frontend-only))]
;; Sync only reachable keys from IndexedDB to memory
(<! (tiered/perform-walk-sync frontend backend [:db]
(dh-walker/make-tiered-walk-fn) {}))
;; Subscribe to server updates
(<! (sync/subscribe! ctx @transport store-id store opts))
;; Register callback for :db updates
(sync/register-callback! ctx store-id :db
(fn [{:keys [value]}]
(go (<! (refresh-local-db!)))))):db- The stored database root- All BTSet node UUIDs from
eavt,aevt,avetindices - Temporal index nodes (if
keep-history?is true) :schema-meta-key- Schema metadata
The walker requires datahike on the classpath (not a dependency of konserve-sync):
{:deps {io.replikativ/datahike {:mvn/version "0.6.1610"}}}- Client sends
{key -> last-write-timestamp}for local keys - Server identifies keys where client is missing or stale
- Keys sent in batches with flow control
- Server sends completion message
After initial sync, writes broadcast automatically:
;; Server write -> automatic broadcast
(<!! (k/assoc server-store :key "value"))
;; Client receives via callback
(sync/register-callback! ctx store-id :key
(fn [{:keys [value]}] (println "Updated:" value)))| Function | Description |
|---|---|
make-context |
Create sync context |
register-store! |
Register store (server) |
subscribe! |
Subscribe to remote store (client) |
register-callback! |
Register key update callback |
unregister-store! |
Unregister store |
unsubscribe! |
Unsubscribe from store |
Copyright 2025 Christian Weilbach. Apache License 2.0.