Apache Spark - Guida rapida

Le industrie utilizzano ampiamente Hadoop per analizzare i propri set di dati. Il motivo è che il framework Hadoop si basa su un semplice modello di programmazione (MapReduce) e consente una soluzione di elaborazione scalabile, flessibile, a tolleranza di errore e conveniente. In questo caso, la preoccupazione principale è mantenere la velocità nell'elaborazione di grandi set di dati in termini di tempo di attesa tra le query e tempo di attesa per eseguire il programma.

Spark è stato introdotto da Apache Software Foundation per accelerare il processo del software di calcolo computazionale Hadoop.

Contro una credenza comune, Spark is not a modified version of Hadoope in realtà non dipende da Hadoop perché dispone di una propria gestione del cluster. Hadoop è solo uno dei modi per implementare Spark.

Spark utilizza Hadoop in due modi: uno è storage e il secondo è processing. Poiché Spark ha il proprio calcolo di gestione del cluster, utilizza Hadoop solo a scopo di archiviazione.

Apache Spark

Apache Spark è una tecnologia di elaborazione cluster velocissima, progettata per un calcolo veloce. Si basa su Hadoop MapReduce ed estende il modello MapReduce per utilizzarlo in modo efficiente per più tipi di calcoli, che includono query interattive e elaborazione del flusso. La caratteristica principale di Spark è la suain-memory cluster computing che aumenta la velocità di elaborazione di un'applicazione.

Spark è progettato per coprire un'ampia gamma di carichi di lavoro come applicazioni batch, algoritmi iterativi, query interattive e streaming. Oltre a supportare tutti questi carichi di lavoro in un rispettivo sistema, riduce l'onere di gestione del mantenimento di strumenti separati.

Evoluzione di Apache Spark

Spark è uno dei sottoprogetti di Hadoop sviluppato nel 2009 nell'AMPLab di UC Berkeley da Matei Zaharia. Era Open Sourced nel 2010 con una licenza BSD. È stato donato alla Fondazione software Apache nel 2013 e ora Apache Spark è diventato un progetto Apache di primo livello da febbraio 2014.

Caratteristiche di Apache Spark

Apache Spark ha le seguenti caratteristiche.

  • Speed- Spark aiuta a eseguire un'applicazione nel cluster Hadoop, fino a 100 volte più veloce in memoria e 10 volte più veloce durante l'esecuzione su disco. Ciò è possibile riducendo il numero di operazioni di lettura / scrittura su disco. Memorizza i dati dell'elaborazione intermedia in memoria.

  • Supports multiple languages- Spark fornisce API integrate in Java, Scala o Python. Pertanto, puoi scrivere applicazioni in diverse lingue. Spark presenta 80 operatori di alto livello per le query interattive.

  • Advanced Analytics- Spark non supporta solo "Mappa" e "Riduci". Supporta anche query SQL, dati in streaming, machine learning (ML) e algoritmi di grafici.

Spark costruito su Hadoop

Il diagramma seguente mostra tre modi per creare Spark con i componenti Hadoop.

Esistono tre modi per la distribuzione di Spark, come spiegato di seguito.

  • Standalone- La distribuzione autonoma di Spark significa che Spark occupa il posto sopra HDFS (Hadoop Distributed File System) e lo spazio viene allocato per HDFS, in modo esplicito. Qui, Spark e MapReduce verranno eseguiti fianco a fianco per coprire tutti i processi Spark sul cluster.

  • Hadoop Yarn- La distribuzione di Hadoop Yarn significa, semplicemente, spark funziona su Yarn senza alcuna preinstallazione o accesso root richiesto. Aiuta a integrare Spark nell'ecosistema Hadoop o nello stack Hadoop. Consente ad altri componenti di funzionare in cima allo stack.

  • Spark in MapReduce (SIMR)- Spark in MapReduce viene utilizzato per avviare il lavoro Spark oltre alla distribuzione autonoma. Con SIMR, l'utente può avviare Spark e utilizza la sua shell senza alcun accesso amministrativo.

Componenti di Spark

La figura seguente mostra i diversi componenti di Spark.

Apache Spark Core

Spark Core è il motore di esecuzione generale sottostante per la piattaforma Spark su cui si basano tutte le altre funzionalità. Fornisce elaborazione in memoria e set di dati di riferimento in sistemi di archiviazione esterni.

Spark SQL

Spark SQL è un componente in cima a Spark Core che introduce una nuova astrazione dei dati chiamata SchemaRDD, che fornisce supporto per dati strutturati e semi-strutturati.

Spark Streaming

Spark Streaming sfrutta la capacità di pianificazione rapida di Spark Core per eseguire analisi di streaming. Acquisisce i dati in mini-batch ed esegue trasformazioni RDD (Resilient Distributed Datasets) su questi mini-batch di dati.

