列がデータフレームに存在する場合にのみ条件を適用する

Aug 17 2020

java8でspark-sql-2.4.1vを使用しています。指定されたデータフレーム列リストに列が存在する場合、特定の操作を実行する必要があるシナリオがあります

以下のようなサンプルデータフレームがあります。データフレームの列は、データベーステーブルで実行される外部クエリに基づいて異なります。

val data = List(
  ("20", "score", "school", "2018-03-31", 14 , 12 , 20),
  ("21", "score", "school", "2018-03-31", 13 , 13 , 21),
  ("22", "rate", "school", "2018-03-31", 11 , 14, 22),
  ("21", "rate", "school", "2018-03-31", 13 , 12, 23)
 )

val df = data.toDF("id", "code", "entity", "date", "column1", "column2" ,"column3"..."columnN")

上記のように、データフレームの「データ」列は固定されておらず、変化し、「column1」、「column2」、「column3」...「columnN」...

したがって、列の可用性に応じて、「when-clause」を使用しようとしているのと同じ操作を実行する必要があります。列が存在する場合は、指定された列に対して特定の操作を実行する必要があります。それ以外の場合は、次の操作に進みます。

「when-cluase」を使用して、以下の2つの方法を試しています。

ファーストウェイ:

 Dataset<Row> resultDs =  df.withColumn("column1_avg", 
                     when( df.schema().fieldNames().contains(col("column1")) , avg(col("column1"))))
                     )
 

二次:

  Dataset<Row> resultDs =  df.withColumn("column2_sum", 
                     when( df.columns().contains(col("column2")) , sum(col("column1"))))
                     )

エラー:

配列型String []でcontains(Column)を呼び出すことはできません

では、java8コードを使用してこのシナリオを処理する方法は?

回答

1 Som Aug 17 2020 at 19:26

すべての列名を持つ列を作成できます。次に、列が存在するかどうかを確認し、使用可能かどうかを処理できます-

 df.withColumn("columns_available", array(df.columns.map(lit): _*))
      .withColumn("column1_org",
      when( array_contains(col("columns_available"),"column1") , col("column1")))
      .withColumn("x",
        when( array_contains(col("columns_available"),"column4") , col("column1")))
      .withColumn("column2_new",
        when( array_contains(col("columns_available"),"column2") , sqrt("column2")))
      .show(false)