то админ клиент зависнет при клоузе навеки даже не смотря на возвращенный колбак
                  
                  
                  def timed[F[_]: Concurrent: Timer](timeout: FiniteDuration): KafkaFuture ~> F =
                  
                  
                      new (KafkaFuture[?] ~> F[?]) {
                  
                  
                        override def apply[A](fa: KafkaFuture[A]): F[A] =
                  
                  
                          Concurrent.timeout(
                  
                  
                              Concurrent[F].async[A](k => {
                  
                  
                              fa.whenComplete((result: A, error: Throwable) =>
                  
                  
                                (result, error) match {
                  
                  
                                  case (x, null)                       =>
                  
                  
                                    println("SUCCESS CB")
                  
                  
                                    k(Right(x))
                  
                  
                                  case (null, err: ExecutionException) => k(Left(err.getCause))
                  
                  
                                  case (null, err)                     => k(Left(err))
                  
                  
                                  case (x, _)                          => k(Right(x)) //impossible
                  
                  
                              })
                  
                  
                            })
                  
                  
                            , timeout
                  
                  
                          )
                  
                  
                      }
                  
                  
                
Блин, скала
Обсуждают сегодня