flatMapLatest и семейство *Latest: отменяй устаревшее, показывай актуальное

В предыдущей статье мы научились фильтровать входной поток: убрали дребезг, дубликаты, пустые запросы. Но мы не затронули другую проблему: что происходит, если обработка одного значения занимает больше времени, чем интервал между двумя значениями?

Представьте: пользователь ищет рецепты. Набрал «пирог» приложение отправило запрос. Пока сервер думает, пользователь передумал и исправил на «пирожки». Приложение отправило второй запрос. Сервер отвечает на первый запрос и на экране появляются рецепты пирогов, хотя пользователь уже ищет пирожки.

Оператор map честно обрабатывает каждое значение до конца, даже если оно давно устарело. Чтобы отменять устаревшую работу, нужно семейство операторов с суффиксом Latest.

mapLatest: не доделывай, если пришёл новый заказ

Заменяем map на mapLatest и проблема с пирогами решена:

val results: StateFlow<List<Recipe>> = query
    .debounce(300)
    .distinctUntilChanged()
    .mapLatest { term ->
        searchRecipes(term) // долгая операция
    }
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())

Поведение mapLatest укладывается в армейскую мудрость: «Не спеши выполнять приказ его могут отменить». Когда в поток приходит новое значение, mapLatest отменяет корутину, обрабатывающую старое, и запускает новую. Пользователь видит только результат последнего запроса.

Под капотом mapLatest использует тот же механизм, что и обычная отмена корутин. Функция searchRecipes() это suspend-функция. В момент отмены она прерывается на ближайшей точке приостановки (вызов delay(), сетевой запрос, обращение к базе данных).

transformLatest: несколько сигналов за один запрос

У mapLatest есть ограничение: он возвращает ровно одно значение на каждый входной элемент. Но что, если нужно сначала показать индикатор загрузки, а потом результат? Это две эмиссии на один входной элемент.

Для таких случаев существует transformLatest. Внутри его лямбды доступна функция emit(), и вызывать её можно сколько угодно раз:

query
    .debounce(300)
    .distinctUntilChanged()
    .transformLatest { term ->
        emit(SearchState.Loading)           // первая эмиссия: загрузка
        val results = searchRecipes(term)
        emit(SearchState.Success(results))  // вторая эмиссия: результат
    }
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), SearchState.Loading)

Как только пользователь набирает новый символ, transformLatest отменяет предыдущую цепочку эмиссий и начинает новую. Если запрос ещё не успел завершиться не беда: корутина отменена, SearchState.Success от старого запроса никогда не будет выпущен.

Закодируем состояния с помощью sealed-класса:

sealed class SearchState {
    data object Loading : SearchState()
    data class Success(val items: List<Recipe>) : SearchState()
    data class Error(val message: String) : SearchState()
}

В UI обрабатываем каждое состояние через when:

@Composable
fun SearchResults(state: SearchState) {
    when (state) {
        is SearchState.Loading -> CircularProgressIndicator()
        is SearchState.Success -> RecipeList(state.items)
        is SearchState.Error -> Text("Ошибка: ${state.message}")
    }
}

flatMapLatest: когда результат тоже поток

Два предыдущих оператора превращают входное значение в результат (mapLatest) или в последовательность результатов (transformLatest). Но иногда источник данных сам возвращает Flow.

Типичный пример Room. Запрос к базе данных возвращает Flow, который автоматически обновляется при изменении таблицы. Если пользователь меняет категорию блюд, нам нужно отписаться от старого потока и подписаться на новый:

// Репозиторий
class RecipeRepository(private val dao: RecipeDao) {
    fun getByCategory(category: String): Flow<List<Recipe>> =
        dao.findByCategory(category) // Room возвращает Flow
}

Во ViewModel:

val recipes: StateFlow<List<Recipe>> = selectedCategory
    .debounce(300)
    .distinctUntilChanged()
    .flatMapLatest { category ->
        repository.getByCategory(category)
    }
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())

Оператор flatMapLatest делает три вещи за один вызов: превращает каждое значение category в поток (map), автоматически «сплющивает» вложенный поток то есть проталкивает значения из внутреннего Flow наружу (flatten), и при появлении нового category отменяет подписку на предыдущий поток (latest).

Без flatMapLatest пришлось бы писать map { repository.getByCategory(it) }, получить Flow<Flow<List<Recipe>>> поток потоков и вручную разбираться с подпиской и отпиской. Выглядело бы это примерно так:

.transformLatest { category ->
    repository.getByCategory(category).collect { recipes ->
        emit(recipes)
    }
}

По сути flatMapLatest это сокращённая запись этого паттерна.

Шпаргалка

Все три оператора решают одну задачу отменять устаревшую работу. Разница в типе результата:

ОператорВход → ВыходДля чего
mapLatestT → RОдно вычисление на элемент: сетевой запрос, форматирование
transformLatestT → emit(R...)Несколько эмиссий: загрузка → результат, пошаговая обработка
flatMapLatestT → Flow<R>Источник данных возвращает поток: Room, WebSocket, поток из репозитория

Если не уверены, какой оператор выбрать, начните с flatMapLatest. Он самый гибкий: работает и с обычными значениями (оберните в flowOf()), и с потоками. А mapLatest и transformLatest используйте, когда точно знаете, что источник данных не поток.


Комбинация debounce + distinctUntilChanged + flatMapLatest стандартный рецепт для поисковой строки в Android. Три оператора, покрывающие 90% случаев реактивного поиска. Остальные 10% это уже нюансы вроде backpressure и буферизации, которые выходят за рамки типичного UI-сценария.