Apache Kafka - интеграция со Spark

В этой главе мы обсудим, как интегрировать Apache Kafka с Spark Streaming API.

О Spark

Spark Streaming API обеспечивает масштабируемую, высокопроизводительную и отказоустойчивую потоковую обработку потоков данных в реальном времени. Данные могут быть получены из многих источников, таких как Kafka, Flume, Twitter и т. Д., И могут обрабатываться с использованием сложных алгоритмов, таких как высокоуровневые функции, такие как map, reduce, join и window. Наконец, обработанные данные могут быть отправлены в файловые системы, базы данных и живые информационные панели. Устойчивые распределенные наборы данных (RDD) - это фундаментальная структура данных Spark. Это неизменяемая распределенная коллекция объектов. Каждый набор данных в RDD разделен на логические разделы, которые могут быть вычислены на разных узлах кластера.

Интеграция со Spark

Kafka - это потенциальная платформа для обмена сообщениями и интеграции для потоковой передачи Spark. Kafka действует как центральный узел для потоков данных в реальном времени и обрабатывается с использованием сложных алгоритмов в Spark Streaming. После обработки данных Spark Streaming может публиковать результаты в еще одной теме Kafka или хранить их в HDFS, базах данных или информационных панелях. На следующей диаграмме показан концептуальный поток.

Теперь давайте подробно рассмотрим API Kafka-Spark.

SparkConf API

Он представляет собой конфигурацию для приложения Spark. Используется для установки различных параметров Spark в виде пар ключ-значение.

Класс SparkConf имеет следующие методы -

  • set(string key, string value) - установить переменную конфигурации.

  • remove(string key) - удалить ключ из конфигурации.

  • setAppName(string name) - установить название приложения для вашего приложения.

  • get(string key) - получить ключ

StreamingContext API

Это основная точка входа для функциональности Spark. SparkContext представляет подключение к кластеру Spark и может использоваться для создания RDD, аккумуляторов и широковещательных переменных в кластере. Подпись определяется, как показано ниже.

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master - URL-адрес кластера для подключения (например, mesos: // host: port, spark: // host: port, local [4]).

  • appName - название вашей работы, которое будет отображаться в веб-интерфейсе кластера

  • batchDuration - временной интервал, через который потоковые данные будут разделены на пакеты

public StreamingContext(SparkConf conf, Duration batchDuration)

Создайте StreamingContext, предоставив конфигурацию, необходимую для нового SparkContext.

  • conf - Параметры искры

  • batchDuration - временной интервал, через который потоковые данные будут разделены на пакеты

KafkaUtils API

KafkaUtils API используется для подключения кластера Kafka к потоковой передаче Spark. Этот API имеет значительную подпись метода createStream, определенную ниже.

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

Показанный выше метод используется для создания входного потока, который извлекает сообщения от Kafka Brokers.

  • ssc - Объект StreamingContext.

  • zkQuorum - Кворум Zookeeper.

  • groupId - Идентификатор группы для этого потребителя.

  • topics - вернуть карту тем для использования.

  • storageLevel - Уровень хранения, используемый для хранения полученных объектов.

У KafkaUtils API есть другой метод createDirectStream, который используется для создания входного потока, который напрямую извлекает сообщения от Kafka Brokers без использования какого-либо получателя. Этот поток может гарантировать, что каждое сообщение от Kafka будет включено в преобразования ровно один раз.

Пример приложения написан на Scala. Чтобы скомпилировать приложение, загрузите и установите sbt , инструмент сборки scala (аналогично maven). Ниже представлен основной код приложения.

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

Сценарий сборки

Интеграция Spark-Kafka зависит от искры, потока искры и Jar интеграции Spark. Создайте новый файл build.sbt и укажите сведения о приложении и его зависимости. SBT загрузит необходимую банку во время компиляции и упаковки приложения.

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

Составление / упаковка

Выполните следующую команду, чтобы скомпилировать и упаковать файл jar приложения. Нам нужно отправить файл jar в консоль Spark для запуска приложения.

sbt package

Отправка в Spark

Запустите интерфейс командной строки Kafka Producer (объяснено в предыдущей главе), создайте новую тему с именем my-first-topic и предоставьте несколько примеров сообщений, как показано ниже.

Another spark test message

Выполните следующую команду, чтобы отправить приложение в Spark console.

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

Пример вывода этого приложения показан ниже.

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..