PySpark - RDD

ตอนนี้เราได้ติดตั้งและกำหนดค่า PySpark บนระบบของเราแล้วเราสามารถตั้งโปรแกรมใน Python บน Apache Spark ได้ อย่างไรก็ตามก่อนที่จะทำเช่นนั้นให้เราเข้าใจแนวคิดพื้นฐานใน Spark - RDD

RDD ย่อมาจาก Resilient Distributed Datasetนี่คือองค์ประกอบที่รันและดำเนินการบนหลายโหนดเพื่อทำการประมวลผลแบบขนานบนคลัสเตอร์ RDD เป็นองค์ประกอบที่ไม่เปลี่ยนรูปซึ่งหมายความว่าเมื่อคุณสร้าง RDD แล้วคุณจะไม่สามารถเปลี่ยนแปลงได้ RDD สามารถทนต่อความผิดพลาดได้เช่นกันดังนั้นในกรณีที่เกิดความล้มเหลวใด ๆ พวกเขาจะกู้คืนโดยอัตโนมัติ คุณสามารถใช้การดำเนินการหลายอย่างกับ RDD เหล่านี้เพื่อให้บรรลุภารกิจบางอย่าง

ในการใช้การดำเนินการกับ RDD เหล่านี้มีสองวิธี -

  • การเปลี่ยนแปลงและ
  • Action

ให้เราเข้าใจสองวิธีนี้โดยละเอียด

Transformation- นี่คือการดำเนินการซึ่งใช้กับ RDD เพื่อสร้าง RDD ใหม่ ตัวกรอง groupBy และแผนที่เป็นตัวอย่างของการเปลี่ยนแปลง

Action - นี่คือการดำเนินการที่ใช้กับ RDD ซึ่งสั่งให้ Spark ทำการคำนวณและส่งผลลัพธ์กลับไปยังไดรเวอร์

ในการใช้การดำเนินการใด ๆ ใน PySpark เราจำเป็นต้องสร้างไฟล์ PySpark RDDอันดับแรก. บล็อกโค้ดต่อไปนี้มีรายละเอียดของคลาส PySpark RDD -

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

ให้เราดูวิธีเรียกใช้การดำเนินการพื้นฐานบางอย่างโดยใช้ PySpark รหัสต่อไปนี้ในไฟล์ Python จะสร้างคำ RDD ซึ่งเก็บชุดคำที่กล่าวถึง

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

ตอนนี้เราจะดำเนินการสองสามคำ

นับ()

จำนวนองค์ประกอบใน RDD จะถูกส่งกลับ

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command - คำสั่งสำหรับ count () คือ -

$SPARK_HOME/bin/spark-submit count.py

Output - ผลลัพธ์สำหรับคำสั่งดังกล่าวคือ -

Number of elements in RDD → 8

เก็บ()

องค์ประกอบทั้งหมดใน RDD จะถูกส่งกลับ

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command - คำสั่งสำหรับการรวบรวม () คือ -

$SPARK_HOME/bin/spark-submit collect.py

Output - ผลลัพธ์สำหรับคำสั่งดังกล่าวคือ -

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach (ฉ)

ส่งคืนเฉพาะองค์ประกอบที่ตรงตามเงื่อนไขของฟังก์ชันภายใน foreach ในตัวอย่างต่อไปนี้เราเรียกฟังก์ชันการพิมพ์ใน foreach ซึ่งจะพิมพ์องค์ประกอบทั้งหมดใน RDD

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

Command - คำสั่งสำหรับ foreach (f) คือ -

$SPARK_HOME/bin/spark-submit foreach.py

Output - ผลลัพธ์สำหรับคำสั่งดังกล่าวคือ -

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

ตัวกรอง (f)

RDD ใหม่จะถูกส่งคืนซึ่งมีองค์ประกอบซึ่งตรงตามฟังก์ชันภายในตัวกรอง ในตัวอย่างต่อไปนี้เรากรองสตริงที่มี "" spark "ออก

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command - คำสั่งสำหรับตัวกรอง (f) คือ -

$SPARK_HOME/bin/spark-submit filter.py

Output - ผลลัพธ์สำหรับคำสั่งดังกล่าวคือ -

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

แผนที่ (f, preservesPartitioning = False)

RDD ใหม่จะถูกส่งคืนโดยใช้ฟังก์ชันกับแต่ละองค์ประกอบใน RDD ในตัวอย่างต่อไปนี้เราสร้างคู่ค่าคีย์และแมปทุกสตริงด้วยค่า 1

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command - คำสั่งสำหรับแผนที่ (f, preservesPartitioning = False) คือ -

$SPARK_HOME/bin/spark-submit map.py

Output - ผลลัพธ์ของคำสั่งดังกล่าวคือ -

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

ลด (f)

หลังจากดำเนินการดำเนินการไบนารีการสับเปลี่ยนและเชื่อมโยงที่ระบุองค์ประกอบใน RDD จะถูกส่งกลับ ในตัวอย่างต่อไปนี้เรากำลังนำเข้าแพ็กเกจเพิ่มจากตัวดำเนินการและใช้กับ 'num' เพื่อดำเนินการเพิ่มอย่างง่าย

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command - คำสั่งสำหรับลด (f) คือ -

$SPARK_HOME/bin/spark-submit reduce.py

Output - ผลลัพธ์ของคำสั่งดังกล่าวคือ -

Adding all the elements -> 15

เข้าร่วม (อื่น ๆ numPartitions = ไม่มี)

ส่งคืน RDD พร้อมกับคู่ขององค์ประกอบที่มีคีย์ที่ตรงกันและค่าทั้งหมดสำหรับคีย์นั้น ๆ ในตัวอย่างต่อไปนี้มีสองคู่ขององค์ประกอบในสอง RDD ที่แตกต่างกัน หลังจากเข้าร่วม RDD ทั้งสองนี้เราได้รับ RDD ที่มีองค์ประกอบที่มีคีย์และค่าที่ตรงกัน

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command - คำสั่งสำหรับการเข้าร่วม (อื่น ๆ , numPartitions = ไม่มี) คือ -

$SPARK_HOME/bin/spark-submit join.py

Output - ผลลัพธ์สำหรับคำสั่งดังกล่าวคือ -

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

แคช ()

คง RDD นี้ด้วยระดับการจัดเก็บเริ่มต้น (MEMORY_ONLY) คุณยังสามารถตรวจสอบว่า RDD ถูกแคชไว้หรือไม่

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command - คำสั่งสำหรับ cache () คือ -

$SPARK_HOME/bin/spark-submit cache.py

Output - ผลลัพธ์ของโปรแกรมข้างต้นคือ -

Words got cached -> True

นี่คือการดำเนินการที่สำคัญที่สุดบางส่วนที่ทำบน PySpark RDD