Apache Spark - Panduan Cepat
Industri menggunakan Hadoop secara ekstensif untuk menganalisis kumpulan data mereka. Alasannya adalah bahwa kerangka kerja Hadoop didasarkan pada model pemrograman sederhana (MapReduce) dan memungkinkan solusi komputasi yang dapat diskalakan, fleksibel, toleran terhadap kesalahan, dan hemat biaya. Di sini yang menjadi perhatian utama adalah menjaga kecepatan dalam memproses dataset besar dalam hal waktu tunggu antara query dan waktu tunggu untuk menjalankan program.
Spark diperkenalkan oleh Apache Software Foundation untuk mempercepat proses perangkat lunak komputasi komputasi Hadoop.
Berbeda dengan kepercayaan umum, Spark is not a modified version of Hadoopdan sebenarnya tidak bergantung pada Hadoop karena memiliki pengelolaan kluster sendiri. Hadoop hanyalah salah satu cara untuk menerapkan Spark.
Spark menggunakan Hadoop dalam dua cara - salah satunya storage dan kedua processing. Karena Spark memiliki komputasi manajemen klasternya sendiri, Spark menggunakan Hadoop hanya untuk tujuan penyimpanan.
Apache Spark
Apache Spark adalah teknologi komputasi klaster secepat kilat, dirancang untuk komputasi cepat. Ini didasarkan pada Hadoop MapReduce dan memperluas model MapReduce untuk menggunakannya secara efisien untuk lebih banyak jenis komputasi, yang mencakup kueri interaktif dan pemrosesan aliran. Fitur utama Spark adalah miliknyain-memory cluster computing yang meningkatkan kecepatan pemrosesan aplikasi.
Spark dirancang untuk mencakup berbagai beban kerja seperti aplikasi batch, algoritme berulang, kueri interaktif, dan streaming. Selain mendukung semua beban kerja ini dalam sistem masing-masing, ini mengurangi beban manajemen dalam memelihara alat terpisah.
Evolusi Apache Spark
Spark adalah salah satu sub proyek Hadoop yang dikembangkan pada tahun 2009 di AMPLab UC Berkeley oleh Matei Zaharia. Itu Open Sourced pada tahun 2010 di bawah lisensi BSD. Itu disumbangkan ke yayasan perangkat lunak Apache pada tahun 2013, dan sekarang Apache Spark telah menjadi proyek Apache tingkat atas mulai Feb-2014.
Fitur Apache Spark
Apache Spark memiliki fitur berikut.
Speed- Spark membantu menjalankan aplikasi di cluster Hadoop, hingga 100 kali lebih cepat di memori, dan 10 kali lebih cepat saat dijalankan di disk. Hal ini dimungkinkan dengan mengurangi jumlah operasi baca / tulis ke disk. Ini menyimpan data pemrosesan menengah dalam memori.
Supports multiple languages- Spark menyediakan API bawaan di Java, Scala, atau Python. Karenanya, Anda dapat menulis aplikasi dalam berbagai bahasa. Spark hadir dengan 80 operator tingkat tinggi untuk kueri interaktif.
Advanced Analytics- Spark tidak hanya mendukung 'Map' dan 'reduce'. Ini juga mendukung kueri SQL, Data streaming, Pembelajaran mesin (ML), dan algoritma Grafik.
Spark Dibangun di Hadoop
Diagram berikut menunjukkan tiga cara bagaimana Spark dapat dibuat dengan komponen Hadoop.
Ada tiga cara penerapan Spark seperti yang dijelaskan di bawah ini.
Standalone- Penerapan Spark Standalone berarti Spark menempati tempat di atas HDFS (Hadoop Distributed File System) dan ruang dialokasikan untuk HDFS, secara eksplisit. Di sini, Spark dan MapReduce akan berjalan berdampingan untuk mencakup semua pekerjaan percikan di cluster.
Hadoop Yarn- Penerapan Hadoop Yarn berarti, sederhananya, percikan berjalan di Yarn tanpa perlu pra-instalasi atau akses root. Ini membantu mengintegrasikan Spark ke ekosistem Hadoop atau tumpukan Hadoop. Ini memungkinkan komponen lain untuk berjalan di atas tumpukan.
Spark in MapReduce (SIMR)- Spark di MapReduce digunakan untuk meluncurkan pekerjaan percikan selain penerapan mandiri. Dengan SIMR, pengguna dapat memulai Spark dan menggunakan cangkangnya tanpa akses administratif apa pun.
Komponen Spark
Ilustrasi berikut menggambarkan berbagai komponen Spark.
Apache Spark Core
Spark Core adalah mesin eksekusi umum yang mendasari untuk platform spark yang dibangun di atas semua fungsionalitas lainnya. Ini menyediakan komputasi In-Memory dan mereferensikan kumpulan data dalam sistem penyimpanan eksternal.
Spark SQL
Spark SQL adalah komponen di atas Spark Core yang memperkenalkan abstraksi data baru yang disebut SchemaRDD, yang memberikan dukungan untuk data terstruktur dan semi-terstruktur.
Spark Streaming
Spark Streaming memanfaatkan kemampuan penjadwalan cepat Spark Core untuk melakukan analisis streaming. Ini menyerap data dalam batch mini dan melakukan transformasi RDD (Set Data Terdistribusi Tangguh) pada batch mini data tersebut.
MLlib (Perpustakaan Pembelajaran Mesin)
MLlib adalah framework pembelajaran mesin terdistribusi di atas Spark karena arsitektur Spark berbasis memori terdistribusi. Ini, menurut tolok ukur, dilakukan oleh pengembang MLlib terhadap implementasi Alternating Least Squares (ALS). Spark MLlib sembilan kali lebih cepat dari versi berbasis disk HadoopApache Mahout (sebelum Mahout mendapatkan antarmuka Spark).
GraphX
GraphX adalah kerangka kerja pemrosesan grafik terdistribusi di atas Spark. Ini menyediakan API untuk mengekspresikan komputasi grafik yang dapat memodelkan grafik yang ditentukan pengguna dengan menggunakan API abstraksi Pregel. Ini juga menyediakan runtime yang dioptimalkan untuk abstraksi ini.
Set Data Terdistribusi Tangguh
Set Data Terdistribusi Tangguh (RDD) adalah struktur data fundamental dari Spark. Ini adalah kumpulan objek terdistribusi yang tidak dapat diubah. Setiap set data di RDD dibagi menjadi beberapa partisi logis, yang dapat dihitung pada node cluster yang berbeda. RDD dapat berisi semua jenis objek Python, Java, atau Scala, termasuk kelas yang ditentukan pengguna.
Secara formal, RDD adalah kumpulan catatan yang dipartisi hanya baca. RDD dapat dibuat melalui operasi deterministik pada data di penyimpanan stabil atau RDD lainnya. RDD adalah kumpulan elemen yang toleran terhadap kesalahan yang dapat dioperasikan secara paralel.
Ada dua cara untuk membuat RDD - parallelizing koleksi yang ada di program driver Anda, atau referencing a dataset dalam sistem penyimpanan eksternal, seperti sistem file bersama, HDFS, HBase, atau sumber data apa pun yang menawarkan Format Input Hadoop.
Spark menggunakan konsep RDD untuk mencapai operasi MapReduce yang lebih cepat dan efisien. Mari kita bahas dulu bagaimana operasi MapReduce terjadi dan mengapa tidak begitu efisien.
Berbagi Data Lambat di MapReduce
MapReduce diadopsi secara luas untuk memproses dan menghasilkan kumpulan data besar dengan algoritme terdistribusi paralel pada kluster. Ini memungkinkan pengguna untuk menulis komputasi paralel, menggunakan sekumpulan operator tingkat tinggi, tanpa harus khawatir tentang distribusi kerja dan toleransi kesalahan.
Sayangnya, di sebagian besar kerangka kerja saat ini, satu-satunya cara untuk menggunakan kembali data antar komputasi (Ex - antara dua tugas MapReduce) adalah menulisnya ke sistem penyimpanan stabil eksternal (Ex - HDFS). Meskipun kerangka kerja ini menyediakan banyak abstraksi untuk mengakses sumber daya komputasi cluster, pengguna masih menginginkan lebih.
Kedua Iterative dan Interactiveaplikasi membutuhkan berbagi data lebih cepat di seluruh pekerjaan paralel. Berbagi data lambat di MapReduce karenareplication, serialization, dan disk IO. Mengenai sistem penyimpanan, sebagian besar aplikasi Hadoop menghabiskan lebih dari 90% waktunya untuk melakukan operasi baca-tulis HDFS.
Operasi Iteratif di MapReduce
Gunakan kembali hasil antara di beberapa komputasi dalam aplikasi multi-tahap. Ilustrasi berikut menjelaskan cara kerja framework saat ini, saat melakukan operasi berulang di MapReduce. Hal ini menimbulkan overhead yang besar karena replikasi data, I / O disk, dan serialisasi, yang membuat sistem menjadi lambat.
Operasi Interaktif di MapReduce
Pengguna menjalankan kueri ad-hoc pada subkumpulan data yang sama. Setiap kueri akan melakukan I / O disk pada penyimpanan stabil, yang dapat mendominasi waktu eksekusi aplikasi.
Ilustrasi berikut menjelaskan bagaimana kerangka kerja saat ini bekerja saat melakukan kueri interaktif di MapReduce.
Berbagi Data menggunakan Spark RDD
Berbagi data lambat di MapReduce karena replication, serialization, dan disk IO. Sebagian besar aplikasi Hadoop menghabiskan lebih dari 90% waktu untuk melakukan operasi baca-tulis HDFS.
Menyadari masalah ini, peneliti mengembangkan kerangka kerja khusus yang disebut Apache Spark. Ide kunci dari percikan adalahResilient Ddidistribusikan Datasets (RDD); itu mendukung komputasi pemrosesan dalam memori. Artinya, ia menyimpan status memori sebagai objek di seluruh tugas dan objek tersebut dapat dibagikan di antara tugas tersebut. Berbagi data dalam memori 10 hingga 100 kali lebih cepat daripada jaringan dan Disk.
Sekarang mari kita coba mencari tahu bagaimana operasi iteratif dan interaktif berlangsung di Spark RDD.
Operasi Iteratif pada Spark RDD
Ilustrasi yang diberikan di bawah ini menunjukkan operasi iteratif pada Spark RDD. Ini akan menyimpan hasil perantara dalam memori terdistribusi alih-alih penyimpanan Stabil (Disk) dan membuat sistem lebih cepat.
Note - Jika memori Terdistribusi (RAM) cukup untuk menyimpan hasil antara (Status JOB), hasil tersebut akan disimpan di disk.
Operasi Interaktif di Spark RDD
Ilustrasi ini menunjukkan operasi interaktif pada Spark RDD. Jika kueri berbeda dijalankan pada kumpulan data yang sama berulang kali, data khusus ini dapat disimpan dalam memori untuk waktu eksekusi yang lebih baik.
Secara default, setiap RDD yang diubah dapat dihitung ulang setiap kali Anda menjalankan tindakan padanya. Namun, Anda juga bisapersistsebuah RDD dalam memori, dalam hal ini Spark akan menyimpan elemen-elemen di sekitar klaster untuk akses yang jauh lebih cepat, saat Anda menanyakannya lagi. Ada juga dukungan untuk mempertahankan RDD pada disk, atau direplikasi di beberapa node.
Spark adalah sub-proyek Hadoop. Oleh karena itu, lebih baik menginstal Spark ke dalam sistem berbasis Linux. Langkah-langkah berikut menunjukkan cara menginstal Apache Spark.
Langkah 1: Memverifikasi Instalasi Java
Instalasi Java merupakan salah satu hal wajib dalam menginstal Spark. Coba perintah berikut untuk memverifikasi versi JAVA.
$java -version
Jika Java sudah terinstal di sistem Anda, Anda akan melihat respons berikut -
java version "1.7.0_71"
Java(TM) SE Runtime Environment (build 1.7.0_71-b13)
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)
Jika Anda belum menginstal Java di sistem Anda, Instal Java sebelum melanjutkan ke langkah berikutnya.
Langkah 2: Memverifikasi instalasi Scala
Anda harus bahasa Scala untuk mengimplementasikan Spark. Jadi mari kita verifikasi instalasi Scala menggunakan perintah berikut.
$scala -version
Jika Scala sudah diinstal di sistem Anda, Anda akan melihat respons berikut -
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
Jika Anda belum menginstal Scala di sistem Anda, lanjutkan ke langkah berikutnya untuk penginstalan Scala.
Langkah 3: Mendownload Scala
Unduh Scala versi terbaru dengan mengunjungi tautan berikut Unduh Scala . Untuk tutorial ini, kami menggunakan versi scala-2.11.6. Setelah mengunduh, Anda akan menemukan file tar Scala di folder unduhan.
Langkah 4: Menginstal Scala
Ikuti langkah-langkah yang diberikan di bawah ini untuk menginstal Scala.
Ekstrak file Scala tar
Ketik perintah berikut untuk mengekstrak file tar Scala.
$ tar xvf scala-2.11.6.tgz
Pindahkan file perangkat lunak Scala
Gunakan perintah berikut untuk memindahkan file perangkat lunak Scala, ke direktori masing-masing (/usr/local/scala).
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv scala-2.11.6 /usr/local/scala
# exit
Atur PATH untuk Scala
Gunakan perintah berikut untuk mengatur PATH untuk Scala.
$ export PATH = $PATH:/usr/local/scala/bin
Memverifikasi Instalasi Scala
Setelah instalasi, lebih baik untuk memverifikasinya. Gunakan perintah berikut untuk memverifikasi instalasi Scala.
$scala -version
Jika Scala sudah diinstal di sistem Anda, Anda akan melihat respons berikut -
Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL
Langkah 5: Mengunduh Apache Spark
Unduh versi terbaru Spark dengan mengunjungi tautan berikut Unduh Spark . Untuk tutorial ini, kami menggunakanspark-1.3.1-bin-hadoop2.6Versi: kapan. Setelah mengunduhnya, Anda akan menemukan file tar Spark di folder unduhan.
Langkah 6: Menginstal Spark
Ikuti langkah-langkah yang diberikan di bawah ini untuk menginstal Spark.
Mengekstrak Spark tar
Perintah berikut untuk mengekstrak file spark tar.
$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz
Memindahkan file perangkat lunak Spark
Perintah berikut untuk memindahkan file perangkat lunak Spark ke direktori masing-masing (/usr/local/spark).
$ su –
Password:
# cd /home/Hadoop/Downloads/
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark
# exit
Menyiapkan lingkungan untuk Spark
Tambahkan baris berikut ke ~/.bashrcmengajukan. Ini berarti menambahkan lokasi, di mana file perangkat lunak percikan berada ke variabel PATH.
export PATH=$PATH:/usr/local/spark/bin
Gunakan perintah berikut untuk mencari file ~ / .bashrc.
$ source ~/.bashrc
Langkah 7: Memverifikasi Instalasi Spark
Tulis perintah berikut untuk membuka shell Spark.
$spark-shell
Jika percikan berhasil dipasang maka Anda akan menemukan output berikut.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Spark Core adalah dasar dari keseluruhan proyek. Ini menyediakan pengiriman tugas terdistribusi, penjadwalan, dan fungsionalitas I / O dasar. Spark menggunakan struktur data fundamental khusus yang dikenal sebagai RDD (Set Data Terdistribusi Tangguh) yang merupakan kumpulan logis dari data yang dipartisi di seluruh mesin. RDD dapat dibuat dengan dua cara; pertama dengan mereferensikan dataset dalam sistem penyimpanan eksternal dan kedua dengan menerapkan transformasi (misalnya peta, filter, reducer, join) pada RDD yang ada.
Abstraksi RDD diekspos melalui API yang terintegrasi dengan bahasa. Ini menyederhanakan kompleksitas pemrograman karena cara aplikasi memanipulasi RDD mirip dengan memanipulasi kumpulan data lokal.
Spark Shell
Spark menyediakan shell interaktif - alat yang ampuh untuk menganalisis data secara interaktif. Ini tersedia dalam bahasa Scala atau Python. Abstraksi utama Spark adalah kumpulan item terdistribusi yang disebut Set Data Terdistribusi Tangguh (RDD). RDD dapat dibuat dari Format Input Hadoop (seperti file HDFS) atau dengan mengubah RDD lainnya.
Buka Spark Shell
Perintah berikut digunakan untuk membuka shell Spark.
$ spark-shell
Buat RDD sederhana
Mari kita buat RDD sederhana dari file teks. Gunakan perintah berikut untuk membuat RDD sederhana.
scala> val inputfile = sc.textFile(“input.txt”)
Output untuk perintah di atas adalah
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
Spark RDD API memperkenalkan beberapa Transformations dan sedikit Actions untuk memanipulasi RDD.
Transformasi RDD
Transformasi RDD mengembalikan penunjuk ke RDD baru dan memungkinkan Anda membuat dependensi antara RDD. Setiap RDD dalam rantai ketergantungan (String of Dependencies) memiliki fungsi untuk menghitung datanya dan memiliki pointer (ketergantungan) ke RDD induknya.
Spark malas, jadi tidak ada yang akan dieksekusi kecuali Anda memanggil beberapa transformasi atau tindakan yang akan memicu pembuatan dan eksekusi pekerjaan. Lihat cuplikan contoh jumlah kata berikut.
Oleh karena itu, transformasi RDD bukanlah sekumpulan data tetapi merupakan langkah dalam program (mungkin satu-satunya langkah) yang memberi tahu Spark cara mendapatkan data dan apa yang harus dilakukan dengannya.
S.No | Transformasi & Makna |
---|---|
1 | map(func) Mengembalikan set data terdistribusi baru, yang dibentuk dengan meneruskan setiap elemen sumber melalui suatu fungsi func. |
2 | filter(func) Mengembalikan set data baru yang dibentuk dengan memilih elemen-elemen sumbernya func mengembalikan true. |
3 | flatMap(func) Mirip dengan map, tetapi setiap item input dapat dipetakan ke 0 atau lebih item output (jadi func harus mengembalikan Seq daripada satu item). |
4 | mapPartitions(func) Mirip dengan map, tetapi berjalan secara terpisah di setiap partisi (blok) RDD, jadi func harus berjenis Iterator <T> ⇒ Iterator <U> saat menjalankan RDD tipe T. |
5 | mapPartitionsWithIndex(func) Mirip dengan Partisi peta, tetapi juga menyediakan func dengan nilai integer yang mewakili indeks partisi, jadi func harus berjenis (Int, Iterator <T>) ⇒ Iterator <U> saat dijalankan pada RDD tipe T. |
6 | sample(withReplacement, fraction, seed) Contoh a fraction data, dengan atau tanpa penggantian, menggunakan seed generator nomor acak. |
7 | union(otherDataset) Mengembalikan set data baru yang berisi gabungan elemen dalam set data sumber dan argumen. |
8 | intersection(otherDataset) Mengembalikan RDD baru yang berisi perpotongan elemen dalam kumpulan data sumber dan argumen. |
9 | distinct([numTasks]) Mengembalikan set data baru yang berisi elemen berbeda dari set data sumber. |
10 | groupByKey([numTasks]) Saat dipanggil pada kumpulan data pasangan (K, V), mengembalikan kumpulan data pasangan (K, Iterable <V>). Note - Jika Anda mengelompokkan untuk melakukan agregasi (seperti penjumlahan atau rata-rata) pada setiap kunci, menggunakan reduceByKey atau aggregateByKey akan menghasilkan performa yang jauh lebih baik. |
11 | reduceByKey(func, [numTasks]) Ketika dipanggil pada dataset dari (K, V) pasangan, mengembalikan dataset dari (K, V) pasangan di mana nilai-nilai untuk setiap tombol dikumpulkan menggunakan fungsi yang diberikan mengurangi func , yang harus dari jenis (V, V) ⇒ V Seperti di groupByKey, jumlah tugas pengurangan dapat dikonfigurasi melalui argumen opsional kedua. |
12 | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) Saat dipanggil pada kumpulan data pasangan (K, V), mengembalikan kumpulan data pasangan (K, U) di mana nilai untuk setiap kunci digabungkan menggunakan fungsi gabungan yang diberikan dan nilai netral "nol". Mengizinkan jenis nilai gabungan yang berbeda dari jenis nilai masukan, sambil menghindari alokasi yang tidak perlu. Seperti di groupByKey, jumlah tugas pengurangan dapat dikonfigurasi melalui argumen kedua opsional. |
13 | sortByKey([ascending], [numTasks]) Saat dipanggil pada kumpulan data pasangan (K, V) di mana K mengimplementasikan Berurutan, mengembalikan kumpulan data pasangan (K, V) yang diurutkan berdasarkan kunci dalam urutan menaik atau menurun, seperti yang ditentukan dalam argumen naik Boolean. |
14 | join(otherDataset, [numTasks]) Ketika dipanggil pada set data tipe (K, V) dan (K, W), mengembalikan set data pasangan (K, (V, W)) dengan semua pasangan elemen untuk setiap kunci. Gabungan luar didukung melalui leftOuterJoin, rightOuterJoin, dan fullOuterJoin. |
15 | cogroup(otherDataset, [numTasks]) Saat dipanggil pada set data tipe (K, V) dan (K, W), mengembalikan set data tupel (K, (Iterable <V>, Iterable <W>)). Operasi ini juga disebut grup Dengan. |
16 | cartesian(otherDataset) Saat dipanggil pada set data tipe T dan U, mengembalikan set data pasangan (T, U) (semua pasangan elemen). |
17 | pipe(command, [envVars]) Pipa setiap partisi RDD melalui perintah shell, misalnya skrip Perl atau bash. Elemen RDD ditulis ke stdin proses dan baris keluaran ke stdout-nya dikembalikan sebagai RDD string. |
18 | coalesce(numPartitions) Kurangi jumlah partisi di RDD menjadi numPartitions. Berguna untuk menjalankan operasi dengan lebih efisien setelah memfilter kumpulan data yang besar. |
19 | repartition(numPartitions) Susun ulang data di RDD secara acak untuk membuat lebih banyak atau lebih sedikit partisi dan menyeimbangkannya. Ini selalu mengacak semua data melalui jaringan. |
20 | repartitionAndSortWithinPartitions(partitioner) Partisi ulang RDD sesuai dengan pemartisi yang diberikan dan, dalam setiap partisi yang dihasilkan, urutkan catatan berdasarkan kuncinya. Ini lebih efisien daripada memanggil partisi ulang dan kemudian menyortir di dalam setiap partisi karena dapat mendorong penyortiran ke dalam mesin shuffle. |
Tindakan
S.No | Aksi & Makna |
---|---|
1 | reduce(func) Gabungkan elemen set data menggunakan sebuah fungsi func(yang mengambil dua argumen dan mengembalikan satu). Fungsi tersebut harus bersifat komutatif dan asosiatif sehingga dapat dihitung dengan benar secara paralel. |
2 | collect() Mengembalikan semua elemen dari kumpulan data sebagai larik di program driver. Ini biasanya berguna setelah filter atau operasi lain yang mengembalikan subset data yang cukup kecil. |
3 | count() Mengembalikan jumlah elemen dalam kumpulan data. |
4 | first() Mengembalikan elemen pertama dari kumpulan data (mirip dengan take (1)). |
5 | take(n) Mengembalikan larik dengan yang pertama n elemen dari dataset. |
6 | takeSample (withReplacement,num, [seed]) Mengembalikan larik dengan sampel acak num elemen set data, dengan atau tanpa penggantian, secara opsional menentukan sebelumnya seed generator nomor acak. |
7 | takeOrdered(n, [ordering]) Mengembalikan yang pertama n elemen RDD menggunakan urutan aslinya atau pembanding kustom. |
8 | saveAsTextFile(path) Menulis elemen set data sebagai file teks (atau kumpulan file teks) dalam direktori tertentu di sistem file lokal, HDFS, atau sistem file lain yang didukung Hadoop. Spark memanggil toString pada setiap elemen untuk mengubahnya menjadi sebaris teks di file. |
9 | saveAsSequenceFile(path) (Java and Scala) Menulis elemen set data sebagai Hadoop SequenceFile di jalur tertentu di sistem file lokal, HDFS, atau sistem file lain yang didukung Hadoop. Ini tersedia di RDD dari key-value pair yang mengimplementasikan antarmuka Writable Hadoop. Di Scala, ini juga tersedia pada tipe yang secara implisit dapat dikonversi menjadi Writable (Spark menyertakan konversi untuk tipe dasar seperti Int, Double, String, dll). |
10 | saveAsObjectFile(path) (Java and Scala) Menulis elemen set data dalam format sederhana menggunakan serialisasi Java, yang kemudian dapat dimuat menggunakan SparkContext.objectFile (). |
11 | countByKey() Hanya tersedia pada tipe RDD (K, V). Mengembalikan hashmap pasangan (K, Int) dengan jumlah setiap kunci. |
12 | foreach(func) Menjalankan fungsi funcdi setiap elemen kumpulan data. Ini biasanya dilakukan untuk efek samping seperti memperbarui Akumulator atau berinteraksi dengan sistem penyimpanan eksternal. Note- memodifikasi variabel selain Akumulator di luar foreach () dapat mengakibatkan perilaku tidak terdefinisi. Lihat Memahami penutupan untuk lebih jelasnya. |
Pemrograman dengan RDD
Mari kita lihat implementasi beberapa transformasi dan tindakan RDD dalam pemrograman RDD dengan bantuan sebuah contoh.
Contoh
Pertimbangkan contoh jumlah kata - Ini menghitung setiap kata yang muncul dalam dokumen. Pertimbangkan teks berikut sebagai masukan dan disimpan sebagaiinput.txt file di direktori home.
input.txt - file masukan.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Ikuti prosedur yang diberikan di bawah ini untuk menjalankan contoh yang diberikan.
Buka Spark-Shell
Perintah berikut digunakan untuk membuka spark shell. Umumnya, percikan dibangun menggunakan Scala. Oleh karena itu, program Spark berjalan di lingkungan Scala.
$ spark-shell
Jika Spark shell berhasil dibuka maka Anda akan menemukan output berikut. Lihat baris terakhir dari output “Spark context available as sc” artinya penampung Spark otomatis dibuatkan objek konteks spark dengan namasc. Sebelum memulai langkah pertama program, objek SparkContext harus dibuat.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Buat RDD
Pertama, kita harus membaca file input menggunakan Spark-Scala API dan membuat RDD.
Perintah berikut digunakan untuk membaca file dari lokasi tertentu. Di sini, RDD baru dibuat dengan nama inputfile. String yang diberikan sebagai argumen dalam metode textFile ("") adalah jalur absolut untuk nama file masukan. Namun, jika hanya nama file yang diberikan, berarti file input berada di lokasi saat ini.
scala> val inputfile = sc.textFile("input.txt")
Jalankan Transformasi Jumlah Kata
Tujuan kami adalah menghitung kata-kata dalam sebuah file. Buat peta datar untuk memisahkan setiap baris menjadi kata-kata (flatMap(line ⇒ line.split(“ ”)).
Selanjutnya, baca setiap kata sebagai kunci dengan nilai ‘1’ (<key, value> = <word, 1>) menggunakan fungsi peta (map(word ⇒ (word, 1)).
Terakhir, kurangi kunci tersebut dengan menambahkan nilai dari kunci yang serupa (reduceByKey(_+_)).
Perintah berikut digunakan untuk menjalankan logika jumlah kata. Setelah menjalankan ini, Anda tidak akan menemukan output apa pun karena ini bukan tindakan, ini transformasi; menunjuk RDD baru atau memberi tahu percikan tentang apa yang harus dilakukan dengan data yang diberikan)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
RDD saat ini
Saat bekerja dengan RDD, jika Anda ingin mengetahui tentang RDD saat ini, gunakan perintah berikut. Ini akan menunjukkan kepada Anda deskripsi tentang RDD saat ini dan dependensinya untuk debugging.
scala> counts.toDebugString
Menyimpan Transformasi
Anda bisa menandai RDD untuk dipertahankan menggunakan metode persist () atau cache () di atasnya. Pertama kali dihitung dalam suatu tindakan, itu akan disimpan dalam memori di node. Gunakan perintah berikut untuk menyimpan transformasi perantara dalam memori.
scala> counts.cache()
Menerapkan Tindakan
Menerapkan tindakan, seperti menyimpan semua transformasi, menghasilkan file teks. Argumen String untuk metode saveAsTextFile ("") adalah jalur absolut folder keluaran. Coba perintah berikut untuk menyimpan output dalam file teks. Dalam contoh berikut, folder 'output' ada di lokasi saat ini.
scala> counts.saveAsTextFile("output")
Memeriksa Output
Buka terminal lain untuk masuk ke direktori home (di mana percikan dijalankan di terminal lain). Gunakan perintah berikut untuk memeriksa direktori keluaran.
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
Perintah berikut digunakan untuk melihat keluaran dari Part-00000 file.
[hadoop@localhost output]$ cat part-00000
Keluaran
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
Perintah berikut digunakan untuk melihat keluaran dari Part-00001 file.
[hadoop@localhost output]$ cat part-00001
Keluaran
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
PBB Mempertahankan Penyimpanan
Sebelum UN-persisting, jika Anda ingin melihat ruang penyimpanan yang digunakan untuk aplikasi ini, gunakan URL berikut di browser Anda.
http://localhost:4040
Anda akan melihat layar berikut, yang menunjukkan ruang penyimpanan yang digunakan untuk aplikasi, yang berjalan di shell Spark.
Jika Anda ingin membatalkan penyimpanan ruang penyimpanan RDD tertentu, gunakan perintah berikut.
Scala> counts.unpersist()
Anda akan melihat hasilnya sebagai berikut -
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
Untuk memverifikasi ruang penyimpanan di browser, gunakan URL berikut.
http://localhost:4040/
Anda akan melihat layar berikut. Ini menunjukkan ruang penyimpanan yang digunakan untuk aplikasi, yang berjalan di shell Spark.
Aplikasi Spark, menggunakan spark-submit, adalah perintah shell yang digunakan untuk menerapkan aplikasi Spark pada sebuah cluster. Ia menggunakan semua manajer cluster masing-masing melalui antarmuka yang seragam. Oleh karena itu, Anda tidak perlu mengkonfigurasi aplikasi Anda untuk masing-masing aplikasi.
Contoh
Mari kita ambil contoh jumlah kata yang sama, yang kita gunakan sebelumnya, menggunakan perintah shell. Di sini, kami menganggap contoh yang sama sebagai aplikasi percikan.
Contoh Input
Teks berikut adalah data masukan dan nama file adalah in.txt.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Lihat program berikut -
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
Simpan program di atas ke dalam file bernama SparkWordCount.scala dan letakkan di direktori yang ditentukan pengguna bernama spark-application.
Note - Saat mengubah inputRDD menjadi countRDD, kami menggunakan flatMap () untuk membuat token garis (dari file teks) menjadi kata-kata, metode map () untuk menghitung frekuensi kata dan metode reduceByKey () untuk menghitung setiap pengulangan kata.
Gunakan langkah-langkah berikut untuk mengirimkan aplikasi ini. Jalankan semua langkah dispark-application direktori melalui terminal.
Langkah 1: Unduh Spark Ja
Spark core jar diperlukan untuk kompilasi, oleh karena itu, unduh spark-core_2.10-1.3.0.jar dari tautan berikut Spark core jar dan pindahkan file jar dari direktori unduhan kespark-application direktori.
Langkah 2: Kompilasi program
Kompilasi program di atas menggunakan perintah yang diberikan di bawah ini. Perintah ini harus dijalankan dari direktori spark-application. Sini,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar adalah jar dukungan Hadoop yang diambil dari pustaka Spark.
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
Langkah 3: Buat JAR
Buat file jar dari aplikasi spark menggunakan perintah berikut. Sini,wordcount adalah nama file untuk file jar.
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Langkah 4: Kirimkan aplikasi percikan
Kirimkan aplikasi spark menggunakan perintah berikut -
spark-submit --class SparkWordCount --master local wordcount.jar
Jika berhasil dijalankan, maka Anda akan menemukan output yang diberikan di bawah ini. ItuOKmembiarkan output berikut untuk identifikasi pengguna dan itu adalah baris terakhir dari program. Jika Anda membaca output berikut dengan cermat, Anda akan menemukan hal-hal yang berbeda, seperti -
- berhasil memulai layanan 'sparkDriver' pada port 42954
- MemoryStore dimulai dengan kapasitas 267,3 MB
- Memulai SparkUI di http://192.168.1.217:4040
- Menambahkan file JAR: /home/hadoop/piapplication/count.jar
- ResultStage 1 (saveAsTextFile di SparkPi.scala: 11) selesai dalam 0,566 detik
- Menghentikan UI web Spark di http://192.168.1.217:4040
- MemoryStore dihapus
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Langkah 5: Memeriksa keluaran
Setelah program berhasil dijalankan, Anda akan menemukan direktori bernama outfile di direktori spark-application.
Perintah berikut digunakan untuk membuka dan memeriksa daftar file di direktori outfile.
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
Perintah untuk memeriksa keluaran part-00000 file adalah -
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
Perintah untuk memeriksa keluaran pada file bagian-00001 adalah -
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
Pergi melalui bagian berikut untuk mengetahui lebih banyak tentang perintah 'percikan-kirim'.
Spark-submit Syntax
spark-submit [options] <app jar | python file> [app arguments]
Pilihan
S.No | Pilihan | Deskripsi |
---|---|---|
1 | --menguasai | spark: // host: port, mesos: // host: port, yarn, atau local. |
2 | --deploy-mode | Apakah akan meluncurkan program driver secara lokal ("klien") atau di salah satu mesin pekerja di dalam cluster ("cluster") (Default: client). |
3 | --kelas | Kelas utama aplikasi Anda (untuk aplikasi Java / Scala). |
4 | --nama | Nama aplikasi Anda. |
5 | - toples | Daftar local jars yang dipisahkan koma untuk disertakan pada classpath driver dan eksekutor. |
6 | --paket | Daftar koordinat maven dari jars yang dipisahkan koma untuk disertakan pada classpath driver dan eksekutor. |
7 | --repositories | Daftar repositori jarak jauh tambahan yang dipisahkan koma untuk mencari koordinat maven yang diberikan dengan --packages. |
8 | --py-files | Daftar file .zip, .egg, atau .py yang dipisahkan koma untuk ditempatkan di PYTHON PATH untuk aplikasi Python. |
9 | --files | Daftar file yang dipisahkan koma untuk ditempatkan di direktori kerja setiap pelaksana. |
10 | --conf (prop = val) | Properti konfigurasi Spark sewenang-wenang. |
11 | --properties-file | Jalur ke file tempat memuat properti tambahan. Jika tidak ditentukan, ini akan mencari conf / spark-defaults. |
12 | --driver-memory | Memori untuk driver (misalnya 1000M, 2G) (Default: 512M). |
13 | --driver-java-options | Opsi Java ekstra untuk diberikan kepada pengemudi. |
14 | --driver-library-path | Entri jalur perpustakaan tambahan untuk diteruskan ke pengemudi. |
15 | --driver-class-path | Entri jalur kelas tambahan untuk diteruskan ke pengemudi. Perhatikan bahwa jars yang ditambahkan dengan --jars secara otomatis disertakan dalam classpath. |
16 | --executor-memory | Memori per pelaksana (misalnya 1000M, 2G) (Default: 1G). |
17 | --proxy-user | Pengguna meniru identitas saat mengajukan aplikasi. |
18 | --help, -h | Tunjukkan pesan bantuan ini dan keluar. |
19 | --verbose, -v | Cetak keluaran debug tambahan. |
20 | --Versi: kapan | Cetak versi Spark saat ini. |
21 | --driver-core NUM | Core untuk driver (Default: 1). |
22 | --mengawasi | Jika diberikan, restart driver jika gagal. |
23 | --membunuh | Jika diberikan, membunuh pengemudi yang ditentukan. |
24 | --status | Jika diberikan, meminta status dari pengemudi yang ditentukan. |
25 | --total-executor-core | Total core untuk semua pelaksana. |
26 | --executor-core | Jumlah inti per pelaksana. (Default: 1 dalam mode YARN, atau semua inti yang tersedia pada pekerja dalam mode mandiri). |
Spark berisi dua jenis variabel bersama - satu adalah broadcast variables dan kedua accumulators.
Broadcast variables - digunakan untuk secara efisien, mendistribusikan nilai-nilai besar.
Accumulators - digunakan untuk mengumpulkan informasi dari koleksi tertentu.
Variabel Siaran
Variabel siaran memungkinkan pemrogram untuk menyimpan variabel hanya-baca dalam cache pada setiap mesin daripada mengirimkan salinannya dengan tugas. Mereka dapat digunakan, misalnya, untuk memberikan setiap node, salinan set data masukan yang besar, dengan cara yang efisien. Spark juga mencoba mendistribusikan variabel siaran menggunakan algoritma siaran yang efisien untuk mengurangi biaya komunikasi.
Tindakan percikan dijalankan melalui serangkaian tahapan, dipisahkan oleh operasi "acak" yang didistribusikan. Spark secara otomatis menyiarkan data umum yang dibutuhkan oleh tugas dalam setiap tahapan.
Data yang disiarkan dengan cara ini di-cache dalam bentuk serial dan diserialisasi sebelum menjalankan setiap tugas. Ini berarti bahwa membuat variabel broadcast secara eksplisit, hanya berguna ketika tugas di beberapa tahapan membutuhkan data yang sama atau ketika menyimpan data dalam bentuk deserialisasi penting.
Variabel siaran dibuat dari variabel v dengan menyebut SparkContext.broadcast(v). Variabel siaran adalah pembungkusv, dan nilainya dapat diakses dengan memanggil valuemetode. Kode yang diberikan di bawah ini menunjukkan ini -
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
Output -
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
Setelah variabel broadcast dibuat, itu harus digunakan sebagai pengganti nilainya v dalam fungsi apa pun berjalan di cluster, sehingga vtidak dikirim ke node lebih dari sekali. Selain itu, objekv tidak boleh diubah setelah siaran, untuk memastikan bahwa semua node mendapatkan nilai yang sama dari variabel siaran.
Akumulator
Akumulator adalah variabel yang hanya "ditambahkan" melalui operasi asosiatif dan oleh karena itu, dapat didukung secara efisien secara paralel. Mereka dapat digunakan untuk mengimplementasikan penghitung (seperti dalam MapReduce) atau penjumlahan. Spark secara native mendukung akumulator tipe numerik, dan pemrogram dapat menambahkan dukungan untuk tipe baru. Jika akumulator dibuat dengan nama, mereka akan ditampilkan diSpark’s UI. Ini dapat berguna untuk memahami kemajuan tahapan yang sedang berjalan (CATATAN - ini belum didukung dalam Python).
Akumulator dibuat dari nilai awal v dengan menyebut SparkContext.accumulator(v). Tugas yang berjalan di cluster kemudian dapat ditambahkan ke cluster tersebut menggunakanaddmetode atau operator + = (dalam Scala dan Python). Namun, mereka tidak bisa membaca nilainya. Hanya program driver yang dapat membaca nilai akumulator, menggunakanvalue metode.
Kode yang diberikan di bawah ini menunjukkan akumulator yang digunakan untuk menjumlahkan elemen array -
scala> val accum = sc.accumulator(0)
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
Jika Anda ingin melihat output dari kode di atas maka gunakan perintah berikut -
scala> accum.value
Keluaran
res2: Int = 10
Operasi RDD numerik
Spark memungkinkan Anda melakukan operasi yang berbeda pada data numerik, menggunakan salah satu metode API yang telah ditentukan sebelumnya. Operasi numerik Spark diimplementasikan dengan algoritme streaming yang memungkinkan pembuatan model, satu elemen dalam satu waktu.
Operasi ini dihitung dan dikembalikan sebagai StatusCounter keberatan dengan menelepon status() metode.
S.No | Metode & Arti |
---|---|
1 | count() Jumlah elemen dalam RDD. |
2 | Mean() Rata-rata elemen dalam RDD. |
3 | Sum() Nilai total elemen dalam RDD. |
4 | Max() Nilai maksimum di antara semua elemen di RDD. |
5 | Min() Nilai minimum di antara semua elemen di RDD. |
6 | Variance() Varians elemen. |
7 | Stdev() Simpangan baku. |
Jika Anda hanya ingin menggunakan salah satu metode ini, Anda dapat memanggil metode terkait langsung di RDD.