fix: resolve CRDT collaboration sync by registering broadcast listener before channel subscribe

The yjs-update broadcast listener was added after the Supabase channel
was already subscribed, which meant it never received messages. Moved
the listener registration to the builder chain before .subscribe()
(matching how cursor/node-lock listeners work), and removed the broken
isRemoteUpdateRef guard that caused ReferenceErrors preventing local
changes from reaching the CRDT.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Gustavo Henrique Santos Souza de Miranda 2026-01-24 16:08:32 -03:00
parent dc7875eeea
commit 34815d70ee
3 changed files with 31 additions and 25 deletions

View File

@ -527,6 +527,12 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
const [characters, setCharacters] = useState<Character[]>(migratedData.characters)
const [variables, setVariables] = useState<Variable[]>(migratedData.variables)
// Refs to always have the latest characters/variables for the CRDT persist callback
const charactersRef = useRef(characters)
const variablesRef = useRef(variables)
charactersRef.current = characters
variablesRef.current = variables
const [showSettings, setShowSettings] = useState(false)
const [showShare, setShowShare] = useState(false)
const [showHistory, setShowHistory] = useState(false)
@ -540,7 +546,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
const realtimeRef = useRef<RealtimeConnection | null>(null)
const crdtRef = useRef<CRDTManager | null>(null)
const auditRef = useRef<AuditTrailRecorder | null>(null)
const isRemoteUpdateRef = useRef(false)
const cursorThrottleRef = useRef<number>(0)
const [collaborationNotifications, setCollaborationNotifications] = useState<CollaborationNotification[]>([])
const [nodeLocks, setNodeLocks] = useState<Map<string, NodeLockInfo>>(new Map())
@ -554,14 +559,10 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
const crdtManager = new CRDTManager({
onNodesChange: (crdtNodes: FlowchartNode[]) => {
isRemoteUpdateRef.current = true
setNodes(toReactFlowNodes(crdtNodes))
isRemoteUpdateRef.current = false
},
onEdgesChange: (crdtEdges: FlowchartEdge[]) => {
isRemoteUpdateRef.current = true
setEdges(toReactFlowEdges(crdtEdges))
isRemoteUpdateRef.current = false
},
onPersist: async (persistNodes: FlowchartNode[], persistEdges: FlowchartEdge[]) => {
try {
@ -571,8 +572,8 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
flowchart_data: {
nodes: persistNodes,
edges: persistEdges,
characters,
variables,
characters: charactersRef.current,
variables: variablesRef.current,
},
})
.eq('id', projectId)
@ -633,6 +634,9 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
return next
})
},
onCRDTUpdate: (update: number[]) => {
crdtManager.applyRemoteUpdate(update)
},
onChannelSubscribed: (channel) => {
crdtManager.connectChannel(channel)
},
@ -678,7 +682,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
}, [edges])
useEffect(() => {
if (isRemoteUpdateRef.current) return
crdtRef.current?.updateNodes(nodesForCRDT)
if (!isRevertingRef.current) {
auditRef.current?.recordNodeChanges(nodesForCRDT)
@ -686,7 +689,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
}, [nodesForCRDT])
useEffect(() => {
if (isRemoteUpdateRef.current) return
crdtRef.current?.updateEdges(edgesForCRDT)
if (!isRevertingRef.current) {
auditRef.current?.recordEdgeChanges(edgesForCRDT)

View File

@ -57,25 +57,23 @@ export class CRDTManager {
}, 'init')
}
/** Connect to a Supabase Realtime channel for syncing updates */
/** Connect to a Supabase Realtime channel for outbound broadcasts */
connectChannel(channel: RealtimeChannel): void {
this.channel = channel
}
// Listen for broadcast updates from other clients
channel.on('broadcast', { event: BROADCAST_EVENT }, (payload) => {
/** Apply a remote CRDT update received via broadcast */
applyRemoteUpdate(updateData: number[]): void {
if (this.isDestroyed) return
const data = payload.payload as { update?: number[] } | undefined
if (data?.update) {
const update = new Uint8Array(data.update)
const update = new Uint8Array(updateData)
this.isApplyingRemote = true
Y.applyUpdate(this.doc, update, 'remote')
this.isApplyingRemote = false
// Notify React state of remote changes
this.notifyNodesChange()
this.notifyEdgesChange()
this.schedulePersist()
}
})
// Note: we do NOT schedulePersist here. Only the originating client
// persists its own changes to avoid write races and stale data overwrites.
}
/** Apply local node changes to the Yjs document */

View File

@ -35,6 +35,7 @@ type RealtimeCallbacks = {
onChannelSubscribed?: (channel: RealtimeChannel) => void
onCursorUpdate?: (cursor: RemoteCursor) => void
onNodeLockUpdate?: (lock: NodeLock | null, userId: string) => void
onCRDTUpdate?: (update: number[]) => void
}
const HEARTBEAT_INTERVAL_MS = 30_000
@ -131,6 +132,11 @@ export class RealtimeConnection {
this.callbacks.onNodeLockUpdate?.(null, payload.userId)
}
})
.on('broadcast', { event: 'yjs-update' }, ({ payload }) => {
if (payload?.update) {
this.callbacks.onCRDTUpdate?.(payload.update)
}
})
.subscribe(async (status) => {
if (this.isDestroyed) return