MLlib (libreria di machine learning)

MLlib è un framework di machine learning distribuito sopra Spark a causa dell'architettura Spark basata sulla memoria distribuita. Secondo i benchmark, è fatto dagli sviluppatori MLlib contro le implementazioni ALS (Alternating Least Squares). Spark MLlib è nove volte più veloce della versione basata su disco Hadoop diApache Mahout (prima che Mahout acquisisse un'interfaccia Spark).

GraphX

GraphX ​​è un framework di elaborazione di grafici distribuito su Spark. Fornisce un'API per esprimere il calcolo del grafico in grado di modellare i grafici definiti dall'utente utilizzando l'API di astrazione Pregel. Fornisce inoltre un runtime ottimizzato per questa astrazione.

Set di dati distribuiti resilienti

Resilient Distributed Datasets (RDD) è una struttura dati fondamentale di Spark. È una raccolta distribuita immutabile di oggetti. Ogni set di dati in RDD è suddiviso in partizioni logiche, che possono essere calcolate su diversi nodi del cluster. Gli RDD possono contenere qualsiasi tipo di oggetti Python, Java o Scala, comprese le classi definite dall'utente.

Formalmente, un RDD è una raccolta di record partizionata di sola lettura. Gli RDD possono essere creati tramite operazioni deterministiche sui dati su una memoria stabile o su altri RDD. RDD è una raccolta di elementi a tolleranza d'errore su cui è possibile operare in parallelo.

Esistono due modi per creare RDD: parallelizing una raccolta esistente nel programma del driver o referencing a dataset in un sistema di archiviazione esterno, come un file system condiviso, HDFS, HBase o qualsiasi origine dati che offra un formato di input Hadoop.

Spark fa uso del concetto di RDD per ottenere operazioni MapReduce più veloci ed efficienti. Discutiamo prima di come avvengono le operazioni di MapReduce e perché non sono così efficienti.

La condivisione dei dati è lenta in MapReduce

MapReduce è ampiamente adottato per l'elaborazione e la generazione di set di dati di grandi dimensioni con un algoritmo distribuito parallelo su un cluster. Consente agli utenti di scrivere calcoli paralleli, utilizzando una serie di operatori di alto livello, senza doversi preoccupare della distribuzione del lavoro e della tolleranza ai guasti.

Sfortunatamente, nella maggior parte dei framework attuali, l'unico modo per riutilizzare i dati tra i calcoli (Ex - tra due lavori MapReduce) è scriverli su un sistema di archiviazione stabile esterno (Ex - HDFS). Sebbene questo framework fornisca numerose astrazioni per accedere alle risorse di calcolo di un cluster, gli utenti vogliono ancora di più.

Tutti e due Iterative e Interactivele applicazioni richiedono una condivisione dei dati più rapida tra lavori paralleli. La condivisione dei dati è lenta in MapReduce a causa direplication, serialization, e disk IO. Per quanto riguarda il sistema di archiviazione, la maggior parte delle applicazioni Hadoop trascorrono più del 90% del tempo in operazioni di lettura-scrittura HDFS.

Operazioni iterative su MapReduce

Riutilizza i risultati intermedi in più calcoli in applicazioni in più fasi. La figura seguente spiega come funziona il framework corrente, mentre si eseguono le operazioni iterative su MapReduce. Ciò comporta notevoli sovraccarichi a causa della replica dei dati, dell'I / O del disco e della serializzazione, che rallentano il sistema.

Operazioni interattive su MapReduce

L'utente esegue query ad-hoc sullo stesso sottoinsieme di dati. Ogni query eseguirà l'I / O del disco sulla memoria stabile, che può dominare il tempo di esecuzione dell'applicazione.

La figura seguente spiega come funziona il framework corrente durante l'esecuzione delle query interattive su MapReduce.

Condivisione dei dati utilizzando Spark RDD

La condivisione dei dati è lenta in MapReduce a causa di replication, serialization, e disk IO. Nella maggior parte delle applicazioni Hadoop, trascorrono più del 90% del tempo in operazioni di lettura-scrittura HDFS.

Riconoscendo questo problema, i ricercatori hanno sviluppato un framework specializzato chiamato Apache Spark. L'idea chiave della scintilla èResiliente Ddistribuito Datasets (RDD); supporta il calcolo dell'elaborazione in memoria. Ciò significa che memorizza lo stato della memoria come un oggetto tra i lavori e l'oggetto è condivisibile tra questi lavori. La condivisione dei dati in memoria è da 10 a 100 volte più veloce della rete e del disco.

Proviamo ora a scoprire come avvengono le operazioni iterative e interattive in Spark RDD.

Operazioni iterative su Spark RDD

L'illustrazione riportata di seguito mostra le operazioni iterative su Spark RDD. Memorizzerà i risultati intermedi in una memoria distribuita invece che nell'archiviazione stabile (disco) e renderà il sistema più veloce.

