import * as Y from 'yjs' import type { RealtimeChannel } from '@supabase/supabase-js' import type { FlowchartNode, FlowchartEdge } from '@/types/flowchart' const PERSIST_DEBOUNCE_MS = 30_000 // 30s - DB persist is for crash recovery only; CRDT handles real-time sync const BROADCAST_EVENT = 'yjs-update' export type CRDTCallbacks = { onNodesChange: (nodes: FlowchartNode[]) => void onEdgesChange: (edges: FlowchartEdge[]) => void onPersist: (nodes: FlowchartNode[], edges: FlowchartEdge[]) => void } export class CRDTManager { private doc: Y.Doc private nodesMap: Y.Map // node ID -> JSON string of FlowchartNode private edgesMap: Y.Map // edge ID -> JSON string of FlowchartEdge private channel: RealtimeChannel | null = null private callbacks: CRDTCallbacks private persistTimer: ReturnType | null = null private isApplyingRemote = false private isSuppressed = false // suppress broadcast/persist during init or refresh private isDestroyed = false constructor(callbacks: CRDTCallbacks) { this.doc = new Y.Doc() this.nodesMap = this.doc.getMap('nodes') this.edgesMap = this.doc.getMap('edges') this.callbacks = callbacks // Schedule persistence on local Yjs document changes this.nodesMap.observe(() => { if (this.isApplyingRemote || this.isSuppressed) return this.schedulePersist() }) this.edgesMap.observe(() => { if (this.isApplyingRemote || this.isSuppressed) return this.schedulePersist() }) // Broadcast local updates to other clients this.doc.on('update', (update: Uint8Array, origin: unknown) => { if (origin === 'remote' || this.isSuppressed) return this.broadcastUpdate(update) }) } /** Initialize the Yjs document from database state */ initializeFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void { this.isSuppressed = true this.doc.transact(() => { nodes.forEach((node) => { this.nodesMap.set(node.id, JSON.stringify(node)) }) edges.forEach((edge) => { this.edgesMap.set(edge.id, JSON.stringify(edge)) }) }, 'init') this.isSuppressed = false } /** Connect to a Supabase Realtime channel for outbound broadcasts */ connectChannel(channel: RealtimeChannel): void { this.channel = channel } /** Apply a remote CRDT update received via broadcast */ applyRemoteUpdate(updateData: number[]): void { if (this.isDestroyed) return const update = new Uint8Array(updateData) this.isApplyingRemote = true Y.applyUpdate(this.doc, update, 'remote') this.isApplyingRemote = false // Notify React state of remote changes this.notifyNodesChange() this.notifyEdgesChange() // Note: we do NOT schedulePersist here. Only the originating client // persists its own changes to avoid write races and stale data overwrites. } /** Replace CRDT state from a database refresh without broadcasting */ refreshFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void { this.isSuppressed = true this.doc.transact(() => { // Sync nodes const nodeIds = new Set(nodes.map((n) => n.id)) Array.from(this.nodesMap.keys()).forEach((id) => { if (!nodeIds.has(id)) this.nodesMap.delete(id) }) nodes.forEach((node) => { const serialized = JSON.stringify(node) if (this.nodesMap.get(node.id) !== serialized) { this.nodesMap.set(node.id, serialized) } }) // Sync edges const edgeIds = new Set(edges.map((e) => e.id)) Array.from(this.edgesMap.keys()).forEach((id) => { if (!edgeIds.has(id)) this.edgesMap.delete(id) }) edges.forEach((edge) => { const serialized = JSON.stringify(edge) if (this.edgesMap.get(edge.id) !== serialized) { this.edgesMap.set(edge.id, serialized) } }) }, 'remote') this.isSuppressed = false } /** Apply local node changes to the Yjs document */ updateNodes(nodes: FlowchartNode[]): void { if (this.isApplyingRemote) return this.doc.transact(() => { const currentIds = new Set(nodes.map((n) => n.id)) // Remove nodes no longer present const existingIds = Array.from(this.nodesMap.keys()) existingIds.forEach((id) => { if (!currentIds.has(id)) { this.nodesMap.delete(id) } }) // Add or update nodes nodes.forEach((node) => { const serialized = JSON.stringify(node) const existing = this.nodesMap.get(node.id) if (existing !== serialized) { this.nodesMap.set(node.id, serialized) } }) }, 'local') } /** Apply local edge changes to the Yjs document */ updateEdges(edges: FlowchartEdge[]): void { if (this.isApplyingRemote) return this.doc.transact(() => { const currentIds = new Set(edges.map((e) => e.id)) // Remove edges no longer present const existingIds = Array.from(this.edgesMap.keys()) existingIds.forEach((id) => { if (!currentIds.has(id)) { this.edgesMap.delete(id) } }) // Add or update edges edges.forEach((edge) => { const serialized = JSON.stringify(edge) const existing = this.edgesMap.get(edge.id) if (existing !== serialized) { this.edgesMap.set(edge.id, serialized) } }) }, 'local') } /** Get current nodes from the Yjs document */ getNodes(): FlowchartNode[] { const nodes: FlowchartNode[] = [] this.nodesMap.forEach((value) => { try { nodes.push(JSON.parse(value) as FlowchartNode) } catch { // Skip malformed entries } }) return nodes } /** Get current edges from the Yjs document */ getEdges(): FlowchartEdge[] { const edges: FlowchartEdge[] = [] this.edgesMap.forEach((value) => { try { edges.push(JSON.parse(value) as FlowchartEdge) } catch { // Skip malformed entries } }) return edges } /** Clean up resources */ destroy(): void { this.isDestroyed = true if (this.persistTimer) { clearTimeout(this.persistTimer) this.persistTimer = null } this.doc.destroy() this.channel = null } private notifyNodesChange(): void { this.callbacks.onNodesChange(this.getNodes()) } private notifyEdgesChange(): void { this.callbacks.onEdgesChange(this.getEdges()) } private broadcastUpdate(update: Uint8Array): void { if (!this.channel || this.isDestroyed) return this.channel.send({ type: 'broadcast', event: BROADCAST_EVENT, payload: { update: Array.from(update) }, }) } private schedulePersist(): void { if (this.persistTimer) { clearTimeout(this.persistTimer) } this.persistTimer = setTimeout(() => { if (this.isDestroyed) return this.callbacks.onPersist(this.getNodes(), this.getEdges()) }, PERSIST_DEBOUNCE_MS) } }