Compare commits

...

3 Commits

Author SHA1 Message Date
Gustavo Henrique Santos Souza de Miranda eb86ccd291 feat: notify collaborators to refresh from DB after save
Instead of relying on Yjs broadcast serialization (which has delivery
issues), use a lightweight state-refresh broadcast event. When any
client persists (manual save or CRDT auto-persist), it broadcasts
state-refresh. Other clients fetch the latest flowchart_data from the
database and update their local state and CRDT.

Added isSuppressed flag to CRDTManager to prevent broadcast/persist
loops during initialization and refresh operations.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 16:36:43 -03:00
Gustavo Henrique Santos Souza de Miranda cdaae6b965 feat: broadcast full CRDT state to other clients after manual save
When a user saves, the full Yjs document state is broadcast so all
connected clients converge, even if they missed incremental updates.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 16:12:58 -03:00
Gustavo Henrique Santos Souza de Miranda 34815d70ee fix: resolve CRDT collaboration sync by registering broadcast listener before channel subscribe
The yjs-update broadcast listener was added after the Supabase channel
was already subscribed, which meant it never received messages. Moved
the listener registration to the builder chain before .subscribe()
(matching how cursor/node-lock listeners work), and removed the broken
isRemoteUpdateRef guard that caused ReferenceErrors preventing local
changes from reaching the CRDT.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-24 16:08:32 -03:00
3 changed files with 106 additions and 28 deletions

View File