Note - Se la memoria distribuita (RAM) è sufficiente per memorizzare risultati intermedi (stato del lavoro), memorizzerà tali risultati sul disco.

Operazioni interattive su Spark RDD

Questa illustrazione mostra le operazioni interattive su Spark RDD. Se diverse query vengono eseguite ripetutamente sullo stesso set di dati, questi particolari dati possono essere conservati in memoria per tempi di esecuzione migliori.

Per impostazione predefinita, ogni RDD trasformato può essere ricalcolato ogni volta che si esegue un'azione su di esso. Tuttavia, potresti anchepersistun RDD in memoria, nel qual caso Spark manterrà gli elementi nel cluster per un accesso molto più rapido, la prossima volta che lo interrogherai. È inoltre disponibile il supporto per la persistenza di RDD su disco o per la replica su più nodi.

Spark è il sottoprogetto di Hadoop. Pertanto, è meglio installare Spark in un sistema basato su Linux. I seguenti passaggi mostrano come installare Apache Spark.

Passaggio 1: verifica dell'installazione di Java

L'installazione di Java è una delle cose obbligatorie nell'installazione di Spark. Prova il seguente comando per verificare la versione JAVA.

$java -version

Se Java è già installato sul tuo sistema, puoi vedere la seguente risposta:

java version "1.7.0_71" 
Java(TM) SE Runtime Environment (build 1.7.0_71-b13) 
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

Se non hai Java installato sul tuo sistema, installa Java prima di procedere al passaggio successivo.

Passaggio 2: verifica dell'installazione di Scala

Dovresti usare il linguaggio Scala per implementare Spark. Quindi verifichiamo l'installazione di Scala usando il seguente comando.

$scala -version

Se Scala è già installato sul tuo sistema, vedrai la seguente risposta:

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Nel caso in cui non hai Scala installato sul tuo sistema, procedi al passaggio successivo per l'installazione di Scala.

Passaggio 3: download di Scala

Scarica l'ultima versione di Scala visitando il seguente link Download Scala . Per questo tutorial, stiamo usando la versione scala-2.11.6. Dopo il download, troverai il file tar di Scala nella cartella di download.

Passaggio 4: installazione di Scala

Seguire i passaggi indicati di seguito per l'installazione di Scala.

Estrai il file tar di Scala

Digita il seguente comando per estrarre il file tar Scala.

$ tar xvf scala-2.11.6.tgz

Spostare i file del software Scala

Utilizzare i seguenti comandi per spostare i file del software Scala nella rispettiva directory (/usr/local/scala).

$ su – 
Password: 
# cd /home/Hadoop/Downloads/ 
# mv scala-2.11.6 /usr/local/scala 
# exit

Imposta PATH per Scala

Utilizzare il seguente comando per impostare PATH per Scala.

$ export PATH = $PATH:/usr/local/scala/bin

Verifica dell'installazione di Scala

Dopo l'installazione, è meglio verificarlo. Utilizzare il seguente comando per verificare l'installazione di Scala.

$scala -version

Se Scala è già installato sul tuo sistema, vedrai la seguente risposta:

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Passaggio 5: download di Apache Spark

Scarica l'ultima versione di Spark visitando il seguente link Download Spark . Per questo tutorial, stiamo usandospark-1.3.1-bin-hadoop2.6versione. Dopo averlo scaricato, troverai il file tar Spark nella cartella di download.

Passaggio 6: installazione di Spark

Segui i passaggi indicati di seguito per l'installazione di Spark.

Estrazione del catrame di scintilla

Il seguente comando per estrarre il file spark tar.

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

Spostamento dei file del software Spark

I seguenti comandi per spostare i file del software Spark nella rispettiva directory (/usr/local/spark).

$ su – 
Password:  

# cd /home/Hadoop/Downloads/ 
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark 
# exit

Configurazione dell'ambiente per Spark

Aggiungi la riga seguente a ~/.bashrcfile. Significa aggiungere la posizione, in cui si trova il file del software spark alla variabile PATH.

export PATH=$PATH:/usr/local/spark/bin

Utilizzare il seguente comando per reperire il file ~ / .bashrc.

$ source ~/.bashrc

Passaggio 7: verifica dell'installazione di Spark

Scrivi il seguente comando per aprire la shell Spark.

$spark-shell

Se spark è installato correttamente, troverai il seguente output.

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc  
scala>

Spark Core è la base dell'intero progetto. Fornisce l'invio distribuito delle attività, la pianificazione e le funzionalità di I / O di base. Spark utilizza una struttura di dati fondamentale specializzata nota come RDD (Resilient Distributed Datasets) che è una raccolta logica di dati partizionati tra le macchine. Gli RDD possono essere creati in due modi; uno è facendo riferimento a set di dati in sistemi di archiviazione esterni e il secondo è applicando trasformazioni (ad esempio mappa, filtro, riduttore, join) su RDD esistenti.

L'astrazione RDD viene esposta tramite un'API integrata nel linguaggio. Ciò semplifica la complessità della programmazione perché il modo in cui le applicazioni manipolano gli RDD è simile alla manipolazione delle raccolte locali di dati.

Spark Shell

Spark fornisce una shell interattiva, un potente strumento per analizzare i dati in modo interattivo. È disponibile in linguaggio Scala o Python. L'astrazione primaria di Spark è una raccolta distribuita di elementi denominata Resilient Distributed Dataset (RDD). Gli RDD possono essere creati da Hadoop Input Formats (come i file HDFS) o trasformando altri RDD.

Apri Spark Shell

Il comando seguente viene utilizzato per aprire la shell Spark.

$ spark-shell

Crea un semplice RDD

Creiamo un semplice RDD dal file di testo. Usa il seguente comando per creare un semplice RDD.

scala> val inputfile = sc.textFile(“input.txt”)

L'output per il comando precedente è

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

L'API Spark RDD ne introduce pochi Transformations e pochi Actions per manipolare RDD.

Trasformazioni RDD

Le trasformazioni RDD restituiscono il puntatore a un nuovo RDD e consentono di creare dipendenze tra RDD. Ogni RDD nella catena delle dipendenze (String of Dependencies) ha una funzione per il calcolo dei suoi dati e ha un puntatore (dipendenza) al suo RDD genitore.

Spark è pigro, quindi non verrà eseguito nulla a meno che non chiami una trasformazione o un'azione che attiverà la creazione e l'esecuzione del lavoro. Guarda il seguente frammento dell'esempio di conteggio delle parole.

