Apache Spark - Penerapan

Aplikasi Spark, menggunakan spark-submit, adalah perintah shell yang digunakan untuk menerapkan aplikasi Spark pada sebuah cluster. Ia menggunakan semua manajer cluster masing-masing melalui antarmuka yang seragam. Oleh karena itu, Anda tidak perlu mengkonfigurasi aplikasi Anda untuk masing-masing aplikasi.

Contoh

Mari kita ambil contoh jumlah kata yang sama, yang kita gunakan sebelumnya, menggunakan perintah shell. Di sini, kami menganggap contoh yang sama sebagai aplikasi percikan.

Contoh Input

Teks berikut adalah data masukan dan nama file adalah in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Lihat program berikut -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Simpan program di atas ke dalam file bernama SparkWordCount.scala dan letakkan di direktori yang ditentukan pengguna bernama spark-application.

Note - Saat mengubah inputRDD menjadi countRDD, kami menggunakan flatMap () untuk membuat token garis (dari file teks) menjadi kata-kata, metode map () untuk menghitung frekuensi kata dan metode reduceByKey () untuk menghitung setiap pengulangan kata.

Gunakan langkah-langkah berikut untuk mengirimkan aplikasi ini. Jalankan semua langkah dispark-application direktori melalui terminal.

Langkah 1: Unduh Spark Ja

Spark core jar diperlukan untuk kompilasi, oleh karena itu, unduh spark-core_2.10-1.3.0.jar dari tautan berikut Spark core jar dan pindahkan file jar dari direktori unduhan kespark-application direktori.

Langkah 2: Kompilasi program

Kompilasi program di atas menggunakan perintah yang diberikan di bawah ini. Perintah ini harus dijalankan dari direktori spark-application. Sini,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar adalah jar dukungan Hadoop yang diambil dari pustaka Spark.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Langkah 3: Buat JAR

Buat file jar dari aplikasi spark menggunakan perintah berikut. Sini,wordcount adalah nama file untuk file jar.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Langkah 4: Kirimkan aplikasi percikan

Kirimkan aplikasi spark menggunakan perintah berikut -

spark-submit --class SparkWordCount --master local wordcount.jar

Jika berhasil dijalankan, maka Anda akan menemukan output yang diberikan di bawah ini. ItuOKmembiarkan output berikut untuk identifikasi pengguna dan itu adalah baris terakhir dari program. Jika Anda membaca output berikut dengan cermat, Anda akan menemukan hal-hal yang berbeda, seperti -

  • berhasil memulai layanan 'sparkDriver' pada port 42954
  • MemoryStore dimulai dengan kapasitas 267,3 MB
  • Memulai SparkUI di http://192.168.1.217:4040
  • Menambahkan file JAR: /home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile di SparkPi.scala: 11) selesai dalam 0,566 detik
  • Menghentikan UI web Spark di http://192.168.1.217:4040
  • MemoryStore dihapus
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Langkah 5: Memeriksa keluaran

Setelah berhasil menjalankan program, Anda akan menemukan direktori bernama outfile di direktori spark-application.

Perintah berikut digunakan untuk membuka dan memeriksa daftar file di direktori outfile.

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

Perintah untuk memeriksa keluaran part-00000 file adalah -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Perintah untuk memeriksa keluaran pada file bagian-00001 adalah -

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Pergi melalui bagian berikut untuk mengetahui lebih banyak tentang perintah 'percikan-kirim'.

Spark-submit Syntax

spark-submit [options] <app jar | python file> [app arguments]

Pilihan

S.No Pilihan Deskripsi
1 --menguasai spark: // host: port, mesos: // host: port, yarn, atau local.
2 --deploy-mode Apakah akan meluncurkan program driver secara lokal ("klien") atau di salah satu mesin pekerja di dalam cluster ("cluster") (Default: client).
3 --kelas Kelas utama aplikasi Anda (untuk aplikasi Java / Scala).
4 --nama Nama aplikasi Anda.
5 - toples Daftar local jars yang dipisahkan koma untuk disertakan pada classpath driver dan eksekutor.
6 --paket Daftar koordinat maven dari jars yang dipisahkan koma untuk disertakan pada classpath driver dan eksekutor.
7 --repositories Daftar repositori jarak jauh tambahan yang dipisahkan koma untuk mencari koordinat maven yang diberikan dengan --packages.
8 --py-files Daftar file .zip, .egg, atau .py yang dipisahkan koma untuk ditempatkan di PYTHON PATH untuk aplikasi Python.
9 --files Daftar file yang dipisahkan koma untuk ditempatkan di direktori kerja setiap pelaksana.
10 --conf (prop = val) Properti konfigurasi Spark sewenang-wenang.
11 --properties-file Jalur ke file tempat memuat properti tambahan. Jika tidak ditentukan, ini akan mencari conf / spark-defaults.
12 --driver-memory Memori untuk driver (misalnya 1000M, 2G) (Default: 512M).
13 --driver-java-options Opsi Java ekstra untuk diberikan kepada pengemudi.
14 --driver-library-path Entri jalur perpustakaan tambahan untuk diteruskan ke pengemudi.
15 --driver-class-path

Entri jalur kelas tambahan untuk diteruskan ke pengemudi.

Perhatikan bahwa jars yang ditambahkan dengan --jars secara otomatis disertakan dalam classpath.

16 --executor-memory Memori per pelaksana (misalnya 1000M, 2G) (Default: 1G).
17 --proxy-user Pengguna meniru identitas saat mengajukan aplikasi.
18 --help, -h Tunjukkan pesan bantuan ini dan keluar.
19 --verbose, -v Cetak keluaran debug tambahan.
20 --Versi: kapan Cetak versi Spark saat ini.
21 --driver-core NUM Core untuk driver (Default: 1).
22 --mengawasi Jika diberikan, restart driver jika gagal.
23 --membunuh Jika diberikan, membunuh pengemudi yang ditentukan.
24 --status Jika diberikan, meminta status dari pengemudi yang ditentukan.
25 --total-executor-core Total core untuk semua pelaksana.
26 --executor-core Jumlah inti per pelaksana. (Default: 1 dalam mode YARN, atau semua inti yang tersedia pada pekerja dalam mode mandiri).