Apache Spark - Kernprogrammierung
Spark Core ist die Basis des gesamten Projekts. Es bietet verteilte Aufgabenverteilung, Zeitplanung und grundlegende E / A-Funktionen. Spark verwendet eine spezielle grundlegende Datenstruktur, die als RDD (Resilient Distributed Datasets) bezeichnet wird und eine logische Sammlung von Daten darstellt, die auf mehrere Maschinen verteilt sind. RDDs können auf zwei Arten erstellt werden. Zum einen wird auf Datensätze in externen Speichersystemen verwiesen, zum anderen werden Transformationen (z. B. Map, Filter, Reducer, Join) auf vorhandene RDDs angewendet.
Die RDD-Abstraktion wird über eine sprachintegrierte API verfügbar gemacht. Dies vereinfacht die Programmierkomplexität, da die Art und Weise, wie Anwendungen RDDs manipulieren, der Manipulation lokaler Datensammlungen ähnelt.
Spark Shell
Spark bietet eine interaktive Shell - ein leistungsstarkes Tool zur interaktiven Analyse von Daten. Es ist entweder in Scala oder Python verfügbar. Die primäre Abstraktion von Spark ist eine verteilte Sammlung von Elementen, die als Resilient Distributed Dataset (RDD) bezeichnet wird. RDDs können aus Hadoop-Eingabeformaten (z. B. HDFS-Dateien) oder durch Transformieren anderer RDDs erstellt werden.
Öffnen Sie die Spark Shell
Der folgende Befehl wird zum Öffnen der Spark-Shell verwendet.
$ spark-shell
Erstellen Sie eine einfache RDD
Lassen Sie uns eine einfache RDD aus der Textdatei erstellen. Verwenden Sie den folgenden Befehl, um eine einfache RDD zu erstellen.
scala> val inputfile = sc.textFile(“input.txt”)
Die Ausgabe für den obigen Befehl ist
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
Die Spark RDD API führt nur wenige ein Transformations und wenige Actions RDD zu manipulieren.
RDD-Transformationen
RDD-Transformationen geben den Zeiger auf neues RDD zurück und ermöglichen das Erstellen von Abhängigkeiten zwischen RDDs. Jedes RDD in der Abhängigkeitskette (String of Dependencies) hat eine Funktion zur Berechnung seiner Daten und einen Zeiger (Abhängigkeit) auf sein übergeordnetes RDD.
Spark ist faul, daher wird nichts ausgeführt, es sei denn, Sie rufen eine Transformation oder Aktion auf, die die Erstellung und Ausführung von Jobs auslöst. Schauen Sie sich den folgenden Ausschnitt des Beispiels für die Wortanzahl an.
Daher ist die RDD-Transformation kein Datensatz, sondern ein Schritt in einem Programm (möglicherweise der einzige Schritt), der Spark sagt, wie Daten abgerufen werden sollen und was damit zu tun ist.
Im Folgenden finden Sie eine Liste der RDD-Transformationen.
S.No. | Transformationen & Bedeutung |
---|---|
1 | map(func) Gibt ein neues verteiltes Dataset zurück, das durch Übergeben jedes Elements der Quelle durch eine Funktion gebildet wird func. |
2 | filter(func) Gibt ein neues Dataset zurück, das durch Auswahl der Elemente der Quelle erstellt wurde, auf denen func gibt true zurück. |
3 | flatMap(func) Ähnlich wie bei der Zuordnung, jedoch kann jedes Eingabeelement 0 oder mehr Ausgabeelementen zugeordnet werden (daher sollte func eine Seq anstelle eines einzelnen Elements zurückgeben). |
4 | mapPartitions(func) Ähnlich wie Map, wird jedoch auf jeder Partition (Block) des RDD separat ausgeführt func muss vom Typ Iterator <T> sein ⇒ Iterator <U>, wenn auf einem RDD vom Typ T ausgeführt wird. |
5 | mapPartitionsWithIndex(func) Ähnlich wie Map Partitions, bietet aber auch func mit einem ganzzahligen Wert, der den Index der Partition darstellt, also func muss vom Typ (Int, Iterator <T>) sein ⇒ Iterator <U>, wenn auf einem RDD vom Typ T ausgeführt wird. |
6 | sample(withReplacement, fraction, seed) Probe a fraction der Daten mit oder ohne Ersatz unter Verwendung eines gegebenen Zufallszahlengenerator-Startwerts. |
7 | union(otherDataset) Gibt ein neues Dataset zurück, das die Vereinigung der Elemente im Quelldatensatz und des Arguments enthält. |
8 | intersection(otherDataset) Gibt eine neue RDD zurück, die den Schnittpunkt von Elementen im Quelldatensatz und dem Argument enthält. |
9 | distinct([numTasks]) Gibt ein neues Dataset zurück, das die verschiedenen Elemente des Quelldatensatzes enthält. |
10 | groupByKey([numTasks]) Wenn ein Datensatz von (K, V) Paaren aufgerufen wird, wird ein Datensatz von (K, Iterable <V>) Paaren zurückgegeben. Note - Wenn Sie eine Gruppierung durchführen, um eine Aggregation (z. B. eine Summe oder einen Durchschnitt) für jeden Schlüssel durchzuführen, führt die Verwendung von reductByKey oder aggregateByKey zu einer wesentlich besseren Leistung. |
11 | reduceByKey(func, [numTasks]) Wenn ein Datensatz von (K, V) Paaren aufgerufen wird, wird ein Datensatz von (K, V) Paaren zurückgegeben, in dem die Werte für jeden Schlüssel unter Verwendung der angegebenen Reduktionsfunktion func aggregiert werden , die vom Typ (V, V) ⇒ V sein muss Wie in groupByKey kann die Anzahl der Reduzierungsaufgaben über ein optionales zweites Argument konfiguriert werden. |
12 | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) Wenn ein Datensatz mit (K, V) Paaren aufgerufen wird, wird ein Datensatz mit (K, U) Paaren zurückgegeben, in dem die Werte für jeden Schlüssel unter Verwendung der angegebenen Kombinationsfunktionen und eines neutralen "Null" -Werts aggregiert werden. Ermöglicht einen aggregierten Werttyp, der sich vom Eingabewerttyp unterscheidet, wobei unnötige Zuordnungen vermieden werden. Wie in groupByKey kann die Anzahl der Reduzierungsaufgaben über ein optionales zweites Argument konfiguriert werden. |
13 | sortByKey([ascending], [numTasks]) Wenn ein Datensatz von (K, V) Paaren aufgerufen wird, in dem K Ordered implementiert, wird ein Datensatz von (K, V) Paaren zurückgegeben, der nach Schlüsseln in aufsteigender oder absteigender Reihenfolge sortiert ist, wie im Booleschen aufsteigenden Argument angegeben. |
14 | join(otherDataset, [numTasks]) Beim Aufrufen von Datensätzen vom Typ (K, V) und (K, W) wird ein Datensatz von (K, (V, W)) Paaren mit allen Elementpaaren für jeden Schlüssel zurückgegeben. Äußere Verknüpfungen werden durch leftOuterJoin, rightOuterJoin und fullOuterJoin unterstützt. |
15 | cogroup(otherDataset, [numTasks]) Beim Aufrufen von Datensätzen vom Typ (K, V) und (K, W) wird ein Datensatz von (K, (Iterable <V>, Iterable <W>)) Tupeln zurückgegeben. Diese Operation wird auch als Gruppe mit bezeichnet. |
16 | cartesian(otherDataset) Gibt beim Aufrufen von Datensätzen der Typen T und U einen Datensatz von (T, U) Paaren (alle Elementpaare) zurück. |
17 | pipe(command, [envVars]) Leiten Sie jede Partition des RDD über einen Shell-Befehl, z. B. ein Perl- oder Bash-Skript. RDD-Elemente werden in das stdin des Prozesses geschrieben, und in das stdout ausgegebene Zeilen werden als RDD von Zeichenfolgen zurückgegeben. |
18 | coalesce(numPartitions) Verringern Sie die Anzahl der Partitionen in der RDD auf numPartitions. Nützlich, um Vorgänge nach dem Filtern eines großen Datensatzes effizienter auszuführen. |
19 | repartition(numPartitions) Mischen Sie die Daten in der RDD nach dem Zufallsprinzip neu, um mehr oder weniger Partitionen zu erstellen und über diese zu verteilen. Dadurch werden immer alle Daten über das Netzwerk gemischt. |
20 | repartitionAndSortWithinPartitions(partitioner) Partitionieren Sie die RDD gemäß dem angegebenen Partitionierer neu und sortieren Sie die Datensätze innerhalb jeder resultierenden Partition nach ihren Schlüsseln. Dies ist effizienter als das Aufrufen der Neupartition und das anschließende Sortieren innerhalb jeder Partition, da dadurch die Sortierung in die Shuffle-Maschinerie verschoben werden kann. |
Aktionen
Die folgende Tabelle enthält eine Liste von Aktionen, die Werte zurückgeben.
S.No. | Aktion & Bedeutung |
---|---|
1 | reduce(func) Aggregieren Sie die Elemente des Datasets mithilfe einer Funktion func(das zwei Argumente akzeptiert und eines zurückgibt). Die Funktion sollte kommutativ und assoziativ sein, damit sie parallel korrekt berechnet werden kann. |
2 | collect() Gibt alle Elemente des Datasets als Array im Treiberprogramm zurück. Dies ist normalerweise nach einem Filter oder einer anderen Operation nützlich, die eine ausreichend kleine Teilmenge der Daten zurückgibt. |
3 | count() Gibt die Anzahl der Elemente im Dataset zurück. |
4 | first() Gibt das erste Element des Datasets zurück (ähnlich wie bei take (1)). |
5 | take(n) Gibt ein Array mit dem ersten zurück n Elemente des Datensatzes. |
6 | takeSample (withReplacement,num, [seed]) Gibt ein Array mit einer zufälligen Stichprobe von zurück num Elemente des Datensatzes, mit oder ohne Ersatz, geben optional einen Zufallszahlengenerator-Startwert vor. |
7 | takeOrdered(n, [ordering]) Gibt den ersten zurück n Elemente der RDD, die entweder ihre natürliche Reihenfolge oder einen benutzerdefinierten Komparator verwenden. |
8 | saveAsTextFile(path) Schreibt die Elemente des Datasets als Textdatei (oder als Satz von Textdateien) in ein bestimmtes Verzeichnis im lokalen Dateisystem, HDFS oder einem anderen von Hadoop unterstützten Dateisystem. Spark ruft für jedes Element toString auf, um es in eine Textzeile in der Datei zu konvertieren. |
9 | saveAsSequenceFile(path) (Java and Scala) Schreibt die Elemente des Datasets als Hadoop-Sequenzdatei in einen bestimmten Pfad im lokalen Dateisystem, HDFS oder einem anderen von Hadoop unterstützten Dateisystem. Dies ist auf RDDs von Schlüssel-Wert-Paaren verfügbar, die die beschreibbare Schnittstelle von Hadoop implementieren. In Scala ist es auch für Typen verfügbar, die implizit in Writable konvertierbar sind (Spark enthält Konvertierungen für Basistypen wie Int, Double, String usw.). |
10 | saveAsObjectFile(path) (Java and Scala) Schreibt die Elemente des Datasets mithilfe der Java-Serialisierung in einem einfachen Format, das dann mit SparkContext.objectFile () geladen werden kann. |
11 | countByKey() Nur für RDDs vom Typ (K, V) verfügbar. Gibt eine Hashmap von (K, Int) Paaren mit der Anzahl der einzelnen Schlüssel zurück. |
12 | foreach(func) Führt eine Funktion aus funcauf jedem Element des Datensatzes. Dies geschieht normalerweise bei Nebenwirkungen wie dem Aktualisieren eines Akkus oder der Interaktion mit externen Speichersystemen. Note- Das Ändern anderer Variablen als Akkumulatoren außerhalb von foreach () kann zu undefiniertem Verhalten führen. Weitere Informationen finden Sie unter Grundlegendes zu Schließungen. |
Programmieren mit RDD
Lassen Sie uns anhand eines Beispiels die Implementierungen weniger RDD-Transformationen und -Aktionen in der RDD-Programmierung sehen.
Beispiel
Betrachten Sie ein Beispiel für die Wortanzahl - Es zählt jedes Wort, das in einem Dokument erscheint. Betrachten Sie den folgenden Text als Eingabe und wird als gespeichertinput.txt Datei in einem Home-Verzeichnis.
input.txt - Eingabedatei.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Befolgen Sie die unten angegebenen Schritte, um das angegebene Beispiel auszuführen.
Öffnen Sie die Spark-Shell
Der folgende Befehl wird zum Öffnen der Funkenschale verwendet. Im Allgemeinen wird der Funke mit Scala aufgebaut. Daher wird ein Spark-Programm in einer Scala-Umgebung ausgeführt.
$ spark-shell
Wenn die Spark-Shell erfolgreich geöffnet wird, finden Sie die folgende Ausgabe. Wenn Sie sich die letzte Zeile der Ausgabe ansehen, bedeutet "Spark-Kontext als sc verfügbar", dass der Spark-Container automatisch ein Spark-Kontextobjekt mit dem Namen erstelltsc. Vor dem Start des ersten Programmschritts sollte das SparkContext-Objekt erstellt werden.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Erstellen Sie eine RDD
Zuerst müssen wir die Eingabedatei mit der Spark-Scala-API lesen und eine RDD erstellen.
Der folgende Befehl wird zum Lesen einer Datei von einem bestimmten Speicherort verwendet. Hier wird eine neue RDD mit dem Namen der Eingabedatei erstellt. Der String, der in der textFile-Methode ("") als Argument angegeben wird, ist der absolute Pfad für den Namen der Eingabedatei. Wenn jedoch nur der Dateiname angegeben wird, bedeutet dies, dass sich die Eingabedatei am aktuellen Speicherort befindet.
scala> val inputfile = sc.textFile("input.txt")
Führen Sie die Wortzählungstransformation aus
Unser Ziel ist es, die Wörter in einer Datei zu zählen. Erstellen Sie eine flache Karte, um jede Zeile in Wörter aufzuteilen (flatMap(line ⇒ line.split(“ ”)).
Lesen Sie als Nächstes jedes Wort als Schlüssel mit einem Wert ‘1’ (<Schlüssel, Wert> = <Wort, 1>) mit Kartenfunktion (map(word ⇒ (word, 1)).
Reduzieren Sie diese Schlüssel schließlich, indem Sie Werte ähnlicher Schlüssel hinzufügen (reduceByKey(_+_)).
Der folgende Befehl wird zum Ausführen der Wortzähllogik verwendet. Nachdem Sie dies ausgeführt haben, werden Sie keine Ausgabe finden, da dies keine Aktion, sondern eine Transformation ist. auf ein neues RDD zeigen oder Funken mitteilen, was mit den angegebenen Daten zu tun ist)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
Aktuelle RDD
Wenn Sie während der Arbeit mit dem RDD Informationen zum aktuellen RDD erhalten möchten, verwenden Sie den folgenden Befehl. Es zeigt Ihnen die Beschreibung der aktuellen RDD und ihrer Abhängigkeiten für das Debuggen.
scala> counts.toDebugString
Zwischenspeichern der Transformationen
Sie können eine RDD, die beibehalten werden soll, mit den Methoden persist () oder cache () markieren. Wenn es zum ersten Mal in einer Aktion berechnet wird, wird es auf den Knoten gespeichert. Verwenden Sie den folgenden Befehl, um die Zwischentransformationen im Speicher zu speichern.
scala> counts.cache()
Aktion anwenden
Das Anwenden einer Aktion, wie das Speichern aller Transformationen, führt zu einer Textdatei. Das String-Argument für die Methode saveAsTextFile ("") ist der absolute Pfad des Ausgabeordners. Versuchen Sie den folgenden Befehl, um die Ausgabe in einer Textdatei zu speichern. Im folgenden Beispiel befindet sich der Ordner "Ausgabe" am aktuellen Speicherort.
scala> counts.saveAsTextFile("output")
Überprüfen der Ausgabe
Öffnen Sie ein anderes Terminal, um zum Ausgangsverzeichnis zu gelangen (wo der Funke im anderen Terminal ausgeführt wird). Verwenden Sie die folgenden Befehle, um das Ausgabeverzeichnis zu überprüfen.
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
Der folgende Befehl wird verwendet, um die Ausgabe von anzuzeigen Part-00000 Dateien.
[hadoop@localhost output]$ cat part-00000
Ausgabe
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
Der folgende Befehl wird verwendet, um die Ausgabe von anzuzeigen Part-00001 Dateien.
[hadoop@localhost output]$ cat part-00001
Ausgabe
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
UN Behalten Sie die Lagerung bei
Wenn Sie vor dem UN-Persistent den für diese Anwendung verwendeten Speicherplatz anzeigen möchten, verwenden Sie die folgende URL in Ihrem Browser.
http://localhost:4040
Der folgende Bildschirm zeigt den für die Anwendung verwendeten Speicherplatz, der auf der Spark-Shell ausgeführt wird.
Wenn Sie den Speicherplatz eines bestimmten RDD UN-persistieren möchten, verwenden Sie den folgenden Befehl.
Scala> counts.unpersist()
Sie sehen die Ausgabe wie folgt:
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
Verwenden Sie die folgende URL, um den Speicherplatz im Browser zu überprüfen.
http://localhost:4040/
Sie sehen den folgenden Bildschirm. Es zeigt den für die Anwendung verwendeten Speicherplatz an, der auf der Spark-Shell ausgeführt wird.