@ -527,6 +527,12 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
const [characters, setCharacters] = useState<Character[]>(migratedData.characters)
const [variables, setVariables] = useState<Variable[]>(migratedData.variables)
// Refs to always have the latest characters/variables for the CRDT persist callback
const charactersRef = useRef(characters)
const variablesRef = useRef(variables)
charactersRef.current = characters
variablesRef.current = variables
const [showSettings, setShowSettings] = useState(false)
const [showShare, setShowShare] = useState(false)
const [showHistory, setShowHistory] = useState(false)
@ -540,7 +546,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
const realtimeRef = useRef<RealtimeConnection | null>(null)
const crdtRef = useRef<CRDTManager | null>(null)
const auditRef = useRef<AuditTrailRecorder | null>(null)
const isRemoteUpdateRef = useRef(false)
const cursorThrottleRef = useRef<number>(0)
const [collaborationNotifications, setCollaborationNotifications] = useState<CollaborationNotification[]>([])
const [nodeLocks, setNodeLocks] = useState<Map<string, NodeLockInfo>>(new Map())
@ -554,14 +559,10 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
const crdtManager = new CRDTManager({
onNodesChange: (crdtNodes: FlowchartNode[]) => {
isRemoteUpdateRef.current = true
setNodes(toReactFlowNodes(crdtNodes))
isRemoteUpdateRef.current = false
},
onEdgesChange: (crdtEdges: FlowchartEdge[]) => {
isRemoteUpdateRef.current = true
setEdges(toReactFlowEdges(crdtEdges))
isRemoteUpdateRef.current = false
},
onPersist: async (persistNodes: FlowchartNode[], persistEdges: FlowchartEdge[]) => {
try {
@ -571,11 +572,13 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
flowchart_data: {
nodes: persistNodes,
edges: persistEdges,
characters,
variables,
characters: charactersRef.current,
variables: variablesRef.current,
},
})
.eq('id', projectId)
// Notify other clients to refresh after successful auto-persist
realtimeRef.current?.broadcastStateRefresh()
} catch {
// Persistence failure is non-critical; will retry on next change
}
@ -633,6 +636,29 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
return next
})
},
onCRDTUpdate: (update: number[]) => {
crdtManager.applyRemoteUpdate(update)
},
onStateRefresh: async () => {
try {
const sb = createClient()
const { data } = await sb
.from('projects')
.select('flowchart_data')
.eq('id', projectId)
.single()
if (data?.flowchart_data) {
const fd = data.flowchart_data as FlowchartData
setNodes(toReactFlowNodes(fd.nodes))
setEdges(toReactFlowEdges(fd.edges))
setCharacters(fd.characters)
setVariables(fd.variables)
crdtManager.refreshFromData(fd.nodes, fd.edges)
}
} catch {
// Non-critical: user can still manually refresh
}
},
onChannelSubscribed: (channel) => {
crdtManager.connectChannel(channel)
},
@ -678,7 +704,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
}, [edges])
useEffect(() => {
if (isRemoteUpdateRef.current) return
crdtRef.current?.updateNodes(nodesForCRDT)
if (!isRevertingRef.current) {
auditRef.current?.recordNodeChanges(nodesForCRDT)
@ -686,7 +711,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
}, [nodesForCRDT])
useEffect(() => {
if (isRemoteUpdateRef.current) return
crdtRef.current?.updateEdges(edgesForCRDT)
if (!isRevertingRef.current) {
auditRef.current?.recordEdgeChanges(edgesForCRDT)
@ -1141,6 +1165,9 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
// Update last saved data ref to mark as not dirty
lastSavedDataRef.current = flowchartData
// Notify other connected clients to refresh from the database
realtimeRef.current?.broadcastStateRefresh()
setToast({ message: 'Project saved successfully', type: 'success' })
} catch (error) {
console.error('Failed to save project:', error)

View File

@ -19,6 +19,7 @@ export class CRDTManager {
private callbacks: CRDTCallbacks
private persistTimer: ReturnType<typeof setTimeout> | null = null
private isApplyingRemote = false
private isSuppressed = false // suppress broadcast/persist during init or refresh
private isDestroyed = false
constructor(callbacks: CRDTCallbacks) {
@ -29,24 +30,25 @@ export class CRDTManager {
// Schedule persistence on local Yjs document changes
this.nodesMap.observe(() => {
if (this.isApplyingRemote) return
if (this.isApplyingRemote || this.isSuppressed) return
this.schedulePersist()
})
this.edgesMap.observe(() => {
if (this.isApplyingRemote) return
if (this.isApplyingRemote || this.isSuppressed) return
this.schedulePersist()
})
// Broadcast local updates to other clients
this.doc.on('update', (update: Uint8Array, origin: unknown) => {
if (origin === 'remote') return // Don't re-broadcast remote updates
if (origin === 'remote' || this.isSuppressed) return
this.broadcastUpdate(update)
})
}
/** Initialize the Yjs document from database state */
initializeFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void {
this.isSuppressed = true
this.doc.transact(() => {
nodes.forEach((node) => {
this.nodesMap.set(node.id, JSON.stringify(node))
@ -55,27 +57,57 @@ export class CRDTManager {
this.edgesMap.set(edge.id, JSON.stringify(edge))
})
}, 'init')
this.isSuppressed = false
}
/** Connect to a Supabase Realtime channel for syncing updates */
/** Connect to a Supabase Realtime channel for outbound broadcasts */
connectChannel(channel: RealtimeChannel): void {
this.channel = channel
}
// Listen for broadcast updates from other clients
channel.on('broadcast', { event: BROADCAST_EVENT }, (payload) => {
if (this.isDestroyed) return
const data = payload.payload as { update?: number[] } | undefined
if (data?.update) {
const update = new Uint8Array(data.update)
this.isApplyingRemote = true
Y.applyUpdate(this.doc, update, 'remote')
this.isApplyingRemote = false
// Notify React state of remote changes
this.notifyNodesChange()
this.notifyEdgesChange()
this.schedulePersist()
}
})
/** Apply a remote CRDT update received via broadcast */
applyRemoteUpdate(updateData: number[]): void {
if (this.isDestroyed) return
const update = new Uint8Array(updateData)
this.isApplyingRemote = true
Y.applyUpdate(this.doc, update, 'remote')
this.isApplyingRemote = false
// Notify React state of remote changes
this.notifyNodesChange()
this.notifyEdgesChange()
// Note: we do NOT schedulePersist here. Only the originating client
// persists its own changes to avoid write races and stale data overwrites.
}
/** Replace CRDT state from a database refresh without broadcasting */
refreshFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void {
this.isSuppressed = true
this.doc.transact(() => {
// Sync nodes
const nodeIds = new Set(nodes.map((n) => n.id))
Array.from(this.nodesMap.keys()).forEach((id) => {
if (!nodeIds.has(id)) this.nodesMap.delete(id)
})
nodes.forEach((node) => {
const serialized = JSON.stringify(node)
if (this.nodesMap.get(node.id) !== serialized) {
this.nodesMap.set(node.id, serialized)
}
})
// Sync edges
const edgeIds = new Set(edges.map((e) => e.id))
Array.from(this.edgesMap.keys()).forEach((id) => {
if (!edgeIds.has(id)) this.edgesMap.delete(id)
})
edges.forEach((edge) => {
const serialized = JSON.stringify(edge)
if (this.edgesMap.get(edge.id) !== serialized) {
this.edgesMap.set(edge.id, serialized)
}
})
}, 'remote')
this.isSuppressed = false
}
/** Apply local node changes to the Yjs document */

View File

@ -35,6 +35,8 @@ type RealtimeCallbacks = {
onChannelSubscribed?: (channel: RealtimeChannel) => void
onCursorUpdate?: (cursor: RemoteCursor) => void
onNodeLockUpdate?: (lock: NodeLock | null, userId: string) => void
onCRDTUpdate?: (update: number[]) => void
onStateRefresh?: () => void
}
const HEARTBEAT_INTERVAL_MS = 30_000
@ -131,6 +133,14 @@ export class RealtimeConnection {
this.callbacks.onNodeLockUpdate?.(null, payload.userId)
}
})
.on('broadcast', { event: 'yjs-update' }, ({ payload }) => {
if (payload?.update) {
this.callbacks.onCRDTUpdate?.(payload.update)
}
})
.on('broadcast', { event: 'state-refresh' }, () => {
this.callbacks.onStateRefresh?.()
})
.subscribe(async (status) => {
if (this.isDestroyed) return
@ -189,6 +199,15 @@ export class RealtimeConnection {
})
}
broadcastStateRefresh(): void {
if (!this.channel) return
this.channel.send({
type: 'broadcast',
event: 'state-refresh',
payload: {},
})
}
broadcastNodeLock(nodeId: string | null): void {
if (!this.channel) return
this.channel.send({