Apache Spark - Pemrograman Inti

Spark Core adalah dasar dari keseluruhan proyek. Ini menyediakan pengiriman tugas terdistribusi, penjadwalan, dan fungsionalitas I / O dasar. Spark menggunakan struktur data fundamental khusus yang disebut RDD (Resilient Distributed Datasets) yang merupakan kumpulan logis dari data yang dipartisi di seluruh mesin. RDD dapat dibuat dengan dua cara; pertama dengan mereferensikan dataset dalam sistem penyimpanan eksternal dan kedua dengan menerapkan transformasi (misalnya peta, filter, reducer, join) pada RDD yang ada.

Abstraksi RDD diekspos melalui API yang terintegrasi dengan bahasa. Ini menyederhanakan kompleksitas pemrograman karena cara aplikasi memanipulasi RDD mirip dengan memanipulasi kumpulan data lokal.

Spark Shell

Spark menyediakan shell interaktif - alat yang ampuh untuk menganalisis data secara interaktif. Ini tersedia dalam bahasa Scala atau Python. Abstraksi utama Spark adalah kumpulan item terdistribusi yang disebut Set Data Terdistribusi Tangguh (RDD). RDD dapat dibuat dari Format Input Hadoop (seperti file HDFS) atau dengan mengubah RDD lainnya.

Buka Spark Shell

Perintah berikut digunakan untuk membuka shell Spark.

$ spark-shell

Buat RDD sederhana

Mari kita buat RDD sederhana dari file teks. Gunakan perintah berikut untuk membuat RDD sederhana.

scala> val inputfile = sc.textFile(“input.txt”)

Output untuk perintah di atas adalah

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API memperkenalkan beberapa Transformations dan sedikit Actions untuk memanipulasi RDD.

Transformasi RDD

Transformasi RDD mengembalikan penunjuk ke RDD baru dan memungkinkan Anda membuat dependensi antara RDD. Setiap RDD dalam rantai ketergantungan (String of Dependencies) memiliki fungsi untuk menghitung datanya dan memiliki pointer (ketergantungan) ke RDD induknya.

Spark malas, jadi tidak ada yang akan dieksekusi kecuali Anda memanggil beberapa transformasi atau tindakan yang akan memicu pembuatan dan eksekusi pekerjaan. Lihat cuplikan contoh jumlah kata berikut.

Oleh karena itu, transformasi RDD bukanlah sekumpulan data tetapi merupakan langkah dalam program (mungkin satu-satunya langkah) yang memberi tahu Spark cara mendapatkan data dan apa yang harus dilakukan dengannya.

Diberikan di bawah ini adalah daftar transformasi RDD.

S.No Transformasi & Makna
1

map(func)

Mengembalikan set data terdistribusi baru, yang dibentuk dengan meneruskan setiap elemen sumber melalui suatu fungsi func.

2

filter(func)

Mengembalikan set data baru yang dibentuk dengan memilih elemen-elemen sumbernya func mengembalikan true.

3

flatMap(func)

Mirip dengan map, tetapi setiap item input dapat dipetakan ke 0 atau lebih item output (jadi func harus mengembalikan Seq daripada satu item).

4

mapPartitions(func)

Mirip dengan map, tetapi berjalan secara terpisah di setiap partisi (blok) RDD, jadi func harus berjenis Iterator <T> ⇒ Iterator <U> saat menjalankan RDD tipe T.

5

mapPartitionsWithIndex(func)

Mirip dengan Partisi peta, tetapi juga menyediakan func dengan nilai integer yang mewakili indeks partisi, jadi func harus berjenis (Int, Iterator <T>) ⇒ Iterator <U> saat dijalankan pada RDD tipe T.

6

sample(withReplacement, fraction, seed)

Contoh a fraction data, dengan atau tanpa penggantian, menggunakan seed generator nomor acak.

7

union(otherDataset)

Mengembalikan set data baru yang berisi gabungan elemen dalam set data sumber dan argumen.

8

intersection(otherDataset)

Mengembalikan RDD baru yang berisi perpotongan elemen dalam kumpulan data sumber dan argumen.

9

distinct([numTasks])

Mengembalikan set data baru yang berisi elemen berbeda dari set data sumber.

10

groupByKey([numTasks])

Saat dipanggil pada kumpulan data pasangan (K, V), mengembalikan kumpulan data pasangan (K, Iterable <V>).

Note - Jika Anda mengelompokkan untuk melakukan agregasi (seperti penjumlahan atau rata-rata) pada setiap kunci, menggunakan reduceByKey atau aggregateByKey akan menghasilkan performa yang jauh lebih baik.

11

reduceByKey(func, [numTasks])

Ketika dipanggil pada dataset dari (K, V) pasangan, mengembalikan dataset dari (K, V) pasangan di mana nilai-nilai untuk setiap tombol dikumpulkan menggunakan fungsi yang diberikan mengurangi func , yang harus dari jenis (V, V) ⇒ V Seperti di groupByKey, jumlah tugas pengurangan dapat dikonfigurasi melalui argumen opsional kedua.

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

