ApacheKafka-Sparkとの統合

この章では、ApacheKafkaをSparkStreamingAPIと統合する方法について説明します。

Sparkについて

Spark Streaming APIは、ライブデータストリームのスケーラブルで高スループットのフォールトトレラントなストリーム処理を可能にします。データは、Kafka、Flume、Twitterなどの多くのソースから取り込むことができ、map、reduce、join、windowなどの高レベル関数などの複雑なアルゴリズムを使用して処理できます。最後に、処理されたデータをファイルシステム、データベース、およびライブダッシュボードにプッシュできます。復元力のある分散データセット(RDD)は、Sparkの基本的なデータ構造です。これは、オブジェクトの不変の分散コレクションです。RDDの各データセットは論理パーティションに分割され、クラスターのさまざまなノードで計算できます。

Sparkとの統合

Kafkaは、Sparkストリーミングの潜在的なメッセージングおよび統合プラットフォームです。Kafkaは、データのリアルタイムストリームの中央ハブとして機能し、SparkStreamingの複雑なアルゴリズムを使用して処理されます。データが処理されると、Spark Streamingは結果をさらに別のKafkaトピックに公開したり、HDFS、データベース、またはダッシュボードに保存したりできます。次の図は、概念的なフローを示しています。

それでは、Kafka-SparkAPIについて詳しく見ていきましょう。

SparkConf API

これは、Sparkアプリケーションの構成を表します。さまざまなSparkパラメーターをキーと値のペアとして設定するために使用されます。

SparkConfクラスには次のメソッドがあります-

  • set(string key, string value) −構成変数を設定します。

  • remove(string key) −構成からキーを削除します。

  • setAppName(string name) −アプリケーションのアプリケーション名を設定します。

  • get(string key) −キーを取得

StreamingContext API

これは、Spark機能の主要なエントリポイントです。SparkContextは、Sparkクラスターへの接続を表し、クラスター上にRDD、アキュムレーター、およびブロードキャスト変数を作成するために使用できます。署名は次のように定義されます。

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)
  • master −接続するクラスターURL(例:mesos:// host:port、spark:// host:port、local [4])。

  • appName −クラスターWebUIに表示するジョブの名前

  • batchDuration −ストリーミングデータがバッチに分割される時間間隔

public StreamingContext(SparkConf conf, Duration batchDuration)

新しいSparkContextに必要な構成を提供して、StreamingContextを作成します。

  • conf −スパークパラメータ

  • batchDuration −ストリーミングデータがバッチに分割される時間間隔

KafkaUtils API

KafkaUtils APIは、KafkaクラスターをSparkストリーミングに接続するために使用されます。このAPIには、以下のように定義された重要なメソッドcreateStreamシグネチャがあります。

public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream(
   StreamingContext ssc, String zkQuorum, String groupId,
   scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel)

上記のメソッドは、KafkaBrokersからメッセージをプルする入力ストリームを作成するために使用されます。

  • ssc −StreamingContextオブジェクト。

  • zkQuorum −Zookeeperクォーラム。

  • groupId −このコンシューマーのグループID。

  • topics −消費するトピックのマップを返します。

  • storageLevel −受信したオブジェクトを保存するために使用するストレージレベル。

KafkaUtils APIには別のメソッドcreateDirectStreamがあります。これは、レシーバーを使用せずにKafkaBrokersからメッセージを直接プルする入力ストリームを作成するために使用されます。このストリームは、Kafkaからの各メッセージが変換に1回だけ含まれることを保証できます。

サンプルアプリケーションはScalaで行われます。アプリケーションをコンパイルするには、sbt、scalaビルドツール(mavenと同様)をダウンロードしてインストールしてください。主なアプリケーションコードを以下に示します。

import java.util.HashMap

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._

object KafkaWordCount {
   def main(args: Array[String]) {
      if (args.length < 4) {
         System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>")
         System.exit(1)
      }

      val Array(zkQuorum, group, topics, numThreads) = args
      val sparkConf = new SparkConf().setAppName("KafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(2))
      ssc.checkpoint("checkpoint")

      val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
      val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1L))
         .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2)
      wordCounts.print()

      ssc.start()
      ssc.awaitTermination()
   }
}

ビルドスクリプト

spark-kafka統合は、spark、sparkストリーミング、およびsparkKafka統合jarに依存します。新しいファイルbuild.sbtを作成し、アプリケーションの詳細とその依存関係を指定します。SBTは、アプリケーションをコンパイルし、梱包しながら、必要なjarファイルをダウンロードします。

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

コンパイル/パッケージング

次のコマンドを実行して、アプリケーションのjarファイルをコンパイルおよびパッケージ化します。アプリケーションを実行するには、jarファイルをsparkコンソールに送信する必要があります。

sbt package

Sparkへの送信

Kafka Producer CLI(前の章で説明)を起動し、my-first-topicという新しいトピックを作成し、以下に示すようにいくつかのサンプルメッセージを提供します。

Another spark test message

次のコマンドを実行して、アプリケーションをSparkコンソールに送信します。

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

このアプリケーションの出力例を以下に示します。

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..