PySpark - RDD
Nachdem wir PySpark auf unserem System installiert und konfiguriert haben, können wir in Python auf Apache Spark programmieren. Lassen Sie uns jedoch vorher ein grundlegendes Konzept in Spark - RDD verstehen.
RDD steht für Resilient Distributed DatasetDies sind die Elemente, die auf mehreren Knoten ausgeführt werden und für die parallele Verarbeitung in einem Cluster ausgeführt werden. RDDs sind unveränderliche Elemente. Wenn Sie also eine RDD erstellt haben, können Sie diese nicht mehr ändern. RDDs sind ebenfalls fehlertolerant und werden daher im Fehlerfall automatisch wiederhergestellt. Sie können mehrere Operationen auf diese RDDs anwenden, um eine bestimmte Aufgabe zu erfüllen.
Es gibt zwei Möglichkeiten, um Operationen auf diese RDDs anzuwenden:
- Transformation und
- Action
Lassen Sie uns diese beiden Möglichkeiten im Detail verstehen.
Transformation- Dies sind die Operationen, die auf eine RDD angewendet werden, um eine neue RDD zu erstellen. Filter, groupBy und map sind Beispiele für Transformationen.
Action - Dies sind die Operationen, die auf RDD angewendet werden. Dadurch wird Spark angewiesen, die Berechnung durchzuführen und das Ergebnis an den Treiber zurückzusenden.
Um eine Operation in PySpark anzuwenden, müssen wir eine erstellen PySpark RDDzuerst. Der folgende Codeblock enthält die Details einer PySpark RDD-Klasse -
class pyspark.RDD (
jrdd,
ctx,
jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)
Lassen Sie uns sehen, wie einige grundlegende Operationen mit PySpark ausgeführt werden. Der folgende Code in einer Python-Datei erstellt RDD-Wörter, in denen eine Reihe der genannten Wörter gespeichert sind.
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
Wir werden jetzt einige Operationen mit Wörtern ausführen.
Anzahl()
Die Anzahl der Elemente in der RDD wird zurückgegeben.
----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------
Command - Der Befehl für count () lautet -
$SPARK_HOME/bin/spark-submit count.py
Output - Die Ausgabe für den obigen Befehl ist -
Number of elements in RDD → 8
sammeln()
Alle Elemente in der RDD werden zurückgegeben.
----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------
Command - Der Befehl für collect () lautet -
$SPARK_HOME/bin/spark-submit collect.py
Output - Die Ausgabe für den obigen Befehl ist -
Elements in RDD -> [
'scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
foreach (f)
Gibt nur die Elemente zurück, die die Bedingung der Funktion in foreach erfüllen. Im folgenden Beispiel rufen wir in foreach eine Druckfunktion auf, die alle Elemente in der RDD druckt.
----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------
Command - Der Befehl für foreach (f) lautet -
$SPARK_HOME/bin/spark-submit foreach.py
Output - Die Ausgabe für den obigen Befehl ist -
scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark
Filter (f)
Es wird eine neue RDD zurückgegeben, die die Elemente enthält und die Funktion innerhalb des Filters erfüllt. Im folgenden Beispiel filtern wir die Zeichenfolgen heraus, die '' spark "enthalten.
----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------
Command - Der Befehl für Filter (f) lautet -
$SPARK_HOME/bin/spark-submit filter.py
Output - Die Ausgabe für den obigen Befehl ist -
Fitered RDD -> [
'spark',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
map (f, konserviertPartitionierung = Falsch)
Eine neue RDD wird zurückgegeben, indem auf jedes Element in der RDD eine Funktion angewendet wird. Im folgenden Beispiel bilden wir ein Schlüsselwertpaar und ordnen jede Zeichenfolge dem Wert 1 zu.
----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------
Command - Der Befehl für map (f, keepesPartitioning = False) lautet -
$SPARK_HOME/bin/spark-submit map.py
Output - Die Ausgabe des obigen Befehls ist -
Key value pair -> [
('scala', 1),
('java', 1),
('hadoop', 1),
('spark', 1),
('akka', 1),
('spark vs hadoop', 1),
('pyspark', 1),
('pyspark and spark', 1)
]
reduzieren (f)
Nach dem Ausführen der angegebenen kommutativen und assoziativen Binäroperation wird das Element in der RDD zurückgegeben. Im folgenden Beispiel importieren wir das Add-Paket vom Operator und wenden es auf 'num' an, um eine einfache Additionsoperation auszuführen.
----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------
Command - Der Befehl zum Reduzieren (f) lautet -
$SPARK_HOME/bin/spark-submit reduce.py
Output - Die Ausgabe des obigen Befehls ist -
Adding all the elements -> 15
join (other, numPartitions = None)
Es gibt RDD mit einem Elementpaar mit den übereinstimmenden Schlüsseln und allen Werten für diesen bestimmten Schlüssel zurück. Im folgenden Beispiel befinden sich zwei Elementpaare in zwei verschiedenen RDDs. Nach dem Verbinden dieser beiden RDDs erhalten wir eine RDD mit Elementen mit übereinstimmenden Schlüsseln und ihren Werten.
----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------
Command - Der Befehl für join (other, numPartitions = None) lautet -
$SPARK_HOME/bin/spark-submit join.py
Output - Die Ausgabe für den obigen Befehl ist -
Join RDD -> [
('spark', (1, 2)),
('hadoop', (4, 5))
]
Zwischenspeicher()
Behalten Sie diese RDD mit der Standardspeicherebene (MEMORY_ONLY) bei. Sie können auch überprüfen, ob das RDD zwischengespeichert ist oder nicht.
----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------
Command - Der Befehl für cache () lautet -
$SPARK_HOME/bin/spark-submit cache.py
Output - Die Ausgabe für das obige Programm ist -
Words got cached -> True
Dies waren einige der wichtigsten Vorgänge, die mit PySpark RDD ausgeführt werden.