Apache Spark - Lập trình cốt lõi
Spark Core là cơ sở của toàn bộ dự án. Nó cung cấp chức năng điều phối tác vụ, lập lịch và I / O cơ bản. Spark sử dụng một cấu trúc dữ liệu cơ bản chuyên biệt được gọi là RDD (Tập dữ liệu phân tán có khả năng phục hồi) là một tập hợp dữ liệu hợp lý được phân vùng trên các máy. RDD có thể được tạo theo hai cách; một là bằng cách tham chiếu các tập dữ liệu trong hệ thống lưu trữ bên ngoài và thứ hai là bằng cách áp dụng các phép biến đổi (ví dụ: bản đồ, bộ lọc, bộ giảm, nối) trên các RDD hiện có.
Phần trừu tượng RDD được hiển thị thông qua một API tích hợp ngôn ngữ. Điều này đơn giản hóa độ phức tạp của lập trình vì cách ứng dụng thao tác với RDD tương tự như thao tác với các bộ sưu tập dữ liệu cục bộ.
Vỏ tia lửa
Spark cung cấp một trình bao tương tác - một công cụ mạnh mẽ để phân tích dữ liệu một cách tương tác. Nó có sẵn bằng ngôn ngữ Scala hoặc Python. Tính trừu tượng chính của Spark là một tập hợp phân tán của các mục được gọi là Tập dữ liệu phân tán có khả năng phục hồi (RDD). Các RDD có thể được tạo từ các Định dạng Đầu vào Hadoop (chẳng hạn như các tệp HDFS) hoặc bằng cách chuyển đổi các RDD khác.
Mở Spark Shell
Lệnh sau được sử dụng để mở Spark shell.
$ spark-shell
Tạo RDD đơn giản
Hãy để chúng tôi tạo một RDD đơn giản từ tệp văn bản. Sử dụng lệnh sau để tạo một RDD đơn giản.
scala> val inputfile = sc.textFile(“input.txt”)
Đầu ra cho lệnh trên là
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
API Spark RDD giới thiệu một số Transformations và ít Actions để thao tác RDD.
Biến đổi RDD
Các phép biến đổi RDD trả về con trỏ tới RDD mới và cho phép bạn tạo các phụ thuộc giữa các RDD. Mỗi RDD trong chuỗi phụ thuộc (String of Dependencies) có một hàm để tính toán dữ liệu của nó và có một con trỏ (phụ thuộc) tới RDD mẹ của nó.
Spark là lười biếng, vì vậy sẽ không có gì được thực thi trừ khi bạn gọi một số chuyển đổi hoặc hành động sẽ kích hoạt tạo và thực thi công việc. Hãy xem đoạn mã sau của ví dụ đếm từ.
Do đó, phép biến đổi RDD không phải là một tập hợp dữ liệu mà là một bước trong chương trình (có thể là bước duy nhất) cho Spark biết cách lấy dữ liệu và phải làm gì với nó.
Dưới đây là danh sách các phép biến đổi RDD.
S. không | Sự biến đổi và ý nghĩa |
---|---|
1 | map(func) Trả về tập dữ liệu được phân phối mới, được hình thành bằng cách chuyển từng phần tử của nguồn thông qua một hàm func. |
2 | filter(func) Trả về tập dữ liệu mới được tạo bằng cách chọn các phần tử của nguồn mà trên đó func trả về true. |
3 | flatMap(func) Tương tự như bản đồ, nhưng mỗi mục đầu vào có thể được ánh xạ tới 0 hoặc nhiều mục đầu ra (vì vậy func nên trả về một Seq thay vì một mục duy nhất). |
4 | mapPartitions(func) Tương tự như bản đồ, nhưng chạy riêng biệt trên từng phân vùng (khối) của RDD, vì vậy func phải thuộc loại Trình lặp <T> ⇒ Trình lặp <U> khi chạy trên RDD kiểu T. |
5 | mapPartitionsWithIndex(func) Tương tự như Phân vùng bản đồ, nhưng cũng cung cấp func với một giá trị số nguyên đại diện cho chỉ mục của phân vùng, vì vậy func phải có kiểu (Int, Iterator <T>) ⇒ Iterator <U> khi chạy trên RDD kiểu T. |
6 | sample(withReplacement, fraction, seed) Mẫu a fraction của dữ liệu, có hoặc không thay thế, bằng cách sử dụng hạt giống tạo số ngẫu nhiên nhất định. |
7 | union(otherDataset) Trả về một tập dữ liệu mới có chứa sự kết hợp của các phần tử trong tập dữ liệu nguồn và đối số. |
số 8 | intersection(otherDataset) Trả về một RDD mới có chứa giao điểm của các phần tử trong tập dữ liệu nguồn và đối số. |
9 | distinct([numTasks]) Trả về một tập dữ liệu mới có chứa các phần tử riêng biệt của tập dữ liệu nguồn. |
10 | groupByKey([numTasks]) Khi được gọi trên một tập dữ liệu gồm các cặp (K, V), trả về một tập dữ liệu gồm các cặp (K, Iterable <V>). Note - Nếu bạn đang nhóm để thực hiện tổng hợp (chẳng hạn như tổng hoặc trung bình) trên mỗi khóa, thì việc sử dụng ReduceByKey hoặc tổng hợpByKey sẽ mang lại hiệu suất tốt hơn nhiều. |
11 | reduceByKey(func, [numTasks]) Khi được gọi trên tập dữ liệu của các cặp (K, V), trả về tập dữ liệu của các cặp (K, V) trong đó các giá trị cho mỗi khóa được tổng hợp bằng cách sử dụng hàm rút gọn đã cho , hàm này phải thuộc loại (V, V) ⇒ V Giống như trong groupByKey, số lượng tác vụ giảm có thể định cấu hình thông qua đối số thứ hai tùy chọn. |
12 | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) Khi được gọi trên tập dữ liệu của các cặp (K, V), trả về tập dữ liệu của các cặp (K, U) trong đó các giá trị cho mỗi khóa được tổng hợp bằng cách sử dụng các hàm kết hợp đã cho và giá trị "0" trung tính. Cho phép loại giá trị tổng hợp khác với loại giá trị đầu vào, đồng thời tránh phân bổ không cần thiết. Giống như trong groupByKey, số lượng tác vụ giảm có thể định cấu hình thông qua đối số thứ hai tùy chọn. |
13 | sortByKey([ascending], [numTasks]) Khi được gọi trên tập dữ liệu của các cặp (K, V) trong đó K triển khai Có thứ tự, trả về tập dữ liệu của các cặp (K, V) được sắp xếp theo các khóa theo thứ tự tăng dần hoặc giảm dần, như được chỉ định trong đối số tăng dần Boolean. |
14 | join(otherDataset, [numTasks]) Khi được gọi trên tập dữ liệu kiểu (K, V) và (K, W), trả về tập dữ liệu gồm các cặp (K, (V, W)) với tất cả các cặp phần tử cho mỗi khóa. Tham gia bên ngoài được hỗ trợ thông qua leftOuterJoin, rightOuterJoin và fullOuterJoin. |
15 | cogroup(otherDataset, [numTasks]) Khi được gọi trên các tập dữ liệu kiểu (K, V) và (K, W), trả về một tập dữ liệu gồm các bộ dữ liệu (K, (Iterable <V>, Iterable <W>)). Thao tác này còn được gọi là nhóm Với. |
16 | cartesian(otherDataset) Khi được gọi trên các tập dữ liệu kiểu T và U, trả về một tập dữ liệu gồm (T, U) các cặp (tất cả các cặp phần tử). |
17 | pipe(command, [envVars]) Đưa từng phân vùng của RDD thông qua một lệnh shell, ví dụ như tập lệnh Perl hoặc bash. Các phần tử RDD được ghi vào stdin của quy trình và các dòng xuất ra stdout của nó được trả về dưới dạng RDD của các chuỗi. |
18 | coalesce(numPartitions) Giảm số lượng phân vùng trong RDD xuống numPartitions. Hữu ích để chạy các hoạt động hiệu quả hơn sau khi lọc bớt một tập dữ liệu lớn. |
19 | repartition(numPartitions) Sắp xếp lại dữ liệu trong RDD một cách ngẫu nhiên để tạo nhiều hoặc ít phân vùng hơn và cân bằng giữa chúng. Điều này luôn luôn xáo trộn tất cả dữ liệu trên mạng. |
20 | repartitionAndSortWithinPartitions(partitioner) Phân vùng lại RDD theo trình phân vùng đã cho và trong mỗi phân vùng kết quả, sắp xếp các bản ghi theo các khóa của chúng. Điều này hiệu quả hơn việc gọi phân vùng lại và sau đó sắp xếp trong mỗi phân vùng vì nó có thể đẩy việc phân loại xuống bộ máy xáo trộn. |
Hành động
Bảng sau đây cung cấp danh sách các Hành động trả về giá trị.
S. không | Hành động & Ý nghĩa |
---|---|
1 | reduce(func) Tổng hợp các phần tử của tập dữ liệu bằng một hàm func(nhận hai đối số và trả về một). Hàm phải có tính chất giao hoán và liên kết để nó có thể được tính toán song song một cách chính xác. |
2 | collect() Trả về tất cả các phần tử của tập dữ liệu dưới dạng một mảng tại chương trình điều khiển. Điều này thường hữu ích sau khi bộ lọc hoặc hoạt động khác trả về một tập hợp con đủ nhỏ của dữ liệu. |
3 | count() Trả về số phần tử trong tập dữ liệu. |
4 | first() Trả về phần tử đầu tiên của tập dữ liệu (tương tự như lấy (1)). |
5 | take(n) Trả về một mảng có giá trị đầu tiên n các phần tử của tập dữ liệu. |
6 | takeSample (withReplacement,num, [seed]) Trả về một mảng với một mẫu ngẫu nhiên là num các phần tử của tập dữ liệu, có hoặc không có thay thế, tùy chọn chỉ định trước hạt tạo số ngẫu nhiên. |
7 | takeOrdered(n, [ordering]) Trả lại cái đầu tiên n các phần tử của RDD sử dụng thứ tự tự nhiên của chúng hoặc bộ so sánh tùy chỉnh. |
số 8 | saveAsTextFile(path) Ghi các phần tử của tập dữ liệu dưới dạng tệp văn bản (hoặc tập hợp tệp văn bản) trong một thư mục nhất định trong hệ thống tệp cục bộ, HDFS hoặc bất kỳ hệ thống tệp nào khác được Hadoop hỗ trợ. Spark gọi toString trên mỗi phần tử để chuyển nó thành một dòng văn bản trong tệp. |
9 | saveAsSequenceFile(path) (Java and Scala) Ghi các phần tử của tập dữ liệu dưới dạng Hadoop SequenceFile trong một đường dẫn nhất định trong hệ thống tệp cục bộ, HDFS hoặc bất kỳ hệ thống tệp nào khác được Hadoop hỗ trợ. Điều này có sẵn trên RDD của các cặp khóa-giá trị triển khai giao diện Ghi của Hadoop. Trong Scala, nó cũng có sẵn trên các loại có thể chuyển đổi hoàn toàn thành W ghi được (Spark bao gồm các chuyển đổi cho các loại cơ bản như Int, Double, String, v.v.). |
10 | saveAsObjectFile(path) (Java and Scala) Viết các phần tử của tập dữ liệu ở một định dạng đơn giản bằng cách sử dụng tuần tự hóa Java, sau đó có thể được tải bằng SparkContext.objectFile (). |
11 | countByKey() Chỉ có sẵn trên RDD của loại (K, V). Trả về một bản đồ băm của các cặp (K, Int) với số lượng của mỗi khóa. |
12 | foreach(func) Chạy một chức năng functrên mỗi phần tử của tập dữ liệu. Điều này thường được thực hiện đối với các tác dụng phụ như cập nhật Bộ tích lũy hoặc tương tác với hệ thống lưu trữ bên ngoài. Note- việc sửa đổi các biến khác với Accumulators bên ngoài foreach () có thể dẫn đến hành vi không xác định. Xem phần Tìm hiểu về việc đóng cửa để biết thêm chi tiết |
Lập trình với RDD
Chúng ta hãy xem cách triển khai của một số hành động và biến đổi RDD trong lập trình RDD với sự trợ giúp của một ví dụ.
Thí dụ
Hãy xem xét một ví dụ đếm từ - Nó đếm từng từ xuất hiện trong một tài liệu. Hãy coi văn bản sau như một đầu vào và được lưu dưới dạnginput.txt tệp trong thư mục chính.
input.txt - tập tin đầu vào.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Làm theo quy trình dưới đây để thực hiện ví dụ đã cho.
Mở Spark-Shell
Lệnh sau được sử dụng để mở spark shell. Nói chung, spark được xây dựng bằng Scala. Do đó, một chương trình Spark chạy trên môi trường Scala.
$ spark-shell
Nếu Spark shell mở thành công thì bạn sẽ tìm thấy kết quả sau. Nhìn vào dòng cuối cùng của đầu ra “Có sẵn ngữ cảnh tia lửa dưới dạng sc” có nghĩa là vùng chứa Spark được tạo tự động đối tượng ngữ cảnh tia lửa với tênsc. Trước khi bắt đầu bước đầu tiên của chương trình, đối tượng SparkContext phải được tạo.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 1.4.0
/_/
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Spark context available as sc
scala>
Tạo RDD
Đầu tiên, chúng ta phải đọc tệp đầu vào bằng API Spark-Scala và tạo một RDD.
Lệnh sau được sử dụng để đọc tệp từ vị trí nhất định. Tại đây, RDD mới được tạo với tên của inputfile. Chuỗi được đưa ra dưới dạng đối số trong phương thức textFile (“”) là đường dẫn tuyệt đối cho tên tệp đầu vào. Tuy nhiên, nếu chỉ có tên tệp thì có nghĩa là tệp đầu vào đang ở vị trí hiện tại.
scala> val inputfile = sc.textFile("input.txt")
Thực hiện chuyển đổi số lượng từ
Mục đích của chúng tôi là đếm các từ trong một tệp. Tạo một bản đồ phẳng để tách từng dòng thành các từ (flatMap(line ⇒ line.split(“ ”)).
Tiếp theo, đọc từng từ dưới dạng khóa có giá trị ‘1’ (<key, value> = <word, 1>) sử dụng hàm bản đồ (map(word ⇒ (word, 1)).
Cuối cùng, giảm các khóa đó bằng cách thêm giá trị của các khóa tương tự (reduceByKey(_+_)).
Lệnh sau được sử dụng để thực hiện logic đếm từ. Sau khi thực hiện điều này, bạn sẽ không tìm thấy bất kỳ đầu ra nào vì đây không phải là một hành động, đây là một sự chuyển đổi; trỏ một RDD mới hoặc cho biết phải làm gì với dữ liệu đã cho)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
RDD hiện tại
Trong khi làm việc với RDD, nếu bạn muốn biết về RDD hiện tại, hãy sử dụng lệnh sau. Nó sẽ hiển thị cho bạn mô tả về RDD hiện tại và các phụ thuộc của nó để gỡ lỗi.
scala> counts.toDebugString
Lưu trữ các biến đổi
Bạn có thể đánh dấu một RDD sẽ được duy trì bằng cách sử dụng các phương thức Kiên trì () hoặc cache () trên đó. Lần đầu tiên nó được tính toán trong một hành động, nó sẽ được lưu trong bộ nhớ trên các nút. Sử dụng lệnh sau để lưu các phép biến đổi trung gian trong bộ nhớ.
scala> counts.cache()
Áp dụng hành động
Áp dụng một hành động, chẳng hạn như lưu trữ tất cả các biến đổi, kết quả vào một tệp văn bản. Đối số String cho phương thức saveAsTextFile (“”) là đường dẫn tuyệt đối của thư mục đầu ra. Hãy thử lệnh sau để lưu kết quả đầu ra trong tệp văn bản. Trong ví dụ sau, thư mục 'đầu ra' ở vị trí hiện tại.
scala> counts.saveAsTextFile("output")
Kiểm tra đầu ra
Mở một thiết bị đầu cuối khác để đi đến thư mục chính (nơi tia lửa được thực thi trong thiết bị đầu cuối khác). Sử dụng các lệnh sau để kiểm tra thư mục đầu ra.
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
Lệnh sau được sử dụng để xem kết quả từ Part-00000 các tập tin.
[hadoop@localhost output]$ cat part-00000
Đầu ra
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
Lệnh sau được sử dụng để xem kết quả từ Part-00001 các tập tin.
[hadoop@localhost output]$ cat part-00001
Đầu ra
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
UN Persist the Storage
Trước khi UN-dai dẳng, nếu bạn muốn xem dung lượng lưu trữ được sử dụng cho ứng dụng này, hãy sử dụng URL sau trong trình duyệt của bạn.
http://localhost:4040
Bạn sẽ thấy màn hình sau, hiển thị không gian lưu trữ được sử dụng cho ứng dụng đang chạy trên Spark shell.
Nếu bạn muốn duy trì không gian lưu trữ của RDD cụ thể, hãy sử dụng lệnh sau.
Scala> counts.unpersist()
Bạn sẽ thấy kết quả như sau:
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810)
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106)
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
Để xác minh không gian lưu trữ trong trình duyệt, hãy sử dụng URL sau.
http://localhost:4040/
Bạn sẽ thấy màn hình sau. Nó hiển thị không gian lưu trữ được sử dụng cho ứng dụng đang chạy trên Spark shell.