170 похожих чатов

Возник вопрос касаемо обработки ошибок в бесконечном реактивном стриме. Допустим,

у меня есть паблишер Flux элементов, которые постепенно попадают с помощью Sink Emittor'а (unicast эммитор), который постепенно запрашивает новые элементы.

Сам принцип реакивных стримов, как известно, опирается на идеи того, что конвеер работает до тех пор, пока не произойдёт ошибка.
Этой ошибкой может быть моргание сети, переполнение буффера и проч.служебные терминальные сигналы, но также и клиентские ошибки.

Чтобы их хэндлить, я добавил возможность переподписки с помощью хука Flux#onErrorResume. Выглядит это примерно так:


broadcast
.flatMap { publisher -> restartIfTerminated(publisher) } // Тут происходит основная логика переподписки - рекурсивный вызов в случаи ошибки
.flatMap { event -> processEvent(event) }
.subscribe()


fun restartIfTerminated(myPublisher: Mono<CustomPublisher>) =
myPublisher
.flatMapMany { it.prepareSubscription() }
.onErrorResume { e ->
if (e is NetworkError) {
restartIfTerminated(myPublisher)
} else {
Flux.error(e)
}
}


Важно отметить, что я привел достаточно абстрактный код, который скорее описывает проблему с точки зрения обработки ошибок.
Да, безусловно, подход с агрессивным ретраем не является до конца истиным, всегда нужно предусматривать delay, но сейчас не об этом.

Что делать, если я хочу НЕ реагировать на какие-то клиентские проблемы? Например, произошла бизнес ошибка - я игнорирую её и продолжаю процессить заэмиченные события.
Будет ли правильным вот такое решение?


onErrorResume { e ->
if (e is NetworkError) {
restartIfTerminated(myPublisher)
} else if (e is BusinessException) {
Flux.empty() // пустышка
} else {
Flux.error(e)
}
}


P.S:
Важно сказать, что обработчики ошибок очень чувствительные. Если у меня есть цепочка действий, то можно неявно перетереть первоначальную обработку, которая следует в downstream'е. Именно поэтому стараюсь обрабатывать ошибки в одном едином месте.

2 ответов

23 просмотра

Обрабатываю Mono схожим образом. Встречный вопрос: onErroresume подписывается на subscribeOn(Schedulers.boundedElastic) вверху? Хочу блокирующий код вызывать в случае ошибки.

Aleksandr- Автор вопроса
Alexei
Обрабатываю Mono схожим образом. Встречный вопрос:...

subscribeOn (если он один в цепочке) переводит всю цепочку на шедуллер. Блокирующий код в случае ошибки - это какой?

Похожие вопросы

Обсуждают сегодня

Господа, а что сейчас вообще с рынком труда на делфи происходит? Какова ситуация?
Rꙮman Yankꙮvsky
29
А вообще, что может смущать в самой Julia - бы сказал, что нет единого стандартного подхода по многим моментам, поэтому многое выглядит как "хаки" и произвол. Короче говоря, с...
Viktor G.
2
30500 за редактор? )
Владимир
47
а через ESC-код ?
Alexey Kulakov
29
Чёт не понял, я ж правильной функцией воспользовался чтобы вывести отладочную информацию? но что-то она не ловится
notme
18
У меня есть функция где происходит это: write_bit(buffer, 1); write_bit(buffer, 0); write_bit(buffer, 1); write_bit(buffer, 1); write_bit(buffer, 1); w...
~
14
Добрый день! Скажите пожалуйста, а какие программы вы бы рекомендовали написать для того, чтобы научиться управлять памятью? Можно написать динамический массив, можно связный ...
Филипп
7
Недавно Google Project Zero нашёл багу в SQLite с помощью LLM, о чём достаточно было шумно в определённых интернетах, которые сопровождались рассказами, что скоро всех "ибешни...
Alex Sherbakov
5
Ребят в СИ можно реализовать ООП?
Николай
33
https://github.com/erlang/otp/blob/OTP-27.1/lib/kernel/src/logger_h_common.erl#L174 https://github.com/erlang/otp/blob/OTP-27.1/lib/kernel/src/logger_olp.erl#L76 15 лет назад...
Maksim Lapshin
20
Карта сайта