From 34815d70eee3932877c031977648bb95c0f34f03 Mon Sep 17 00:00:00 2001 From: Gustavo Henrique Santos Souza de Miranda Date: Sat, 24 Jan 2026 16:08:32 -0300 Subject: [PATCH] fix: resolve CRDT collaboration sync by registering broadcast listener before channel subscribe The yjs-update broadcast listener was added after the Supabase channel was already subscribed, which meant it never received messages. Moved the listener registration to the builder chain before .subscribe() (matching how cursor/node-lock listeners work), and removed the broken isRemoteUpdateRef guard that caused ReferenceErrors preventing local changes from reaching the CRDT. Co-Authored-By: Claude Opus 4.5 --- .../editor/[projectId]/FlowchartEditor.tsx | 20 +++++++------ src/lib/collaboration/crdt.ts | 30 +++++++++---------- src/lib/collaboration/realtime.ts | 6 ++++ 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/src/app/editor/[projectId]/FlowchartEditor.tsx b/src/app/editor/[projectId]/FlowchartEditor.tsx index 31b2c39..0c0f5e6 100644 --- a/src/app/editor/[projectId]/FlowchartEditor.tsx +++ b/src/app/editor/[projectId]/FlowchartEditor.tsx @@ -527,6 +527,12 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName, const [characters, setCharacters] = useState(migratedData.characters) const [variables, setVariables] = useState(migratedData.variables) + + // Refs to always have the latest characters/variables for the CRDT persist callback + const charactersRef = useRef(characters) + const variablesRef = useRef(variables) + charactersRef.current = characters + variablesRef.current = variables const [showSettings, setShowSettings] = useState(false) const [showShare, setShowShare] = useState(false) const [showHistory, setShowHistory] = useState(false) @@ -540,7 +546,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName, const realtimeRef = useRef(null) const crdtRef = useRef(null) const auditRef = useRef(null) - const isRemoteUpdateRef = useRef(false) const cursorThrottleRef = useRef(0) const [collaborationNotifications, setCollaborationNotifications] = useState([]) const [nodeLocks, setNodeLocks] = useState>(new Map()) @@ -554,14 +559,10 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName, const crdtManager = new CRDTManager({ onNodesChange: (crdtNodes: FlowchartNode[]) => { - isRemoteUpdateRef.current = true setNodes(toReactFlowNodes(crdtNodes)) - isRemoteUpdateRef.current = false }, onEdgesChange: (crdtEdges: FlowchartEdge[]) => { - isRemoteUpdateRef.current = true setEdges(toReactFlowEdges(crdtEdges)) - isRemoteUpdateRef.current = false }, onPersist: async (persistNodes: FlowchartNode[], persistEdges: FlowchartEdge[]) => { try { @@ -571,8 +572,8 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName, flowchart_data: { nodes: persistNodes, edges: persistEdges, - characters, - variables, + characters: charactersRef.current, + variables: variablesRef.current, }, }) .eq('id', projectId) @@ -633,6 +634,9 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName, return next }) }, + onCRDTUpdate: (update: number[]) => { + crdtManager.applyRemoteUpdate(update) + }, onChannelSubscribed: (channel) => { crdtManager.connectChannel(channel) }, @@ -678,7 +682,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName, }, [edges]) useEffect(() => { - if (isRemoteUpdateRef.current) return crdtRef.current?.updateNodes(nodesForCRDT) if (!isRevertingRef.current) { auditRef.current?.recordNodeChanges(nodesForCRDT) @@ -686,7 +689,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName, }, [nodesForCRDT]) useEffect(() => { - if (isRemoteUpdateRef.current) return crdtRef.current?.updateEdges(edgesForCRDT) if (!isRevertingRef.current) { auditRef.current?.recordEdgeChanges(edgesForCRDT) diff --git a/src/lib/collaboration/crdt.ts b/src/lib/collaboration/crdt.ts index e27a3c5..57ac014 100644 --- a/src/lib/collaboration/crdt.ts +++ b/src/lib/collaboration/crdt.ts @@ -57,25 +57,23 @@ export class CRDTManager { }, 'init') } - /** Connect to a Supabase Realtime channel for syncing updates */ + /** Connect to a Supabase Realtime channel for outbound broadcasts */ connectChannel(channel: RealtimeChannel): void { this.channel = channel + } - // Listen for broadcast updates from other clients - channel.on('broadcast', { event: BROADCAST_EVENT }, (payload) => { - if (this.isDestroyed) return - const data = payload.payload as { update?: number[] } | undefined - if (data?.update) { - const update = new Uint8Array(data.update) - this.isApplyingRemote = true - Y.applyUpdate(this.doc, update, 'remote') - this.isApplyingRemote = false - // Notify React state of remote changes - this.notifyNodesChange() - this.notifyEdgesChange() - this.schedulePersist() - } - }) + /** Apply a remote CRDT update received via broadcast */ + applyRemoteUpdate(updateData: number[]): void { + if (this.isDestroyed) return + const update = new Uint8Array(updateData) + this.isApplyingRemote = true + Y.applyUpdate(this.doc, update, 'remote') + this.isApplyingRemote = false + // Notify React state of remote changes + this.notifyNodesChange() + this.notifyEdgesChange() + // Note: we do NOT schedulePersist here. Only the originating client + // persists its own changes to avoid write races and stale data overwrites. } /** Apply local node changes to the Yjs document */ diff --git a/src/lib/collaboration/realtime.ts b/src/lib/collaboration/realtime.ts index d1e892a..90a5762 100644 --- a/src/lib/collaboration/realtime.ts +++ b/src/lib/collaboration/realtime.ts @@ -35,6 +35,7 @@ type RealtimeCallbacks = { onChannelSubscribed?: (channel: RealtimeChannel) => void onCursorUpdate?: (cursor: RemoteCursor) => void onNodeLockUpdate?: (lock: NodeLock | null, userId: string) => void + onCRDTUpdate?: (update: number[]) => void } const HEARTBEAT_INTERVAL_MS = 30_000 @@ -131,6 +132,11 @@ export class RealtimeConnection { this.callbacks.onNodeLockUpdate?.(null, payload.userId) } }) + .on('broadcast', { event: 'yjs-update' }, ({ payload }) => { + if (payload?.update) { + this.callbacks.onCRDTUpdate?.(payload.update) + } + }) .subscribe(async (status) => { if (this.isDestroyed) return