то админ клиент зависнет при клоузе навеки даже не смотря на возвращенный колбак
def timed[F[_]: Concurrent: Timer](timeout: FiniteDuration): KafkaFuture ~> F =
new (KafkaFuture[?] ~> F[?]) {
override def apply[A](fa: KafkaFuture[A]): F[A] =
Concurrent.timeout(
Concurrent[F].async[A](k => {
fa.whenComplete((result: A, error: Throwable) =>
(result, error) match {
case (x, null) =>
println("SUCCESS CB")
k(Right(x))
case (null, err: ExecutionException) => k(Left(err.getCause))
case (null, err) => k(Left(err))
case (x, _) => k(Right(x)) //impossible
})
})
, timeout
)
}
Блин, скала
Обсуждают сегодня