ralph/collaboration-and-character-variables #7
|
|
@ -14,7 +14,8 @@
|
|||
"next": "16.1.4",
|
||||
"react": "19.2.3",
|
||||
"react-dom": "19.2.3",
|
||||
"reactflow": "^11.11.4"
|
||||
"reactflow": "^11.11.4",
|
||||
"yjs": "^13.6.29"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@tailwindcss/postcss": "^4",
|
||||
|
|
@ -4975,6 +4976,16 @@
|
|||
"dev": true,
|
||||
"license": "ISC"
|
||||
},
|
||||
"node_modules/isomorphic.js": {
|
||||
"version": "0.2.5",
|
||||
"resolved": "https://registry.npmjs.org/isomorphic.js/-/isomorphic.js-0.2.5.tgz",
|
||||
"integrity": "sha512-PIeMbHqMt4DnUP3MA/Flc0HElYjMXArsw1qwJZcm9sqR8mq3l8NYizFMty0pWwE/tzIGH3EKK5+jes5mAr85yw==",
|
||||
"license": "MIT",
|
||||
"funding": {
|
||||
"type": "GitHub Sponsors ❤",
|
||||
"url": "https://github.com/sponsors/dmonad"
|
||||
}
|
||||
},
|
||||
"node_modules/iterator.prototype": {
|
||||
"version": "1.1.5",
|
||||
"resolved": "https://registry.npmjs.org/iterator.prototype/-/iterator.prototype-1.1.5.tgz",
|
||||
|
|
@ -5130,6 +5141,27 @@
|
|||
"node": ">= 0.8.0"
|
||||
}
|
||||
},
|
||||
"node_modules/lib0": {
|
||||
"version": "0.2.117",
|
||||
"resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.117.tgz",
|
||||
"integrity": "sha512-DeXj9X5xDCjgKLU/7RR+/HQEVzuuEUiwldwOGsHK/sfAfELGWEyTcf0x+uOvCvK3O2zPmZePXWL85vtia6GyZw==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"isomorphic.js": "^0.2.4"
|
||||
},
|
||||
"bin": {
|
||||
"0ecdsa-generate-keypair": "bin/0ecdsa-generate-keypair.js",
|
||||
"0gentesthtml": "bin/gentesthtml.js",
|
||||
"0serve": "bin/0serve.js"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=16"
|
||||
},
|
||||
"funding": {
|
||||
"type": "GitHub Sponsors ❤",
|
||||
"url": "https://github.com/sponsors/dmonad"
|
||||
}
|
||||
},
|
||||
"node_modules/lightningcss": {
|
||||
"version": "1.30.2",
|
||||
"resolved": "https://registry.npmjs.org/lightningcss/-/lightningcss-1.30.2.tgz",
|
||||
|
|
@ -7186,6 +7218,23 @@
|
|||
"dev": true,
|
||||
"license": "ISC"
|
||||
},
|
||||
"node_modules/yjs": {
|
||||
"version": "13.6.29",
|
||||
"resolved": "https://registry.npmjs.org/yjs/-/yjs-13.6.29.tgz",
|
||||
"integrity": "sha512-kHqDPdltoXH+X4w1lVmMtddE3Oeqq48nM40FD5ojTd8xYhQpzIDcfE2keMSU5bAgRPJBe225WTUdyUgj1DtbiQ==",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"lib0": "^0.2.99"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=16.0.0",
|
||||
"npm": ">=8.0.0"
|
||||
},
|
||||
"funding": {
|
||||
"type": "GitHub Sponsors ❤",
|
||||
"url": "https://github.com/sponsors/dmonad"
|
||||
}
|
||||
},
|
||||
"node_modules/yocto-queue": {
|
||||
"version": "0.1.0",
|
||||
"resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz",
|
||||
|
|
|
|||
|
|
@ -16,7 +16,8 @@
|
|||
"next": "16.1.4",
|
||||
"react": "19.2.3",
|
||||
"react-dom": "19.2.3",
|
||||
"reactflow": "^11.11.4"
|
||||
"reactflow": "^11.11.4",
|
||||
"yjs": "^13.6.29"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@tailwindcss/postcss": "^4",
|
||||
|
|
|
|||
|
|
@ -28,6 +28,8 @@ import ExportValidationModal, { type ValidationIssue } from '@/components/editor
|
|||
import { EditorProvider } from '@/components/editor/EditorContext'
|
||||
import Toast from '@/components/Toast'
|
||||
import { RealtimeConnection, type ConnectionState, type PresenceUser } from '@/lib/collaboration/realtime'
|
||||
import { CRDTManager } from '@/lib/collaboration/crdt'
|
||||
import { createClient } from '@/lib/supabase/client'
|
||||
import ShareModal from '@/components/editor/ShareModal'
|
||||
import type { FlowchartData, FlowchartNode, FlowchartEdge, Character, Variable, Condition } from '@/types/flowchart'
|
||||
|
||||
|
|
@ -241,12 +243,53 @@ function FlowchartEditorInner({ projectId, userId, userDisplayName, isOwner, ini
|
|||
const [connectionState, setConnectionState] = useState<ConnectionState>('disconnected')
|
||||
const [presenceUsers, setPresenceUsers] = useState<PresenceUser[]>([])
|
||||
const realtimeRef = useRef<RealtimeConnection | null>(null)
|
||||
const crdtRef = useRef<CRDTManager | null>(null)
|
||||
const isRemoteUpdateRef = useRef(false)
|
||||
|
||||
// Connect to Supabase Realtime channel on mount, disconnect on unmount
|
||||
// Initialize CRDT manager and connect to Supabase Realtime channel on mount
|
||||
useEffect(() => {
|
||||
const supabase = createClient()
|
||||
|
||||
const crdtManager = new CRDTManager({
|
||||
onNodesChange: (crdtNodes: FlowchartNode[]) => {
|
||||
isRemoteUpdateRef.current = true
|
||||
setNodes(toReactFlowNodes(crdtNodes))
|
||||
isRemoteUpdateRef.current = false
|
||||
},
|
||||
onEdgesChange: (crdtEdges: FlowchartEdge[]) => {
|
||||
isRemoteUpdateRef.current = true
|
||||
setEdges(toReactFlowEdges(crdtEdges))
|
||||
isRemoteUpdateRef.current = false
|
||||
},
|
||||
onPersist: async (persistNodes: FlowchartNode[], persistEdges: FlowchartEdge[]) => {
|
||||
try {
|
||||
await supabase
|
||||
.from('projects')
|
||||
.update({
|
||||
flowchart_data: {
|
||||
nodes: persistNodes,
|
||||
edges: persistEdges,
|
||||
characters,
|
||||
variables,
|
||||
},
|
||||
})
|
||||
.eq('id', projectId)
|
||||
} catch {
|
||||
// Persistence failure is non-critical; will retry on next change
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
// Initialize CRDT document from initial data
|
||||
crdtManager.initializeFromData(migratedData.nodes, migratedData.edges)
|
||||
crdtRef.current = crdtManager
|
||||
|
||||
const connection = new RealtimeConnection(projectId, userId, userDisplayName, {
|
||||
onConnectionStateChange: setConnectionState,
|
||||
onPresenceSync: setPresenceUsers,
|
||||
onChannelSubscribed: (channel) => {
|
||||
crdtManager.connectChannel(channel)
|
||||
},
|
||||
})
|
||||
realtimeRef.current = connection
|
||||
connection.connect()
|
||||
|
|
@ -254,9 +297,43 @@ function FlowchartEditorInner({ projectId, userId, userDisplayName, isOwner, ini
|
|||
return () => {
|
||||
connection.disconnect()
|
||||
realtimeRef.current = null
|
||||
crdtManager.destroy()
|
||||
crdtRef.current = null
|
||||
}
|
||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||
}, [projectId, userId, userDisplayName])
|
||||
|
||||
// Sync local React Flow state changes to CRDT (skip remote-originated updates)
|
||||
const nodesForCRDT = useMemo(() => {
|
||||
return nodes.map((node) => ({
|
||||
id: node.id,
|
||||
type: node.type as 'dialogue' | 'choice' | 'variable',
|
||||
position: node.position,
|
||||
data: node.data,
|
||||
})) as FlowchartNode[]
|
||||
}, [nodes])
|
||||
|
||||
const edgesForCRDT = useMemo(() => {
|
||||
return edges.map((edge) => ({
|
||||
id: edge.id,
|
||||
source: edge.source,
|
||||
sourceHandle: edge.sourceHandle,
|
||||
target: edge.target,
|
||||
targetHandle: edge.targetHandle,
|
||||
data: edge.data,
|
||||
})) as FlowchartEdge[]
|
||||
}, [edges])
|
||||
|
||||
useEffect(() => {
|
||||
if (isRemoteUpdateRef.current) return
|
||||
crdtRef.current?.updateNodes(nodesForCRDT)
|
||||
}, [nodesForCRDT])
|
||||
|
||||
useEffect(() => {
|
||||
if (isRemoteUpdateRef.current) return
|
||||
crdtRef.current?.updateEdges(edgesForCRDT)
|
||||
}, [edgesForCRDT])
|
||||
|
||||
const handleAddCharacter = useCallback(
|
||||
(name: string, color: string): string => {
|
||||
const id = nanoid()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,198 @@
|
|||
import * as Y from 'yjs'
|
||||
import type { RealtimeChannel } from '@supabase/supabase-js'
|
||||
import type { FlowchartNode, FlowchartEdge } from '@/types/flowchart'
|
||||
|
||||
const PERSIST_DEBOUNCE_MS = 2000
|
||||
const BROADCAST_EVENT = 'yjs-update'
|
||||
|
||||
export type CRDTCallbacks = {
|
||||
onNodesChange: (nodes: FlowchartNode[]) => void
|
||||
onEdgesChange: (edges: FlowchartEdge[]) => void
|
||||
onPersist: (nodes: FlowchartNode[], edges: FlowchartEdge[]) => void
|
||||
}
|
||||
|
||||
export class CRDTManager {
|
||||
private doc: Y.Doc
|
||||
private nodesMap: Y.Map<string> // node ID -> JSON string of FlowchartNode
|
||||
private edgesMap: Y.Map<string> // edge ID -> JSON string of FlowchartEdge
|
||||
private channel: RealtimeChannel | null = null
|
||||
private callbacks: CRDTCallbacks
|
||||
private persistTimer: ReturnType<typeof setTimeout> | null = null
|
||||
private isApplyingRemote = false
|
||||
private isDestroyed = false
|
||||
|
||||
constructor(callbacks: CRDTCallbacks) {
|
||||
this.doc = new Y.Doc()
|
||||
this.nodesMap = this.doc.getMap('nodes')
|
||||
this.edgesMap = this.doc.getMap('edges')
|
||||
this.callbacks = callbacks
|
||||
|
||||
// Listen for remote Yjs document changes
|
||||
this.nodesMap.observe(() => {
|
||||
if (this.isApplyingRemote) return
|
||||
this.notifyNodesChange()
|
||||
this.schedulePersist()
|
||||
})
|
||||
|
||||
this.edgesMap.observe(() => {
|
||||
if (this.isApplyingRemote) return
|
||||
this.notifyEdgesChange()
|
||||
this.schedulePersist()
|
||||
})
|
||||
|
||||
// Broadcast local updates to other clients
|
||||
this.doc.on('update', (update: Uint8Array, origin: unknown) => {
|
||||
if (origin === 'remote') return // Don't re-broadcast remote updates
|
||||
this.broadcastUpdate(update)
|
||||
})
|
||||
}
|
||||
|
||||
/** Initialize the Yjs document from database state */
|
||||
initializeFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void {
|
||||
this.doc.transact(() => {
|
||||
nodes.forEach((node) => {
|
||||
this.nodesMap.set(node.id, JSON.stringify(node))
|
||||
})
|
||||
edges.forEach((edge) => {
|
||||
this.edgesMap.set(edge.id, JSON.stringify(edge))
|
||||
})
|
||||
}, 'init')
|
||||
}
|
||||
|
||||
/** Connect to a Supabase Realtime channel for syncing updates */
|
||||
connectChannel(channel: RealtimeChannel): void {
|
||||
this.channel = channel
|
||||
|
||||
// Listen for broadcast updates from other clients
|
||||
channel.on('broadcast', { event: BROADCAST_EVENT }, (payload) => {
|
||||
if (this.isDestroyed) return
|
||||
const data = payload.payload as { update?: number[] } | undefined
|
||||
if (data?.update) {
|
||||
const update = new Uint8Array(data.update)
|
||||
this.isApplyingRemote = true
|
||||
Y.applyUpdate(this.doc, update, 'remote')
|
||||
this.isApplyingRemote = false
|
||||
// Notify React state of remote changes
|
||||
this.notifyNodesChange()
|
||||
this.notifyEdgesChange()
|
||||
this.schedulePersist()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/** Apply local node changes to the Yjs document */
|
||||
updateNodes(nodes: FlowchartNode[]): void {
|
||||
if (this.isApplyingRemote) return
|
||||
|
||||
this.doc.transact(() => {
|
||||
const currentIds = new Set(nodes.map((n) => n.id))
|
||||
|
||||
// Remove nodes no longer present
|
||||
const existingIds = Array.from(this.nodesMap.keys())
|
||||
existingIds.forEach((id) => {
|
||||
if (!currentIds.has(id)) {
|
||||
this.nodesMap.delete(id)
|
||||
}
|
||||
})
|
||||
|
||||
// Add or update nodes
|
||||
nodes.forEach((node) => {
|
||||
const serialized = JSON.stringify(node)
|
||||
const existing = this.nodesMap.get(node.id)
|
||||
if (existing !== serialized) {
|
||||
this.nodesMap.set(node.id, serialized)
|
||||
}
|
||||
})
|
||||
}, 'local')
|
||||
}
|
||||
|
||||
/** Apply local edge changes to the Yjs document */
|
||||
updateEdges(edges: FlowchartEdge[]): void {
|
||||
if (this.isApplyingRemote) return
|
||||
|
||||
this.doc.transact(() => {
|
||||
const currentIds = new Set(edges.map((e) => e.id))
|
||||
|
||||
// Remove edges no longer present
|
||||
const existingIds = Array.from(this.edgesMap.keys())
|
||||
existingIds.forEach((id) => {
|
||||
if (!currentIds.has(id)) {
|
||||
this.edgesMap.delete(id)
|
||||
}
|
||||
})
|
||||
|
||||
// Add or update edges
|
||||
edges.forEach((edge) => {
|
||||
const serialized = JSON.stringify(edge)
|
||||
const existing = this.edgesMap.get(edge.id)
|
||||
if (existing !== serialized) {
|
||||
this.edgesMap.set(edge.id, serialized)
|
||||
}
|
||||
})
|
||||
}, 'local')
|
||||
}
|
||||
|
||||
/** Get current nodes from the Yjs document */
|
||||
getNodes(): FlowchartNode[] {
|
||||
const nodes: FlowchartNode[] = []
|
||||
this.nodesMap.forEach((value) => {
|
||||
try {
|
||||
nodes.push(JSON.parse(value) as FlowchartNode)
|
||||
} catch {
|
||||
// Skip malformed entries
|
||||
}
|
||||
})
|
||||
return nodes
|
||||
}
|
||||
|
||||
/** Get current edges from the Yjs document */
|
||||
getEdges(): FlowchartEdge[] {
|
||||
const edges: FlowchartEdge[] = []
|
||||
this.edgesMap.forEach((value) => {
|
||||
try {
|
||||
edges.push(JSON.parse(value) as FlowchartEdge)
|
||||
} catch {
|
||||
// Skip malformed entries
|
||||
}
|
||||
})
|
||||
return edges
|
||||
}
|
||||
|
||||
/** Clean up resources */
|
||||
destroy(): void {
|
||||
this.isDestroyed = true
|
||||
if (this.persistTimer) {
|
||||
clearTimeout(this.persistTimer)
|
||||
this.persistTimer = null
|
||||
}
|
||||
this.doc.destroy()
|
||||
this.channel = null
|
||||
}
|
||||
|
||||
private notifyNodesChange(): void {
|
||||
this.callbacks.onNodesChange(this.getNodes())
|
||||
}
|
||||
|
||||
private notifyEdgesChange(): void {
|
||||
this.callbacks.onEdgesChange(this.getEdges())
|
||||
}
|
||||
|
||||
private broadcastUpdate(update: Uint8Array): void {
|
||||
if (!this.channel || this.isDestroyed) return
|
||||
this.channel.send({
|
||||
type: 'broadcast',
|
||||
event: BROADCAST_EVENT,
|
||||
payload: { update: Array.from(update) },
|
||||
})
|
||||
}
|
||||
|
||||
private schedulePersist(): void {
|
||||
if (this.persistTimer) {
|
||||
clearTimeout(this.persistTimer)
|
||||
}
|
||||
this.persistTimer = setTimeout(() => {
|
||||
if (this.isDestroyed) return
|
||||
this.callbacks.onPersist(this.getNodes(), this.getEdges())
|
||||
}, PERSIST_DEBOUNCE_MS)
|
||||
}
|
||||
}
|
||||
|
|
@ -11,6 +11,7 @@ export type PresenceUser = {
|
|||
type RealtimeCallbacks = {
|
||||
onConnectionStateChange: (state: ConnectionState) => void
|
||||
onPresenceSync?: (users: PresenceUser[]) => void
|
||||
onChannelSubscribed?: (channel: RealtimeChannel) => void
|
||||
}
|
||||
|
||||
const HEARTBEAT_INTERVAL_MS = 30_000
|
||||
|
|
@ -75,6 +76,10 @@ export class RealtimeConnection {
|
|||
userId: this.userId,
|
||||
displayName: this.displayName,
|
||||
})
|
||||
// Notify that the channel is ready for CRDT sync
|
||||
if (this.channel) {
|
||||
this.callbacks.onChannelSubscribed?.(this.channel)
|
||||
}
|
||||
} else if (status === 'CHANNEL_ERROR' || status === 'TIMED_OUT') {
|
||||
this.callbacks.onConnectionStateChange('reconnecting')
|
||||
this.scheduleReconnect()
|
||||
|
|
|
|||
Loading…
Reference in New Issue