diff --git a/package-lock.json b/package-lock.json index 1e138bc..356f8a1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -14,7 +14,8 @@ "next": "16.1.4", "react": "19.2.3", "react-dom": "19.2.3", - "reactflow": "^11.11.4" + "reactflow": "^11.11.4", + "yjs": "^13.6.29" }, "devDependencies": { "@tailwindcss/postcss": "^4", @@ -4975,6 +4976,16 @@ "dev": true, "license": "ISC" }, + "node_modules/isomorphic.js": { + "version": "0.2.5", + "resolved": "https://registry.npmjs.org/isomorphic.js/-/isomorphic.js-0.2.5.tgz", + "integrity": "sha512-PIeMbHqMt4DnUP3MA/Flc0HElYjMXArsw1qwJZcm9sqR8mq3l8NYizFMty0pWwE/tzIGH3EKK5+jes5mAr85yw==", + "license": "MIT", + "funding": { + "type": "GitHub Sponsors ❤", + "url": "https://github.com/sponsors/dmonad" + } + }, "node_modules/iterator.prototype": { "version": "1.1.5", "resolved": "https://registry.npmjs.org/iterator.prototype/-/iterator.prototype-1.1.5.tgz", @@ -5130,6 +5141,27 @@ "node": ">= 0.8.0" } }, + "node_modules/lib0": { + "version": "0.2.117", + "resolved": "https://registry.npmjs.org/lib0/-/lib0-0.2.117.tgz", + "integrity": "sha512-DeXj9X5xDCjgKLU/7RR+/HQEVzuuEUiwldwOGsHK/sfAfELGWEyTcf0x+uOvCvK3O2zPmZePXWL85vtia6GyZw==", + "license": "MIT", + "dependencies": { + "isomorphic.js": "^0.2.4" + }, + "bin": { + "0ecdsa-generate-keypair": "bin/0ecdsa-generate-keypair.js", + "0gentesthtml": "bin/gentesthtml.js", + "0serve": "bin/0serve.js" + }, + "engines": { + "node": ">=16" + }, + "funding": { + "type": "GitHub Sponsors ❤", + "url": "https://github.com/sponsors/dmonad" + } + }, "node_modules/lightningcss": { "version": "1.30.2", "resolved": "https://registry.npmjs.org/lightningcss/-/lightningcss-1.30.2.tgz", @@ -7186,6 +7218,23 @@ "dev": true, "license": "ISC" }, + "node_modules/yjs": { + "version": "13.6.29", + "resolved": "https://registry.npmjs.org/yjs/-/yjs-13.6.29.tgz", + "integrity": "sha512-kHqDPdltoXH+X4w1lVmMtddE3Oeqq48nM40FD5ojTd8xYhQpzIDcfE2keMSU5bAgRPJBe225WTUdyUgj1DtbiQ==", + "license": "MIT", + "dependencies": { + "lib0": "^0.2.99" + }, + "engines": { + "node": ">=16.0.0", + "npm": ">=8.0.0" + }, + "funding": { + "type": "GitHub Sponsors ❤", + "url": "https://github.com/sponsors/dmonad" + } + }, "node_modules/yocto-queue": { "version": "0.1.0", "resolved": "https://registry.npmjs.org/yocto-queue/-/yocto-queue-0.1.0.tgz", diff --git a/package.json b/package.json index 839d947..ca26b26 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,8 @@ "next": "16.1.4", "react": "19.2.3", "react-dom": "19.2.3", - "reactflow": "^11.11.4" + "reactflow": "^11.11.4", + "yjs": "^13.6.29" }, "devDependencies": { "@tailwindcss/postcss": "^4", diff --git a/src/app/editor/[projectId]/FlowchartEditor.tsx b/src/app/editor/[projectId]/FlowchartEditor.tsx index f755efe..d9b0b9e 100644 --- a/src/app/editor/[projectId]/FlowchartEditor.tsx +++ b/src/app/editor/[projectId]/FlowchartEditor.tsx @@ -28,6 +28,8 @@ import ExportValidationModal, { type ValidationIssue } from '@/components/editor import { EditorProvider } from '@/components/editor/EditorContext' import Toast from '@/components/Toast' import { RealtimeConnection, type ConnectionState, type PresenceUser } from '@/lib/collaboration/realtime' +import { CRDTManager } from '@/lib/collaboration/crdt' +import { createClient } from '@/lib/supabase/client' import ShareModal from '@/components/editor/ShareModal' import type { FlowchartData, FlowchartNode, FlowchartEdge, Character, Variable, Condition } from '@/types/flowchart' @@ -241,12 +243,53 @@ function FlowchartEditorInner({ projectId, userId, userDisplayName, isOwner, ini const [connectionState, setConnectionState] = useState('disconnected') const [presenceUsers, setPresenceUsers] = useState([]) const realtimeRef = useRef(null) + const crdtRef = useRef(null) + const isRemoteUpdateRef = useRef(false) - // Connect to Supabase Realtime channel on mount, disconnect on unmount + // Initialize CRDT manager and connect to Supabase Realtime channel on mount useEffect(() => { + const supabase = createClient() + + const crdtManager = new CRDTManager({ + onNodesChange: (crdtNodes: FlowchartNode[]) => { + isRemoteUpdateRef.current = true + setNodes(toReactFlowNodes(crdtNodes)) + isRemoteUpdateRef.current = false + }, + onEdgesChange: (crdtEdges: FlowchartEdge[]) => { + isRemoteUpdateRef.current = true + setEdges(toReactFlowEdges(crdtEdges)) + isRemoteUpdateRef.current = false + }, + onPersist: async (persistNodes: FlowchartNode[], persistEdges: FlowchartEdge[]) => { + try { + await supabase + .from('projects') + .update({ + flowchart_data: { + nodes: persistNodes, + edges: persistEdges, + characters, + variables, + }, + }) + .eq('id', projectId) + } catch { + // Persistence failure is non-critical; will retry on next change + } + }, + }) + + // Initialize CRDT document from initial data + crdtManager.initializeFromData(migratedData.nodes, migratedData.edges) + crdtRef.current = crdtManager + const connection = new RealtimeConnection(projectId, userId, userDisplayName, { onConnectionStateChange: setConnectionState, onPresenceSync: setPresenceUsers, + onChannelSubscribed: (channel) => { + crdtManager.connectChannel(channel) + }, }) realtimeRef.current = connection connection.connect() @@ -254,9 +297,43 @@ function FlowchartEditorInner({ projectId, userId, userDisplayName, isOwner, ini return () => { connection.disconnect() realtimeRef.current = null + crdtManager.destroy() + crdtRef.current = null } + // eslint-disable-next-line react-hooks/exhaustive-deps }, [projectId, userId, userDisplayName]) + // Sync local React Flow state changes to CRDT (skip remote-originated updates) + const nodesForCRDT = useMemo(() => { + return nodes.map((node) => ({ + id: node.id, + type: node.type as 'dialogue' | 'choice' | 'variable', + position: node.position, + data: node.data, + })) as FlowchartNode[] + }, [nodes]) + + const edgesForCRDT = useMemo(() => { + return edges.map((edge) => ({ + id: edge.id, + source: edge.source, + sourceHandle: edge.sourceHandle, + target: edge.target, + targetHandle: edge.targetHandle, + data: edge.data, + })) as FlowchartEdge[] + }, [edges]) + + useEffect(() => { + if (isRemoteUpdateRef.current) return + crdtRef.current?.updateNodes(nodesForCRDT) + }, [nodesForCRDT]) + + useEffect(() => { + if (isRemoteUpdateRef.current) return + crdtRef.current?.updateEdges(edgesForCRDT) + }, [edgesForCRDT]) + const handleAddCharacter = useCallback( (name: string, color: string): string => { const id = nanoid() diff --git a/src/lib/collaboration/crdt.ts b/src/lib/collaboration/crdt.ts new file mode 100644 index 0000000..71974a7 --- /dev/null +++ b/src/lib/collaboration/crdt.ts @@ -0,0 +1,198 @@ +import * as Y from 'yjs' +import type { RealtimeChannel } from '@supabase/supabase-js' +import type { FlowchartNode, FlowchartEdge } from '@/types/flowchart' + +const PERSIST_DEBOUNCE_MS = 2000 +const BROADCAST_EVENT = 'yjs-update' + +export type CRDTCallbacks = { + onNodesChange: (nodes: FlowchartNode[]) => void + onEdgesChange: (edges: FlowchartEdge[]) => void + onPersist: (nodes: FlowchartNode[], edges: FlowchartEdge[]) => void +} + +export class CRDTManager { + private doc: Y.Doc + private nodesMap: Y.Map // node ID -> JSON string of FlowchartNode + private edgesMap: Y.Map // edge ID -> JSON string of FlowchartEdge + private channel: RealtimeChannel | null = null + private callbacks: CRDTCallbacks + private persistTimer: ReturnType | null = null + private isApplyingRemote = false + private isDestroyed = false + + constructor(callbacks: CRDTCallbacks) { + this.doc = new Y.Doc() + this.nodesMap = this.doc.getMap('nodes') + this.edgesMap = this.doc.getMap('edges') + this.callbacks = callbacks + + // Listen for remote Yjs document changes + this.nodesMap.observe(() => { + if (this.isApplyingRemote) return + this.notifyNodesChange() + this.schedulePersist() + }) + + this.edgesMap.observe(() => { + if (this.isApplyingRemote) return + this.notifyEdgesChange() + this.schedulePersist() + }) + + // Broadcast local updates to other clients + this.doc.on('update', (update: Uint8Array, origin: unknown) => { + if (origin === 'remote') return // Don't re-broadcast remote updates + this.broadcastUpdate(update) + }) + } + + /** Initialize the Yjs document from database state */ + initializeFromData(nodes: FlowchartNode[], edges: FlowchartEdge[]): void { + this.doc.transact(() => { + nodes.forEach((node) => { + this.nodesMap.set(node.id, JSON.stringify(node)) + }) + edges.forEach((edge) => { + this.edgesMap.set(edge.id, JSON.stringify(edge)) + }) + }, 'init') + } + + /** Connect to a Supabase Realtime channel for syncing updates */ + connectChannel(channel: RealtimeChannel): void { + this.channel = channel + + // Listen for broadcast updates from other clients + channel.on('broadcast', { event: BROADCAST_EVENT }, (payload) => { + if (this.isDestroyed) return + const data = payload.payload as { update?: number[] } | undefined + if (data?.update) { + const update = new Uint8Array(data.update) + this.isApplyingRemote = true + Y.applyUpdate(this.doc, update, 'remote') + this.isApplyingRemote = false + // Notify React state of remote changes + this.notifyNodesChange() + this.notifyEdgesChange() + this.schedulePersist() + } + }) + } + + /** Apply local node changes to the Yjs document */ + updateNodes(nodes: FlowchartNode[]): void { + if (this.isApplyingRemote) return + + this.doc.transact(() => { + const currentIds = new Set(nodes.map((n) => n.id)) + + // Remove nodes no longer present + const existingIds = Array.from(this.nodesMap.keys()) + existingIds.forEach((id) => { + if (!currentIds.has(id)) { + this.nodesMap.delete(id) + } + }) + + // Add or update nodes + nodes.forEach((node) => { + const serialized = JSON.stringify(node) + const existing = this.nodesMap.get(node.id) + if (existing !== serialized) { + this.nodesMap.set(node.id, serialized) + } + }) + }, 'local') + } + + /** Apply local edge changes to the Yjs document */ + updateEdges(edges: FlowchartEdge[]): void { + if (this.isApplyingRemote) return + + this.doc.transact(() => { + const currentIds = new Set(edges.map((e) => e.id)) + + // Remove edges no longer present + const existingIds = Array.from(this.edgesMap.keys()) + existingIds.forEach((id) => { + if (!currentIds.has(id)) { + this.edgesMap.delete(id) + } + }) + + // Add or update edges + edges.forEach((edge) => { + const serialized = JSON.stringify(edge) + const existing = this.edgesMap.get(edge.id) + if (existing !== serialized) { + this.edgesMap.set(edge.id, serialized) + } + }) + }, 'local') + } + + /** Get current nodes from the Yjs document */ + getNodes(): FlowchartNode[] { + const nodes: FlowchartNode[] = [] + this.nodesMap.forEach((value) => { + try { + nodes.push(JSON.parse(value) as FlowchartNode) + } catch { + // Skip malformed entries + } + }) + return nodes + } + + /** Get current edges from the Yjs document */ + getEdges(): FlowchartEdge[] { + const edges: FlowchartEdge[] = [] + this.edgesMap.forEach((value) => { + try { + edges.push(JSON.parse(value) as FlowchartEdge) + } catch { + // Skip malformed entries + } + }) + return edges + } + + /** Clean up resources */ + destroy(): void { + this.isDestroyed = true + if (this.persistTimer) { + clearTimeout(this.persistTimer) + this.persistTimer = null + } + this.doc.destroy() + this.channel = null + } + + private notifyNodesChange(): void { + this.callbacks.onNodesChange(this.getNodes()) + } + + private notifyEdgesChange(): void { + this.callbacks.onEdgesChange(this.getEdges()) + } + + private broadcastUpdate(update: Uint8Array): void { + if (!this.channel || this.isDestroyed) return + this.channel.send({ + type: 'broadcast', + event: BROADCAST_EVENT, + payload: { update: Array.from(update) }, + }) + } + + private schedulePersist(): void { + if (this.persistTimer) { + clearTimeout(this.persistTimer) + } + this.persistTimer = setTimeout(() => { + if (this.isDestroyed) return + this.callbacks.onPersist(this.getNodes(), this.getEdges()) + }, PERSIST_DEBOUNCE_MS) + } +} diff --git a/src/lib/collaboration/realtime.ts b/src/lib/collaboration/realtime.ts index 1d038fc..0c8fe84 100644 --- a/src/lib/collaboration/realtime.ts +++ b/src/lib/collaboration/realtime.ts @@ -11,6 +11,7 @@ export type PresenceUser = { type RealtimeCallbacks = { onConnectionStateChange: (state: ConnectionState) => void onPresenceSync?: (users: PresenceUser[]) => void + onChannelSubscribed?: (channel: RealtimeChannel) => void } const HEARTBEAT_INTERVAL_MS = 30_000 @@ -75,6 +76,10 @@ export class RealtimeConnection { userId: this.userId, displayName: this.displayName, }) + // Notify that the channel is ready for CRDT sync + if (this.channel) { + this.callbacks.onChannelSubscribed?.(this.channel) + } } else if (status === 'CHANNEL_ERROR' || status === 'TIMED_OUT') { this.callbacks.onConnectionStateChange('reconnecting') this.scheduleReconnect()