4 minutes
Don’t Worry, Just Launch
I’ve been writing Kotlin code for nearly 20% of my professional programming life! I wrote my first scrap of Kotlin in roughly mid-2018, and transitioned to writing full-time production Kotlin in mid-2019. Ever since then, I’ve been on the Kotlin train 100%.
Despite all that Kotlin experience, the new Hatch Credit Android app (launching Q1-2021✨) is the first time I’ve used 100% coroutines in production code, so I still write some dumb code sometimes.
A great example: this week, I was adding some basic feature flagging and crash reporting to the app, using LaunchDarkly and Sentry. Given that I already had a User coming from my repository, I used a SharedFlow
to multiplex so that Sentry and LaunchDarkly could each handle things in their own time:
class Analytics {
init { // ⚠️ DO NOT USE THIS CODE ⚠️
scope.launch() {
val userFlow = userRepo.observeUser(refresh = false)
.map { /** filter+transform the User */ }
.shareIn(this, SharingStarted.WhileSubscribed(), 1)
userFlow
.collect { /* consume User for Sentry setup */ }
userFlow
.collect { /** consume for LaunchDarkly setup */ }
}
}
}
Simple enough, right? I wrote this code, proceeded to write comprehensive tests eyeball it, and then committed the code to GitHub without a second thought. The next day, I start testing the app to cut a release, and I realize – hey, LaunchDarkly never gets set up!
I do all the standard debugging things – println
, breakpoints (Android Studio + Coroutines is a nightmare, even on 7.0.0 Arctic Fox
), to no avail. Sentry handles the Flow
just fine, LaunchDarkly sees nothing.
I’ll spare y’all the hours of debugging I lost to my own stupid mistake. I will tell you that: coroutine scopes don’t like to be shared! Here’s how I fixed my issue:
class Analytics {
init {
val userFlow = userRepo.observeUser(refresh = false)
.map { /** filter+transform the User */ }
.shareIn(scope, SharingStarted.Eagerly, 1)
userFlow
.onEach { /* setup sentry */ }
.launchIn(scope)
userFlow
.onEach { /* setup LaunchDarkly */ }
.launchIn(scope)
}
}
(note: there’s another solution involving CoroutineScope.async
, see below for that code)
What’s the difference?
Here’s what went wrong at first, and why this code works:
In case you’re not familiar, Flow<T>.launchIn(CoroutineScope)
turns your launch
into a kind of postfix operator instead of having to nesting a collect
inside a launch
block.
scope.launch {
flowOf(1, 2, 3).collect()
}
// ⬆️ these are equivalent ⬇️
flowOf(1, 2, 3).launchIn(scope)
In writing my original bit of code, I made a few mistakes:
- I assumed that
launch+collect
andlaunchIn
were exactly equivalent – not true when yourlaunch
has multiplecollect
statements! This leads to (2): - I forgot that
Flow.collect
is a suspending terminal operator, so for a never-ending stream (e.g. a Flow from your Repository),collect
will never complete!
CoroutineScope.launch
only starts one new coroutine, so in the following situation, only one Flow will start; the other Flow will patiently wait (forever!):
init {
scope.launch { // ⚠️ BAD CODE DON'T DO THIS
// ✅ doesn't emit, but runs fine
val neverEnding = flow<Unit> { while(true) delay(100) }.collect { println("neverending!") }
// ❌ never runs because `neverEnding`
val neverStarting = flowOf(1, 2, 3).collect { println("$it") }
}
}
Instead, since coroutines are cheap, don’t worry and launch
as many of them as you need:
init {
flow { while(true) delay(100) }.launchIn(scope) // ✅ runs fine
flowOf(1, 2, 3).launchIn(scope) // ✅ also runs fine
}
This construction has the added benefit of making your code read nicer too – less indentation! 🎉
Bonus
Extensions!
In search of a less verbose Flow collection mechanism, I’ve built a small extension to help process Flow emissions in a given CoroutineScope
:
fun <T> Flow<T>.launchIn(scope: CoroutineScope, collector: suspend (T) -> Unit): Job {
return scope.launch { collect(collector) }
}
//Sample Usage:
val scope = ... //e.g. viewModelScope
flowOf(1, 2, 3)
.launchIn(scope) { int -> println("$int") }
This combines the concision of launchIn
with the standard collection handles you’d use in onEach
or collect
. Best of both worlds!
Async instead of Launch
There’s another way to solve the original problem, which Dan Lew pointed out – CoroutineScope.async
!
init {
scope.launch {
val userFlow = userRepo.observeUser(refresh = false)
.map { /** filter+transform the User */ }
.shareIn(this, SharingStarted.WhileSubscribed(), 1)
val sentry = async { userFlow.collect { /* consume User for Sentry setup */ } }
val launchDarkly = async { userFlow.collect { /* consume User for LaunchDarkly setup */ } }
sentry.await() + launchDarkly.await()
}
}
References | Extra Reading | Sources
My deep appreciation goes out to:
- Ian Lake and Mike Nakhimovich for very patiently helping me debug issues that didn’t pertain to them or their libraries in the slightest 😁
- Dariusz Seweryn, Dan Lew, and Geoffrey Metais for pointing out that
collect
is a suspending operator and providing feedback on the code snippets 🙏🏾 - Kiran Rao and Akshay Chordiya for some copy-editing ✍🏽
Also:
- Kotlin Docs - Launching Flow: https://kotlinlang.org/docs/reference/coroutines/flow.html#launching-flow
- Kotlin Docs - Async: https://kotlinlang.org/docs/reference/coroutines/composing-suspending-functions.html#structured-concurrency-with-async