feat: notify collaborators to refresh from DB after save

Instead of relying on Yjs broadcast serialization (which has delivery
issues), use a lightweight state-refresh broadcast event. When any
client persists (manual save or CRDT auto-persist), it broadcasts
state-refresh. Other clients fetch the latest flowchart_data from the
database and update their local state and CRDT.

Added isSuppressed flag to CRDTManager to prevent broadcast/persist
loops during initialization and refresh operations.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Gustavo Henrique Santos Souza de Miranda 2026-01-24 16:36:43 -03:00
parent cdaae6b965
commit eb86ccd291
3 changed files with 74 additions and 16 deletions

View File

@ -577,6 +577,8 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
}, },
}) })
.eq('id', projectId) .eq('id', projectId)
// Notify other clients to refresh after successful auto-persist
realtimeRef.current?.broadcastStateRefresh()
} catch { } catch {
// Persistence failure is non-critical; will retry on next change // Persistence failure is non-critical; will retry on next change
} }
@ -637,6 +639,26 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
onCRDTUpdate: (update: number[]) => { onCRDTUpdate: (update: number[]) => {
crdtManager.applyRemoteUpdate(update) crdtManager.applyRemoteUpdate(update)
}, },
onStateRefresh: async () => {
try {
const sb = createClient()
const { data } = await sb
.from('projects')
.select('flowchart_data')
.eq('id', projectId)
.single()
if (data?.flowchart_data) {
const fd = data.flowchart_data as FlowchartData
setNodes(toReactFlowNodes(fd.nodes))
setEdges(toReactFlowEdges(fd.edges))
setCharacters(fd.characters)
setVariables(fd.variables)
crdtManager.refreshFromData(fd.nodes, fd.edges)
}
} catch {
// Non-critical: user can still manually refresh
}
},
onChannelSubscribed: (channel) => { onChannelSubscribed: (channel) => {
crdtManager.connectChannel(channel) crdtManager.connectChannel(channel)
}, },
@ -1143,8 +1165,8 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
// Update last saved data ref to mark as not dirty // Update last saved data ref to mark as not dirty
lastSavedDataRef.current = flowchartData lastSavedDataRef.current = flowchartData
// Broadcast full CRDT state so other connected clients sync up // Notify other connected clients to refresh from the database
crdtRef.current?.broadcastFullState() realtimeRef.current?.broadcastStateRefresh()
setToast({ message: 'Project saved successfully', type: 'success' }) setToast({ message: 'Project saved successfully', type: 'success' })
} catch (error) { } catch (error) {

View File

@ -19,6 +19,7 @@ export class CRDTManager {
private callbacks: CRDTCallbacks private callbacks: CRDTCallbacks
private persistTimer: ReturnType<typeof setTimeout> | null = null private persistTimer: ReturnType<typeof setTimeout> | null = null
private isApplyingRemote = false private isApplyingRemote = false
private isSuppressed = false // suppress broadcast/persist during init or refresh
private isDestroyed = false private isDestroyed = false
constructor(callbacks: CRDTCallbacks) { constructor(callbacks: CRDTCallbacks) {
@ -29,24 +30,25 @@ export class CRDTManager {
// Schedule persistence on local Yjs document changes // Schedule persistence on local Yjs document changes
this.nodesMap.observe(() => { this.nodesMap.observe(() => {
if (this.isApplyingRemote) return if (this.isApplyingRemote || this.isSuppressed) return
this.schedulePersist() this.schedulePersist()
}) })
this.edgesMap.observe(() => { this.edgesMap.observe(() => {
if (this.isApplyingRemote) return if (this.isApplyingRemote || this.isSuppressed) return
this.schedulePersist() this.schedulePersist()
}) })
// Broadcast local updates to other clients // Broadcast local updates to other clients
this.doc.on('update', (update: Uint8Array, origin: unknown) => { this.doc.on('update', (update: Uint8Array, origin: unknown) => {
if (origin === 'remote') return // Don't re-broadcast remote updates if (origin === 'remote' || this.isSuppressed) return
this.broadcastUpdate(update) this.broadcastUpdate(update)
}) })
} }
/** Initialize the Yjs document from database state */ /** Initialize the Yjs document from database state */
initializeFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void { initializeFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void {
this.isSuppressed = true
this.doc.transact(() => { this.doc.transact(() => {
nodes.forEach((node) => { nodes.forEach((node) => {
this.nodesMap.set(node.id, JSON.stringify(node)) this.nodesMap.set(node.id, JSON.stringify(node))
@ -55,6 +57,7 @@ export class CRDTManager {
this.edgesMap.set(edge.id, JSON.stringify(edge)) this.edgesMap.set(edge.id, JSON.stringify(edge))
}) })
}, 'init') }, 'init')
this.isSuppressed = false
} }
/** Connect to a Supabase Realtime channel for outbound broadcasts */ /** Connect to a Supabase Realtime channel for outbound broadcasts */
@ -76,6 +79,37 @@ export class CRDTManager {
// persists its own changes to avoid write races and stale data overwrites. // persists its own changes to avoid write races and stale data overwrites.
} }
/** Replace CRDT state from a database refresh without broadcasting */
refreshFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void {
this.isSuppressed = true
this.doc.transact(() => {
// Sync nodes
const nodeIds = new Set(nodes.map((n) => n.id))
Array.from(this.nodesMap.keys()).forEach((id) => {
if (!nodeIds.has(id)) this.nodesMap.delete(id)
})
nodes.forEach((node) => {
const serialized = JSON.stringify(node)
if (this.nodesMap.get(node.id) !== serialized) {
this.nodesMap.set(node.id, serialized)
}
})
// Sync edges
const edgeIds = new Set(edges.map((e) => e.id))
Array.from(this.edgesMap.keys()).forEach((id) => {
if (!edgeIds.has(id)) this.edgesMap.delete(id)
})
edges.forEach((edge) => {
const serialized = JSON.stringify(edge)
if (this.edgesMap.get(edge.id) !== serialized) {
this.edgesMap.set(edge.id, serialized)
}
})
}, 'remote')
this.isSuppressed = false
}
/** Apply local node changes to the Yjs document */ /** Apply local node changes to the Yjs document */
updateNodes(nodes: FlowchartNode[]): void { updateNodes(nodes: FlowchartNode[]): void {
if (this.isApplyingRemote) return if (this.isApplyingRemote) return
@ -173,17 +207,6 @@ export class CRDTManager {
this.callbacks.onEdgesChange(this.getEdges()) this.callbacks.onEdgesChange(this.getEdges())
} }
/** Broadcast the full document state to sync all connected clients */
broadcastFullState(): void {
if (!this.channel || this.isDestroyed) return
const fullState = Y.encodeStateAsUpdate(this.doc)
this.channel.send({
type: 'broadcast',
event: BROADCAST_EVENT,
payload: { update: Array.from(fullState) },
})
}
private broadcastUpdate(update: Uint8Array): void { private broadcastUpdate(update: Uint8Array): void {
if (!this.channel || this.isDestroyed) return if (!this.channel || this.isDestroyed) return
this.channel.send({ this.channel.send({

View File

@ -36,6 +36,7 @@ type RealtimeCallbacks = {
onCursorUpdate?: (cursor: RemoteCursor) => void onCursorUpdate?: (cursor: RemoteCursor) => void
onNodeLockUpdate?: (lock: NodeLock | null, userId: string) => void onNodeLockUpdate?: (lock: NodeLock | null, userId: string) => void
onCRDTUpdate?: (update: number[]) => void onCRDTUpdate?: (update: number[]) => void
onStateRefresh?: () => void
} }
const HEARTBEAT_INTERVAL_MS = 30_000 const HEARTBEAT_INTERVAL_MS = 30_000
@ -137,6 +138,9 @@ export class RealtimeConnection {
this.callbacks.onCRDTUpdate?.(payload.update) this.callbacks.onCRDTUpdate?.(payload.update)
} }
}) })
.on('broadcast', { event: 'state-refresh' }, () => {
this.callbacks.onStateRefresh?.()
})
.subscribe(async (status) => { .subscribe(async (status) => {
if (this.isDestroyed) return if (this.isDestroyed) return
@ -195,6 +199,15 @@ export class RealtimeConnection {
}) })
} }
broadcastStateRefresh(): void {
if (!this.channel) return
this.channel.send({
type: 'broadcast',
event: 'state-refresh',
payload: {},
})
}
broadcastNodeLock(nodeId: string | null): void { broadcastNodeLock(nodeId: string | null): void {
if (!this.channel) return if (!this.channel) return
this.channel.send({ this.channel.send({