การใช้ pyspark จะเพิ่มคอลัมน์ลงใน DataFrame เป็นแผนที่คีย์ - ค่าของคอลัมน์ที่รู้จักหลายคอลัมน์ใน DataFrame เดียวกันได้อย่างไรโดยไม่รวมค่า null

Jul 20 2020

ให้ตัวอย่างต่อไปนี้:

d = [{'asset': '2', 'ts': 6,  'B':'123','C':'234'}, 
     {'asset': '1', 'ts': 5, 'C.1':'999', 'B':'888','F':'999'}]
df = spark.createDataFrame(d)
df.show(truncate=False)

+---+----+-----+---+----+----+
|B  |C   |asset|ts |C.1 |F   |
+---+----+-----+---+----+----+
|123|234 |2    |6  |null|null|
|888|null|1    |5  |999 |999 |
+---+----+-----+---+----+----+

ฉันต้องการสร้างผลลัพธ์ต่อไปนี้:

+-----+---+--------------------------------+
|asset|ts |signals                         |
+-----+---+--------------------------------+
|2    |6  |[B -> 123, C -> 234]            |
|1    |5  |[B -> 888, C.1 -> 999, F -> 999]|
+-----+---+--------------------------------+

ฉันลองทำสิ่งต่อไปนี้:

from itertools import chain
from pyspark.sql.functions import *
all_signals=['B','C','C.1','F']
key_values = create_map(*(chain(*[(lit(name), col("`"+name+"`"))
                                  for name in all_signals])))

new_df = df.withColumn('signals',key_values).drop(*all_signals).show(truncate=False)

+-----+---+--------------------------------------+
|asset|ts |signals                               |
+-----+---+--------------------------------------+
|2    |6  |[B -> 123, C -> 234, C.1 ->, F ->]    |
|1    |5  |[B -> 888, C ->, C.1 -> 999, F -> 999]|
+-----+---+--------------------------------------+

แต่ฉันไม่ต้องการคีย์ที่มีค่าว่าง ดังนั้นฉันจึงลองหลายวิธีในการยกเว้น null หรือ None ฉันลองใช้เงื่อนไข "if" เมื่อ / อย่างอื่น แต่ดูเหมือนจะไม่ได้ผล นี่คือหนึ่งความพยายาม:

key_values = create_map(*(chain(*[(lit(name), col("`"+name+"`")) 
                                  for name in all_signals 
                                  if col("`"+name+"`").isNotNull()])))
new_df = df.withColumn('signals',key_values).drop(*all_signals).show(truncate=False)


ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.

ฉันทำให้มันทำงานโดยใช้วิธีแบบวงกลมซึ่งฉันไม่พอใจ:

new_df= df.withColumn("signals", from_json(
                       to_json(struct(["`"+x+"`" for x in all_signals])),"MAP<STRING,STRING>"))
                      
new_df = new_df.drop(*all_signals)
new_df.show(truncate=False)

+-----+---+--------------------------------+
|asset|ts |signals                         |
+-----+---+--------------------------------+
|2    |6  |[B -> 123, C -> 234]            |
|1    |5  |[B -> 888, C.1 -> 999, F -> 999]|
+-----+---+--------------------------------+

แต่ต้องมีวิธียกเว้น null โดยไม่ต้องไปที่ json แล้วย้อนกลับ!

คำตอบ

2 murtihash Jul 20 2020 at 10:19

ไม่จำเป็นต้องUDFใช้ฟังก์ชันลำดับที่สูงขึ้นfilterด้วยarrays_zipและmap_from_entriesเพื่อให้ได้ผลลัพธ์ที่คุณต้องการ(spark2.4+)

from pyspark.sql import functions as F

all_singals=['B','C','C.1','F']

df.withColumn("all_one", F.array(*[F.lit(x) for x in all_signals]))\
  .withColumn("all_two", F.array(*["`"+x+"`" for x in all_signals]))\
  .withColumn("signals", F.expr("""map_from_entries(filter(arrays_zip(all_one,all_two),x-> x.all_two is not null))"""))\
  .drop("all_one","all_two").show(truncate=False)

#+---+----+-----+---+----+----+--------------------------------+
#|B  |C   |asset|ts |C.1 |F   |signals                         |
#+---+----+-----+---+----+----+--------------------------------+
#|123|234 |2    |6  |null|null|[B -> 123, C -> 234]            |
#|888|null|1    |5  |999 |999 |[B -> 888, C.1 -> 999, F -> 999]|
#+---+----+-----+---+----+----+--------------------------------+
1 Ankur Jul 20 2020 at 10:04

ฉันมีทางเลือกอื่นในการแก้ปัญหา ขั้นแรกสร้างแผนที่ด้วยค่า null จากนั้นจึงปล่อยค่า null

from pyspark.sql.types import MapType, StringType
from pyspark.sql import functions as F

# Original dataframe
data = [{'asset': '2', 'ts': 6, 'B': '123', 'C': '234'},
        {'asset': '1', 'ts': 5, 'C.1': '999', 'B': '888', 'F': '999'}]
df = spark.createDataFrame(data)
df.show(truncate=False)

# Create a map that includes null values
# Backticks are needed because spark is weird
# https://stackoverflow.com/questions/44367019/column-name-with-dot-spark
names = ['B', 'C', 'C.1', 'F']
key_value_list = []
for name in names:
    key_value_list += [F.lit(name)]
    key_value_list += [df["`{}`".format(name)]]
map_column = F.create_map(*key_value_list)

# UDF that drops null values
remove_null_values_udf = F.udf(
    lambda d: {k: v for k, v in d.items() if v is not None},
    MapType(StringType(), StringType())
)

# Apply both of the above
df = df.withColumn('map', remove_null_values_udf(map_column)).drop(*names)
df.show()
# +-----+---+--------------------+
# |asset| ts|                 map|
# +-----+---+--------------------+
# |    2|  6|[B -> 123, C -> 234]|
# |    1|  5|[B -> 888, F -> 9...|
# +-----+---+--------------------+