Compare commits
No commits in common. "eb86ccd29197c6a6876151d8de829cdb1e77a2f4" and "dc7875eeeae1abb44b54b5ec1013e5bb60f85ad4" have entirely different histories.
eb86ccd291
...
dc7875eeea
|
|
@ -527,12 +527,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
|
||||||
|
|
||||||
const [characters, setCharacters] = useState<Character[]>(migratedData.characters)
|
const [characters, setCharacters] = useState<Character[]>(migratedData.characters)
|
||||||
const [variables, setVariables] = useState<Variable[]>(migratedData.variables)
|
const [variables, setVariables] = useState<Variable[]>(migratedData.variables)
|
||||||
|
|
||||||
// Refs to always have the latest characters/variables for the CRDT persist callback
|
|
||||||
const charactersRef = useRef(characters)
|
|
||||||
const variablesRef = useRef(variables)
|
|
||||||
charactersRef.current = characters
|
|
||||||
variablesRef.current = variables
|
|
||||||
const [showSettings, setShowSettings] = useState(false)
|
const [showSettings, setShowSettings] = useState(false)
|
||||||
const [showShare, setShowShare] = useState(false)
|
const [showShare, setShowShare] = useState(false)
|
||||||
const [showHistory, setShowHistory] = useState(false)
|
const [showHistory, setShowHistory] = useState(false)
|
||||||
|
|
@ -546,6 +540,7 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
|
||||||
const realtimeRef = useRef<RealtimeConnection | null>(null)
|
const realtimeRef = useRef<RealtimeConnection | null>(null)
|
||||||
const crdtRef = useRef<CRDTManager | null>(null)
|
const crdtRef = useRef<CRDTManager | null>(null)
|
||||||
const auditRef = useRef<AuditTrailRecorder | null>(null)
|
const auditRef = useRef<AuditTrailRecorder | null>(null)
|
||||||
|
const isRemoteUpdateRef = useRef(false)
|
||||||
const cursorThrottleRef = useRef<number>(0)
|
const cursorThrottleRef = useRef<number>(0)
|
||||||
const [collaborationNotifications, setCollaborationNotifications] = useState<CollaborationNotification[]>([])
|
const [collaborationNotifications, setCollaborationNotifications] = useState<CollaborationNotification[]>([])
|
||||||
const [nodeLocks, setNodeLocks] = useState<Map<string, NodeLockInfo>>(new Map())
|
const [nodeLocks, setNodeLocks] = useState<Map<string, NodeLockInfo>>(new Map())
|
||||||
|
|
@ -559,10 +554,14 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
|
||||||
|
|
||||||
const crdtManager = new CRDTManager({
|
const crdtManager = new CRDTManager({
|
||||||
onNodesChange: (crdtNodes: FlowchartNode[]) => {
|
onNodesChange: (crdtNodes: FlowchartNode[]) => {
|
||||||
|
isRemoteUpdateRef.current = true
|
||||||
setNodes(toReactFlowNodes(crdtNodes))
|
setNodes(toReactFlowNodes(crdtNodes))
|
||||||
|
isRemoteUpdateRef.current = false
|
||||||
},
|
},
|
||||||
onEdgesChange: (crdtEdges: FlowchartEdge[]) => {
|
onEdgesChange: (crdtEdges: FlowchartEdge[]) => {
|
||||||
|
isRemoteUpdateRef.current = true
|
||||||
setEdges(toReactFlowEdges(crdtEdges))
|
setEdges(toReactFlowEdges(crdtEdges))
|
||||||
|
isRemoteUpdateRef.current = false
|
||||||
},
|
},
|
||||||
onPersist: async (persistNodes: FlowchartNode[], persistEdges: FlowchartEdge[]) => {
|
onPersist: async (persistNodes: FlowchartNode[], persistEdges: FlowchartEdge[]) => {
|
||||||
try {
|
try {
|
||||||
|
|
@ -572,13 +571,11 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
|
||||||
flowchart_data: {
|
flowchart_data: {
|
||||||
nodes: persistNodes,
|
nodes: persistNodes,
|
||||||
edges: persistEdges,
|
edges: persistEdges,
|
||||||
characters: charactersRef.current,
|
characters,
|
||||||
variables: variablesRef.current,
|
variables,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
.eq('id', projectId)
|
.eq('id', projectId)
|
||||||
// Notify other clients to refresh after successful auto-persist
|
|
||||||
realtimeRef.current?.broadcastStateRefresh()
|
|
||||||
} catch {
|
} catch {
|
||||||
// Persistence failure is non-critical; will retry on next change
|
// Persistence failure is non-critical; will retry on next change
|
||||||
}
|
}
|
||||||
|
|
@ -636,29 +633,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
|
||||||
return next
|
return next
|
||||||
})
|
})
|
||||||
},
|
},
|
||||||
onCRDTUpdate: (update: number[]) => {
|
|
||||||
crdtManager.applyRemoteUpdate(update)
|
|
||||||
},
|
|
||||||
onStateRefresh: async () => {
|
|
||||||
try {
|
|
||||||
const sb = createClient()
|
|
||||||
const { data } = await sb
|
|
||||||
.from('projects')
|
|
||||||
.select('flowchart_data')
|
|
||||||
.eq('id', projectId)
|
|
||||||
.single()
|
|
||||||
if (data?.flowchart_data) {
|
|
||||||
const fd = data.flowchart_data as FlowchartData
|
|
||||||
setNodes(toReactFlowNodes(fd.nodes))
|
|
||||||
setEdges(toReactFlowEdges(fd.edges))
|
|
||||||
setCharacters(fd.characters)
|
|
||||||
setVariables(fd.variables)
|
|
||||||
crdtManager.refreshFromData(fd.nodes, fd.edges)
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
// Non-critical: user can still manually refresh
|
|
||||||
}
|
|
||||||
},
|
|
||||||
onChannelSubscribed: (channel) => {
|
onChannelSubscribed: (channel) => {
|
||||||
crdtManager.connectChannel(channel)
|
crdtManager.connectChannel(channel)
|
||||||
},
|
},
|
||||||
|
|
@ -704,6 +678,7 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
|
||||||
}, [edges])
|
}, [edges])
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
|
if (isRemoteUpdateRef.current) return
|
||||||
crdtRef.current?.updateNodes(nodesForCRDT)
|
crdtRef.current?.updateNodes(nodesForCRDT)
|
||||||
if (!isRevertingRef.current) {
|
if (!isRevertingRef.current) {
|
||||||
auditRef.current?.recordNodeChanges(nodesForCRDT)
|
auditRef.current?.recordNodeChanges(nodesForCRDT)
|
||||||
|
|
@ -711,6 +686,7 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
|
||||||
}, [nodesForCRDT])
|
}, [nodesForCRDT])
|
||||||
|
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
|
if (isRemoteUpdateRef.current) return
|
||||||
crdtRef.current?.updateEdges(edgesForCRDT)
|
crdtRef.current?.updateEdges(edgesForCRDT)
|
||||||
if (!isRevertingRef.current) {
|
if (!isRevertingRef.current) {
|
||||||
auditRef.current?.recordEdgeChanges(edgesForCRDT)
|
auditRef.current?.recordEdgeChanges(edgesForCRDT)
|
||||||
|
|
@ -1165,9 +1141,6 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
|
||||||
// Update last saved data ref to mark as not dirty
|
// Update last saved data ref to mark as not dirty
|
||||||
lastSavedDataRef.current = flowchartData
|
lastSavedDataRef.current = flowchartData
|
||||||
|
|
||||||
// Notify other connected clients to refresh from the database
|
|
||||||
realtimeRef.current?.broadcastStateRefresh()
|
|
||||||
|
|
||||||
setToast({ message: 'Project saved successfully', type: 'success' })
|
setToast({ message: 'Project saved successfully', type: 'success' })
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Failed to save project:', error)
|
console.error('Failed to save project:', error)
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ export class CRDTManager {
|
||||||
private callbacks: CRDTCallbacks
|
private callbacks: CRDTCallbacks
|
||||||
private persistTimer: ReturnType<typeof setTimeout> | null = null
|
private persistTimer: ReturnType<typeof setTimeout> | null = null
|
||||||
private isApplyingRemote = false
|
private isApplyingRemote = false
|
||||||
private isSuppressed = false // suppress broadcast/persist during init or refresh
|
|
||||||
private isDestroyed = false
|
private isDestroyed = false
|
||||||
|
|
||||||
constructor(callbacks: CRDTCallbacks) {
|
constructor(callbacks: CRDTCallbacks) {
|
||||||
|
|
@ -30,25 +29,24 @@ export class CRDTManager {
|
||||||
|
|
||||||
// Schedule persistence on local Yjs document changes
|
// Schedule persistence on local Yjs document changes
|
||||||
this.nodesMap.observe(() => {
|
this.nodesMap.observe(() => {
|
||||||
if (this.isApplyingRemote || this.isSuppressed) return
|
if (this.isApplyingRemote) return
|
||||||
this.schedulePersist()
|
this.schedulePersist()
|
||||||
})
|
})
|
||||||
|
|
||||||
this.edgesMap.observe(() => {
|
this.edgesMap.observe(() => {
|
||||||
if (this.isApplyingRemote || this.isSuppressed) return
|
if (this.isApplyingRemote) return
|
||||||
this.schedulePersist()
|
this.schedulePersist()
|
||||||
})
|
})
|
||||||
|
|
||||||
// Broadcast local updates to other clients
|
// Broadcast local updates to other clients
|
||||||
this.doc.on('update', (update: Uint8Array, origin: unknown) => {
|
this.doc.on('update', (update: Uint8Array, origin: unknown) => {
|
||||||
if (origin === 'remote' || this.isSuppressed) return
|
if (origin === 'remote') return // Don't re-broadcast remote updates
|
||||||
this.broadcastUpdate(update)
|
this.broadcastUpdate(update)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Initialize the Yjs document from database state */
|
/** Initialize the Yjs document from database state */
|
||||||
initializeFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void {
|
initializeFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void {
|
||||||
this.isSuppressed = true
|
|
||||||
this.doc.transact(() => {
|
this.doc.transact(() => {
|
||||||
nodes.forEach((node) => {
|
nodes.forEach((node) => {
|
||||||
this.nodesMap.set(node.id, JSON.stringify(node))
|
this.nodesMap.set(node.id, JSON.stringify(node))
|
||||||
|
|
@ -57,57 +55,27 @@ export class CRDTManager {
|
||||||
this.edgesMap.set(edge.id, JSON.stringify(edge))
|
this.edgesMap.set(edge.id, JSON.stringify(edge))
|
||||||
})
|
})
|
||||||
}, 'init')
|
}, 'init')
|
||||||
this.isSuppressed = false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Connect to a Supabase Realtime channel for outbound broadcasts */
|
/** Connect to a Supabase Realtime channel for syncing updates */
|
||||||
connectChannel(channel: RealtimeChannel): void {
|
connectChannel(channel: RealtimeChannel): void {
|
||||||
this.channel = channel
|
this.channel = channel
|
||||||
}
|
|
||||||
|
|
||||||
/** Apply a remote CRDT update received via broadcast */
|
// Listen for broadcast updates from other clients
|
||||||
applyRemoteUpdate(updateData: number[]): void {
|
channel.on('broadcast', { event: BROADCAST_EVENT }, (payload) => {
|
||||||
if (this.isDestroyed) return
|
if (this.isDestroyed) return
|
||||||
const update = new Uint8Array(updateData)
|
const data = payload.payload as { update?: number[] } | undefined
|
||||||
this.isApplyingRemote = true
|
if (data?.update) {
|
||||||
Y.applyUpdate(this.doc, update, 'remote')
|
const update = new Uint8Array(data.update)
|
||||||
this.isApplyingRemote = false
|
this.isApplyingRemote = true
|
||||||
// Notify React state of remote changes
|
Y.applyUpdate(this.doc, update, 'remote')
|
||||||
this.notifyNodesChange()
|
this.isApplyingRemote = false
|
||||||
this.notifyEdgesChange()
|
// Notify React state of remote changes
|
||||||
// Note: we do NOT schedulePersist here. Only the originating client
|
this.notifyNodesChange()
|
||||||
// persists its own changes to avoid write races and stale data overwrites.
|
this.notifyEdgesChange()
|
||||||
}
|
this.schedulePersist()
|
||||||
|
}
|
||||||
/** Replace CRDT state from a database refresh without broadcasting */
|
})
|
||||||
refreshFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void {
|
|
||||||
this.isSuppressed = true
|
|
||||||
this.doc.transact(() => {
|
|
||||||
// Sync nodes
|
|
||||||
const nodeIds = new Set(nodes.map((n) => n.id))
|
|
||||||
Array.from(this.nodesMap.keys()).forEach((id) => {
|
|
||||||
if (!nodeIds.has(id)) this.nodesMap.delete(id)
|
|
||||||
})
|
|
||||||
nodes.forEach((node) => {
|
|
||||||
const serialized = JSON.stringify(node)
|
|
||||||
if (this.nodesMap.get(node.id) !== serialized) {
|
|
||||||
this.nodesMap.set(node.id, serialized)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
// Sync edges
|
|
||||||
const edgeIds = new Set(edges.map((e) => e.id))
|
|
||||||
Array.from(this.edgesMap.keys()).forEach((id) => {
|
|
||||||
if (!edgeIds.has(id)) this.edgesMap.delete(id)
|
|
||||||
})
|
|
||||||
edges.forEach((edge) => {
|
|
||||||
const serialized = JSON.stringify(edge)
|
|
||||||
if (this.edgesMap.get(edge.id) !== serialized) {
|
|
||||||
this.edgesMap.set(edge.id, serialized)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}, 'remote')
|
|
||||||
this.isSuppressed = false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Apply local node changes to the Yjs document */
|
/** Apply local node changes to the Yjs document */
|
||||||
|
|
|
||||||
|
|
@ -35,8 +35,6 @@ type RealtimeCallbacks = {
|
||||||
onChannelSubscribed?: (channel: RealtimeChannel) => void
|
onChannelSubscribed?: (channel: RealtimeChannel) => void
|
||||||
onCursorUpdate?: (cursor: RemoteCursor) => void
|
onCursorUpdate?: (cursor: RemoteCursor) => void
|
||||||
onNodeLockUpdate?: (lock: NodeLock | null, userId: string) => void
|
onNodeLockUpdate?: (lock: NodeLock | null, userId: string) => void
|
||||||
onCRDTUpdate?: (update: number[]) => void
|
|
||||||
onStateRefresh?: () => void
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const HEARTBEAT_INTERVAL_MS = 30_000
|
const HEARTBEAT_INTERVAL_MS = 30_000
|
||||||
|
|
@ -133,14 +131,6 @@ export class RealtimeConnection {
|
||||||
this.callbacks.onNodeLockUpdate?.(null, payload.userId)
|
this.callbacks.onNodeLockUpdate?.(null, payload.userId)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.on('broadcast', { event: 'yjs-update' }, ({ payload }) => {
|
|
||||||
if (payload?.update) {
|
|
||||||
this.callbacks.onCRDTUpdate?.(payload.update)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.on('broadcast', { event: 'state-refresh' }, () => {
|
|
||||||
this.callbacks.onStateRefresh?.()
|
|
||||||
})
|
|
||||||
.subscribe(async (status) => {
|
.subscribe(async (status) => {
|
||||||
if (this.isDestroyed) return
|
if (this.isDestroyed) return
|
||||||
|
|
||||||
|
|
@ -199,15 +189,6 @@ export class RealtimeConnection {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
broadcastStateRefresh(): void {
|
|
||||||
if (!this.channel) return
|
|
||||||
this.channel.send({
|
|
||||||
type: 'broadcast',
|
|
||||||
event: 'state-refresh',
|
|
||||||
payload: {},
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
broadcastNodeLock(nodeId: string | null): void {
|
broadcastNodeLock(nodeId: string | null): void {
|
||||||
if (!this.channel) return
|
if (!this.channel) return
|
||||||
this.channel.send({
|
this.channel.send({
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue