нужно данные, которые он прочитал и собирается отдать сохранить в другой таблице. Для этого создаю Channel, в которого делаю send() в onEach { } того flow, который из бд. Дальше пишу в бд реактивно channel.receiveAsFlow().asFlux(). Как сделать так, что бы в случае если запись в бд свалилась по любым причинам flow, который читает так же развалился бы и grpc-streaming закончился?
Во-первых, если не надо явно делать промежуточный канал, не делайте, просто мапьте один flow в другой. Тогда ваша проблема будет решена автоматически. Если нет, то вам надо повесить реакцию на закрытие вашего приемника и закрыть скоуп, в котором получаются события в это реакции.
Обсуждают сегодня