Apache Kafka - интеграция со Spark
В этой главе мы обсудим, как интегрировать Apache Kafka с Spark Streaming API.
О Spark
Spark Streaming API обеспечивает масштабируемую, высокопроизводительную и отказоустойчивую потоковую обработку потоков данных в реальном времени. Данные могут быть получены из многих источников, таких как Kafka, Flume, Twitter и т. Д., И могут обрабатываться с использованием сложных алгоритмов, таких как высокоуровневые функции, такие как map, reduce, join и window. Наконец, обработанные данные могут быть отправлены в файловые системы, базы данных и живые информационные панели. Устойчивые распределенные наборы данных (RDD) - это фундаментальная структура данных Spark. Это неизменяемая распределенная коллекция объектов. Каждый набор данных в RDD разделен на логические разделы, которые могут быть вычислены на разных узлах кластера.
Интеграция со Spark
Kafka - это потенциальная платформа для обмена сообщениями и интеграции для потоковой передачи Spark. Kafka действует как центральный узел для потоков данных в реальном времени и обрабатывается с использованием сложных алгоритмов в Spark Streaming. После обработки данных Spark Streaming может публиковать результаты в еще одной теме Kafka или хранить их в HDFS, базах данных или информационных панелях. На следующей диаграмме показан концептуальный поток.
Теперь давайте подробно рассмотрим API Kafka-Spark.
SparkConf API
Он представляет собой конфигурацию для приложения Spark. Используется для установки различных параметров Spark в виде пар ключ-значение.
Класс SparkConf
имеет следующие методы -
set(string key, string value) - установить переменную конфигурации.
remove(string key) - удалить ключ из конфигурации.
setAppName(string name) - установить название приложения для вашего приложения.
get(string key) - получить ключ
StreamingContext API
Это основная точка входа для функциональности Spark. SparkContext представляет подключение к кластеру Spark и может использоваться для создания RDD, аккумуляторов и широковещательных переменных в кластере. Подпись определяется, как показано ниже.
public StreamingContext(String master, String appName, Duration batchDuration,
String sparkHome, scala.collection.Seq<String> jars,
scala.collection.Map<String,String> environment)
master - URL-адрес кластера для подключения (например, mesos: // host: port, spark: // host: port, local [4]).
appName - название вашей работы, которое будет отображаться в веб-интерфейсе кластера
batchDuration - временной интервал, через который потоковые данные будут разделены на пакеты
public StreamingContext(SparkConf conf, Duration batchDuration)
Создайте StreamingContext, предоставив конфигурацию, необходимую для нового SparkContext.
conf - Параметры искры
batchDuration - временной интервал, через который потоковые данные будут разделены на пакеты
KafkaUtils API
KafkaUtils API используется для подключения кластера Kafka к потоковой передаче Spark. Этот API имеет значительную
подпись метода createStream,
определенную ниже.
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
StreamingContext ssc, String zkQuorum, String groupId,
scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)
Показанный выше метод используется для создания входного потока, который извлекает сообщения от Kafka Brokers.
ssc - Объект StreamingContext.
zkQuorum - Кворум Zookeeper.
groupId - Идентификатор группы для этого потребителя.
topics - вернуть карту тем для использования.
storageLevel - Уровень хранения, используемый для хранения полученных объектов.
У KafkaUtils API есть другой метод createDirectStream, который используется для создания входного потока, который напрямую извлекает сообщения от Kafka Brokers без использования какого-либо получателя. Этот поток может гарантировать, что каждое сообщение от Kafka будет включено в преобразования ровно один раз.
Пример приложения написан на Scala. Чтобы скомпилировать приложение, загрузите и установите sbt
, инструмент сборки scala (аналогично maven). Ниже представлен основной код приложения.
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
object KafkaWordCount {
def main(args: Array[String]) {
if (args.length < 4) {
System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L))
.reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
Сценарий сборки
Интеграция Spark-Kafka зависит от искры, потока искры и Jar интеграции Spark. Создайте новый файл build.sbt
и укажите сведения о приложении и его зависимости. SBT
загрузит необходимую банку во время компиляции и упаковки приложения.
name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"
Составление / упаковка
Выполните следующую команду, чтобы скомпилировать и упаковать файл jar приложения. Нам нужно отправить файл jar в консоль Spark для запуска приложения.
sbt package
Отправка в Spark
Запустите интерфейс командной строки Kafka Producer (объяснено в предыдущей главе), создайте новую тему с именем my-first-topic
и предоставьте несколько примеров сообщений, как показано ниже.
Another spark test message
Выполните следующую команду, чтобы отправить приложение в Spark console.
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>
Пример вывода этого приложения показан ниже.
spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..