170 похожих чатов

Возник вопрос касаемо обработки ошибок в бесконечном реактивном стриме. Допустим,

у меня есть паблишер Flux элементов, которые постепенно попадают с помощью Sink Emittor'а (unicast эммитор), который постепенно запрашивает новые элементы.

Сам принцип реакивных стримов, как известно, опирается на идеи того, что конвеер работает до тех пор, пока не произойдёт ошибка.
Этой ошибкой может быть моргание сети, переполнение буффера и проч.служебные терминальные сигналы, но также и клиентские ошибки.

Чтобы их хэндлить, я добавил возможность переподписки с помощью хука Flux#onErrorResume. Выглядит это примерно так:


broadcast
.flatMap { publisher -> restartIfTerminated(publisher) } // Тут происходит основная логика переподписки - рекурсивный вызов в случаи ошибки
.flatMap { event -> processEvent(event) }
.subscribe()


fun restartIfTerminated(myPublisher: Mono<CustomPublisher>) =
myPublisher
.flatMapMany { it.prepareSubscription() }
.onErrorResume { e ->
if (e is NetworkError) {
restartIfTerminated(myPublisher)
} else {
Flux.error(e)
}
}


Важно отметить, что я привел достаточно абстрактный код, который скорее описывает проблему с точки зрения обработки ошибок.
Да, безусловно, подход с агрессивным ретраем не является до конца истиным, всегда нужно предусматривать delay, но сейчас не об этом.

Что делать, если я хочу НЕ реагировать на какие-то клиентские проблемы? Например, произошла бизнес ошибка - я игнорирую её и продолжаю процессить заэмиченные события.
Будет ли правильным вот такое решение?


onErrorResume { e ->
if (e is NetworkError) {
restartIfTerminated(myPublisher)
} else if (e is BusinessException) {
Flux.empty() // пустышка
} else {
Flux.error(e)
}
}


P.S:
Важно сказать, что обработчики ошибок очень чувствительные. Если у меня есть цепочка действий, то можно неявно перетереть первоначальную обработку, которая следует в downstream'е. Именно поэтому стараюсь обрабатывать ошибки в одном едином месте.

2 ответов

20 просмотров

Обрабатываю Mono схожим образом. Встречный вопрос: onErroresume подписывается на subscribeOn(Schedulers.boundedElastic) вверху? Хочу блокирующий код вызывать в случае ошибки.

Aleksandr- Автор вопроса
Alexei
Обрабатываю Mono схожим образом. Встречный вопрос:...

subscribeOn (если он один в цепочке) переводит всю цепочку на шедуллер. Блокирующий код в случае ошибки - это какой?

Похожие вопросы

Обсуждают сегодня

Всем привет! Имеется функция: function IsValidChar(ch: UTF8Char): Boolean; var i: Integer; ValidChars: AnsiString; begin ValidChars := 'abcdefghijklmnopqrstuvwxyzABCDE...
Евгений
44
#include <stdio.h> #include <stdlib.h> #include <time.h> void mass_first_generate(int mass[5][7]) {     for (int N = 0; N < 5; N++) {         for (int A = 0; A < 7; A++) {   ...
Чувак
6
Всем привет! Решаю 99 OCaml Problems и столкнулся со следующей проблемой (прошу палками не забивать, я OCaml практически не трогал до этого момента): open OUnit2 let create_...
К|/|pи/\/\ 6е3yглbIи
2
https://www.linkedin.com/posts/ugama-benedicta-kelechi-codergirl-103041300_mobiledevelopment-fluttertraining-handsonlearning-activity-7263445699227254784-IdHB?utm_source=share...
CoderGirl
16
возможно ли как-то передать в электрон или таури медиа поток с рендера 2д движка? двиг запускается как dll, а дальше надо как-то отправлять рендер кодировать не подходит, зр...
Kyle Nekto
7
Ну вот просто даже давайте вот как. Какой нибудь конкретный кейс, можете в пример привести, где бч работает и приносит прикладную пользу, а не просто что бы было? Не крипту.
Alexander Andreev
22
Помогите пожалуйста. Делаю систему плагинов. Проблема сейчас в такая: плагины загружаются в основном потоке. FLibHandle := SafeLoadLibrary(FFileName) Но нужно еще выполнить фу...
Илья 🤣
10
Точно, оно. У тебя там имена потоков выставляются?
Александр (Rouse_) Багель
9
объясните пожалуйста, почему функция не работает должным образом? вроде должно брать активное окно сравнивать его размер с размером экрана, и если есть совпадение = true прове...
JF
12
лучше скажите, причём тут паскаль?
Alexey Kulakov
36
Карта сайта