Pertanto, la trasformazione RDD non è un insieme di dati ma è un passaggio in un programma (potrebbe essere l'unico passaggio) che dice a Spark come ottenere i dati e cosa farne.

S.No Trasformazioni e significato
1

map(func)

Restituisce un nuovo set di dati distribuito, formato passando ogni elemento della sorgente attraverso una funzione func.

2

filter(func)

Restituisce un nuovo dataset formato selezionando quegli elementi della sorgente su cui func restituisce true.

3

flatMap(func)

Simile a map, ma ogni elemento di input può essere mappato a 0 o più elementi di output (quindi func dovrebbe restituire un Seq piuttosto che un singolo elemento).

4

mapPartitions(func)

Simile a map, ma viene eseguito separatamente su ogni partizione (blocco) dell'RDD, quindi func deve essere di tipo Iterator <T> ⇒ Iterator <U> quando viene eseguito su un RDD di tipo T.

5

mapPartitionsWithIndex(func)

Simile alla mappa delle partizioni, ma fornisce anche func con un valore intero che rappresenta l'indice della partizione, quindi func deve essere di tipo (Int, Iterator <T>) ⇒ Iterator <U> quando si esegue su un RDD di tipo T.

6

sample(withReplacement, fraction, seed)

Campione a fraction dei dati, con o senza sostituzione, utilizzando un determinato seme del generatore di numeri casuali.

7

union(otherDataset)

Restituisce un nuovo set di dati che contiene l'unione degli elementi nel set di dati di origine e l'argomento.

8

intersection(otherDataset)

Restituisce un nuovo RDD che contiene l'intersezione di elementi nel set di dati di origine e l'argomento.

9

distinct([numTasks])

Restituisce un nuovo set di dati che contiene gli elementi distinti del set di dati di origine.

10

groupByKey([numTasks])

Quando viene chiamato su un set di dati di coppie (K, V), restituisce un set di dati di coppie (K, Iterable <V>).

Note - Se stai raggruppando per eseguire un'aggregazione (come una somma o una media) su ciascuna chiave, l'utilizzo di reduceByKey o aggregateByKey produrrà prestazioni molto migliori.

11

reduceByKey(func, [numTasks])

Quando viene chiamato su un set di dati di (K, V) coppie, restituisce un set di dati di (K, V) coppie in cui i valori per ogni chiave vengono aggregati utilizzando la data ridurre la funzione func , che deve essere di tipo (V, V) ⇒ V Come in groupByKey, il numero di attività di riduzione è configurabile tramite un secondo argomento opzionale.

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

Quando viene richiamato su un set di dati di coppie (K, V), restituisce un set di dati di coppie (K, U) in cui i valori di ciascuna chiave vengono aggregati utilizzando le funzioni di combinazione fornite e un valore "zero" neutro. Consente un tipo di valore aggregato diverso dal tipo di valore di input, evitando allocazioni non necessarie. Come in groupByKey, il numero di attività di riduzione è configurabile tramite un secondo argomento opzionale.

13

sortByKey([ascending], [numTasks])

Quando viene richiamato su un set di dati di coppie (K, V) in cui K implementa Ordered, restituisce un set di dati di coppie (K, V) ordinate per chiavi in ​​ordine crescente o decrescente, come specificato nell'argomento booleano ascendente.

14

join(otherDataset, [numTasks])

Quando viene richiamato su set di dati di tipo (K, V) e (K, W), restituisce un set di dati di coppie (K, (V, W)) con tutte le coppie di elementi per ciascuna chiave. I join esterni sono supportati tramite leftOuterJoin, rightOuterJoin e fullOuterJoin.

15

cogroup(otherDataset, [numTasks])

Quando viene chiamato su set di dati di tipo (K, V) e (K, W), restituisce un set di dati di tuple (K, (Iterable <V>, Iterable <W>)). Questa operazione è anche chiamata gruppo con.

16

cartesian(otherDataset)

Quando viene richiamato su set di dati di tipo T e U, restituisce un set di dati di coppie (T, U) (tutte le coppie di elementi).

17

pipe(command, [envVars])

Pipe ogni partizione dell'RDD attraverso un comando shell, ad esempio uno script Perl o bash. Gli elementi RDD vengono scritti nello stdin del processo e le righe in uscita nel suo stdout vengono restituite come un RDD di stringhe.

18

coalesce(numPartitions)

Riduci il numero di partizioni nell'RDD a numPartitions. Utile per eseguire le operazioni in modo più efficiente dopo aver filtrato un set di dati di grandi dimensioni.

19

repartition(numPartitions)

Rimescola i dati nell'RDD in modo casuale per creare più o meno partizioni e bilanciarle tra loro. Questo rimescola sempre tutti i dati sulla rete.

20

repartitionAndSortWithinPartitions(partitioner)

Ripartiziona l'RDD in base al partizionatore specificato e, all'interno di ciascuna partizione risultante, ordina i record in base alle loro chiavi. Questo è più efficiente della chiamata alla ripartizione e quindi dell'ordinamento all'interno di ogni partizione perché può spingere l'ordinamento verso il basso nel meccanismo di riproduzione casuale.

Azioni

S.No Azione e significato
1

reduce(func)

Aggrega gli elementi del set di dati utilizzando una funzione func(che accetta due argomenti e ne restituisce uno). La funzione dovrebbe essere commutativa e associativa in modo che possa essere calcolata correttamente in parallelo.

2

collect()

Restituisce tutti gli elementi del set di dati come array nel programma del driver. Ciò è solitamente utile dopo un filtro o un'altra operazione che restituisce un sottoinsieme di dati sufficientemente piccolo.

3

count()

Restituisce il numero di elementi nel set di dati.

4

first()

Restituisce il primo elemento del set di dati (simile a take (1)).

5

take(n)

Restituisce un array con il primo n elementi del set di dati.

6

takeSample (withReplacement,num, [seed])

Restituisce un array con un campione casuale di num elementi del set di dati, con o senza sostituzione, eventualmente pre-specificando un seme del generatore di numeri casuali.

7

takeOrdered(n, [ordering])

Restituisce il primo n elementi dell'RDD utilizzando il loro ordine naturale o un comparatore personalizzato.

8

saveAsTextFile(path)

Scrive gli elementi del set di dati come file di testo (o insieme di file di testo) in una determinata directory nel file system locale, HDFS o qualsiasi altro file system supportato da Hadoop. Spark chiama toString su ogni elemento per convertirlo in una riga di testo nel file.

9

saveAsSequenceFile(path) (Java and Scala)

Scrive gli elementi del set di dati come Hadoop SequenceFile in un determinato percorso nel file system locale, HDFS o qualsiasi altro file system supportato da Hadoop. Questo è disponibile su RDD di coppie chiave-valore che implementano l'interfaccia scrivibile di Hadoop. In Scala, è anche disponibile sui tipi convertibili in modo implicito in Writable (Spark include conversioni per tipi di base come Int, Double, String, ecc.).

10

saveAsObjectFile(path) (Java and Scala)

Scrive gli elementi del set di dati in un formato semplice utilizzando la serializzazione Java, che può quindi essere caricato utilizzando SparkContext.objectFile ().

11

countByKey()

Disponibile solo su RDD di tipo (K, V). Restituisce una hashmap di (K, Int) coppie con il conteggio di ogni chiave.

12

foreach(func)

Esegue una funzione funcsu ogni elemento del set di dati. Questo di solito viene fatto per effetti collaterali come l'aggiornamento di un accumulatore o l'interazione con sistemi di archiviazione esterni.

Note- la modifica di variabili diverse dagli accumulatori al di fuori di foreach () può comportare un comportamento indefinito. Vedere Comprensione delle chiusure per maggiori dettagli.

Programmazione con RDD

Vediamo le implementazioni di poche trasformazioni RDD e azioni nella programmazione RDD con l'aiuto di un esempio.

Esempio

Considera un esempio di conteggio delle parole: conta ogni parola che appare in un documento. Considera il testo seguente come input e viene salvato come fileinput.txt file in una directory home.

input.txt - file di input.

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

Seguire la procedura indicata di seguito per eseguire l'esempio fornito.

Apri Spark-Shell

Il comando seguente viene utilizzato per aprire Spark Shell. Generalmente, spark viene creato utilizzando Scala. Pertanto, un programma Spark viene eseguito in ambiente Scala.

$ spark-shell

Se la shell di Spark si apre correttamente, troverai il seguente output. Guarda l'ultima riga dell'output "Contesto Spark disponibile come sc" significa che il contenitore Spark viene creato automaticamente oggetto contesto Spark con il nomesc. Prima di iniziare il primo passaggio di un programma, è necessario creare l'oggetto SparkContext.

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

Crea un RDD

Innanzitutto, dobbiamo leggere il file di input utilizzando l'API Spark-Scala e creare un RDD.

Il seguente comando viene utilizzato per leggere un file da una determinata posizione. Qui, viene creato un nuovo RDD con il nome di inputfile. La stringa fornita come argomento nel metodo textFile ("") è il percorso assoluto per il nome del file di input. Tuttavia, se viene fornito solo il nome del file, significa che il file di input si trova nella posizione corrente.

scala> val inputfile = sc.textFile("input.txt")

Eseguire la trasformazione del conteggio delle parole

Il nostro scopo è contare le parole in un file. Crea una mappa piatta per dividere ogni riga in parole (flatMap(line ⇒ line.split(“ ”)).

Quindi, leggi ogni parola come una chiave con un valore ‘1’ (<chiave, valore> = <parola, 1>) utilizzando la funzione mappa (map(word ⇒ (word, 1)).

Infine, riduci quelle chiavi aggiungendo valori di chiavi simili (reduceByKey(_+_)).

Il comando seguente viene utilizzato per eseguire la logica del conteggio delle parole. Dopo aver eseguito ciò, non troverai alcun output perché questa non è un'azione, questa è una trasformazione; indicare un nuovo RDD o dire a Spark cosa fare con i dati forniti)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

RDD corrente

Mentre si lavora con l'RDD, se si desidera conoscere l'RDD corrente, utilizzare il seguente comando. Ti mostrerà la descrizione dell'RDD corrente e delle sue dipendenze per il debug.

scala> counts.toDebugString

Memorizzazione nella cache delle trasformazioni

È possibile contrassegnare un RDD come persistente utilizzando i metodi persist () o cache () su di esso. La prima volta che viene calcolato in un'azione, verrà mantenuto in memoria sui nodi. Utilizzare il seguente comando per archiviare le trasformazioni intermedie in memoria.

scala> counts.cache()

Applicare l'azione

L'applicazione di un'azione, come memorizzare tutte le trasformazioni, risulta in un file di testo. L'argomento String per il metodo saveAsTextFile ("") è il percorso assoluto della cartella di output. Prova il seguente comando per salvare l'output in un file di testo. Nell'esempio seguente, la cartella "output" si trova nella posizione corrente.

scala> counts.saveAsTextFile("output")

Controllo dell'output

Apri un altro terminale per andare alla directory home (dove spark viene eseguito nell'altro terminale). Utilizzare i seguenti comandi per controllare la directory di output.

[hadoop@localhost ~]$ cd output/ [hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

Il comando seguente viene utilizzato per visualizzare l'output da Part-00000 File.

[hadoop@localhost output]$ cat part-00000

Produzione

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Il comando seguente viene utilizzato per visualizzare l'output da Part-00001 File.

[hadoop@localhost output]$ cat part-00001

Produzione

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Le Nazioni Unite persistono nell'archiviazione

Prima della persistenza UN, se desideri visualizzare lo spazio di archiviazione utilizzato per questa applicazione, utilizza il seguente URL nel tuo browser.

http://localhost:4040

Verrà visualizzata la seguente schermata, che mostra lo spazio di archiviazione utilizzato per l'applicazione, in esecuzione sulla shell Spark.

Se si desidera annullare la persistenza dello spazio di archiviazione di un determinato RDD, utilizzare il seguente comando.

Scala> counts.unpersist()

Vedrai l'output come segue:

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

Per verificare lo spazio di archiviazione nel browser, utilizzare il seguente URL.

http://localhost:4040/

Vedrai la seguente schermata. Mostra lo spazio di archiviazione utilizzato per l'applicazione, in esecuzione sulla shell Spark.

L'applicazione Spark, utilizzando spark-submit, è un comando della shell usato per distribuire l'applicazione Spark in un cluster. Utilizza tutti i rispettivi gestori di cluster tramite un'interfaccia uniforme. Pertanto, non è necessario configurare l'applicazione per ciascuna di esse.

Esempio

Prendiamo lo stesso esempio di conteggio delle parole, che abbiamo usato prima, usando i comandi della shell. Qui, consideriamo lo stesso esempio di un'applicazione Spark.

Input di esempio

Il testo seguente è i dati di input e il file denominato è in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Guarda il seguente programma -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Salva il programma sopra in un file denominato SparkWordCount.scala e posizionarlo in una directory definita dall'utente denominata spark-application.

Note - Durante la trasformazione di inputRDD in countRDD, stiamo usando flatMap () per tokenizzare le righe (da file di testo) in parole, il metodo map () per contare la frequenza delle parole e il metodo reduceByKey () per contare ogni ripetizione di parole.

Utilizzare i seguenti passaggi per inviare questa domanda. Esegui tutti i passaggi inspark-application directory tramite il terminale.

Passaggio 1: scarica Spark Ja

Spark core jar è richiesto per la compilazione, quindi scarica spark-core_2.10-1.3.0.jar dal seguente link Spark core jar e sposta il file jar dalla directory di download aspark-application directory.

Passaggio 2: compilare il programma

Compilare il programma precedente utilizzando il comando fornito di seguito. Questo comando dovrebbe essere eseguito dalla directory spark-application. Qui,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar è un jar di supporto Hadoop tratto dalla libreria Spark.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Passaggio 3: creare un JAR

Crea un file jar dell'applicazione Spark utilizzando il seguente comando. Qui,wordcount è il nome del file per il file jar.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Passaggio 4: inviare l'applicazione Spark

Invia l'applicazione Spark utilizzando il seguente comando:

spark-submit --class SparkWordCount --master local wordcount.jar

Se viene eseguito correttamente, troverai l'output fornito di seguito. IlOKlasciare che il seguente output sia per l'identificazione dell'utente e questa è l'ultima riga del programma. Se leggi attentamente il seguente output, troverai cose diverse, come:

  • avviato con successo il servizio "sparkDriver" sulla porta 42954
  • MemoryStore è stato avviato con una capacità di 267,3 MB
  • SparkUI avviato su http://192.168.1.217:4040
  • File JAR aggiunto: /home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile at SparkPi.scala: 11) terminato in 0,566 s
  • Interfaccia utente Web di Spark interrotta su http://192.168.1.217:4040
  • MemoryStore cancellato
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Passaggio 5: controllo dell'output

Dopo aver eseguito con successo il programma, troverai la directory denominata outfile nella directory spark-application.

I seguenti comandi vengono utilizzati per aprire e controllare l'elenco dei file nella directory outfile.

$ cd outfile $ ls 
Part-00000 part-00001 _SUCCESS

I comandi per il check-in dell'output part-00000 file sono -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

I comandi per il controllo dell'output nel file part-00001 sono:

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Passare alla sezione seguente per saperne di più sul comando "spark-submit".

Sintassi Spark-submit

spark-submit [options] <app jar | python file> [app arguments]

Opzioni

S.No Opzione Descrizione
1 --maestro spark: // host: port, mesos: // host: port, filato o local.
2 - modalità di distribuzione Indica se avviare il programma driver localmente ("client") o su una delle macchine worker all'interno del cluster ("cluster") (impostazione predefinita: client).
3 --classe La classe principale della tua applicazione (per app Java / Scala).
4 --nome Un nome della tua applicazione.
5 --barattoli Elenco separato da virgole di file jar locali da includere nei percorsi classe del driver e dell'esecutore.
6 --pacchi Elenco separato da virgole di coordinate Maven di jar da includere nei percorsi classi del driver e dell'esecutore.
7 - repository Elenco separato da virgole di repository remoti aggiuntivi per cercare le coordinate Maven fornite con --packages.
8 --py-files Elenco separato da virgole di file .zip, .egg o .py da inserire nel PERCORSO PYTHON per le app Python.
9 --File Elenco di file separato da virgole da inserire nella directory di lavoro di ciascun esecutore.
10 --conf (prop = val) Proprietà di configurazione Spark arbitraria.
11 --properties-file Percorso di un file da cui caricare proprietà aggiuntive. Se non specificato, cercherà conf / spark-defaults.
12 --driver-memory Memoria per il driver (es. 1000M, 2G) (impostazione predefinita: 512M).
13 --driver-java-opzioni Opzioni Java aggiuntive da passare al driver.
14 --driver-library-path Voci del percorso della libreria extra da passare al driver.
15 --driver-class-path

Voci del percorso di classe extra da passare al driver.

Nota che i jar aggiunti con --jars vengono automaticamente inclusi nel classpath.

16 --executor-memory Memoria per esecutore (es. 1000M, 2G) (impostazione predefinita: 1G).
17 --proxy-user Utente da impersonare durante l'invio della domanda.
18 --help, -h Mostra questo messaggio di aiuto ed esci.
19 --verbose, -v Stampa l'output di debug aggiuntivo.
20 --versione Stampa la versione dell'attuale Spark.
21 --driver-core NUM Core per il driver (predefinito: 1).
22 --sorvegliare Se fornito, riavvia il driver in caso di errore.
23 --uccidere Se dato, uccide il conducente specificato.
24 --stato Se fornito, richiede lo stato del driver specificato.
25 --total-executor-core Core totali per tutti gli esecutori.
26 --executor-core Numero di core per esecutore. (Predefinito: 1 in modalità YARN o tutti i core disponibili sul worker in modalità standalone).

Spark contiene due diversi tipi di variabili condivise: una è broadcast variables e il secondo è accumulators.

  • Broadcast variables - utilizzato per distribuire in modo efficiente grandi valori.

  • Accumulators - utilizzato per aggregare le informazioni di una particolare raccolta.

Variabili di trasmissione

Le variabili di trasmissione consentono al programmatore di mantenere una variabile di sola lettura memorizzata nella cache su ogni macchina piuttosto che spedirne una copia con le attività. Possono essere utilizzati, ad esempio, per fornire a ogni nodo una copia di un ampio set di dati di input, in modo efficiente. Spark tenta inoltre di distribuire le variabili di trasmissione utilizzando algoritmi di trasmissione efficienti per ridurre i costi di comunicazione.

Le azioni Spark vengono eseguite attraverso una serie di fasi, separate da operazioni di "shuffle" distribuite. Spark trasmette automaticamente i dati comuni necessari alle attività all'interno di ciascuna fase.

I dati trasmessi in questo modo vengono memorizzati nella cache in formato serializzato e vengono deserializzati prima di eseguire ciascuna attività. Ciò significa che la creazione esplicita di variabili di trasmissione è utile solo quando le attività in più fasi richiedono gli stessi dati o quando è importante memorizzare nella cache i dati in forma deserializzata.

Le variabili di trasmissione vengono create da una variabile v a chiamata SparkContext.broadcast(v). La variabile broadcast è un wrapperv, ed è possibile accedere al suo valore chiamando il valuemetodo. Il codice riportato di seguito mostra questo:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

Output -

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

Dopo che la variabile di trasmissione è stata creata, dovrebbe essere utilizzata al posto del valore v in tutte le funzioni eseguite sul cluster, quindi vnon viene spedito ai nodi più di una volta. Inoltre, l'oggettov non deve essere modificato dopo la sua trasmissione, in modo da garantire che tutti i nodi ottengano lo stesso valore della variabile di trasmissione.

Accumulatori

Gli accumulatori sono variabili che vengono "aggiunte" solo tramite un'operazione associativa e possono quindi essere efficacemente supportate in parallelo. Possono essere usati per implementare contatori (come in MapReduce) o somme. Spark supporta nativamente gli accumulatori di tipi numerici ei programmatori possono aggiungere il supporto per nuovi tipi. Se gli accumulatori vengono creati con un nome, verranno visualizzati in formatoSpark’s UI. Questo può essere utile per comprendere lo stato di avanzamento delle fasi in esecuzione (NOTA: non è ancora supportato in Python).

Un accumulatore viene creato da un valore iniziale v a chiamata SparkContext.accumulator(v). Le attività in esecuzione sul cluster possono quindi aggiungersi ad esso utilizzando iladdo l'operatore + = (in Scala e Python). Tuttavia, non possono leggere il suo valore. Solo il programma del driver può leggere il valore dell'accumulatore, utilizzando il suovalue metodo.

Il codice riportato di seguito mostra un accumulatore utilizzato per sommare gli elementi di un array -

scala> val accum = sc.accumulator(0) 
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

Se vuoi vedere l'output del codice sopra, usa il seguente comando:

scala> accum.value

Produzione

res2: Int = 10

Operazioni RDD numeriche

Spark ti consente di eseguire diverse operazioni sui dati numerici, utilizzando uno dei metodi API predefiniti. Le operazioni numeriche di Spark vengono implementate con un algoritmo di streaming che consente di costruire il modello, un elemento alla volta.

Queste operazioni vengono calcolate e restituite come file StatusCounter oggetto chiamando status() metodo.

S.No Metodi e significato
1

count()

Numero di elementi nell'RDD.

2

Mean()

Media degli elementi nella RDD.

3

Sum()

Valore totale degli elementi nella RDD.

4

Max()

Valore massimo tra tutti gli elementi nell'RDD.

5

Min()

Valore minimo tra tutti gli elementi nell'RDD.

6

Variance()

Varianza degli elementi.

7

Stdev()

Deviazione standard.

Se vuoi usare solo uno di questi metodi, puoi chiamare il metodo corrispondente direttamente su RDD.