Saat dipanggil pada kumpulan data pasangan (K, V), mengembalikan kumpulan data pasangan (K, U) di mana nilai untuk setiap kunci digabungkan menggunakan fungsi gabungan yang diberikan dan nilai netral "nol". Mengizinkan jenis nilai gabungan yang berbeda dari jenis nilai masukan, sambil menghindari alokasi yang tidak perlu. Seperti di groupByKey, jumlah tugas pengurangan dapat dikonfigurasi melalui argumen kedua opsional.

13

sortByKey([ascending], [numTasks])

Saat dipanggil pada kumpulan data pasangan (K, V) di mana K mengimplementasikan Berurutan, mengembalikan kumpulan data pasangan (K, V) yang diurutkan berdasarkan kunci dalam urutan menaik atau menurun, seperti yang ditentukan dalam argumen naik Boolean.

14

join(otherDataset, [numTasks])

Ketika dipanggil pada set data tipe (K, V) dan (K, W), mengembalikan set data pasangan (K, (V, W)) dengan semua pasangan elemen untuk setiap kunci. Gabungan luar didukung melalui leftOuterJoin, rightOuterJoin, dan fullOuterJoin.

15

cogroup(otherDataset, [numTasks])

Saat dipanggil pada set data tipe (K, V) dan (K, W), mengembalikan set data tupel (K, (Iterable <V>, Iterable <W>)). Operasi ini juga disebut grup Dengan.

16

cartesian(otherDataset)

Saat dipanggil pada set data tipe T dan U, mengembalikan set data pasangan (T, U) (semua pasangan elemen).

17

pipe(command, [envVars])

Gunakan pipa setiap partisi RDD melalui perintah shell, misalnya skrip Perl atau bash. Elemen RDD ditulis ke stdin proses dan baris keluaran ke stdout-nya dikembalikan sebagai RDD string.

18

coalesce(numPartitions)

Kurangi jumlah partisi di RDD menjadi numPartitions. Berguna untuk menjalankan operasi dengan lebih efisien setelah memfilter kumpulan data yang besar.

19

repartition(numPartitions)

Susun ulang data di RDD secara acak untuk membuat lebih banyak atau lebih sedikit partisi dan menyeimbangkannya. Ini selalu mengacak semua data melalui jaringan.

20

repartitionAndSortWithinPartitions(partitioner)

Partisi ulang RDD sesuai dengan pemartisi yang diberikan dan, dalam setiap partisi yang dihasilkan, urutkan catatan berdasarkan kuncinya. Ini lebih efisien daripada memanggil partisi ulang dan kemudian menyortir di dalam setiap partisi karena dapat mendorong penyortiran ke dalam mesin shuffle.

Tindakan

Tabel berikut memberikan daftar Tindakan, yang mengembalikan nilai.

S.No Aksi & Arti
1

reduce(func)

Gabungkan elemen set data menggunakan sebuah fungsi func(yang mengambil dua argumen dan mengembalikan satu). Fungsi tersebut harus bersifat komutatif dan asosiatif sehingga dapat dihitung dengan benar secara paralel.

2

collect()

Mengembalikan semua elemen dari kumpulan data sebagai larik di program driver. Ini biasanya berguna setelah filter atau operasi lain yang mengembalikan subset data yang cukup kecil.

3

count()

Mengembalikan jumlah elemen dalam kumpulan data.

4

first()

Mengembalikan elemen pertama dari kumpulan data (mirip dengan take (1)).

5

take(n)

Mengembalikan larik dengan yang pertama n elemen dari dataset.

6

takeSample (withReplacement,num, [seed])

Mengembalikan larik dengan sampel acak num elemen set data, dengan atau tanpa penggantian, secara opsional menentukan sebelumnya seed generator nomor acak.

7

takeOrdered(n, [ordering])

Mengembalikan yang pertama n elemen RDD menggunakan urutan aslinya atau pembanding kustom.

8

saveAsTextFile(path)

Menulis elemen set data sebagai file teks (atau kumpulan file teks) dalam direktori tertentu di sistem file lokal, HDFS, atau sistem file lain yang didukung Hadoop. Spark memanggil toString pada setiap elemen untuk mengubahnya menjadi sebaris teks di file.

9

saveAsSequenceFile(path) (Java and Scala)

Menulis elemen set data sebagai Hadoop SequenceFile di jalur tertentu di sistem file lokal, HDFS, atau sistem file lain yang didukung Hadoop. Ini tersedia di RDD pasangan nilai kunci yang menerapkan antarmuka Writable Hadoop. Di Scala, ini juga tersedia pada tipe yang secara implisit dapat dikonversi menjadi Writable (Spark menyertakan konversi untuk tipe dasar seperti Int, Double, String, dll).

10

saveAsObjectFile(path) (Java and Scala)

Menulis elemen set data dalam format sederhana menggunakan serialisasi Java, yang kemudian dapat dimuat menggunakan SparkContext.objectFile ().

11

countByKey()

Hanya tersedia pada tipe RDD (K, V). Mengembalikan hashmap pasangan (K, Int) dengan jumlah setiap kunci.

12

foreach(func)

Menjalankan fungsi funcdi setiap elemen kumpulan data. Ini biasanya dilakukan untuk efek samping seperti memperbarui Akumulator atau berinteraksi dengan sistem penyimpanan eksternal.

Note- memodifikasi variabel selain Akumulator di luar foreach () dapat mengakibatkan perilaku tidak terdefinisi. Lihat Memahami penutupan untuk lebih jelasnya.

Pemrograman dengan RDD

Mari kita lihat implementasi beberapa transformasi dan tindakan RDD dalam pemrograman RDD dengan bantuan sebuah contoh.

Contoh

Pertimbangkan contoh jumlah kata - Ini menghitung setiap kata yang muncul dalam dokumen. Pertimbangkan teks berikut sebagai masukan dan disimpan sebagaiinput.txt file di direktori home.

input.txt - file masukan.

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

Ikuti prosedur yang diberikan di bawah ini untuk menjalankan contoh yang diberikan.

Buka Spark-Shell

Perintah berikut digunakan untuk membuka spark shell. Umumnya, percikan dibangun menggunakan Scala. Oleh karena itu, program Spark berjalan di lingkungan Scala.

$ spark-shell

Jika Spark shell berhasil dibuka maka Anda akan menemukan output berikut. Lihat baris terakhir dari output “Spark context available as sc” artinya penampung Spark otomatis dibuatkan objek konteks spark dengan namasc. Sebelum memulai langkah pertama program, objek SparkContext harus dibuat.

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

Buat RDD

Pertama, kita harus membaca file input menggunakan Spark-Scala API dan membuat RDD.

Perintah berikut digunakan untuk membaca file dari lokasi tertentu. Di sini, RDD baru dibuat dengan nama inputfile. String yang diberikan sebagai argumen dalam metode textFile ("") adalah jalur absolut untuk nama file masukan. Namun, jika hanya nama file yang diberikan, berarti file input berada di lokasi saat ini.

scala> val inputfile = sc.textFile("input.txt")

Jalankan Transformasi Jumlah Kata

Tujuan kami adalah menghitung kata-kata dalam sebuah file. Buat peta datar untuk memisahkan setiap baris menjadi kata-kata (flatMap(line ⇒ line.split(“ ”)).

Selanjutnya, baca setiap kata sebagai kunci dengan nilai ‘1’ (<key, value> = <word, 1>) menggunakan fungsi peta (map(word ⇒ (word, 1)).

Terakhir, kurangi kunci tersebut dengan menambahkan nilai dari kunci yang serupa (reduceByKey(_+_)).

Perintah berikut digunakan untuk menjalankan logika jumlah kata. Setelah menjalankan ini, Anda tidak akan menemukan output apa pun karena ini bukan tindakan, ini transformasi; menunjuk RDD baru atau memberi tahu percikan tentang apa yang harus dilakukan dengan data yang diberikan)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

RDD saat ini

Saat bekerja dengan RDD, jika Anda ingin mengetahui tentang RDD saat ini, gunakan perintah berikut. Ini akan menunjukkan kepada Anda deskripsi tentang RDD saat ini dan dependensinya untuk debugging.

scala> counts.toDebugString

Menyimpan Transformasi

Anda bisa menandai RDD untuk dipertahankan menggunakan metode persist () atau cache () di atasnya. Pertama kali dihitung dalam suatu tindakan, itu akan disimpan dalam memori di node. Gunakan perintah berikut untuk menyimpan transformasi perantara dalam memori.

scala> counts.cache()

Menerapkan Tindakan

Menerapkan tindakan, seperti menyimpan semua transformasi, menghasilkan file teks. Argumen String untuk metode saveAsTextFile ("") adalah jalur absolut folder keluaran. Coba perintah berikut untuk menyimpan output dalam file teks. Dalam contoh berikut, folder 'output' ada di lokasi saat ini.

scala> counts.saveAsTextFile("output")

Memeriksa Output

Buka terminal lain untuk masuk ke direktori home (di mana percikan dijalankan di terminal lain). Gunakan perintah berikut untuk memeriksa direktori keluaran.

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

Perintah berikut digunakan untuk melihat keluaran dari Part-00000 file.

[hadoop@localhost output]$ cat part-00000

Keluaran

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Perintah berikut digunakan untuk melihat keluaran dari Part-00001 file.

[hadoop@localhost output]$ cat part-00001

Keluaran

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

PBB Mempertahankan Penyimpanan

Sebelum UN-persisting, jika Anda ingin melihat ruang penyimpanan yang digunakan untuk aplikasi ini, gunakan URL berikut di browser Anda.

http://localhost:4040

Anda akan melihat layar berikut, yang menunjukkan ruang penyimpanan yang digunakan untuk aplikasi, yang berjalan di shell Spark.

Jika Anda ingin membatalkan penyimpanan ruang penyimpanan RDD tertentu, gunakan perintah berikut.

Scala> counts.unpersist()

Anda akan melihat hasilnya sebagai berikut -

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

Untuk memverifikasi ruang penyimpanan di browser, gunakan URL berikut.

http://localhost:4040/

Anda akan melihat layar berikut. Ini menunjukkan ruang penyimpanan yang digunakan untuk aplikasi, yang berjalan di shell Spark.