Application lorsque la condition est uniquement lorsque la colonne existe dans la trame de données

Aug 17 2020

J'utilise spark-sql-2.4.1v avec java8. J'ai un scénario dans lequel je dois effectuer certaines opérations si des colonnes se présentent dans la liste de colonnes de dataframe donnée

J'ai un exemple de cadre de données comme ci-dessous, les colonnes de dataframe différeraient en fonction d'une requête externe exécutée sur la table de base de données.

val data = List(
  ("20", "score", "school", "2018-03-31", 14 , 12 , 20),
  ("21", "score", "school", "2018-03-31", 13 , 13 , 21),
  ("22", "rate", "school", "2018-03-31", 11 , 14, 22),
  ("21", "rate", "school", "2018-03-31", 13 , 12, 23)
 )

val df = data.toDF("id", "code", "entity", "date", "column1", "column2" ,"column3"..."columnN")

comme indiqué ci-dessus, les colonnes "données" du dataframe ne sont pas fixes et varieraient et auraient "colonne1", "colonne2", "colonne3" ... "colonneN" ...

Dépend donc de la disponibilité de la colonne, je dois effectuer certaines opérations pour la même chose que j'essaie d'utiliser "clause quand", lorsqu'une colonne est présente, je dois effectuer certaines opérations sur la colonne spécifiée sinon passer à l'opération suivante.

J'essaye ci-dessous deux manières en utilisant "quand-cluase"

Première voie:

 Dataset<Row> resultDs =  df.withColumn("column1_avg", 
                     when( df.schema().fieldNames().contains(col("column1")) , avg(col("column1"))))
                     )
 

Seconde voie:

  Dataset<Row> resultDs =  df.withColumn("column2_sum", 
                     when( df.columns().contains(col("column2")) , sum(col("column1"))))
                     )

Erreur:

Impossible d'appeler contient (Colonne) sur le type de tableau String []

alors comment gérer ce scénario en utilisant le code java8?

Réponses

1 Som Aug 17 2020 at 19:26

Vous pouvez créer une colonne ayant tous les noms de colonne. alors vous pouvez vérifier si la colonne est présente ou non et traiter si elle est disponible-

 df.withColumn("columns_available", array(df.columns.map(lit): _*))
      .withColumn("column1_org",
      when( array_contains(col("columns_available"),"column1") , col("column1")))
      .withColumn("x",
        when( array_contains(col("columns_available"),"column4") , col("column1")))
      .withColumn("column2_new",
        when( array_contains(col("columns_available"),"column2") , sqrt("column2")))
      .show(false)