Apache Spark - การปรับใช้

แอปพลิเคชัน Spark โดยใช้ spark-submit เป็นคำสั่งเชลล์ที่ใช้ในการปรับใช้แอปพลิเคชัน Spark บนคลัสเตอร์ ใช้ตัวจัดการคลัสเตอร์ตามลำดับทั้งหมดผ่านอินเทอร์เฟซที่เหมือนกัน ดังนั้นคุณไม่จำเป็นต้องกำหนดค่าแอปพลิเคชันของคุณสำหรับแต่ละแอปพลิเคชัน

ตัวอย่าง

ให้เรานำตัวอย่างการนับจำนวนคำที่เราเคยใช้มาก่อนโดยใช้คำสั่งเชลล์ ที่นี่เราพิจารณาตัวอย่างเดียวกับแอปพลิเคชันจุดประกาย

อินพุตตัวอย่าง

ข้อความต่อไปนี้คือข้อมูลอินพุตและไฟล์ชื่อคือ in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

ดูโปรแกรมต่อไปนี้ -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

บันทึกโปรแกรมข้างต้นลงในไฟล์ชื่อ SparkWordCount.scala และวางไว้ในไดเร็กทอรีที่ผู้ใช้กำหนดชื่อ spark-application.

Note - ในขณะที่เปลี่ยน inputRDD เป็น countRDD เรากำลังใช้ flatMap () สำหรับการโทเค็นบรรทัด (จากไฟล์ข้อความ) เป็นคำวิธี map () สำหรับการนับความถี่ของคำและวิธีการ ReduceByKey () สำหรับการนับการซ้ำแต่ละคำ

ใช้ขั้นตอนต่อไปนี้เพื่อส่งใบสมัครนี้ ดำเนินการตามขั้นตอนทั้งหมดในไฟล์spark-application ไดเร็กทอรีผ่านเทอร์มินัล

ขั้นตอนที่ 1: ดาวน์โหลด Spark Ja

ต้องใช้ Spark core jar สำหรับการคอมไพล์ดังนั้นดาวน์โหลด spark-core_2.10-1.3.0.jar จากลิงค์ต่อไปนี้Spark core jarและย้ายไฟล์ jar จากไดเร็กทอรีดาวน์โหลดไปที่spark-application ไดเรกทอรี

ขั้นตอนที่ 2: รวบรวมโปรแกรม

รวบรวมโปรแกรมด้านบนโดยใช้คำสั่งที่ระบุด้านล่าง คำสั่งนี้ควรดำเนินการจากไดเร็กทอรี spark-application ที่นี่/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar เป็นโถสนับสนุน Hadoop ที่นำมาจากห้องสมุด Spark

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

ขั้นตอนที่ 3: สร้าง JAR

สร้างไฟล์ jar ของแอปพลิเคชัน spark โดยใช้คำสั่งต่อไปนี้ ที่นี่wordcount คือชื่อไฟล์สำหรับไฟล์ jar

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

ขั้นตอนที่ 4: ส่งใบสมัคร Spark

ส่งแอปพลิเคชัน spark โดยใช้คำสั่งต่อไปนี้ -

spark-submit --class SparkWordCount --master local wordcount.jar

หากดำเนินการสำเร็จคุณจะพบผลลัพธ์ที่ระบุด้านล่าง OKการปล่อยให้เอาต์พุตต่อไปนี้มีไว้สำหรับการระบุผู้ใช้และนั่นคือบรรทัดสุดท้ายของโปรแกรม หากคุณอ่านผลลัพธ์ต่อไปนี้อย่างละเอียดคุณจะพบสิ่งต่างๆเช่น -

  • เริ่มบริการ 'sparkDriver' บนพอร์ต 42954 เรียบร้อยแล้ว
  • MemoryStore เริ่มต้นด้วยความจุ 267.3 MB
  • เริ่มต้น SparkUI ที่ http://192.168.1.217:4040
  • เพิ่มไฟล์ JAR: /home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile ที่ SparkPi.scala: 11) เสร็จใน 0.566 วินาที
  • หยุด Spark web UI ที่ http://192.168.1.217:4040
  • เคลียร์ MemoryStore แล้ว
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

ขั้นตอนที่ 5: การตรวจสอบผลลัพธ์

หลังจากดำเนินการโปรแกรมสำเร็จคุณจะพบไดเร็กทอรีชื่อ outfile ในไดเร็กทอรี spark-application

คำสั่งต่อไปนี้ใช้สำหรับเปิดและตรวจสอบรายการไฟล์ในไดเร็กทอรี outfile

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

คำสั่งสำหรับตรวจสอบเอาต์พุตใน part-00000 ไฟล์คือ -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

