I’ve been writing Kotlin code for nearly 20% of my professional programming life! I wrote my first scrap of Kotlin in roughly mid-2018, and transitioned to writing full-time production Kotlin in mid-2019. Ever since then, I’ve been on the Kotlin train 100%.
Despite all that Kotlin experience, the new Hatch Credit Android app (launching Q1-2021✨) is the first time I’ve used 100% coroutines in production code, so I still write some dumb code sometimes.

A great example: this week, I was adding some basic feature flagging and crash reporting to the app, using LaunchDarkly and Sentry. Given that I already had a User coming from my repository, I used a SharedFlow to multiplex so that Sentry and LaunchDarkly could each handle things in their own time:

class Analytics {
    init { // ⚠️ DO NOT USE THIS CODE ⚠️
        scope.launch() { 
            val userFlow = userRepo.observeUser(refresh = false)
                .map { /** filter+transform the User */ }
                .shareIn(this, SharingStarted.WhileSubscribed(), 1)

            userFlow
                .collect { /* consume User for Sentry setup */ }

            userFlow
                .collect { /** consume for LaunchDarkly setup */ }
        }
    }
}

Simple enough, right? I wrote this code, proceeded to write comprehensive tests eyeball it, and then committed the code to GitHub without a second thought. The next day, I start testing the app to cut a release, and I realize – hey, LaunchDarkly never gets set up!

I do all the standard debugging things – println, breakpoints (Android Studio + Coroutines is a nightmare, even on 7.0.0 Arctic Fox), to no avail. Sentry handles the Flow just fine, LaunchDarkly sees nothing.

I’ll spare y’all the hours of debugging I lost to my own stupid mistake. I will tell you that: coroutine scopes don’t like to be shared! Here’s how I fixed my issue:

class Analytics {
    init {
        val userFlow = userRepo.observeUser(refresh = false)
             .map { /** filter+transform the User */ }  
             .shareIn(scope, SharingStarted.Eagerly, 1)

        userFlow
            .onEach { /* setup sentry */ }
            .launchIn(scope)

        userFlow
            .onEach { /* setup LaunchDarkly */ }
            .launchIn(scope)
    }
}

(note: there’s another solution involving CoroutineScope.async, see below for that code)

What’s the difference?

Here’s what went wrong at first, and why this code works:

In case you’re not familiar, Flow<T>.launchIn(CoroutineScope) turns your launch into a kind of postfix operator instead of having to nesting a collect inside a launch block.

scope.launch { 
    flowOf(1, 2, 3).collect()
}
// ⬆️ these are equivalent ⬇️
flowOf(1, 2, 3).launchIn(scope)

In writing my original bit of code, I made a few mistakes:

  1. I assumed that launch+collect and launchIn were exactly equivalent – not true when your launch has multiple collect statements! This leads to (2):
  2. I forgot that Flow.collect is a suspending terminal operator, so for a never-ending stream (e.g. a Flow from your Repository), collect will never complete!

CoroutineScope.launch only starts one new coroutine, so in the following situation, only one Flow will start; the other Flow will patiently wait (forever!):

init {
    scope.launch { // ⚠️ BAD CODE DON'T DO THIS
        // ✅ doesn't emit, but runs fine
        val neverEnding = flow<Unit> { while(true) delay(100) }.collect { println("neverending!") } 

        // ❌ never runs because `neverEnding`
        val neverStarting = flowOf(1, 2, 3).collect { println("$it") }
    }
}

Instead, since coroutines are cheap, don’t worry and launch as many of them as you need:

init {
    flow { while(true) delay(100) }.launchIn(scope)  // ✅ runs fine
    flowOf(1, 2, 3).launchIn(scope) // ✅ also runs fine
}

This construction has the added benefit of making your code read nicer too – less indentation! 🎉

Bonus

Extensions!

In search of a less verbose Flow collection mechanism, I’ve built a small extension to help process Flow emissions in a given CoroutineScope:

fun <T> Flow<T>.launchIn(scope: CoroutineScope, collector: suspend (T) -> Unit): Job {
    return scope.launch { collect(collector) }
}

//Sample Usage:
val scope = ... //e.g. viewModelScope
flowOf(1, 2, 3)
    .launchIn(scope) { int -> println("$int") }

This combines the concision of launchIn with the standard collection handles you’d use in onEach or collect. Best of both worlds!

Async instead of Launch

There’s another way to solve the original problem, which Dan Lew pointed out – CoroutineScope.async!

init { 
    scope.launch { 
        val userFlow = userRepo.observeUser(refresh = false)
            .map { /** filter+transform the User */ }
            .shareIn(this, SharingStarted.WhileSubscribed(), 1)
        
        val sentry = async { userFlow.collect { /* consume User for Sentry setup */ } }         
        val launchDarkly = async { userFlow.collect { /* consume User for LaunchDarkly setup */ } }         
        sentry.await() + launchDarkly.await()
    }
}

References | Extra Reading | Sources

My deep appreciation goes out to:

Also: