spark-датафрейм в kafka-очередь – и завершаться.
Запись у меня вполне себе происходит (по крайней мере, kafka-console-consumer эти данные видит), но скрипт после этого не завершается.
Как мне всё это забороть?
И держите простейший пример, чтобы это воспроизвести.
Скрипт:
package part4integrations
import org.apache.spark.sql.SparkSession
import common._
object IntegratingKafkaDemo {
val spark = SparkSession.builder()
.appName("Integrating Kafka")
.master("local[2]")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
def writeToKafka() = {
val carsDF = spark.readStream
.schema(carsSchema)
.json("src/main/resources/data/cars")
val carsKafkaDF = carsDF.selectExpr("upper(Name) as key", "Name as value")
// В кафку пишет -- но после этого не выходит, как быть?!
carsKafkaDF.writeStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "rockthejvm")
.option("checkpointLocation", "checkpoints_demo")
.start().awaitTermination()
}
def main(args: Array[String]): Unit = {
writeToKafka()
}
}
И cars.json:
{"Name":"chevrolet chevelle malibu", "Miles_per_Gallon":18, "Cylinders":8, "Displacement":307, "Horsepower":130, "Weight_in_lbs":3504, "Acceleration":12, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"buick skylark 320", "Miles_per_Gallon":15, "Cylinders":8, "Displacement":350, "Horsepower":165, "Weight_in_lbs":3693, "Acceleration":11.5, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"plymouth satellite", "Miles_per_Gallon":18, "Cylinders":8, "Displacement":318, "Horsepower":150, "Weight_in_lbs":3436, "Acceleration":11, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"amc rebel sst", "Miles_per_Gallon":16, "Cylinders":8, "Displacement":304, "Horsepower":150, "Weight_in_lbs":3433, "Acceleration":12, "Year":"1970-01-01", "Origin":"USA"}
{"Name":"ford torino", "Miles_per_Gallon":17, "Cylinders":8, "Displacement":302, "Horsepower":140, "Weight_in_lbs":3449, "Acceleration":10.5, "Year":"1970-01-01", "Origin":"USA"}
Мне кажется тебе в Moscow spark лучше https://t.me/moscowspark
Обсуждают сегодня