Skip to content

Commit aa8f205

Browse files
committed
[AIT-208] feat: partial sync implementation
- rename `syncObjectsDataPool` to `syncObjectsPool` - Standardized naming across `ObjectsManager` to improve code clarity. - Updated references in implementation and tests.
1 parent c686b8f commit aa8f205

5 files changed

Lines changed: 197 additions & 29 deletions

File tree

liveobjects/src/main/kotlin/io/ably/lib/objects/DefaultRealtimeObjects.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
315315
// if no HAS_OBJECTS flag received on attach, we can end sync sequence immediately and treat it as no objects on a channel.
316316
// reset the objects pool to its initial state, and emit update events so subscribers to root object get notified about changes.
317317
objectsPool.resetToInitialPool(true) // RTO4b1, RTO4b2
318-
objectsManager.clearSyncObjectsDataPool() // RTO4b3
318+
objectsManager.clearSyncObjectsPool() // RTO4b3
319319
// RTO4b5 removed — buffer already cleared by RTO4d above
320320
// defer the state change event until the next tick if we started a new sequence just now due to being in initialized state.
321321
// this allows any event listeners to process the start of the new sequence event that was emitted earlier during this event loop.
@@ -340,7 +340,7 @@ internal class DefaultRealtimeObjects(internal val channelName: String, internal
340340
if (state != ChannelState.suspended) {
341341
// do not emit data update events as the actual current state of Objects data is unknown when we're in these channel states
342342
objectsPool.clearObjectsData(false)
343-
objectsManager.clearSyncObjectsDataPool()
343+
objectsManager.clearSyncObjectsPool()
344344
}
345345
}
346346
else -> {

liveobjects/src/main/kotlin/io/ably/lib/objects/ObjectsManager.kt

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
1717
/**
1818
* @spec RTO5 - Sync objects data pool for collecting sync messages
1919
*/
20-
private val syncObjectsDataPool = mutableMapOf<String, ObjectMessage>()
20+
private val syncObjectsPool = mutableMapOf<String, ObjectMessage>()
2121
private var currentSyncId: String? = null
2222
/**
2323
* @spec RTO7 - Buffered object operations during sync
@@ -59,7 +59,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
5959
}
6060

6161
// RTO5a3 - continue current sync sequence
62-
applyObjectSyncMessages(objectMessages) // RTO5b
62+
applyObjectSyncMessages(objectMessages) // RTO5f
6363

6464
// RTO5a4 - if this is the last (or only) message in a sequence of sync updates, end the sync
6565
if (syncTracker.hasSyncEnded()) {
@@ -77,7 +77,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
7777
internal fun startNewSync(syncId: String?) {
7878
Log.v(tag, "Starting new sync sequence: syncId=$syncId")
7979

80-
syncObjectsDataPool.clear() // RTO5a2a
80+
syncObjectsPool.clear() // RTO5a2a
8181
currentSyncId = syncId
8282
syncCompletionWaiter = CompletableDeferred()
8383
stateChange(ObjectsState.Syncing)
@@ -93,7 +93,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
9393
applySync() // RTO5c1/2/7
9494
applyObjectMessages(bufferedObjectOperations, ObjectsOperationSource.CHANNEL) // RTO5c6
9595
bufferedObjectOperations.clear() // RTO5c5
96-
syncObjectsDataPool.clear() // RTO5c4
96+
syncObjectsPool.clear() // RTO5c4
9797
currentSyncId = null // RTO5c3
9898
realtimeObjects.appliedOnAckSerials.clear() // RTO5c9
9999
stateChange(ObjectsState.Synced) // RTO5c8
@@ -127,8 +127,8 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
127127
* Clears the sync objects data pool.
128128
* Used by DefaultRealtimeObjects.handleStateChange.
129129
*/
130-
internal fun clearSyncObjectsDataPool() {
131-
syncObjectsDataPool.clear()
130+
internal fun clearSyncObjectsPool() {
131+
syncObjectsPool.clear()
132132
}
133133

134134
/**
@@ -145,7 +145,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
145145
* @spec RTO5c - Processes sync data and updates objects pool
146146
*/
147147
private fun applySync() {
148-
if (syncObjectsDataPool.isEmpty()) {
148+
if (syncObjectsPool.isEmpty()) {
149149
return
150150
}
151151

@@ -154,7 +154,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
154154
val existingObjectUpdates = mutableListOf<Pair<BaseRealtimeObject, ObjectUpdate>>()
155155

156156
// RTO5c1
157-
for ((objectId, objectMessage) in syncObjectsDataPool) {
157+
for ((objectId, objectMessage) in syncObjectsPool) {
158158
val objectState = objectMessage.objectState as ObjectState // we have non-null objectState here due to RTO5b
159159
receivedObjectIds.add(objectId)
160160
val existingObject = realtimeObjects.objectsPool.get(objectId)
@@ -232,9 +232,9 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
232232
}
233233

234234
/**
235-
* Applies sync messages to sync data pool.
235+
* Applies sync messages to sync data pool, merging partial sync messages for the same objectId.
236236
*
237-
* @spec RTO5b - Collects object states during sync sequence
237+
* @spec RTO5f - Collects and merges object states during sync sequence
238238
*/
239239
private fun applyObjectSyncMessages(objectMessages: List<ObjectMessage>) {
240240
for (objectMessage in objectMessages) {
@@ -244,11 +244,44 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
244244
}
245245

246246
val objectState: ObjectState = objectMessage.objectState
247-
if (objectState.counter != null || objectState.map != null) {
248-
syncObjectsDataPool[objectState.objectId] = objectMessage
249-
} else {
250-
// RTO5c1b1c - object state must contain either counter or map data
251-
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
247+
val objectId = objectState.objectId
248+
val existingEntry = syncObjectsPool[objectId]
249+
250+
if (existingEntry == null) {
251+
// RTO5f1 - objectId not in pool, store directly
252+
if (objectState.counter != null || objectState.map != null) {
253+
syncObjectsPool[objectId] = objectMessage
254+
} else {
255+
// RTO5c1b1c - object state must contain either counter or map data
256+
Log.w(tag, "Object state received without counter or map data, skipping message: ${objectMessage.id}")
257+
}
258+
continue
259+
}
260+
261+
// RTO5f2 - objectId already in pool; this is a partial sync message, merge based on type
262+
when {
263+
objectState.map != null -> {
264+
// RTO5f2a - map object: merge entries
265+
if (objectState.tombstone) {
266+
// RTO5f2a1 - tombstone: replace pool entry entirely
267+
syncObjectsPool[objectId] = objectMessage
268+
} else {
269+
// RTO5f2a2 - merge map entries; server guarantees no duplicate keys across partials
270+
val existingState = existingEntry.objectState!! // non-null for existing entry
271+
val mergedEntries = existingState.map?.entries.orEmpty() + objectState.map.entries.orEmpty()
272+
val mergedMap = (existingState.map ?: ObjectsMap()).copy(entries = mergedEntries)
273+
val mergedState = existingState.copy(map = mergedMap)
274+
syncObjectsPool[objectId] = existingEntry.copy(objectState = mergedState)
275+
}
276+
}
277+
objectState.counter != null -> {
278+
// RTO5f2b - counter objects must never be split across messages
279+
Log.e(tag, "Received partial sync message for a counter object, skipping: ${objectMessage.id}")
280+
}
281+
else -> {
282+
// RTO5f2c - unsupported type, log warning and skip
283+
Log.w(tag, "Received partial sync message for an unsupported object type, skipping: ${objectMessage.id}")
284+
}
252285
}
253286
}
254287
}
@@ -284,7 +317,7 @@ internal class ObjectsManager(private val realtimeObjects: DefaultRealtimeObject
284317

285318
internal fun dispose() {
286319
syncCompletionWaiter?.cancel()
287-
syncObjectsDataPool.clear()
320+
syncObjectsPool.clear()
288321
bufferedObjectOperations.clear()
289322
disposeObjectsStateListeners()
290323
}

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/TestHelpers.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ internal val BaseRealtimeObject.TombstonedAt: Long?
6868
* START - DefaultRealtimeObjects dep mocks
6969
* ======================================
7070
*/
71-
internal val ObjectsManager.SyncObjectsDataPool: Map<String, ObjectState>
72-
get() = this.getPrivateField("syncObjectsDataPool")
71+
internal val ObjectsManager.SyncObjectsPool: Map<String, ObjectMessage>
72+
get() = this.getPrivateField("syncObjectsPool")
7373

7474
internal val ObjectsManager.BufferedObjectOperations: List<ObjectMessage>
7575
get() = this.getPrivateField("bufferedObjectOperations")

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/DefaultRealtimeObjectsTest.kt

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import io.ably.lib.objects.type.livemap.DefaultLiveMap
1313
import io.ably.lib.objects.type.livemap.LiveMapEntry
1414
import io.ably.lib.objects.unit.BufferedObjectOperations
1515
import io.ably.lib.objects.unit.ObjectsManager
16-
import io.ably.lib.objects.unit.SyncObjectsDataPool
16+
import io.ably.lib.objects.unit.SyncObjectsPool
1717
import io.ably.lib.objects.unit.getMockObjectsAdapter
1818
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
1919
import io.ably.lib.objects.unit.getMockRealtimeChannel
@@ -83,7 +83,7 @@ class DefaultRealtimeObjectsTest {
8383
defaultRealtimeObjects.ObjectsManager.endSync()
8484
}
8585

86-
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsDataPool.size) // RTO4b3
86+
assertEquals(0, defaultRealtimeObjects.ObjectsManager.SyncObjectsPool.size) // RTO4b3
8787
assertEquals(0, defaultRealtimeObjects.ObjectsManager.BufferedObjectOperations.size) // RTO4d
8888
assertEquals(1, defaultRealtimeObjects.objectsPool.size()) // RTO4b1 - Only root remains
8989
assertEquals(rootObject, defaultRealtimeObjects.objectsPool.get(ROOT_OBJECT_ID)) // points to previously created root object
@@ -246,16 +246,16 @@ class DefaultRealtimeObjectsTest {
246246
failCalled.await()
247247

248248
verify(exactly = 0) { defaultRealtimeObjects.objectsPool.clearObjectsData(any()) }
249-
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
249+
verify(exactly = 0) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() }
250250
}
251251

252252
@Test
253253
fun `(RTO4) handleStateChange(DETACHED) clears objects data and sync pool`() = runTest {
254254
val defaultRealtimeObjects = getDefaultRealtimeObjectsWithMockedDeps()
255255

256-
// Use clearSyncObjectsDataPool (the last operation in the coroutine) as the completion signal
256+
// Use clearSyncObjectsPool (the last operation in the coroutine) as the completion signal
257257
val syncPoolCleared = CompletableDeferred<Unit>()
258-
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() } answers {
258+
every { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() } answers {
259259
callOriginal()
260260
syncPoolCleared.complete(Unit)
261261
}
@@ -265,7 +265,7 @@ class DefaultRealtimeObjectsTest {
265265
syncPoolCleared.await()
266266

267267
verify(exactly = 1) { defaultRealtimeObjects.objectsPool.clearObjectsData(false) }
268-
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsDataPool() }
268+
verify(exactly = 1) { defaultRealtimeObjects.ObjectsManager.clearSyncObjectsPool() }
269269
}
270270

271271
@Test
@@ -281,7 +281,7 @@ class DefaultRealtimeObjectsTest {
281281
operation = ObjectOperation(
282282
action = ObjectOperationAction.CounterInc,
283283
objectId = "counter:test@1",
284-
counterOp = ObjectsCounterOp(amount = 5.0)
284+
counterInc = CounterInc(number = 5.0)
285285
)
286286
)
287287
)
@@ -313,7 +313,7 @@ class DefaultRealtimeObjectsTest {
313313
operation = ObjectOperation(
314314
action = ObjectOperationAction.CounterInc,
315315
objectId = "counter:test@1",
316-
counterOp = ObjectsCounterOp(amount = 5.0)
316+
counterInc = CounterInc(number = 5.0)
317317
)
318318
)
319319
)
@@ -370,7 +370,7 @@ class DefaultRealtimeObjectsTest {
370370
operation = ObjectOperation(
371371
action = ObjectOperationAction.CounterInc,
372372
objectId = "counter:test@1",
373-
counterOp = ObjectsCounterOp(amount = 3.0)
373+
counterInc = CounterInc(number = 0.0)
374374
),
375375
serial = "serial-op-1",
376376
siteCode = "site1"

liveobjects/src/test/kotlin/io/ably/lib/objects/unit/objects/ObjectsManagerTest.kt

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import io.ably.lib.objects.unit.*
1212
import io.ably.lib.objects.unit.getDefaultRealtimeObjectsWithMockedDeps
1313
import io.ably.lib.types.AblyException
1414
import io.ably.lib.types.ErrorInfo
15+
import io.ably.lib.util.Log
1516
import io.mockk.*
1617
import kotlinx.coroutines.launch
1718
import kotlinx.coroutines.test.runTest
@@ -655,6 +656,140 @@ class ObjectsManagerTest {
655656
"appliedOnAckSerials should be empty when serials length mismatches")
656657
}
657658

659+
@Test
660+
fun `(RTO5f2a2) partial sync map entries are merged across two messages with the same objectId`() {
661+
val defaultRealtimeObjects = makeRealtimeObjects()
662+
val objectsManager = defaultRealtimeObjects.ObjectsManager
663+
664+
val msg1 = ObjectMessage(
665+
id = "msg1",
666+
objectState = ObjectState(
667+
objectId = "map:test@1",
668+
tombstone = false,
669+
siteTimeserials = mapOf("site1" to "serial1"),
670+
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))))
671+
)
672+
)
673+
val msg2 = ObjectMessage(
674+
id = "msg2",
675+
objectState = ObjectState(
676+
objectId = "map:test@1",
677+
tombstone = false,
678+
siteTimeserials = mapOf("site1" to "serial2"),
679+
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key2" to ObjectsMapEntry(data = ObjectData(string = "value2"))))
680+
)
681+
)
682+
683+
objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")
684+
685+
val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap
686+
assertNotNull(liveMap.data["key1"], "key1 should be present after merge")
687+
assertNotNull(liveMap.data["key2"], "key2 should be present after merge")
688+
assertEquals("value1", liveMap.data["key1"]?.data?.string)
689+
assertEquals("value2", liveMap.data["key2"]?.data?.string)
690+
}
691+
692+
@Test
693+
fun `(RTO5f2a1) tombstone on second partial message replaces pool entry entirely`() {
694+
val defaultRealtimeObjects = makeRealtimeObjects()
695+
val objectsManager = defaultRealtimeObjects.ObjectsManager
696+
697+
val msg1 = ObjectMessage(
698+
id = "msg1",
699+
objectState = ObjectState(
700+
objectId = "map:test@1",
701+
tombstone = false,
702+
siteTimeserials = mapOf("site1" to "serial1"),
703+
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))))
704+
)
705+
)
706+
val msg2 = ObjectMessage(
707+
id = "msg2",
708+
objectState = ObjectState(
709+
objectId = "map:test@1",
710+
tombstone = true,
711+
siteTimeserials = mapOf("site1" to "serial2"),
712+
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = emptyMap())
713+
)
714+
)
715+
716+
objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")
717+
718+
val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap
719+
// After tombstone replaces the entry, the map should have no key1
720+
assertNull(liveMap.data["key1"], "key1 should not be present after tombstone replaced the pool entry")
721+
}
722+
723+
@Test
724+
fun `(RTO5f2b) partial sync counter message logs error and is skipped`() {
725+
val defaultRealtimeObjects = makeRealtimeObjects()
726+
val objectsManager = defaultRealtimeObjects.ObjectsManager
727+
728+
mockkStatic(Log::class)
729+
every { Log.e(any(), any<String>()) } returns 0
730+
731+
val msg1 = ObjectMessage(
732+
id = "msg1",
733+
objectState = ObjectState(
734+
objectId = "counter:test@1",
735+
tombstone = false,
736+
siteTimeserials = mapOf("site1" to "serial1"),
737+
counter = ObjectsCounter(count = 10.0)
738+
)
739+
)
740+
val msg2 = ObjectMessage(
741+
id = "msg2",
742+
objectState = ObjectState(
743+
objectId = "counter:test@1",
744+
tombstone = false,
745+
siteTimeserials = mapOf("site1" to "serial2"),
746+
counter = ObjectsCounter(count = 5.0)
747+
)
748+
)
749+
750+
objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")
751+
752+
// Pool should contain only msg1 (msg2 skipped)
753+
val counter = defaultRealtimeObjects.objectsPool.get("counter:test@1") as DefaultLiveCounter
754+
assertEquals(10.0, counter.data.get(), "counter value should be from msg1 only (msg2 skipped)")
755+
verify { Log.e(any(), match<String> { it.contains("partial sync message for a counter") }) }
756+
}
757+
758+
@Test
759+
fun `(RTO5f2c) partial sync message with unsupported type logs warning and is skipped`() {
760+
val defaultRealtimeObjects = makeRealtimeObjects()
761+
val objectsManager = defaultRealtimeObjects.ObjectsManager
762+
763+
mockkStatic(Log::class)
764+
every { Log.w(any(), any<String>()) } returns 0
765+
766+
val msg1 = ObjectMessage(
767+
id = "msg1",
768+
objectState = ObjectState(
769+
objectId = "map:test@1",
770+
tombstone = false,
771+
siteTimeserials = mapOf("site1" to "serial1"),
772+
map = ObjectsMap(semantics = ObjectsMapSemantics.LWW, entries = mapOf("key1" to ObjectsMapEntry(data = ObjectData(string = "value1"))))
773+
)
774+
)
775+
// msg2 has neither map nor counter — hits the else branch (RTO5f2c)
776+
val msg2 = ObjectMessage(
777+
id = "msg2",
778+
objectState = ObjectState(
779+
objectId = "map:test@1",
780+
tombstone = false,
781+
siteTimeserials = mapOf("site1" to "serial2"),
782+
)
783+
)
784+
785+
objectsManager.handleObjectSyncMessages(listOf(msg1, msg2), "sync-1:")
786+
787+
// Pool entry should still be msg1 (msg2 was skipped)
788+
val liveMap = defaultRealtimeObjects.objectsPool.get("map:test@1") as DefaultLiveMap
789+
assertNotNull(liveMap.data["key1"], "key1 should still be present (msg2 skipped)")
790+
verify { Log.w(any(), match<String> { it.contains("unsupported object type") }) }
791+
}
792+
658793
private fun mockZeroValuedObjects() {
659794
mockkObject(DefaultLiveMap.Companion)
660795
every {

0 commit comments

Comments
 (0)