diff --git a/src/app/editor/[projectId]/FlowchartEditor.tsx b/src/app/editor/[projectId]/FlowchartEditor.tsx index 7179df9..5c8f969 100644 --- a/src/app/editor/[projectId]/FlowchartEditor.tsx +++ b/src/app/editor/[projectId]/FlowchartEditor.tsx @@ -577,6 +577,8 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName, }, }) .eq('id', projectId) + // Notify other clients to refresh after successful auto-persist + realtimeRef.current?.broadcastStateRefresh() } catch { // Persistence failure is non-critical; will retry on next change } @@ -637,6 +639,26 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName, onCRDTUpdate: (update: number[]) => { crdtManager.applyRemoteUpdate(update) }, + onStateRefresh: async () => { + try { + const sb = createClient() + const { data } = await sb + .from('projects') + .select('flowchart_data') + .eq('id', projectId) + .single() + if (data?.flowchart_data) { + const fd = data.flowchart_data as FlowchartData + setNodes(toReactFlowNodes(fd.nodes)) + setEdges(toReactFlowEdges(fd.edges)) + setCharacters(fd.characters) + setVariables(fd.variables) + crdtManager.refreshFromData(fd.nodes, fd.edges) + } + } catch { + // Non-critical: user can still manually refresh + } + }, onChannelSubscribed: (channel) => { crdtManager.connectChannel(channel) }, @@ -1143,8 +1165,8 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName, // Update last saved data ref to mark as not dirty lastSavedDataRef.current = flowchartData - // Broadcast full CRDT state so other connected clients sync up - crdtRef.current?.broadcastFullState() + // Notify other connected clients to refresh from the database + realtimeRef.current?.broadcastStateRefresh() setToast({ message: 'Project saved successfully', type: 'success' }) } catch (error) { diff --git a/src/lib/collaboration/crdt.ts b/src/lib/collaboration/crdt.ts index d99478d..7c7fcdb 100644 --- a/src/lib/collaboration/crdt.ts +++ b/src/lib/collaboration/crdt.ts @@ -19,6 +19,7 @@ export class CRDTManager { private callbacks: CRDTCallbacks private persistTimer: ReturnType | null = null private isApplyingRemote = false + private isSuppressed = false // suppress broadcast/persist during init or refresh private isDestroyed = false constructor(callbacks: CRDTCallbacks) { @@ -29,24 +30,25 @@ export class CRDTManager { // Schedule persistence on local Yjs document changes this.nodesMap.observe(() => { - if (this.isApplyingRemote) return + if (this.isApplyingRemote || this.isSuppressed) return this.schedulePersist() }) this.edgesMap.observe(() => { - if (this.isApplyingRemote) return + if (this.isApplyingRemote || this.isSuppressed) return this.schedulePersist() }) // Broadcast local updates to other clients this.doc.on('update', (update: Uint8Array, origin: unknown) => { - if (origin === 'remote') return // Don't re-broadcast remote updates + if (origin === 'remote' || this.isSuppressed) return this.broadcastUpdate(update) }) } /** Initialize the Yjs document from database state */ initializeFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void { + this.isSuppressed = true this.doc.transact(() => { nodes.forEach((node) => { this.nodesMap.set(node.id, JSON.stringify(node)) @@ -55,6 +57,7 @@ export class CRDTManager { this.edgesMap.set(edge.id, JSON.stringify(edge)) }) }, 'init') + this.isSuppressed = false } /** Connect to a Supabase Realtime channel for outbound broadcasts */ @@ -76,6 +79,37 @@ export class CRDTManager { // persists its own changes to avoid write races and stale data overwrites. } + /** Replace CRDT state from a database refresh without broadcasting */ + refreshFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void { + this.isSuppressed = true + this.doc.transact(() => { + // Sync nodes + const nodeIds = new Set(nodes.map((n) => n.id)) + Array.from(this.nodesMap.keys()).forEach((id) => { + if (!nodeIds.has(id)) this.nodesMap.delete(id) + }) + nodes.forEach((node) => { + const serialized = JSON.stringify(node) + if (this.nodesMap.get(node.id) !== serialized) { + this.nodesMap.set(node.id, serialized) + } + }) + + // Sync edges + const edgeIds = new Set(edges.map((e) => e.id)) + Array.from(this.edgesMap.keys()).forEach((id) => { + if (!edgeIds.has(id)) this.edgesMap.delete(id) + }) + edges.forEach((edge) => { + const serialized = JSON.stringify(edge) + if (this.edgesMap.get(edge.id) !== serialized) { + this.edgesMap.set(edge.id, serialized) + } + }) + }, 'remote') + this.isSuppressed = false + } + /** Apply local node changes to the Yjs document */ updateNodes(nodes: FlowchartNode[]): void { if (this.isApplyingRemote) return @@ -173,17 +207,6 @@ export class CRDTManager { this.callbacks.onEdgesChange(this.getEdges()) } - /** Broadcast the full document state to sync all connected clients */ - broadcastFullState(): void { - if (!this.channel || this.isDestroyed) return - const fullState = Y.encodeStateAsUpdate(this.doc) - this.channel.send({ - type: 'broadcast', - event: BROADCAST_EVENT, - payload: { update: Array.from(fullState) }, - }) - } - private broadcastUpdate(update: Uint8Array): void { if (!this.channel || this.isDestroyed) return this.channel.send({ diff --git a/src/lib/collaboration/realtime.ts b/src/lib/collaboration/realtime.ts index 90a5762..5efbdd5 100644 --- a/src/lib/collaboration/realtime.ts +++ b/src/lib/collaboration/realtime.ts @@ -36,6 +36,7 @@ type RealtimeCallbacks = { onCursorUpdate?: (cursor: RemoteCursor) => void onNodeLockUpdate?: (lock: NodeLock | null, userId: string) => void onCRDTUpdate?: (update: number[]) => void + onStateRefresh?: () => void } const HEARTBEAT_INTERVAL_MS = 30_000 @@ -137,6 +138,9 @@ export class RealtimeConnection { this.callbacks.onCRDTUpdate?.(payload.update) } }) + .on('broadcast', { event: 'state-refresh' }, () => { + this.callbacks.onStateRefresh?.() + }) .subscribe(async (status) => { if (this.isDestroyed) return @@ -195,6 +199,15 @@ export class RealtimeConnection { }) } + broadcastStateRefresh(): void { + if (!this.channel) return + this.channel.send({ + type: 'broadcast', + event: 'state-refresh', + payload: {}, + }) + } + broadcastNodeLock(nodeId: string | null): void { if (!this.channel) return this.channel.send({