คำสั่งในการตรวจสอบเอาต์พุตในไฟล์ part-00001 คือ -

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

อ่านหัวข้อต่อไปนี้เพื่อเรียนรู้เพิ่มเติมเกี่ยวกับคำสั่ง 'spark-submit'

Spark-submit Syntax

spark-submit [options] <app jar | python file> [app arguments]

ตัวเลือก

ส. เลขที่ ตัวเลือก คำอธิบาย
1 - อาจารย์ spark: // host: port, mesos: // host: port, yarn หรือ local
2 - โหมดใช้งานได้ ไม่ว่าจะเปิดโปรแกรมไดรเวอร์ในเครื่อง ("ไคลเอนต์") หรือบนเครื่องของผู้ปฏิบัติงานเครื่องใดเครื่องหนึ่งภายในคลัสเตอร์ ("คลัสเตอร์") (ค่าเริ่มต้น: ไคลเอนต์)
3 - คลาส คลาสหลักของแอปพลิเคชันของคุณ (สำหรับแอป Java / Scala)
4 --ชื่อ ชื่อแอปพลิเคชันของคุณ
5 - ขวด รายการ jar ในเครื่องที่คั่นด้วยจุลภาคที่จะรวมไว้ในคลาสพา ธ ของไดรเวอร์และตัวดำเนินการ
6 - แพคเกจ รายการพิกัด maven ที่คั่นด้วยจุลภาคที่จะรวมไว้ในคลาสพา ธ ของไดรเวอร์และตัวดำเนินการ
7 - องค์ประกอบ รายการที่เก็บรีโมตเพิ่มเติมที่คั่นด้วยจุลภาคเพื่อค้นหาพิกัด maven ที่ให้มากับ --packages
8 --py ไฟล์ รายการไฟล์. zip, .egg หรือ. py ที่คั่นด้วยจุลภาคเพื่อวางบน PYTHON PATH สำหรับแอป Python
9 - ไฟล์ รายการไฟล์ที่คั่นด้วยจุลภาคที่จะวางในไดเร็กทอรีการทำงานของแต่ละตัวดำเนินการ
10 --conf (เสา = วาล) คุณสมบัติการกำหนดค่า Spark โดยพลการ
11 - คุณสมบัติไฟล์ พา ธ ไปยังไฟล์ที่จะโหลดคุณสมบัติพิเศษ หากไม่ได้ระบุจะเป็นการค้นหา conf / spark-defaults
12 - ไขควงหน่วยความจำ หน่วยความจำสำหรับไดรเวอร์ (เช่น 1000M, 2G) (ค่าเริ่มต้น: 512M)
13 - ตัวเลือก -driver-java ตัวเลือก Java เพิ่มเติมเพื่อส่งผ่านไปยังไดรเวอร์
14 --driver- ไลบรารีเส้นทาง รายการพา ธ ไลบรารีเพิ่มเติมเพื่อส่งผ่านไปยังไดรเวอร์
15 - ไดร์ - คลาส - พา ธ

รายการพา ธ คลาสพิเศษเพื่อส่งผ่านไปยังไดรเวอร์

โปรดทราบว่า jars ที่เพิ่มด้วย --jars จะรวมอยู่ใน classpath โดยอัตโนมัติ

16 - ผู้ดำเนินการหน่วยความจำ หน่วยความจำต่อตัวดำเนินการ (เช่น 1000M, 2G) (ค่าเริ่มต้น: 1G)
17 - ผู้ใช้พร็อกซี ผู้ใช้ที่จะแอบอ้างเมื่อส่งใบสมัคร
18 - ช่วยเหลือ, -h แสดงข้อความช่วยเหลือนี้และออก
19 --verbose, -v พิมพ์เอาต์พุตการดีบักเพิ่มเติม
20 - รุ่น พิมพ์ Spark เวอร์ชันปัจจุบัน
21 - แกน NUM แกนสำหรับไดรเวอร์ (ค่าเริ่มต้น: 1)
22 - ดูแล หากได้รับให้รีสตาร์ทไดรเวอร์เมื่อเกิดความล้มเหลว
23 --ฆ่า หากกำหนดให้ฆ่าไดรเวอร์ที่ระบุ
24 --สถานะ หากได้รับให้ร้องขอสถานะของไดรเวอร์ที่ระบุ
25 - รวม - ตัวดำเนินการ - คอร์ คอร์ทั้งหมดสำหรับตัวดำเนินการทั้งหมด
26 - ผู้ดำเนินการ - คอร์ จำนวนคอร์ต่อตัวดำเนินการ (ค่าเริ่มต้น: 1 ในโหมด YARN หรือแกนที่มีอยู่ทั้งหมดของผู้ปฏิบัติงานในโหมดสแตนด์อโลน)