From 8505557723e606e3332026b0f5e30d8db460707d Mon Sep 17 00:00:00 2001 From: Ethan Berg Date: Thu, 26 Mar 2026 13:21:51 -0700 Subject: [PATCH 1/3] feat: added flow wrapper for cache --- cache/build.gradle.kts | 2 + .../kroger/cache/internal/CacheFlowWrapper.kt | 97 +++++++++++++++ .../cache/internal/CacheFlowWrapperTest.kt | 111 ++++++++++++++++++ gradle/libs.versions.toml | 2 + 4 files changed, 212 insertions(+) create mode 100644 cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt create mode 100644 cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt diff --git a/cache/build.gradle.kts b/cache/build.gradle.kts index 48da38d..8e3976a 100644 --- a/cache/build.gradle.kts +++ b/cache/build.gradle.kts @@ -33,5 +33,7 @@ dependencies { junit5() testImplementation(libs.kotlinx.coroutines.test) + testImplementation(libs.mockk) testImplementation(libs.truth) + testImplementation(libs.turbine) } diff --git a/cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt b/cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt new file mode 100644 index 0000000..a3ab76d --- /dev/null +++ b/cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt @@ -0,0 +1,97 @@ +package com.kroger.cache.internal + +import com.kroger.cache.SnapshotPersistentCache +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.drop +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch + +/** + * MIT License + * + * Copyright (c) 2023 The Kroger Co. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +/** + * A Wrapper class for a SnapshotPersistentCache that exposes changes to the cache via a flow. + * + * **Note this works best when used as a singleton + * + * @param cache the [com.kroger.cache.SnapshotPersistentCache] holding the value(s) on disk + * @param scope the [kotlinx.coroutines.CoroutineScope] to run the flow on + * + */ +public class CacheFlowWrapper( + private val cache: SnapshotPersistentCache, + private val scope: CoroutineScope, +) { + /** + * A reference to the coroutine job used for reading the first value from the [cache] and emitting it on [_cacheValueState] + */ + private val initializerJob: Job + + /** + * The private mutable state flow for the current value + */ + private val _cacheValueState = MutableStateFlow(null) + + /** + * publicly exposed read-only flow on which to read and observe changes to the current value + */ + public val cacheValueFlow: StateFlow = _cacheValueState.asStateFlow() + + /** + * Initialization block reads the value the [cache] and emits it on [_cacheValueState] + * + * This job also updates the value in [cache] for each new value emitted on the flow + * except for the first, which is read from [cache] + */ + init { + initializerJob = scope.launch { + _cacheValueState.value = cache.read() + + cacheValueFlow + .drop(1) + .onEach { + cache.save(it) + }.launchIn(scope) + } + } + + /** + * Updates the value of the StateFlow + * Any update to the state flow will be persisted to the [cache] + * Waits for initialization to finish reading the first value from the [cache] + * before emitting a new value on the flow + * + * @param newValue The new value to be both emitted on the flow, and saved in the [cache] + */ + public suspend fun setValue(newValue: T) { + if (initializerJob.isActive) { + initializerJob.join() + } + _cacheValueState.emit(newValue) + } +} diff --git a/cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt b/cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt new file mode 100644 index 0000000..a8503ed --- /dev/null +++ b/cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt @@ -0,0 +1,111 @@ +package com.kroger.cache.internal + +import app.cash.turbine.test +import com.google.common.truth.Truth.assertThat +import com.kroger.cache.SnapshotPersistentCache +import io.mockk.coEvery +import io.mockk.coVerifySequence +import io.mockk.just +import io.mockk.mockk +import io.mockk.runs +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.runTest +import org.junit.jupiter.api.Test + +@OptIn(ExperimentalCoroutinesApi::class) +class CacheFlowWrapperTest { + private val testDispatcher = UnconfinedTestDispatcher() + val testScope = CoroutineScope(CoroutineName("CacheFlowWrapperTest") + testDispatcher) + val fileCache: SnapshotPersistentCache = mockk() + + lateinit var cacheWrapper: CacheFlowWrapper + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `GIVEN cache is still reading WHEN new value is set THEN set value will wait for read to finish`() = runTest { + val fileCacheValue = "File cache value" + val newValue = "new value" + coEvery { fileCache.read() } coAnswers { + delay(1000) + fileCacheValue + } + coEvery { fileCache.save(any()) } just runs + cacheWrapper = CacheFlowWrapper(fileCache, testScope) + cacheWrapper.cacheValueFlow.test { + assertThat(awaitItem()).isEqualTo(null) + cacheWrapper.setValue(newValue) + assertThat(awaitItem()).isEqualTo(fileCacheValue) + advanceTimeBy(1000) + assertThat(awaitItem()).isEqualTo(newValue) + cancelAndIgnoreRemainingEvents() + } + + coVerifySequence { + fileCache.read() + fileCache.save(eq(newValue)) + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `GIVEN cache is done reading WHEN new value is set THEN set value will happen immediately`() = runTest { + val fileCacheValue = "File cache value" + val newValue = "new value" + coEvery { fileCache.read() } coAnswers { + fileCacheValue + } + coEvery { fileCache.save(any()) } just runs + cacheWrapper = CacheFlowWrapper(fileCache, testScope) + cacheWrapper.cacheValueFlow.test { + assertThat(awaitItem()).isEqualTo(fileCacheValue) + advanceTimeBy(1000) + cacheWrapper.setValue(newValue) + assertThat(awaitItem()).isEqualTo(newValue) + cancelAndIgnoreRemainingEvents() + } + + coVerifySequence { + fileCache.read() + fileCache.save(eq(newValue)) + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `GIVEN cache is writing values slowly WHEN new values are set in quick succession THEN all values are emitted on flow, and last value is saved to disk`() = runTest { + val firstValue = "first new value" + val secondValue = "second new value" + val thirdValue = "third new value" + val fourthValue = "Fourth new value" + coEvery { fileCache.read() } returns null + coEvery { fileCache.save(any()) } coAnswers { + delay(1000) + } + cacheWrapper = CacheFlowWrapper(fileCache, testScope) + cacheWrapper.cacheValueFlow.test { + assertThat(awaitItem()).isEqualTo(null) + cacheWrapper.setValue(firstValue) + cacheWrapper.setValue(secondValue) + cacheWrapper.setValue(thirdValue) + cacheWrapper.setValue(fourthValue) + advanceTimeBy(2000) + assertThat(awaitItem()).isEqualTo(firstValue) + assertThat(awaitItem()).isEqualTo(secondValue) + assertThat(awaitItem()).isEqualTo(thirdValue) + assertThat(awaitItem()).isEqualTo(fourthValue) + cancelAndIgnoreRemainingEvents() + } + + coVerifySequence { + fileCache.read() + fileCache.save(eq(firstValue)) // start writing the first value + // second and third should be skipped since first isn't done writing yet + fileCache.save(eq(fourthValue)) // fourth and final value is written + } + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 7fc2eaa..f6f5f0b 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -39,6 +39,7 @@ ksp = "1.9.23-1.0.20" mockk = "1.12.5" telemetry = "1.0.0" truth = "1.1.3" +turbine = "1.0.0" [libraries] androidx-activity-compose = { module = "androidx.activity:activity-compose", version.ref = "androidxActivity" } @@ -73,6 +74,7 @@ moshi-ksp = { module = "com.squareup.moshi:moshi-kotlin-codegen", version.ref = truth = { module = "com.google.truth:truth", version.ref = "truth" } telemetry = { module = "com.kroger.telemetry:telemetry", version.ref = "telemetry" } telemetry-android = { module = "com.kroger.telemetry:android", version.ref = "telemetry" } +turbine = { module = "app.cash.turbine:turbine", version.ref = "turbine" } [plugins] android-application = { id = "com.android.application", version.ref = "androidGradlePlugin" } From d6559c1d76eecc5644584ba3e5cdd161c7ffaa7f Mon Sep 17 00:00:00 2001 From: Ethan Berg Date: Thu, 9 Apr 2026 15:25:46 -0700 Subject: [PATCH 2/3] chore: update documentation, fix unit tests, move files --- .../cache/{internal => }/CacheFlowWrapper.kt | 35 ++-- .../com/kroger/cache/internal/MemoryCache.kt | 2 +- .../com/kroger/cache/CacheFlowWrapperTest.kt | 156 ++++++++++++++++++ .../cache/internal/CacheFlowWrapperTest.kt | 111 ------------- 4 files changed, 176 insertions(+), 128 deletions(-) rename cache/src/main/java/com/kroger/cache/{internal => }/CacheFlowWrapper.kt (85%) create mode 100644 cache/src/test/java/com/kroger/cache/CacheFlowWrapperTest.kt delete mode 100644 cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt diff --git a/cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt b/cache/src/main/java/com/kroger/cache/CacheFlowWrapper.kt similarity index 85% rename from cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt rename to cache/src/main/java/com/kroger/cache/CacheFlowWrapper.kt index a3ab76d..ed5e25d 100644 --- a/cache/src/main/java/com/kroger/cache/internal/CacheFlowWrapper.kt +++ b/cache/src/main/java/com/kroger/cache/CacheFlowWrapper.kt @@ -1,16 +1,3 @@ -package com.kroger.cache.internal - -import com.kroger.cache.SnapshotPersistentCache -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.MutableStateFlow -import kotlinx.coroutines.flow.StateFlow -import kotlinx.coroutines.flow.asStateFlow -import kotlinx.coroutines.flow.drop -import kotlinx.coroutines.flow.launchIn -import kotlinx.coroutines.flow.onEach -import kotlinx.coroutines.launch - /** * MIT License * @@ -34,12 +21,28 @@ import kotlinx.coroutines.launch * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. */ +package com.kroger.cache + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.drop +import kotlinx.coroutines.flow.launchIn +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.launch + /** - * A Wrapper class for a SnapshotPersistentCache that exposes changes to the cache via a flow. + * A Wrapper class for a SnapshotPersistentCache that persists changes made to the value of the state flow * - * **Note this works best when used as a singleton + * Only emits values on the flow that are set via the same instance. + * If two flow wrappers exist for the same SnapshotPersistentCache, and one gets updated, the second will be out of sync. * - * @param cache the [com.kroger.cache.SnapshotPersistentCache] holding the value(s) on disk + * @param cache the [SnapshotPersistentCache] holding the value(s) on disk * @param scope the [kotlinx.coroutines.CoroutineScope] to run the flow on * */ diff --git a/cache/src/main/java/com/kroger/cache/internal/MemoryCache.kt b/cache/src/main/java/com/kroger/cache/internal/MemoryCache.kt index b4cc6e8..8da3646 100644 --- a/cache/src/main/java/com/kroger/cache/internal/MemoryCache.kt +++ b/cache/src/main/java/com/kroger/cache/internal/MemoryCache.kt @@ -29,7 +29,7 @@ import com.kroger.cache.Cache * A wrapper around a [Map] to work as an in-memory cache. * This class is not thread-safe and callers must ensure access is synchronized. * - * @param initialCapacity the initial capacity to use when creatig the [Map]. + * @param initialCapacity the initial capacity to use when creating the [Map]. */ internal class MemoryCache( initialCapacity: Int, diff --git a/cache/src/test/java/com/kroger/cache/CacheFlowWrapperTest.kt b/cache/src/test/java/com/kroger/cache/CacheFlowWrapperTest.kt new file mode 100644 index 0000000..2285157 --- /dev/null +++ b/cache/src/test/java/com/kroger/cache/CacheFlowWrapperTest.kt @@ -0,0 +1,156 @@ +/** + * MIT License + * + * Copyright (c) 2023 The Kroger Co. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ +package com.kroger.cache + +import app.cash.turbine.test +import com.google.common.truth.Truth.assertThat +import io.mockk.coEvery +import io.mockk.coVerifySequence +import io.mockk.just +import io.mockk.mockk +import io.mockk.runs +import kotlinx.coroutines.CoroutineName +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.delay +import kotlinx.coroutines.test.UnconfinedTestDispatcher +import kotlinx.coroutines.test.advanceTimeBy +import kotlinx.coroutines.test.resetMain +import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.test.setMain +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test + +@OptIn(ExperimentalCoroutinesApi::class) +class CacheFlowWrapperTest { + private val testDispatcher = UnconfinedTestDispatcher() + val testScope = CoroutineScope(CoroutineName("CacheFlowWrapperTest") + testDispatcher) + val fileCache: SnapshotPersistentCache = mockk() + + lateinit var cacheWrapper: CacheFlowWrapper + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler? = null + + @BeforeEach + fun setup() { + defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler() + Thread.setDefaultUncaughtExceptionHandler { _, e -> throw e } + Dispatchers.setMain(testDispatcher) + } + + @AfterEach + fun afterEach() { + Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + Dispatchers.resetMain() + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `GIVEN cache is still reading WHEN new value is set THEN set value will wait for read to finish`() = + runTest { + val fileCacheValue = "File cache value" + val newValue = "new value" + coEvery { fileCache.read() } coAnswers { + delay(1000) + fileCacheValue + } + coEvery { fileCache.save(any()) } just runs + cacheWrapper = CacheFlowWrapper(fileCache, testScope) + cacheWrapper.cacheValueFlow.test { + assertThat(awaitItem()).isEqualTo(null) + cacheWrapper.setValue(newValue) + assertThat(awaitItem()).isEqualTo(fileCacheValue) + advanceTimeBy(1000) + assertThat(awaitItem()).isEqualTo(newValue) + cancelAndIgnoreRemainingEvents() + } + + coVerifySequence { + fileCache.read() + fileCache.save(eq(newValue)) + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `GIVEN cache is done reading WHEN new value is set THEN set value will happen immediately`() = + runTest { + val fileCacheValue = "File cache value" + val newValue = "new value" + coEvery { fileCache.read() } coAnswers { + fileCacheValue + } + coEvery { fileCache.save(any()) } just runs + cacheWrapper = CacheFlowWrapper(fileCache, testScope) + cacheWrapper.cacheValueFlow.test { + assertThat(awaitItem()).isEqualTo(fileCacheValue) + advanceTimeBy(1000) + cacheWrapper.setValue(newValue) + assertThat(awaitItem()).isEqualTo(newValue) + cancelAndIgnoreRemainingEvents() + } + + coVerifySequence { + fileCache.read() + fileCache.save(eq(newValue)) + } + } + + @OptIn(ExperimentalCoroutinesApi::class) + @Test + fun `GIVEN cache is writing values slowly WHEN new values are set in quick succession THEN all values are emitted on flow, and last value is saved to disk`() = + runTest { + val firstValue = "first new value" + val secondValue = "second new value" + val thirdValue = "third new value" + val fourthValue = "Fourth new value" + coEvery { fileCache.read() } returns null + coEvery { fileCache.save(any()) } coAnswers { + delay(1000) + } + cacheWrapper = CacheFlowWrapper(fileCache, testScope) + cacheWrapper.cacheValueFlow.test { + assertThat(awaitItem()).isEqualTo(null) + cacheWrapper.setValue(firstValue) + cacheWrapper.setValue(secondValue) + cacheWrapper.setValue(thirdValue) + cacheWrapper.setValue(fourthValue) + advanceTimeBy(2000) + assertThat(awaitItem()).isEqualTo(firstValue) + assertThat(awaitItem()).isEqualTo(secondValue) + assertThat(awaitItem()).isEqualTo(thirdValue) + assertThat(awaitItem()).isEqualTo(fourthValue) + cancelAndIgnoreRemainingEvents() + } + + coVerifySequence { + fileCache.read() + fileCache.save(eq(firstValue)) // start writing the first value + // second and third should be skipped since first isn't done writing yet + fileCache.save(eq(fourthValue)) // fourth and final value is written + } + } +} diff --git a/cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt b/cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt deleted file mode 100644 index a8503ed..0000000 --- a/cache/src/test/java/com/kroger/cache/internal/CacheFlowWrapperTest.kt +++ /dev/null @@ -1,111 +0,0 @@ -package com.kroger.cache.internal - -import app.cash.turbine.test -import com.google.common.truth.Truth.assertThat -import com.kroger.cache.SnapshotPersistentCache -import io.mockk.coEvery -import io.mockk.coVerifySequence -import io.mockk.just -import io.mockk.mockk -import io.mockk.runs -import kotlinx.coroutines.CoroutineName -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.delay -import kotlinx.coroutines.test.UnconfinedTestDispatcher -import kotlinx.coroutines.test.advanceTimeBy -import kotlinx.coroutines.test.runTest -import org.junit.jupiter.api.Test - -@OptIn(ExperimentalCoroutinesApi::class) -class CacheFlowWrapperTest { - private val testDispatcher = UnconfinedTestDispatcher() - val testScope = CoroutineScope(CoroutineName("CacheFlowWrapperTest") + testDispatcher) - val fileCache: SnapshotPersistentCache = mockk() - - lateinit var cacheWrapper: CacheFlowWrapper - - @OptIn(ExperimentalCoroutinesApi::class) - @Test - fun `GIVEN cache is still reading WHEN new value is set THEN set value will wait for read to finish`() = runTest { - val fileCacheValue = "File cache value" - val newValue = "new value" - coEvery { fileCache.read() } coAnswers { - delay(1000) - fileCacheValue - } - coEvery { fileCache.save(any()) } just runs - cacheWrapper = CacheFlowWrapper(fileCache, testScope) - cacheWrapper.cacheValueFlow.test { - assertThat(awaitItem()).isEqualTo(null) - cacheWrapper.setValue(newValue) - assertThat(awaitItem()).isEqualTo(fileCacheValue) - advanceTimeBy(1000) - assertThat(awaitItem()).isEqualTo(newValue) - cancelAndIgnoreRemainingEvents() - } - - coVerifySequence { - fileCache.read() - fileCache.save(eq(newValue)) - } - } - - @OptIn(ExperimentalCoroutinesApi::class) - @Test - fun `GIVEN cache is done reading WHEN new value is set THEN set value will happen immediately`() = runTest { - val fileCacheValue = "File cache value" - val newValue = "new value" - coEvery { fileCache.read() } coAnswers { - fileCacheValue - } - coEvery { fileCache.save(any()) } just runs - cacheWrapper = CacheFlowWrapper(fileCache, testScope) - cacheWrapper.cacheValueFlow.test { - assertThat(awaitItem()).isEqualTo(fileCacheValue) - advanceTimeBy(1000) - cacheWrapper.setValue(newValue) - assertThat(awaitItem()).isEqualTo(newValue) - cancelAndIgnoreRemainingEvents() - } - - coVerifySequence { - fileCache.read() - fileCache.save(eq(newValue)) - } - } - - @OptIn(ExperimentalCoroutinesApi::class) - @Test - fun `GIVEN cache is writing values slowly WHEN new values are set in quick succession THEN all values are emitted on flow, and last value is saved to disk`() = runTest { - val firstValue = "first new value" - val secondValue = "second new value" - val thirdValue = "third new value" - val fourthValue = "Fourth new value" - coEvery { fileCache.read() } returns null - coEvery { fileCache.save(any()) } coAnswers { - delay(1000) - } - cacheWrapper = CacheFlowWrapper(fileCache, testScope) - cacheWrapper.cacheValueFlow.test { - assertThat(awaitItem()).isEqualTo(null) - cacheWrapper.setValue(firstValue) - cacheWrapper.setValue(secondValue) - cacheWrapper.setValue(thirdValue) - cacheWrapper.setValue(fourthValue) - advanceTimeBy(2000) - assertThat(awaitItem()).isEqualTo(firstValue) - assertThat(awaitItem()).isEqualTo(secondValue) - assertThat(awaitItem()).isEqualTo(thirdValue) - assertThat(awaitItem()).isEqualTo(fourthValue) - cancelAndIgnoreRemainingEvents() - } - - coVerifySequence { - fileCache.read() - fileCache.save(eq(firstValue)) // start writing the first value - // second and third should be skipped since first isn't done writing yet - fileCache.save(eq(fourthValue)) // fourth and final value is written - } - } -} From 02194f0101e65495aae518831b731147890909cd Mon Sep 17 00:00:00 2001 From: Ethan Berg Date: Thu, 9 Apr 2026 15:30:42 -0700 Subject: [PATCH 3/3] chore: fix lint errors --- .../java/com/kroger/cache/CacheFlowWrapper.kt | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/cache/src/main/java/com/kroger/cache/CacheFlowWrapper.kt b/cache/src/main/java/com/kroger/cache/CacheFlowWrapper.kt index ed5e25d..c2bb9c6 100644 --- a/cache/src/main/java/com/kroger/cache/CacheFlowWrapper.kt +++ b/cache/src/main/java/com/kroger/cache/CacheFlowWrapper.kt @@ -24,10 +24,7 @@ package com.kroger.cache import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job -import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow @@ -51,29 +48,29 @@ public class CacheFlowWrapper( private val scope: CoroutineScope, ) { /** - * A reference to the coroutine job used for reading the first value from the [cache] and emitting it on [_cacheValueState] + * A reference to the coroutine job used for reading the first value from the [cache] and emitting it on [_cacheValueFlow] */ private val initializerJob: Job /** * The private mutable state flow for the current value */ - private val _cacheValueState = MutableStateFlow(null) + private val _cacheValueFlow = MutableStateFlow(null) /** * publicly exposed read-only flow on which to read and observe changes to the current value */ - public val cacheValueFlow: StateFlow = _cacheValueState.asStateFlow() + public val cacheValueFlow: StateFlow = _cacheValueFlow.asStateFlow() /** - * Initialization block reads the value the [cache] and emits it on [_cacheValueState] + * Initialization block reads the value the [cache] and emits it on [_cacheValueFlow] * * This job also updates the value in [cache] for each new value emitted on the flow * except for the first, which is read from [cache] */ init { initializerJob = scope.launch { - _cacheValueState.value = cache.read() + _cacheValueFlow.value = cache.read() cacheValueFlow .drop(1) @@ -95,6 +92,6 @@ public class CacheFlowWrapper( if (initializerJob.isActive) { initializerJob.join() } - _cacheValueState.emit(newValue) + _cacheValueFlow.emit(newValue) } }