Skip to content

Commit 161d6ec

Browse files
authored
Merge pull request #1197 from ably/AIT-208/partial-sync
[AIT-208] feat: partial sync implementation
2 parents c686b8f + f8555a9 commit 161d6ec

6 files changed

Lines changed: 337 additions & 36 deletions

File tree

liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
315315
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
316316
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
317317
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
318-
objectsManager.clearSyncObjectsDataPool() // RTO4b3
318+
objectsManager.clearSyncObjectsPool() // RTO4b3
319319
// RTO4b5 removed — buffer already cleared by RTO4d above
320320
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
321321
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
@@ -340,7 +340,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
340340
if (state != ChannelState.suspended) {
341341
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
342342
objectsPool.clearObjectsData(false)
343-
objectsManager.clearSyncObjectsDataPool()
343+
objectsManager.clearSyncObjectsPool()
344344
}
345345
}
346346
else -> {

liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ import kotlinx.coroutines.CompletableDeferred
1515
internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObjects): ObjectsStateCoordinator() {
1616
private val tag = "ObjectsManager"
1717
/**
18-
* @spec RTO5 - Sync objects data pool for collecting sync messages
18+
* @spec RTO5 - Sync objects pool for collecting sync messages
1919
*/
20-
private val syncObjectsDataPool = mutableMapOf<String, ObjectMessage>()
20+
private val syncObjectsPool = mutableMapOf<String, ObjectMessage>()
2121
private var currentSyncId: String? = null
2222
/**
2323
* @spec RTO7 - Buffered object operations during sync
@@ -59,7 +59,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
5959
}
6060

6161
// RTO5a3 - continue current sync sequence
62-
applyObjectSyncMessages(objectMessages) // RTO5b
62+
applyObjectSyncMessages(objectMessages) // RTO5f
6363

6464
// RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync
6565
if (syncTracker.hasSyncEnded()) {
@@ -77,7 +77,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
7777
internal fun startNewSync(syncId: String?) {
7878
Log.v(tag, "Starting new sync sequence: syncId=$syncId")
7979

80-
syncObjectsDataPool.clear() // RTO5a2a
80+
syncObjectsPool.clear() // RTO5a2a
8181
currentSyncId = syncId
8282
syncCompletionWaiter = CompletableDeferred()
8383
stateChange(ObjectsState.Syncing)
@@ -93,7 +93,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
9393
applySync() // RTO5c1/2/7
9494
applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
9595
bufferedObjectOperations.clear() // RTO5c5
96-
syncObjectsDataPool.clear() // RTO5c4
96+
syncObjectsPool.clear() // RTO5c4
9797
currentSyncId = null // RTO5c3
9898
realtimeObjects.appliedOnAckSerials.clear() // RTO5c9
9999
stateChange(ObjectsState.Synced) // RTO5c8
@@ -124,11 +124,11 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
124124
}
125125

126126
/**
127-
* Clears the sync objects data pool.
127+
* Clears the sync objects pool.
128128
* Used by DefaultRealtimeObjects.handleStateChange.
129129
*/
130-
internal fun clearSyncObjectsDataPool() {
131-
syncObjectsDataPool.clear()
130+
internal fun clearSyncObjectsPool() {
131+
syncObjectsPool.clear()
132132
}
133133

134134
/**
@@ -145,7 +145,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
145145
* @spec RTO5c - Processes sync data and updates objects pool
146146
*/
147147
private fun applySync() {
148-
if (syncObjectsDataPool.isEmpty()) {
148+
if (syncObjectsPool.isEmpty()) {
149149
return
150150
}
151151

@@ -154,8 +154,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
154154
val existingObjectUpdates = mutableListOf<Pair<BaseRealtimeObject, ObjectUpdate>>()
155155

156156
// RTO5c1
157-
for ((objectId, objectMessage) in syncObjectsDataPool) {
158-
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
157+
for ((objectId, objectMessage) in syncObjectsPool) {
158+
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5f
159159
receivedObjectIds.add(objectId)
160160
val existingObject = realtimeObjects.objectsPool.get(objectId)
161161

@@ -166,7 +166,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
166166
existingObjectUpdates.add(Pair(existingObject, update))
167167
} else { // RTO5c1b
168168
// RTO5c1b1, RTO5c1b1a, RTO5c1b1b - Create new object and add it to the pool
169-
val newObject = createObjectFromState(objectState)
169+
val newObject = createObjectFromState(objectState) ?: continue // RTO5c1b1c - skip unsupported
170170
newObject.applyObjectSync(objectMessage)
171171
realtimeObjects.objectsPool.set(objectId, newObject)
172172
}
@@ -232,9 +232,9 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
232232
}
233233

234234
/**
235-
* Applies sync messages to sync data pool.
235+
* Applies sync messages to sync data pool, merging partial sync messages for the same objectId.
236236
*
237-
* @spec RTO5b - Collects object states during sync sequence
237+
* @spec RTO5f - Collects and merges object states during sync sequence
238238
*/
239239
private fun applyObjectSyncMessages(objectMessages: List<ObjectMessage>) {
240240
for (objectMessage in objectMessages) {
@@ -244,11 +244,44 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
244244
}
245245

246246
val objectState: ObjectState = objectMessage.objectState
247-
if (objectState.counter != null || objectState.map != null) {
248-
syncObjectsDataPool[objectState.objectId] = objectMessage
249-
} else {
250-
// RTO5c1b1c - object state must contain either counter or map data
251-
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
247+
val objectId = objectState.objectId
248+
val existingEntry = syncObjectsPool[objectId]
249+
250+
if (existingEntry == null) {
251+
// RTO5f1 - objectId not in pool, store directly
252+
if (objectState.counter != null || objectState.map != null) {
253+
syncObjectsPool[objectId] = objectMessage
254+
} else {
255+
// RTO5c1b1c - object state must contain either counter or map data
256+
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
257+
}
258+
continue
259+
}
260+
261+
// RTO5f2 - objectId already in pool; this is a partial sync message, merge based on type
262+
when {
263+
objectState.map != null -> {
264+
// RTO5f2a - map object: merge entries
265+
if (objectState.tombstone) {
266+
// RTO5f2a1 - tombstone: replace pool entry entirely
267+
syncObjectsPool[objectId] = objectMessage
268+
} else {
269+
// RTO5f2a2 - merge map entries; server guarantees no duplicate keys across partials
270+
val existingState = existingEntry.objectState!! // non-null for existing entry
271+
val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty()
272+
val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries)
273+
val mergedState = existingState.copy(map = mergedMap)
274+
syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)
275+
}
276+
}
277+
objectState.counter != null -> {
278+
// RTO5f2b - counter objects must never be split across messages
279+
Log.e(tag, "Received partial sync message for a counter object, skipping: ${objectMessage.id}")
280+
}
281+
else -> {
282+
// RTO5f2c - unsupported type, log warning and skip
283+
Log.w(tag, "Received partial sync message for an unsupported object type, skipping: ${objectMessage.id}")
284+
}
252285
}
253286
}
254287
}
@@ -258,11 +291,15 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
258291
*
259292
* @spec RTO5c1b - Creates objects from object state based on type
260293
*/
261-
private fun createObjectFromState(objectState: ObjectState): BaseRealtimeObject {
294+
private fun createObjectFromState(objectState: ObjectState): BaseRealtimeObject? {
262295
return when {
263296
objectState.counter != null -> DefaultLiveCounter.zeroValue(objectState.objectId, realtimeObjects) // RTO5c1b1a
264297
objectState.map != null -> DefaultLiveMap.zeroValue(objectState.objectId, realtimeObjects) // RTO5c1b1b
265-
else -> throw clientError("Object state must contain either counter or map data") // RTO5c1b1c
298+
else -> {
299+
// RTO5c1b1c - unsupported object type, skip gracefully
300+
Log.w(tag, "Received unsupported object state during OBJECT_SYNC (no counter or map), skipping objectId: ${objectState.objectId}")
301+
null
302+
}
266303
}
267304
}
268305

@@ -284,7 +321,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
284321

285322
internal fun dispose() {
286323
syncCompletionWaiter?.cancel()
287-
syncObjectsDataPool.clear()
324+
syncObjectsPool.clear()
288325
bufferedObjectOperations.clear()
289326
disposeObjectsStateListeners()
290327
}

liveobjects/src/main/kotlin/io/ably/lib/objects/type/BaseRealtimeObject.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ internal abstract class BaseRealtimeObject(
4848
* @spec RTLM6/RTLC6 - Overrides ObjectMessage with object data state from sync to LiveMap/LiveCounter
4949
*/
5050
internal fun applyObjectSync(objectMessage: ObjectMessage): ObjectUpdate {
51-
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
51+
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5f
5252
validate(objectState)
5353
// object's site serials are still updated even if it is tombstoned, so always use the site serials received from the operation.
5454
// should default to empty map if site serials do not exist on the object state, so that any future operation may be applied to this object.

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ internal val BaseRealtimeObject.TombstonedAt: Long?
6868
* START - DefaultRealtimeObjects dep mocks
6969
* ======================================
7070
*/
71-
internal val ObjectsManager.SyncObjectsDataPool: Map<String, ObjectState>
72-
get() = this.getPrivateField("syncObjectsDataPool")
71+
internal val ObjectsManager.SyncObjectsPool: Map<String, ObjectMessage>
72+
get() = this.getPrivateField("syncObjectsPool")
7373

7474
internal val ObjectsManager.BufferedObjectOperations: List<ObjectMessage>
7575
get() = this.getPrivateField("bufferedObjectOperations")

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import io.ably.lib.objects.type.livemap.DefaultLiveMap
1313
import io.ably.lib.objects.type.livemap.LiveMapEntry
1414
import io.ably.lib.objects.unit.BufferedObjectOperations
1515
import io.ably.lib.objects.unit.ObjectsManager
16-
import io.ably.lib.objects.unit.SyncObjectsDataPool
16+
import io.ably.lib.objects.unit.SyncObjectsPool
1717
import io.ably.lib.objects.unit.getMockObjectsAdapter
1818
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
1919
import io.ably.lib.objects.unit.getMockRealtimeChannel
@@ -83,7 +83,7 @@ class DefaultRealtimeObjectsTest {
8383
defaultRealtimeObjects.ObjectsManager.endSync()
8484
}
8585

86-
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsDataPool.size) // RTO4b3
86+
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsPool.size) // RTO4b3
8787
assertEquals(0, defaultRealtimeObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4d
8888
assertEquals(1, defaultRealtimeObjects.objectsPool.size()) // RTO4b1 - Only root remains
8989
assertEquals(rootObject, defaultRealtimeObjects.objectsPool.get(ROOT_OBJECT_ID)) // points to previously created root object
@@ -246,16 +246,16 @@ class DefaultRealtimeObjectsTest {
246246
failCalled.await()
247247

248248
verify(exactly = 0) { defaultRealtimeObjects.objectsPool.clearObjectsData(any()) }
249-
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
249+
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() }
250250
}
251251

252252
@Test
253253
fun `(RTO4) handleStateChange(DETACHED) clears objects data and sync pool`() = runTest {
254254
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
255255

256-
// Use clearSyncObjectsDataPool (the last operation in the coroutine) as the completion signal
256+
// Use clearSyncObjectsPool (the last operation in the coroutine) as the completion signal
257257
val syncPoolCleared = CompletableDeferred<Unit>()
258-
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() } answers {
258+
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() } answers {
259259
callOriginal()
260260
syncPoolCleared.complete(Unit)
261261
}
@@ -265,7 +265,7 @@ class DefaultRealtimeObjectsTest {
265265
syncPoolCleared.await()
266266

267267
verify(exactly = 1) { defaultRealtimeObjects.objectsPool.clearObjectsData(false) }
268-
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
268+
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() }
269269
}
270270

271271
@Test
@@ -281,7 +281,7 @@ class DefaultRealtimeObjectsTest {
281281
operation = ObjectOperation(
282282
action = ObjectOperationAction.CounterInc,
283283
objectId = "counter:test@1",
284-
counterOp = ObjectsCounterOp(amount = 5.0)
284+
counterInc = CounterInc(number = 5.0)
285285
)
286286
)
287287
)
@@ -313,7 +313,7 @@ class DefaultRealtimeObjectsTest {
313313
operation = ObjectOperation(
314314
action = ObjectOperationAction.CounterInc,
315315
objectId = "counter:test@1",
316-
counterOp = ObjectsCounterOp(amount = 5.0)
316+
counterInc = CounterInc(number = 5.0)
317317
)
318318
)
319319
)
@@ -370,7 +370,7 @@ class DefaultRealtimeObjectsTest {
370370
operation = ObjectOperation(
371371
action = ObjectOperationAction.CounterInc,
372372
objectId = "counter:test@1",
373-
counterOp = ObjectsCounterOp(amount = 3.0)
373+
counterInc = CounterInc(number = 3.0)
374374
),
375375
serial = "serial-op-1",
376376
siteCode = "site1"

0 commit comments

Comments
 (0)