Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@

package mozilla.telemetry.glean

import android.util.Log
import androidx.annotation.VisibleForTesting
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedSendChannelException
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.runBlocking
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean

internal object Dispatchers {
class WaitableCoroutineScope(
Expand Down Expand Up @@ -50,33 +51,70 @@ internal object Dispatchers {
}
}

class DelayedTaskQueue {
// When true, jobs will be queued and not ran until triggered by calling
// flushQueuedInitialTasks()
private var queueInitialTasks = AtomicBoolean(true)
class DelayedTaskQueue(
private val coroutineScope: CoroutineScope,
) {
internal val channel = Channel<Int>()

// When true, jobs will be run synchronously
internal var testingMode = false

// Use a [ConcurrentLinkedQueue] to take advantage of its thread safety and no locking
internal val taskQueue: ConcurrentLinkedQueue<() -> Unit> = ConcurrentLinkedQueue()
init {
// We put a first task on it that waits to receive something.
// We close that channel in `flushQueuedInitialTasks` which will unblock this task.
//
// Receiving/Closing the channel is the signal the task is unblocked
@Suppress("SwallowedException")
this.executeTask {
try {
runBlocking { channel.receive() }
} catch (e: ClosedSendChannelException) {
// intentionally left empty.
// The channel is closed by `flushQueuedInitialTasks`
}
}
}

/**
* Enable testing mode, which makes all of the Glean SDK public API
* synchronous.
*
* @param enabled whether or not to enable the testing mode
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
fun setTestingMode(enabled: Boolean) {
testingMode = enabled
}

/**
* Launch a block of work synchronously or queue it if not yet unblocked.
* Launch a block of work asynchronously.
*
* If [queueInitialTasks] true
* then the work will be queued and executed when [flushQueuedInitialTasks] is called.
* If [queueInitialTasks] is false the block is executed synchronously.
* If the queue is still blocked, this will run at a later point.
*/
fun launch(block: () -> Unit) {
val queueTasks = synchronized(this) {
queueInitialTasks.get()
coroutineScope.launch {
block()
}

if (queueTasks) {
addTaskToQueue(block)
} else {
block()
if (this.testingMode) {
this.launchBlocking { }
}
}

/**
* Launch a block of work, wait and return its result
*/
fun <T> launchBlocking(block: () -> T): T {
val channel = Channel<T>()
coroutineScope.launch {
runBlocking {
channel.send(block())
}
}

return runBlocking { channel.receive() }
}

/**
* Stops queueing tasks and processes any tasks in the queue.
*
Expand All @@ -85,29 +123,17 @@ internal object Dispatchers {
* Since [queueInitialTasks] is set to false prior to processing the queue,
* newly launched tasks should be executed immediately rather than added to the queue.
*/
internal fun flushQueuedInitialTasks() {
val dispatcherObject = this

val queueCopy: ConcurrentLinkedQueue<() -> Unit> = ConcurrentLinkedQueue()
synchronized(dispatcherObject) {
queueCopy.addAll(taskQueue)
taskQueue.clear()

queueCopy.forEach { task ->
task()
@OptIn(kotlinx.coroutines.DelicateCoroutinesApi::class)
fun flushQueuedInitialTasks() {
if (!this.channel.isClosedForSend) {
runBlocking {
this@DelayedTaskQueue.channel.send(1)
}

queueInitialTasks.set(false)
this.channel.close()
}
}

/**
* Helper function to add task to queue.
*/
@Synchronized
private fun addTaskToQueue(block: () -> Unit) {
taskQueue.add(block)
}
internal fun executeTask(block: suspend CoroutineScope.() -> Unit): Job? = coroutineScope.launch(block = block)
}

// This job is used to make sure the API `CoroutineContext` does not cancel
Expand All @@ -128,5 +154,9 @@ internal object Dispatchers {

@OptIn(kotlinx.coroutines.DelicateCoroutinesApi::class, kotlinx.coroutines.ExperimentalCoroutinesApi::class)
@Suppress("ktlint:standard:property-naming")
var Delayed = DelayedTaskQueue()
var Delayed = DelayedTaskQueue(
CoroutineScope(
newSingleThreadContext("GleanMetricPool") + supervisorJob,
),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import android.util.Log
import androidx.annotation.VisibleForTesting
import androidx.lifecycle.ProcessLifecycleOwner
import kotlinx.coroutines.MainScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import mozilla.telemetry.glean.Dispatchers
import mozilla.telemetry.glean.GleanMetrics.GleanValidation
import mozilla.telemetry.glean.config.Configuration
import mozilla.telemetry.glean.internal.*
Expand Down Expand Up @@ -375,7 +378,8 @@ open class GleanInternalAPI internal constructor() {
* @return true if the experiment is active and reported in pings, otherwise false
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
fun testIsExperimentActive(experimentId: String): Boolean = gleanTestGetExperimentData(experimentId) != null
fun testIsExperimentActive(experimentId: String): Boolean =
Dispatchers.Delayed.launchBlocking { gleanTestGetExperimentData(experimentId) } != null

/**
* Returns the stored data for the requested active experiment, for testing purposes only.
Expand Down Expand Up @@ -467,7 +471,9 @@ open class GleanInternalAPI internal constructor() {
// Note that this is sending the length of the last foreground session
// because it belongs to the baseline ping and that ping is sent every
// time the app goes to background.
gleanHandleClientActive()
Dispatchers.Delayed.launchBlocking {
gleanHandleClientActive()
}

GleanValidation.foregroundCount.add(1)
}
Expand All @@ -477,9 +483,11 @@ open class GleanInternalAPI internal constructor() {
*/
internal fun handleBackgroundEvent() {
// Persist data on backgrounding the app
persistPingLifetimeData()
Dispatchers.Delayed.launchBlocking {
persistPingLifetimeData()

gleanHandleClientInactive()
gleanHandleClientInactive()
}
}

/**
Expand All @@ -503,7 +511,9 @@ open class GleanInternalAPI internal constructor() {
pingName: String,
reason: String? = null,
) {
gleanSubmitPingByName(pingName, reason)
Dispatchers.Delayed.launchBlocking {
gleanSubmitPingByName(pingName, reason)
}
}

/** Gets a `Set` of the currently registered ping names.
Expand Down Expand Up @@ -606,6 +616,7 @@ open class GleanInternalAPI internal constructor() {
this.testingMode = enabled
gleanSetTestMode(enabled)
Dispatchers.API.setTestingMode(enabled)
Dispatchers.Delayed.setTestingMode(enabled)
}

@VisibleForTesting(otherwise = VisibleForTesting.NONE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class BooleanMetricType {
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
@JvmOverloads
fun testGetValue(pingName: String? = null) = inner.testGetValue(pingName)
fun testGetValue(pingName: String? = null) =
Dispatchers.Delayed.launchBlocking { this.inner.testGetValue(pingName) }

/**
* Returns the number of errors recorded for the given metric.
Expand All @@ -62,5 +63,6 @@ class BooleanMetricType {
* @return the number of errors recorded for the metric.
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
fun testGetNumRecordedErrors(errorType: ErrorType) = inner.testGetNumRecordedErrors(errorType)
fun testGetNumRecordedErrors(errorType: ErrorType) =
Dispatchers.Delayed.launchBlocking { inner.testGetNumRecordedErrors(errorType) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,16 @@ class CounterMetricType {
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
@JvmOverloads
fun testGetValue(pingName: String? = null) = inner.testGetValue(pingName)
fun testGetValue(pingName: String? = null) =
Dispatchers.Delayed.launchBlocking { this.inner.testGetValue(pingName) }

/**
* Returns the number of errors recorded for the given metric.
*
* @param errorType The type of error to get the error count of
* @param errorType The type of the error recorded.
* @return the number of errors recorded for the metric.
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
fun testGetNumRecordedErrors(errorType: ErrorType) = inner.testGetNumRecordedErrors(errorType)
fun testGetNumRecordedErrors(errorType: ErrorType) =
Dispatchers.Delayed.launchBlocking { inner.testGetNumRecordedErrors(errorType) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class CustomDistributionMetricType(
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
@JvmOverloads
fun testGetValue(pingName: String? = null) = inner.testGetValue(pingName)
fun testGetValue(pingName: String? = null) =
Dispatchers.Delayed.launchBlocking { this.inner.testGetValue(pingName) }

/**
* Returns the number of errors recorded for the given metric.
Expand All @@ -78,5 +79,6 @@ class CustomDistributionMetricType(
* @return the number of errors recorded for the metric.
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
fun testGetNumRecordedErrors(error: ErrorType) = inner.testGetNumRecordedErrors(error)
fun testGetNumRecordedErrors(errorType: ErrorType) =
Dispatchers.Delayed.launchBlocking { inner.testGetNumRecordedErrors(errorType) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ class DenominatorMetricType(
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
@JvmOverloads
fun testGetValue(pingName: String? = null) = inner.testGetValue(pingName)
fun testGetValue(pingName: String? = null) =
Dispatchers.Delayed.launchBlocking { this.inner.testGetValue(pingName) }

/**
* Returns the number of errors recorded for the given metric.
Expand All @@ -56,5 +57,6 @@ class DenominatorMetricType(
* @return the number of errors recorded for the metric.
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
fun testGetNumRecordedErrors(errorType: ErrorType) = inner.testGetNumRecordedErrors(errorType)
fun testGetNumRecordedErrors(errorType: ErrorType) =
Dispatchers.Delayed.launchBlocking { inner.testGetNumRecordedErrors(errorType) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class EventMetricType<ExtraObject> constructor(
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
@JvmOverloads
fun testGetValue(pingName: String? = null): List<RecordedEvent>? = inner.testGetValue(pingName)
fun testGetValue(pingName: String? = null) =
Dispatchers.Delayed.launchBlocking { this.inner.testGetValue(pingName) }

/**
* Returns the number of errors recorded for the given metric.
Expand All @@ -83,5 +84,6 @@ class EventMetricType<ExtraObject> constructor(
* @return the number of errors recorded for the metric.
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
fun testGetNumRecordedErrors(errorType: ErrorType) = inner.testGetNumRecordedErrors(errorType)
fun testGetNumRecordedErrors(errorType: ErrorType) =
Dispatchers.Delayed.launchBlocking { inner.testGetNumRecordedErrors(errorType) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class MemoryDistributionMetricType(
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
@JvmOverloads
fun testGetValue(pingName: String? = null) = inner.testGetValue(pingName)
fun testGetValue(pingName: String? = null) =
Dispatchers.Delayed.launchBlocking { this.inner.testGetValue(pingName) }

/**
* Returns the number of errors recorded for the given metric.
Expand All @@ -63,5 +64,6 @@ class MemoryDistributionMetricType(
* @return the number of errors recorded for the metric.
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
fun testGetNumRecordedErrors(error: ErrorType) = inner.testGetNumRecordedErrors(error)
fun testGetNumRecordedErrors(errorType: ErrorType) =
Dispatchers.Delayed.launchBlocking { inner.testGetNumRecordedErrors(errorType) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ class NumeratorMetricType(
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
@JvmOverloads
fun testGetValue(pingName: String? = null) = inner.testGetValue(pingName)
fun testGetValue(pingName: String? = null) =
Dispatchers.Delayed.launchBlocking { this.inner.testGetValue(pingName) }

/**
* Returns the number of errors recorded for the given metric.
Expand All @@ -54,5 +55,6 @@ class NumeratorMetricType(
* @return the number of errors recorded for the metric.
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
fun testGetNumRecordedErrors(errorType: ErrorType) = inner.testGetNumRecordedErrors(errorType)
fun testGetNumRecordedErrors(errorType: ErrorType) =
Dispatchers.Delayed.launchBlocking { inner.testGetNumRecordedErrors(errorType) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ class ObjectMetricType<K> constructor(
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
@JvmOverloads
fun testGetValue(pingName: String? = null): JsonElement? {
return inner.testGetValue(pingName)?.let {
return Json.decodeFromString(it)
fun testGetValue(pingName: String? = null): JsonElement? =
Dispatchers.Delayed.launchBlocking {
inner.testGetValue(pingName)?.let {
Json.decodeFromString(it)
}
}
}

/**
* Returns the number of errors recorded for the given metric.
Expand All @@ -74,5 +75,6 @@ class ObjectMetricType<K> constructor(
* @return the number of errors recorded for the metric.
*/
@VisibleForTesting(otherwise = VisibleForTesting.NONE)
fun testGetNumRecordedErrors(errorType: ErrorType) = inner.testGetNumRecordedErrors(errorType)
fun testGetNumRecordedErrors(errorType: ErrorType) =
Dispatchers.Delayed.launchBlocking { inner.testGetNumRecordedErrors(errorType) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,14 @@ class PingType<ReasonCodesEnum>(
*/
@JvmOverloads
fun submit(reason: ReasonCodesEnum? = null) {
Dispatchers.Delayed.launch {
this.testCallback?.let {
it(reason)
}
this.testCallback = null
this.testCallback?.let {
// If there's a callback, we're in tests and can block to wait for other recordings.
// The callback needs to be called on _this_ thread, as it may put things on the dispatcher.
it(reason)
}
this.testCallback = null

Dispatchers.Delayed.launch {
val reasonString = reason?.let { this.reasonCodes[it.code()] }
this.innerPing.submit(reasonString)
}
Expand Down
Loading