Apache Spark - Penerapan
Aplikasi Spark, menggunakan spark-submit, adalah perintah shell yang digunakan untuk menerapkan aplikasi Spark pada sebuah cluster. Ia menggunakan semua manajer cluster masing-masing melalui antarmuka yang seragam. Oleh karena itu, Anda tidak perlu mengkonfigurasi aplikasi Anda untuk masing-masing aplikasi.
Contoh
Mari kita ambil contoh jumlah kata yang sama, yang kita gunakan sebelumnya, menggunakan perintah shell. Di sini, kami menganggap contoh yang sama sebagai aplikasi percikan.
Contoh Input
Teks berikut adalah data masukan dan nama file adalah in.txt.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Lihat program berikut -
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
Simpan program di atas ke dalam file bernama SparkWordCount.scala dan letakkan di direktori yang ditentukan pengguna bernama spark-application.
Note - Saat mengubah inputRDD menjadi countRDD, kami menggunakan flatMap () untuk membuat token garis (dari file teks) menjadi kata-kata, metode map () untuk menghitung frekuensi kata dan metode reduceByKey () untuk menghitung setiap pengulangan kata.
Gunakan langkah-langkah berikut untuk mengirimkan aplikasi ini. Jalankan semua langkah dispark-application direktori melalui terminal.
Langkah 1: Unduh Spark Ja
Spark core jar diperlukan untuk kompilasi, oleh karena itu, unduh spark-core_2.10-1.3.0.jar dari tautan berikut Spark core jar dan pindahkan file jar dari direktori unduhan kespark-application direktori.
Langkah 2: Kompilasi program
Kompilasi program di atas menggunakan perintah yang diberikan di bawah ini. Perintah ini harus dijalankan dari direktori spark-application. Sini,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar adalah jar dukungan Hadoop yang diambil dari pustaka Spark.
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
Langkah 3: Buat JAR
Buat file jar dari aplikasi spark menggunakan perintah berikut. Sini,wordcount adalah nama file untuk file jar.
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Langkah 4: Kirimkan aplikasi percikan
Kirimkan aplikasi spark menggunakan perintah berikut -
spark-submit --class SparkWordCount --master local wordcount.jar
Jika berhasil dijalankan, maka Anda akan menemukan output yang diberikan di bawah ini. ItuOKmembiarkan output berikut untuk identifikasi pengguna dan itu adalah baris terakhir dari program. Jika Anda membaca output berikut dengan cermat, Anda akan menemukan hal-hal yang berbeda, seperti -
- berhasil memulai layanan 'sparkDriver' pada port 42954
- MemoryStore dimulai dengan kapasitas 267,3 MB
- Memulai SparkUI di http://192.168.1.217:4040
- Menambahkan file JAR: /home/hadoop/piapplication/count.jar
- ResultStage 1 (saveAsTextFile di SparkPi.scala: 11) selesai dalam 0,566 detik
- Menghentikan UI web Spark di http://192.168.1.217:4040
- MemoryStore dihapus
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Langkah 5: Memeriksa keluaran
Setelah berhasil menjalankan program, Anda akan menemukan direktori bernama outfile di direktori spark-application.
Perintah berikut digunakan untuk membuka dan memeriksa daftar file di direktori outfile.
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
Perintah untuk memeriksa keluaran part-00000 file adalah -
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
Perintah untuk memeriksa keluaran pada file bagian-00001 adalah -
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
Pergi melalui bagian berikut untuk mengetahui lebih banyak tentang perintah 'percikan-kirim'.
Spark-submit Syntax
spark-submit [options] <app jar | python file> [app arguments]
Pilihan
S.No | Pilihan | Deskripsi |
---|---|---|
1 | --menguasai | spark: // host: port, mesos: // host: port, yarn, atau local. |
2 | --deploy-mode | Apakah akan meluncurkan program driver secara lokal ("klien") atau di salah satu mesin pekerja di dalam cluster ("cluster") (Default: client). |
3 | --kelas | Kelas utama aplikasi Anda (untuk aplikasi Java / Scala). |
4 | --nama | Nama aplikasi Anda. |
5 | - toples | Daftar local jars yang dipisahkan koma untuk disertakan pada classpath driver dan eksekutor. |
6 | --paket | Daftar koordinat maven dari jars yang dipisahkan koma untuk disertakan pada classpath driver dan eksekutor. |
7 | --repositories | Daftar repositori jarak jauh tambahan yang dipisahkan koma untuk mencari koordinat maven yang diberikan dengan --packages. |
8 | --py-files | Daftar file .zip, .egg, atau .py yang dipisahkan koma untuk ditempatkan di PYTHON PATH untuk aplikasi Python. |
9 | --files | Daftar file yang dipisahkan koma untuk ditempatkan di direktori kerja setiap pelaksana. |
10 | --conf (prop = val) | Properti konfigurasi Spark sewenang-wenang. |
11 | --properties-file | Jalur ke file tempat memuat properti tambahan. Jika tidak ditentukan, ini akan mencari conf / spark-defaults. |
12 | --driver-memory | Memori untuk driver (misalnya 1000M, 2G) (Default: 512M). |
13 | --driver-java-options | Opsi Java ekstra untuk diberikan kepada pengemudi. |
14 | --driver-library-path | Entri jalur perpustakaan tambahan untuk diteruskan ke pengemudi. |
15 | --driver-class-path | Entri jalur kelas tambahan untuk diteruskan ke pengemudi. Perhatikan bahwa jars yang ditambahkan dengan --jars secara otomatis disertakan dalam classpath. |
16 | --executor-memory | Memori per pelaksana (misalnya 1000M, 2G) (Default: 1G). |
17 | --proxy-user | Pengguna meniru identitas saat mengajukan aplikasi. |
18 | --help, -h | Tunjukkan pesan bantuan ini dan keluar. |
19 | --verbose, -v | Cetak keluaran debug tambahan. |
20 | --Versi: kapan | Cetak versi Spark saat ini. |
21 | --driver-core NUM | Core untuk driver (Default: 1). |
22 | --mengawasi | Jika diberikan, restart driver jika gagal. |
23 | --membunuh | Jika diberikan, membunuh pengemudi yang ditentukan. |
24 | --status | Jika diberikan, meminta status dari pengemudi yang ditentukan. |
25 | --total-executor-core | Total core untuk semua pelaksana. |
26 | --executor-core | Jumlah inti per pelaksana. (Default: 1 dalam mode YARN, atau semua inti yang tersedia pada pekerja dalam mode mandiri). |