Apache Spark - Triển khai

Ứng dụng Spark, sử dụng spark-submit, là một lệnh shell được sử dụng để triển khai ứng dụng Spark trên một cụm. Nó sử dụng tất cả các trình quản lý cụm tương ứng thông qua một giao diện thống nhất. Do đó, bạn không phải định cấu hình ứng dụng của mình cho từng ứng dụng.

Thí dụ

Chúng ta hãy lấy ví dụ tương tự về số từ, chúng ta đã sử dụng trước đây, bằng cách sử dụng các lệnh shell. Ở đây, chúng tôi coi ví dụ tương tự như một ứng dụng tia lửa.

Đầu vào mẫu

Văn bản sau là dữ liệu đầu vào và tệp có tên là in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Xem chương trình sau -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Lưu chương trình trên vào một tệp có tên SparkWordCount.scala và đặt nó trong một thư mục do người dùng xác định có tên spark-application.

Note - Trong khi chuyển inputRDD thành countRDD, chúng tôi đang sử dụng flatMap () để mã hóa các dòng (từ tệp văn bản) thành các từ, phương thức map () để đếm tần suất từ ​​và phương thức ReduceByKey () để đếm từng từ lặp lại.

Sử dụng các bước sau để gửi đơn đăng ký này. Thực hiện tất cả các bước trongspark-application thư mục thông qua thiết bị đầu cuối.

Bước 1: Tải xuống Spark Ja

Spark core jar là bắt buộc để biên dịch, do đó, hãy tải xuống spark-core_2.10-1.3.0.jar từ liên kết sau Spark core jar và di chuyển tệp jar từ thư mục tải xuống sangspark-application danh mục.

Bước 2: Biên dịch chương trình

Biên dịch chương trình trên bằng lệnh dưới đây. Lệnh này sẽ được thực thi từ thư mục ứng dụng tia lửa. Đây,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar là một jar hỗ trợ Hadoop được lấy từ thư viện Spark.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Bước 3: Tạo JAR

Tạo một tệp jar của ứng dụng tia lửa bằng lệnh sau. Đây,wordcount là tên tệp cho tệp jar.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Bước 4: Gửi ứng dụng spark

Gửi ứng dụng tia lửa bằng lệnh sau:

spark-submit --class SparkWordCount --master local wordcount.jar

Nếu nó được thực thi thành công, thì bạn sẽ tìm thấy kết quả được đưa ra bên dưới. CácOKcho phép đầu ra sau là để nhận dạng người dùng và đó là dòng cuối cùng của chương trình. Nếu bạn đọc kỹ kết quả sau, bạn sẽ thấy những điều khác nhau, chẳng hạn như -

  • đã bắt đầu thành công dịch vụ 'sparkDriver' trên cổng 42954
  • MemoryStore bắt đầu với dung lượng 267,3 MB
  • Bắt đầu SparkUI tại http://192.168.1.217:4040
  • Đã thêm tệp JAR: /home/hadoop/piapplication/count.jar
  • Kết quả Giai đoạn 1 (saveAsTextFile tại SparkPi.scala: 11) hoàn thành trong 0,566 giây
  • Đã dừng giao diện người dùng web Spark tại http://192.168.1.217:4040
  • MemoryStore đã bị xóa
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Bước 5: Kiểm tra đầu ra

Sau khi thực hiện thành công chương trình, bạn sẽ tìm thấy thư mục có tên outfile trong thư mục ứng dụng spark.

Các lệnh sau được sử dụng để mở và kiểm tra danh sách các tệp trong thư mục outfile.

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

Các lệnh để kiểm tra đầu ra trong part-00000 tập tin là -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Các lệnh để kiểm tra kết quả đầu ra trong tệp part-00001 là:

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Xem qua phần sau để biết thêm về lệnh 'spark-submit'.

Cú pháp gửi Spark

spark-submit [options] <app jar | python file> [app arguments]

Tùy chọn

S. không Lựa chọn Sự miêu tả
1 --bậc thầy spark: // host: port, mesos: // host: port, fiber hoặc local.
2 - chế độ triển khai Khởi chạy chương trình trình điều khiển cục bộ ("máy khách") hoặc trên một trong các máy công nhân bên trong cụm ("cụm") (Mặc định: máy khách).
3 --lớp học Lớp chính của ứng dụng của bạn (dành cho ứng dụng Java / Scala).
4 --Tên Tên ứng dụng của bạn.
5 --jars Danh sách các lọ cục bộ được phân tách bằng dấu phẩy để đưa vào các đường dẫn trình điều khiển và trình thực thi.
6 - gói hàng Danh sách tọa độ maven của các chum được phân tách bằng dấu phẩy để đưa vào đường dẫn classpath của trình điều khiển và trình thực thi.
7 - kho lưu trữ Danh sách các kho lưu trữ từ xa bổ sung được phân tách bằng dấu phẩy để tìm kiếm tọa độ maven được cung cấp với --packages.
số 8 --py-files Danh sách các tệp .zip, .egg hoặc .py được phân tách bằng dấu phẩy để đặt trên PYTHON PATH cho ứng dụng Python.
9 --các tập tin Danh sách tệp được phân tách bằng dấu phẩy sẽ được đặt trong thư mục làm việc của mỗi trình thực thi.
10 --conf (prop = val) Thuộc tính cấu hình Spark tùy ý.
11 --properties-file Đường dẫn đến tệp để tải các thuộc tính bổ sung. Nếu không được chỉ định, điều này sẽ tìm kiếm mặc định conf / spark.
12 --driver-memory Bộ nhớ cho trình điều khiển (ví dụ: 1000M, 2G) (Mặc định: 512M).
13 --driver-java-options Các tùy chọn Java bổ sung để chuyển cho trình điều khiển.
14 --driver-library-path Các mục nhập đường dẫn thư viện bổ sung để chuyển cho trình điều khiển.
15 --driver-class-path

Các mục nhập đường dẫn lớp bổ sung để chuyển cho người lái xe.

Lưu ý rằng các bình được thêm vào với --jars sẽ tự động được đưa vào classpath.

16 --executor-memory Bộ nhớ cho mỗi trình thực thi (ví dụ: 1000M, 2G) (Mặc định: 1G).
17 - người dùng proxy Người dùng mạo danh khi nộp hồ sơ.
18 - trợ giúp, -h Hiển thị thông báo trợ giúp này và thoát.
19 --verbose, -v In đầu ra gỡ lỗi bổ sung.
20 --phiên bản In phiên bản của Spark hiện tại.
21 --driver-core NUM Lõi cho trình điều khiển (Mặc định: 1).
22 --supervise Nếu được đưa ra, hãy khởi động lại trình điều khiển khi bị lỗi.
23 --giết chết Nếu được đưa ra, giết người lái xe được chỉ định.
24 --trạng thái Nếu được đưa ra, hãy yêu cầu trạng thái của trình điều khiển được chỉ định.
25 --total-executive-core Tổng số lõi cho tất cả người thực thi.
26 --executor-core Số lõi trên mỗi trình thực thi. (Mặc định: 1 ở chế độ YARN hoặc tất cả các lõi có sẵn trên worker ở chế độ độc lập).