developing #10
|
|
@ -682,6 +682,38 @@ function FlowchartEditorInner({ projectId, projectName, userId, userDisplayName,
|
||||||
// eslint-disable-next-line react-hooks/exhaustive-deps
|
// eslint-disable-next-line react-hooks/exhaustive-deps
|
||||||
}, [projectId, userId, userDisplayName])
|
}, [projectId, userId, userDisplayName])
|
||||||
|
|
||||||
|
// Manage connection lifecycle based on visibility and user activity
|
||||||
|
useEffect(() => {
|
||||||
|
const handleVisibilityChange = () => {
|
||||||
|
if (!document.hidden) {
|
||||||
|
realtimeRef.current?.notifyActivity()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Throttle activity notifications to avoid excessive calls
|
||||||
|
let activityThrottled = false
|
||||||
|
const throttledActivity = () => {
|
||||||
|
if (activityThrottled) return
|
||||||
|
activityThrottled = true
|
||||||
|
realtimeRef.current?.notifyActivity()
|
||||||
|
setTimeout(() => { activityThrottled = false }, 10_000)
|
||||||
|
}
|
||||||
|
|
||||||
|
document.addEventListener('visibilitychange', handleVisibilityChange)
|
||||||
|
document.addEventListener('mousedown', throttledActivity)
|
||||||
|
document.addEventListener('keydown', throttledActivity)
|
||||||
|
document.addEventListener('scroll', throttledActivity, true)
|
||||||
|
document.addEventListener('mousemove', throttledActivity)
|
||||||
|
|
||||||
|
return () => {
|
||||||
|
document.removeEventListener('visibilitychange', handleVisibilityChange)
|
||||||
|
document.removeEventListener('mousedown', throttledActivity)
|
||||||
|
document.removeEventListener('keydown', throttledActivity)
|
||||||
|
document.removeEventListener('scroll', throttledActivity, true)
|
||||||
|
document.removeEventListener('mousemove', throttledActivity)
|
||||||
|
}
|
||||||
|
}, [])
|
||||||
|
|
||||||
// Sync local React Flow state changes to CRDT (skip remote-originated updates)
|
// Sync local React Flow state changes to CRDT (skip remote-originated updates)
|
||||||
const nodesForCRDT = useMemo(() => {
|
const nodesForCRDT = useMemo(() => {
|
||||||
return nodes.map((node) => ({
|
return nodes.map((node) => ({
|
||||||
|
|
|
||||||
|
|
@ -42,17 +42,22 @@ type RealtimeCallbacks = {
|
||||||
const HEARTBEAT_INTERVAL_MS = 30_000
|
const HEARTBEAT_INTERVAL_MS = 30_000
|
||||||
const RECONNECT_BASE_DELAY_MS = 1000
|
const RECONNECT_BASE_DELAY_MS = 1000
|
||||||
const RECONNECT_MAX_DELAY_MS = 30_000
|
const RECONNECT_MAX_DELAY_MS = 30_000
|
||||||
|
const INACTIVITY_TIMEOUT_MS = 5 * 60_000 // 5 minutes of inactivity before pausing
|
||||||
|
const CONNECTION_TIMEOUT_MS = 15_000 // 15s timeout for initial connection
|
||||||
|
|
||||||
export class RealtimeConnection {
|
export class RealtimeConnection {
|
||||||
private channel: RealtimeChannel | null = null
|
private channel: RealtimeChannel | null = null
|
||||||
private heartbeatTimer: ReturnType<typeof setInterval> | null = null
|
private heartbeatTimer: ReturnType<typeof setInterval> | null = null
|
||||||
private reconnectTimer: ReturnType<typeof setTimeout> | null = null
|
private reconnectTimer: ReturnType<typeof setTimeout> | null = null
|
||||||
|
private inactivityTimer: ReturnType<typeof setTimeout> | null = null
|
||||||
|
private connectionTimer: ReturnType<typeof setTimeout> | null = null
|
||||||
private reconnectAttempts = 0
|
private reconnectAttempts = 0
|
||||||
private projectId: string
|
private projectId: string
|
||||||
private userId: string
|
private userId: string
|
||||||
private displayName: string
|
private displayName: string
|
||||||
private callbacks: RealtimeCallbacks
|
private callbacks: RealtimeCallbacks
|
||||||
private isDestroyed = false
|
private isDestroyed = false
|
||||||
|
private isPaused = false
|
||||||
private supabase = createClient()
|
private supabase = createClient()
|
||||||
|
|
||||||
constructor(projectId: string, userId: string, displayName: string, callbacks: RealtimeCallbacks) {
|
constructor(projectId: string, userId: string, displayName: string, callbacks: RealtimeCallbacks) {
|
||||||
|
|
@ -64,7 +69,20 @@ export class RealtimeConnection {
|
||||||
|
|
||||||
async connect(): Promise<void> {
|
async connect(): Promise<void> {
|
||||||
if (this.isDestroyed) return
|
if (this.isDestroyed) return
|
||||||
|
this.isPaused = false
|
||||||
this.callbacks.onConnectionStateChange('connecting')
|
this.callbacks.onConnectionStateChange('connecting')
|
||||||
|
this.resetInactivityTimer()
|
||||||
|
this.clearConnectionTimer()
|
||||||
|
|
||||||
|
// Set a timeout: if we don't connect within CONNECTION_TIMEOUT_MS, retry
|
||||||
|
this.connectionTimer = setTimeout(() => {
|
||||||
|
if (this.isDestroyed || this.isPaused) return
|
||||||
|
if (this.channel) {
|
||||||
|
this.supabase.removeChannel(this.channel)
|
||||||
|
this.channel = null
|
||||||
|
}
|
||||||
|
this.scheduleReconnect()
|
||||||
|
}, CONNECTION_TIMEOUT_MS)
|
||||||
|
|
||||||
this.channel = this.supabase.channel(`project:${this.projectId}`, {
|
this.channel = this.supabase.channel(`project:${this.projectId}`, {
|
||||||
config: { presence: { key: this.userId } },
|
config: { presence: { key: this.userId } },
|
||||||
|
|
@ -143,6 +161,7 @@ export class RealtimeConnection {
|
||||||
})
|
})
|
||||||
.subscribe(async (status) => {
|
.subscribe(async (status) => {
|
||||||
if (this.isDestroyed) return
|
if (this.isDestroyed) return
|
||||||
|
this.clearConnectionTimer()
|
||||||
|
|
||||||
if (status === 'SUBSCRIBED') {
|
if (status === 'SUBSCRIBED') {
|
||||||
this.reconnectAttempts = 0
|
this.reconnectAttempts = 0
|
||||||
|
|
@ -162,15 +181,24 @@ export class RealtimeConnection {
|
||||||
this.callbacks.onConnectionStateChange('reconnecting')
|
this.callbacks.onConnectionStateChange('reconnecting')
|
||||||
this.scheduleReconnect()
|
this.scheduleReconnect()
|
||||||
} else if (status === 'CLOSED') {
|
} else if (status === 'CLOSED') {
|
||||||
this.callbacks.onConnectionStateChange('disconnected')
|
if (!this.isPaused) {
|
||||||
|
// Unexpected close - attempt to reconnect
|
||||||
|
this.callbacks.onConnectionStateChange('reconnecting')
|
||||||
|
this.scheduleReconnect()
|
||||||
|
} else {
|
||||||
|
this.callbacks.onConnectionStateChange('disconnected')
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async disconnect(): Promise<void> {
|
async disconnect(): Promise<void> {
|
||||||
this.isDestroyed = true
|
this.isDestroyed = true
|
||||||
|
this.isPaused = false
|
||||||
this.stopHeartbeat()
|
this.stopHeartbeat()
|
||||||
this.clearReconnectTimer()
|
this.clearReconnectTimer()
|
||||||
|
this.clearInactivityTimer()
|
||||||
|
this.clearConnectionTimer()
|
||||||
|
|
||||||
if (this.channel) {
|
if (this.channel) {
|
||||||
await this.deleteSession()
|
await this.deleteSession()
|
||||||
|
|
@ -181,6 +209,54 @@ export class RealtimeConnection {
|
||||||
this.callbacks.onConnectionStateChange('disconnected')
|
this.callbacks.onConnectionStateChange('disconnected')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pause the connection (e.g. on inactivity or tab hidden).
|
||||||
|
* Unlike disconnect(), this allows resuming later.
|
||||||
|
*/
|
||||||
|
async pause(): Promise<void> {
|
||||||
|
if (this.isDestroyed || this.isPaused) return
|
||||||
|
this.isPaused = true
|
||||||
|
this.stopHeartbeat()
|
||||||
|
this.clearReconnectTimer()
|
||||||
|
this.clearInactivityTimer()
|
||||||
|
this.clearConnectionTimer()
|
||||||
|
|
||||||
|
if (this.channel) {
|
||||||
|
await this.deleteSession()
|
||||||
|
this.supabase.removeChannel(this.channel)
|
||||||
|
this.channel = null
|
||||||
|
}
|
||||||
|
|
||||||
|
this.callbacks.onConnectionStateChange('disconnected')
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resume the connection after it was paused.
|
||||||
|
* Re-establishes the channel and presence.
|
||||||
|
*/
|
||||||
|
async resume(): Promise<void> {
|
||||||
|
if (this.isDestroyed || !this.isPaused) return
|
||||||
|
this.isPaused = false
|
||||||
|
this.reconnectAttempts = 0
|
||||||
|
await this.connect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Notify that user activity has occurred, resetting the inactivity timer.
|
||||||
|
* If the connection was paused due to inactivity, it will resume.
|
||||||
|
*/
|
||||||
|
notifyActivity(): void {
|
||||||
|
if (this.isDestroyed) return
|
||||||
|
this.resetInactivityTimer()
|
||||||
|
if (this.isPaused) {
|
||||||
|
this.resume()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getIsPaused(): boolean {
|
||||||
|
return this.isPaused
|
||||||
|
}
|
||||||
|
|
||||||
getChannel(): RealtimeChannel | null {
|
getChannel(): RealtimeChannel | null {
|
||||||
return this.channel
|
return this.channel
|
||||||
}
|
}
|
||||||
|
|
@ -254,19 +330,44 @@ export class RealtimeConnection {
|
||||||
|
|
||||||
private startHeartbeat(): void {
|
private startHeartbeat(): void {
|
||||||
this.stopHeartbeat()
|
this.stopHeartbeat()
|
||||||
|
let consecutiveFailures = 0
|
||||||
this.heartbeatTimer = setInterval(async () => {
|
this.heartbeatTimer = setInterval(async () => {
|
||||||
if (this.isDestroyed) {
|
if (this.isDestroyed || this.isPaused) {
|
||||||
this.stopHeartbeat()
|
this.stopHeartbeat()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if the channel is still in a healthy state
|
||||||
|
if (this.channel) {
|
||||||
|
const state = (this.channel as unknown as { state?: string }).state
|
||||||
|
if (state && state !== 'joined' && state !== 'joining') {
|
||||||
|
// Channel is in an unhealthy state - trigger reconnect
|
||||||
|
this.callbacks.onConnectionStateChange('reconnecting')
|
||||||
|
this.supabase.removeChannel(this.channel)
|
||||||
|
this.channel = null
|
||||||
|
this.stopHeartbeat()
|
||||||
|
this.scheduleReconnect()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.supabase
|
await this.supabase
|
||||||
.from('collaboration_sessions')
|
.from('collaboration_sessions')
|
||||||
.update({ last_heartbeat: new Date().toISOString() })
|
.update({ last_heartbeat: new Date().toISOString() })
|
||||||
.eq('project_id', this.projectId)
|
.eq('project_id', this.projectId)
|
||||||
.eq('user_id', this.userId)
|
.eq('user_id', this.userId)
|
||||||
|
consecutiveFailures = 0
|
||||||
} catch {
|
} catch {
|
||||||
// Heartbeat failure is non-critical
|
consecutiveFailures++
|
||||||
|
// If heartbeat fails 3 times in a row, the connection is likely dead
|
||||||
|
if (consecutiveFailures >= 3 && this.channel) {
|
||||||
|
this.callbacks.onConnectionStateChange('reconnecting')
|
||||||
|
this.supabase.removeChannel(this.channel)
|
||||||
|
this.channel = null
|
||||||
|
this.stopHeartbeat()
|
||||||
|
this.scheduleReconnect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}, HEARTBEAT_INTERVAL_MS)
|
}, HEARTBEAT_INTERVAL_MS)
|
||||||
}
|
}
|
||||||
|
|
@ -307,4 +408,28 @@ export class RealtimeConnection {
|
||||||
this.reconnectTimer = null
|
this.reconnectTimer = null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private resetInactivityTimer(): void {
|
||||||
|
this.clearInactivityTimer()
|
||||||
|
if (this.isDestroyed) return
|
||||||
|
this.inactivityTimer = setTimeout(() => {
|
||||||
|
if (!this.isDestroyed && !this.isPaused) {
|
||||||
|
this.pause()
|
||||||
|
}
|
||||||
|
}, INACTIVITY_TIMEOUT_MS)
|
||||||
|
}
|
||||||
|
|
||||||
|
private clearInactivityTimer(): void {
|
||||||
|
if (this.inactivityTimer) {
|
||||||
|
clearTimeout(this.inactivityTimer)
|
||||||
|
this.inactivityTimer = null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private clearConnectionTimer(): void {
|
||||||
|
if (this.connectionTimer) {
|
||||||
|
clearTimeout(this.connectionTimer)
|
||||||
|
this.connectionTimer = null
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue