Apache Spark - Dağıtım

Spark-submit kullanan Spark uygulaması, Spark uygulamasını bir kümeye dağıtmak için kullanılan bir kabuk komutudur. Tek tip bir arayüz aracılığıyla tüm ilgili küme yöneticilerini kullanır. Bu nedenle, uygulamanızı her biri için yapılandırmanız gerekmez.

Misal

Kabuk komutlarını kullanarak daha önce kullandığımız aynı kelime sayımı örneğini ele alalım. Burada aynı örneği kıvılcım uygulaması olarak ele alıyoruz.

Örnek Giriş

Aşağıdaki metin giriş verileridir ve adlı dosya in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Aşağıdaki programa bakın -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Yukarıdaki programı adlı bir dosyaya kaydedin. SparkWordCount.scala ve bunu adında kullanıcı tanımlı bir dizine yerleştirin spark-application.

Note - inputRDD'yi countRDD'ye dönüştürürken, satırları (metin dosyasından) kelimelere belirtmek için flatMap (), kelime sıklığını saymak için map () yöntemi ve her kelime tekrarını saymak için lessByKey () yöntemini kullanıyoruz.

Bu başvuruyu göndermek için aşağıdaki adımları kullanın. Tüm adımları yürütünspark-application terminal üzerinden rehber.

1. Adım: Spark Ja'yı indirin

Derleme için Spark core jar gereklidir, bu nedenle aşağıdaki Spark core jar bağlantısından spark-core_2.10-1.3.0.jar dosyasını indirin ve jar dosyasını indirme dizinindenspark-application dizin.

Adım 2: Programı derleyin

Aşağıda verilen komutu kullanarak yukarıdaki programı derleyin. Bu komut, spark-application dizininden çalıştırılmalıdır. Buraya,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar Spark kitaplığından alınan bir Hadoop destek kavanozudur.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

3. Adım: Bir JAR oluşturun

Aşağıdaki komutu kullanarak spark uygulamasının bir jar dosyasını oluşturun. Buraya,wordcount jar dosyasının dosya adıdır.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

4. Adım: Kıvılcım uygulamasını gönderin

Kıvılcım uygulamasını aşağıdaki komutu kullanarak gönderin -

spark-submit --class SparkWordCount --master local wordcount.jar

Başarıyla yürütülürse, aşağıda verilen çıktıyı bulacaksınız. OKAşağıdaki çıktıya izin verilmesi kullanıcı kimliği içindir ve bu programın son satırıdır. Aşağıdaki çıktıyı dikkatlice okursanız, farklı şeyler bulacaksınız:

  • 42954 numaralı bağlantı noktasında "sparkDriver" hizmeti başarıyla başlatıldı
  • MemoryStore 267,3 MB kapasite ile başladı
  • SparkUI'yi http://192.168.1.217:4040 adresinde başlattı
  • JAR dosyası eklendi: /home/hadoop/piapplication/count.jar
  • Sonuç Aşaması 1 (SparkPi.scala'da saveAsTextFile: 11) 0,566 saniyede tamamlandı
  • Spark web kullanıcı arayüzü http://192.168.1.217:4040 adresinde durduruldu
  • MemoryStore temizlendi
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Adım 5: Çıktı kontrol ediliyor

Programın başarılı bir şekilde yürütülmesinden sonra, adlı dizini bulacaksınız. outfile spark-application dizininde.

Aşağıdaki komutlar, outfile dizinindeki dosyaların listesini açmak ve kontrol etmek için kullanılır.

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

Çıkış kontrolü için komutlar part-00000 dosya -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Part-00001 dosyasındaki çıktıyı kontrol etme komutları şunlardır:

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

'Spark-submit' komutu hakkında daha fazla bilgi edinmek için aşağıdaki bölümden geçin.

Spark-submit Sözdizimi

spark-submit [options] <app jar | python file> [app arguments]

Seçenekler

S.No Seçenek Açıklama
1 --usta spark: // host: port, mesos: // host: port, iplik veya yerel.
2 --deploy modu Sürücü programının yerel olarak mı ("istemci") yoksa kümedeki çalışan makinelerden birinde mi ("küme") (Varsayılan: istemci) başlatılacağı.
3 --sınıf Uygulamanızın ana sınıfı (Java / Scala uygulamaları için).
4 --name Başvurunuzun adı.
5 - kavanoz Sürücü ve uygulayıcı sınıf yollarına eklenecek yerel kavanozların virgülle ayrılmış listesi.
6 - paketler Sürücü ve uygulayıcı sınıf yollarına dahil edilecek kavanozların maven koordinatlarının virgülle ayrılmış listesi.
7 --repositories --Packages ile verilen maven koordinatlarını aramak için ek uzak depoların virgülle ayrılmış listesi.
8 --py-dosyaları Python uygulamaları için PYTHON PATH'ına yerleştirilecek .zip, .egg veya .py dosyalarının virgülle ayrılmış listesi.
9 --Dosyalar Her yürütücünün çalışma dizinine yerleştirilecek dosyaların virgülle ayrılmış listesi.
10 --conf (prop = val) Keyfi Spark yapılandırma özelliği.
11 --özellikler-dosya Ekstra özelliklerin yükleneceği dosyanın yolu. Belirtilmezse, bu conf / spark-default'ları arayacaktır.
12 - sürücü belleği Sürücü için bellek (örneğin 1000M, 2G) (Varsayılan: 512M).
13 --driver-java-seçenekleri Sürücüye geçmek için ekstra Java seçenekleri.
14 --driver-library-path Sürücüye iletilecek ekstra kitaplık yolu girişleri.
15 --driver-sınıf-yolu

Sürücüye iletilecek ekstra sınıf yolu girişleri.

--Jars ile eklenen kavanozların otomatik olarak sınıf yoluna dahil edildiğini unutmayın.

16 - yürütücü-hafıza Yürütücü başına bellek (örn. 1000M, 2G) (Varsayılan: 1G).
17 --proxy-user Başvuruyu gönderirken kullanıcının kimliğine bürünmesi.
18 --yardım, -h Bu yardım mesajını göster ve çık.
19 --verbose, -v Ek hata ayıklama çıktısı yazdırın.
20 --version Mevcut Spark sürümünü yazdırın.
21 --driver çekirdekler NUM Sürücü için çekirdekler (Varsayılan: 1).
22 - denetim Verilirse, hata durumunda sürücüyü yeniden başlatır.
23 --öldürmek Verilirse, belirtilen sürücüyü öldürür.
24 --durum Verilirse, belirtilen sürücünün durumunu ister.
25 --total-executor-cores Tüm yöneticiler için toplam çekirdek.
26 - yürütücü-çekirdekler Yürütücü başına çekirdek sayısı. (Varsayılan: YARN modunda 1 veya bağımsız modda çalışan üzerinde mevcut tüm çekirdekler).