у меня есть паблишер Flux элементов, которые постепенно попадают с помощью Sink Emittor'а (unicast эммитор), который постепенно запрашивает новые элементы.
Сам принцип реакивных стримов, как известно, опирается на идеи того, что конвеер работает до тех пор, пока не произойдёт ошибка.
Этой ошибкой может быть моргание сети, переполнение буффера и проч.служебные терминальные сигналы, но также и клиентские ошибки.
Чтобы их хэндлить, я добавил возможность переподписки с помощью хука Flux#onErrorResume. Выглядит это примерно так:
broadcast
.flatMap { publisher -> restartIfTerminated(publisher) } // Тут происходит основная логика переподписки - рекурсивный вызов в случаи ошибки
.flatMap { event -> processEvent(event) }
.subscribe()
fun restartIfTerminated(myPublisher: Mono<CustomPublisher>) =
myPublisher
.flatMapMany { it.prepareSubscription() }
.onErrorResume { e ->
if (e is NetworkError) {
restartIfTerminated(myPublisher)
} else {
Flux.error(e)
}
}
Важно отметить, что я привел достаточно абстрактный код, который скорее описывает проблему с точки зрения обработки ошибок.
Да, безусловно, подход с агрессивным ретраем не является до конца истиным, всегда нужно предусматривать delay, но сейчас не об этом.
Что делать, если я хочу НЕ реагировать на какие-то клиентские проблемы? Например, произошла бизнес ошибка - я игнорирую её и продолжаю процессить заэмиченные события.
Будет ли правильным вот такое решение?
onErrorResume { e ->
if (e is NetworkError) {
restartIfTerminated(myPublisher)
} else if (e is BusinessException) {
Flux.empty() // пустышка
} else {
Flux.error(e)
}
}
P.S:
Важно сказать, что обработчики ошибок очень чувствительные. Если у меня есть цепочка действий, то можно неявно перетереть первоначальную обработку, которая следует в downstream'е. Именно поэтому стараюсь обрабатывать ошибки в одном едином месте.
Обрабатываю Mono схожим образом. Встречный вопрос: onErroresume подписывается на subscribeOn(Schedulers.boundedElastic) вверху? Хочу блокирующий код вызывать в случае ошибки.
subscribeOn (если он один в цепочке) переводит всю цепочку на шедуллер. Блокирующий код в случае ошибки - это какой?
Обсуждают сегодня