Apache Spark - wdrożenie

Aplikacja Spark, korzystająca z funkcji przesyłania Spark, to polecenie powłoki używane do wdrażania aplikacji Spark w klastrze. Korzysta ze wszystkich odpowiednich menedżerów klastrów za pośrednictwem jednolitego interfejsu. Dlatego nie musisz konfigurować aplikacji dla każdego z nich.

Przykład

Weźmy ten sam przykład liczenia słów, którego używaliśmy wcześniej, używając poleceń powłoki. Tutaj rozważymy ten sam przykład, co w przypadku aplikacji iskrowej.

Przykładowe wejście

Poniższy tekst to dane wejściowe, a plik o nazwie to in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Spójrz na następujący program -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Zapisz powyższy program do pliku o nazwie SparkWordCount.scala i umieść go w katalogu zdefiniowanym przez użytkownika o nazwie spark-application.

Note - Podczas przekształcania inputRDD w countRDD używamy flatMap () do tokenizacji wierszy (z pliku tekstowego) na słowa, metody map () do liczenia częstotliwości słów i metody redukujByKey () do liczenia powtórzeń każdego słowa.

Wykonaj następujące kroki, aby przesłać tę aplikację. Wykonaj wszystkie kroki wspark-application katalog za pośrednictwem terminala.

Krok 1: Pobierz Spark Ja

Spark core jar jest wymagany do kompilacji, dlatego pobierz spark-core_2.10-1.3.0.jar z poniższego linku Spark core jar i przenieś plik jar z katalogu pobierania dospark-application informator.

Krok 2: Skompiluj program

Skompiluj powyższy program, używając polecenia podanego poniżej. To polecenie powinno zostać wykonane z katalogu spark-application. Tutaj,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar to jar obsługi Hadoop pobrany z biblioteki Spark.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Krok 3: Utwórz plik JAR

Utwórz plik jar aplikacji Spark za pomocą następującego polecenia. Tutaj,wordcount to nazwa pliku jar.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Krok 4: Prześlij wniosek o iskrę

Prześlij aplikację Spark za pomocą następującego polecenia -

spark-submit --class SparkWordCount --master local wordcount.jar

Jeśli zakończy się pomyślnie, znajdziesz dane wyjściowe podane poniżej. PlikOKwpuszczenie następującego wyjścia służy do identyfikacji użytkownika i jest to ostatnia linia programu. Jeśli uważnie przeczytasz poniższe wyniki, znajdziesz różne rzeczy, takie jak -

  • pomyślnie uruchomiono usługę „sparkDriver” na porcie 42954
  • Początkowo MemoryStore miał pojemność 267,3 MB
  • Uruchomiono SparkUI pod adresem http://192.168.1.217:4040
  • Dodano plik JAR: /home/hadoop/piapplication/count.jar
  • Wynik Etap 1 (saveAsTextFile w SparkPi.scala: 11) zakończył się w 0,566 s
  • Zatrzymano interfejs sieciowy Spark pod adresem http://192.168.1.217:4040
  • Wyczyszczono MemoryStore
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Krok 5: Sprawdzanie wyników

Po pomyślnym wykonaniu programu znajdziesz katalog o nazwie outfile w katalogu spark-application.

Poniższe polecenia służą do otwierania i sprawdzania listy plików w katalogu outfile.

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

Polecenia do sprawdzania danych wyjściowych part-00000 plik to -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Polecenia do sprawdzania wyników w pliku part-00001 to -

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Przejdź przez następną sekcję, aby dowiedzieć się więcej o poleceniu „spark-submit”.

Składnia przesyłania Spark

spark-submit [options] <app jar | python file> [app arguments]

Opcje

S.Nr Opcja Opis
1 --mistrz spark: // host: port, mesos: // host: port, yarn lub local.
2 --deploy-mode Czy uruchomić program sterownika lokalnie („klient”), czy na jednej z maszyn roboczych wewnątrz klastra („klaster”) (domyślnie: klient).
3 --klasa Główna klasa aplikacji (dla aplikacji Java / Scala).
4 --Nazwa Nazwa Twojej aplikacji.
5 - słoiki Rozdzielana przecinkami lista lokalnych plików JAR do dołączenia do ścieżek klas sterownika i modułu wykonawczego.
6 - paczki Rozdzielana przecinkami lista współrzędnych maven słoików do uwzględnienia w ścieżkach klas sterownika i modułu wykonawczego.
7 - repozytoria Rozdzielana przecinkami lista dodatkowych zdalnych repozytoriów do wyszukania współrzędnych maven podanych z --packages.
8 --py-pliki Lista rozdzielonych przecinkami plików .zip, .egg lub .py do umieszczenia w PYTHON PATH dla aplikacji Python.
9 --akta Rozdzielana przecinkami lista plików do umieszczenia w katalogu roboczym każdego modułu wykonawczego.
10 --conf (prop = val) Dowolna właściwość konfiguracji Spark.
11 - plik-właściwości Ścieżka do pliku, z którego można załadować dodatkowe właściwości. Jeśli nie zostanie określony, będzie szukać wartości conf / spark-defaults.
12 - pamięć kierowcy Pamięć dla sterownika (np. 1000M, 2G) (Domyślnie: 512M).
13 --driver-java-options Dodatkowe opcje Java do przekazania do sterownika.
14 --driver-Library-path Dodatkowe wpisy ścieżek bibliotek do przekazania do sterownika.
15 --driver-class-path

Dodatkowe wpisy ścieżki klasy do przekazania do sterownika.

Zauważ, że słoiki dodane za pomocą --jars są automatycznie dołączane do ścieżki klas.

16 --executor-memory Pamięć na executor (np. 1000 MB, 2 GB) (Domyślnie: 1 GB).
17 --proxy-user Użytkownik podszywa się pod użytkownika podczas składania wniosku.
18 --help, -h Pokaż ten komunikat pomocy i zakończ.
19 --verbose, -v Wydrukuj dodatkowe wyjście debugowania.
20 --wersja Wydrukuj wersję aktualnej wersji Spark.
21 - rdzenie sterowników NUM Rdzenie sterownika (domyślnie: 1).
22 --dozorować Jeśli podano, uruchamia ponownie sterownik w przypadku awarii.
23 --zabić Jeśli podano, zabija określony sterownik.
24 --status Jeśli podano, żąda statusu określonego sterownika.
25 - rdzenie total-executor-core Całkowita liczba rdzeni dla wszystkich wykonawców.
26 - rdzenie wykonawcze Liczba rdzeni na executor. (Domyślnie: 1 w trybie YARN lub wszystkie dostępne rdzenie pracownika w trybie autonomicznym).