Pysparkデータフレームのキーグループに対してSCAN操作を実行する方法

Aug 22 2020

専門家、私はpysparkデータフレームでスキャンのような操作を実行しようとしています。ここでは、キーグループの次のレコードに基づいてレコードの終了日をマークしています。これは私のデータフレームがどのように見えるかです-

+---+----+----+-------------------+-------------------+
|Key|col1|col2|     effective_date|           end_date|
+---+----+----+-------------------+-------------------+
|  X| ABC| DEF|2020-08-01 00:00:00|2999-12-31 00:00:00|
|  X|ABC1|DEF1|2020-08-03 00:00:00|2999-12-31 00:00:00|
|  X|ABC2|DEF2|2020-08-05 00:00:00|2999-12-31 00:00:00|
|  Y| PQR| STU|2020-08-07 00:00:00|2999-12-31 00:00:00|
|  Y|PQR1|STU1|2020-08-09 00:00:00|2999-12-31 00:00:00|
+---+----+----+-------------------+-------------------+

希望のアウトアウト-

+---+----+----+-------------------+-------------------+
|Key|col1|col2|     effective_date|           end_date|
+---+----+----+-------------------+-------------------+
|  X| ABC| DEF|2020-08-01 00:00:00|2020-08-02 23:59:59|
|  X|ABC1|DEF1|2020-08-03 00:00:00|2020-08-04 23:59:59|
|  X|ABC2|DEF2|2020-08-05 00:00:00|2999-12-31 00:00:00|
|  Y| PQR| STU|2020-08-07 00:00:00|2020-08-08 23:59:59|
|  Y|PQR1|STU1|2020-08-09 00:00:00|2999-12-31 00:00:00|
+---+----+----+-------------------+-------------------+

ここでレコードをグループ化するフィールドは「キー」であり、キーグループにend_date「2999-12-3100:00:00」のレコードを1つだけ保持したいと思います。他のすべてのレコードは、期限切れと終了をマークします。日付は、次のレコードの発効日-1に基づいて、レコードを発効日の順に並べたときに決定されます。以下を試してみました-

>>> from pyspark.sql import functions as F
>>> from pyspark.sql import Window
>>> w = Window.partitionBy("Key").orderBy("effective_date")
>>> df1=df.withColumn("end_date",F.date_sub(F.lead("effective_date").over(w), 1))

これでは出力が正しく表示されません。Python2.7とSpark2.2を使用しています

回答

1 murtihash Aug 22 2020 at 00:54

このleadようにこれを試してください:

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("Key").orderBy("effective_date")

df.withColumn("lead", F.lead("effective_date").over(w))\
  .withColumn("end_date", F.when(F.col("lead").isNotNull(), F.expr("""lead - interval 1 second"""))\
                           .otherwise(F.col("end_date"))).drop("lead")\
  .orderBy("effective_date").show()

#+---+----+----+-------------------+-------------------+
#|Key|col1|col2|     effective_date|           end_date|
#+---+----+----+-------------------+-------------------+
#|  X| ABC| DEF|2020-08-01 00:00:00|2020-08-02 23:59:59|
#|  X|ABC1|DEF1|2020-08-03 00:00:00|2020-08-04 23:59:59|
#|  X|ABC2|DEF2|2020-08-05 00:00:00|2999-12-31 00:00:00|
#|  Y| PQR| STU|2020-08-07 00:00:00|2020-08-08 23:59:59|
#|  Y|PQR1|STU1|2020-08-09 00:00:00|2999-12-31 00:00:00|
#+---+----+----+-------------------+-------------------+