SparkUDFがCassandraコネクタを使用してキーを検索する

Aug 23 2020
  • 一部のSparkジョブでKey-Valueルックアップストアとしてcassandraを使用しようとしています。

  • 私たちは主にデータフレームを使用し、RDDAPIから離れました。

  • テーブルと結合する、sparkにロードする、または
    結合をcassandraにプッシュして、大きな
    テーブルスキャンを回避するための対策を講じる代わりに、cassandraに接続するSparkUDFを記述して1つのキーを検索できると思いました。

  • さらに、結果行をケースクラスオブジェクトに変換して、オブジェクトを返したいと思います。

以下のこの質問からの回答に基づいて、この情報の一部を入手しました。withSessionDoは、各ノードで使用可能な基盤となるJVMレベルのセッションを再利用します。SparkCassandraコネクタの適切な使用法

val connector = CassandraConnector(sparkConf) // I Know this is serializable.

def lookupKey(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
    connector.withSessionDo(session => {
        val stmt = session.prepare(s"SELECT * FROM $keyspace.$table WHERE key = ?")
        val result = session.execute( stmt.bind(key) )
        MyCaseClass(
           fieldl1 = result.getString(0),
           fieldl2 = result.getInt(1)
           ...
        )
    }
})

セッションはシリアル化できないため、udfの外部でセッションを作成して渡すことはできないため、マッピングマネージャーを使用して行をケースクラスインスタンスに変換できます。Mapping Managerを使用した代替アプローチ、

def lookupKeyAlt(connector: CassandraConnector, keyspace: String, table: String): UserDefineFunction = udf((key: String) => {
    connector.withSessionDo(session => {
        val manager = new MappingManager(session)   // session isn't serializable, so creating one outside and passing to udf is not an option if wf we were willing to do the session management.
        val mapperClass = manager.mapper(classOf[MyCaseClass], keyspace)
        mapperClass.get(key)
    }
})

私はカサンドラに不慣れなので、いくつかの質問に我慢してください。

  1. これらのアプローチに私が気付いていない落とし穴はありますか?
  2. 2番目のアプローチでは、UDFを呼び出すたびに新しいMappingManager(セッション)を作成していることを理解しています。これは引き続きjvmレベルのセッションを使用し、それ以上のセッションを開きますか?呼び出しごとにMappingManagerをインスタンス化するのは正しいですか?セッションはシリアル化できないため、外部で作成してUDFに渡すことはできません。
  3. 結果の行をケースクラスのオブジェクトに変換する他の方法は何ですか?
  4. この種のルックアップを行うためのより良い代替手段はありますか?

回答

1 AlexOtt Aug 23 2020 at 03:11

Spark Cassandra Connector(SCC)が内部で実行していることをエミュレートしようとしていますが、SCCが非同期APIを使用している間、同期APIを使用し、すべてのデータを次々に取得するため、実装はSCCよりもはるかに遅くなります。 、および複数の行のデータを並列にプルします。

目的を達成するための最良の方法は、Cassandraに最適化された結合(「直接結合」と呼ばれることが多い)を使用することです。この種の結合は、RDD APIで常に利用可能でしたが、長い間、データフレームAPIでは、コネクタの商用バージョンでのみ利用可能でした。ただし、SCC 2.5.0(2020年5月にリリース)以降、この機能はオープンソースバージョンでも利用できるため、エミュレーションを構築する代わりに使用できます。直接的には、あなただけのときに実行される加入特殊な触媒の拡張機能を有効に渡すことによって、spark.sql.extensions=com.datastax.spark.connector.CassandraSparkExtensionsSparkSessionを構成するとき(たとえば、コマンドラインを介して)。その後、完全または部分的な主キーでCassandraテーブルとの結合を実行できます。SCCは、結合を、非常に効果的に実行されるCassandraへの個別のリクエストに自動的に変換します。explain結合されたデータフレームで実行することでこれが発生することを確認できるため、次のように表示されます(文字列Cassandra Direct Joinを探します)。

scala> joined.explain
== Physical Plan ==
Cassandra Direct Join [pk = id#30, c1 = cc1#32] test.jtest1 - Reading (pk, c1, c2, v) Pushed {}
+- *(1) Project [cast(id#28L as int) AS id#30, cast(id#28L as int) AS cc1#32]
   +- *(1) Range (1, 5, step=1, splits=8)

最近、DataframeAPIとRDDAPIの両方を使用してCassandraのデータと効果的な結合を実行する方法を説明する長いブログ投稿を書きました-ここでは繰り返したくありません:-)