Apache Spark - szybki przewodnik

Branże intensywnie używają Hadoop do analizowania swoich zestawów danych. Powodem jest to, że platforma Hadoop jest oparta na prostym modelu programowania (MapReduce) i umożliwia rozwiązanie obliczeniowe, które jest skalowalne, elastyczne, odporne na błędy i opłacalne. Tutaj głównym problemem jest utrzymanie szybkości przetwarzania dużych zbiorów danych pod względem czasu oczekiwania między zapytaniami i czasu oczekiwania na uruchomienie programu.

Spark został wprowadzony przez Apache Software Foundation w celu przyspieszenia procesu tworzenia oprogramowania obliczeniowego Hadoop.

Wbrew powszechnemu przekonaniu Spark is not a modified version of Hadoopi tak naprawdę nie jest zależny od Hadoop, ponieważ ma własne zarządzanie klastrem. Hadoop to tylko jeden ze sposobów implementacji Spark.

Spark używa Hadoop na dwa sposoby - jeden to storage a po drugie processing. Ponieważ Spark ma własne obliczenia zarządzania klastrem, używa Hadoop tylko do przechowywania.

Apache Spark

Apache Spark to błyskawiczna technologia przetwarzania klastrów, zaprojektowana do szybkich obliczeń. Opiera się na Hadoop MapReduce i rozszerza model MapReduce, aby efektywnie używać go do większej liczby typów obliczeń, w tym zapytań interaktywnych i przetwarzania strumieni. Główną cechą Sparka jest jegoin-memory cluster computing co zwiększa szybkość przetwarzania aplikacji.

Spark jest przeznaczony do obsługi szerokiego zakresu obciążeń, takich jak aplikacje wsadowe, algorytmy iteracyjne, zapytania interaktywne i przesyłanie strumieniowe. Oprócz obsługi wszystkich tych obciążeń w odpowiednim systemie, zmniejsza obciążenie zarządzania związane z utrzymywaniem oddzielnych narzędzi.

Ewolucja Apache Spark

Spark jest jednym z podprojektów Hadoop opracowanych w 2009 roku w AMPLab UC Berkeley przez Matei Zaharia. Był to Open Sourced w 2010 roku na licencji BSD. Został przekazany fundacji oprogramowania Apache w 2013 r., A teraz Apache Spark stał się projektem najwyższego poziomu Apache od lutego 2014 r.

Funkcje Apache Spark

Apache Spark ma następujące funkcje.

  • Speed- Spark pomaga uruchomić aplikację w klastrze Hadoop, do 100 razy szybciej w pamięci i 10 razy szybciej podczas pracy na dysku. Jest to możliwe dzięki zmniejszeniu liczby operacji odczytu / zapisu na dysku. Przechowuje w pamięci pośrednie dane przetwarzania.

  • Supports multiple languages- Spark udostępnia wbudowane interfejsy API w Javie, Scali lub Pythonie. Dlatego możesz pisać aplikacje w różnych językach. Spark zawiera 80 operatorów wysokiego poziomu do interaktywnego wykonywania zapytań.

  • Advanced Analytics- Spark nie tylko obsługuje „Mapowanie” i „Zmniejszanie”. Obsługuje również zapytania SQL, dane strumieniowe, uczenie maszynowe (ML) i algorytmy wykresów.

Spark zbudowany na Hadoop

Na poniższym diagramie przedstawiono trzy sposoby tworzenia platformy Spark przy użyciu składników Hadoop.

Istnieją trzy sposoby wdrażania platformy Spark, jak wyjaśniono poniżej.

  • Standalone- Wdrożenie autonomiczne Spark oznacza, że ​​Spark zajmuje miejsce na szczycie HDFS (rozproszony system plików Hadoop), a miejsce jest przydzielane jawnie na HDFS. Tutaj Spark i MapReduce będą działać obok siebie, aby pokryć wszystkie zadania iskry w klastrze.

  • Hadoop Yarn- Wdrożenie Hadoop Yarn oznacza po prostu, że iskra działa na Yarn bez konieczności wstępnej instalacji lub dostępu do roota. Pomaga zintegrować Spark z ekosystemem Hadoop lub stosem Hadoop. Pozwala to na działanie innych komponentów na szczycie stosu.

  • Spark in MapReduce (SIMR)- Spark w MapReduce służy do uruchamiania zadania iskrzenia oprócz samodzielnego wdrażania. Dzięki SIMR użytkownik może uruchomić Sparka i używać jego powłoki bez dostępu administratora.

Komponenty Spark

Poniższa ilustracja przedstawia różne składniki platformy Spark.

Apache Spark Core

Spark Core to podstawowy aparat wykonawczy dla platformy Spark, na którym są zbudowane wszystkie inne funkcje. Zapewnia przetwarzanie w pamięci i zestawy danych referencyjnych w zewnętrznych systemach pamięci masowej.

Spark SQL

Spark SQL to komponent znajdujący się na szczycie Spark Core, który wprowadza nową abstrakcję danych o nazwie SchemaRDD, która zapewnia obsługę danych ustrukturyzowanych i częściowo ustrukturyzowanych.

Spark Streaming

Spark Streaming wykorzystuje możliwości szybkiego planowania Spark Core do przeprowadzania analizy strumieniowej. Pozyskuje dane w mini-partiach i przeprowadza transformacje RDD (Resilient Distributed Datasets) na tych mini-partiach danych.

MLlib (biblioteka uczenia maszynowego)

MLlib to rozproszona platforma uczenia maszynowego powyżej platformy Spark ze względu na rozproszoną architekturę Spark opartą na pamięci. Zgodnie z benchmarkami, jest to wykonywane przez programistów MLlib w porównaniu z implementacjami alternatywnych najmniejszych kwadratów (ALS). Spark MLlib jest dziewięć razy szybszy niż wersja dyskowa platformy HadoopApache Mahout (zanim Mahout zyskał interfejs Sparka).

GraphX

GraphX ​​to rozproszony framework do przetwarzania wykresów na szczycie Spark. Zapewnia interfejs API do wyrażania obliczeń wykresów, który może modelować wykresy zdefiniowane przez użytkownika za pomocą interfejsu API abstrakcji Pregel. Zapewnia również zoptymalizowane środowisko wykonawcze dla tej abstrakcji.

Odporne, rozproszone zestawy danych

Resilient Distributed Datasets (RDD) to podstawowa struktura danych platformy Spark. Jest to niezmienny, rozproszony zbiór obiektów. Każdy zestaw danych w RDD jest podzielony na partycje logiczne, które mogą być obliczane na różnych węzłach klastra. RDD mogą zawierać dowolne typy obiektów Python, Java lub Scala, w tym klasy zdefiniowane przez użytkownika.

Formalnie RDD to zbiór rekordów tylko do odczytu, podzielony na partycje. RDD można tworzyć poprzez deterministyczne operacje na danych w stabilnej pamięci lub na innych RDD. RDD to odporny na uszkodzenia zbiór elementów, na których można pracować równolegle.

Istnieją dwa sposoby tworzenia RDD - parallelizing istniejąca kolekcja w programie sterownika lub referencing a dataset w zewnętrznym systemie pamięci masowej, takim jak współużytkowany system plików, HDFS, HBase lub dowolne źródło danych oferujące format wejściowy Hadoop.

Spark wykorzystuje koncepcję RDD, aby osiągnąć szybsze i wydajniejsze operacje MapReduce. Omówmy najpierw, jak odbywają się operacje MapReduce i dlaczego nie są one tak wydajne.

Udostępnianie danych jest powolne w MapReduce

MapReduce jest szeroko stosowany do przetwarzania i generowania dużych zestawów danych za pomocą równoległego, rozproszonego algorytmu w klastrze. Umożliwia użytkownikom pisanie obliczeń równoległych przy użyciu zestawu operatorów wysokiego poziomu, bez martwienia się o dystrybucję pracy i odporność na błędy.

Niestety w większości obecnych frameworków jedynym sposobem ponownego wykorzystania danych między obliczeniami (Ex - między dwoma zadaniami MapReduce) jest zapisanie ich w zewnętrznym stabilnym systemie pamięci masowej (Ex - HDFS). Chociaż ta struktura zapewnia liczne abstrakcje dostępu do zasobów obliczeniowych klastra, użytkownicy nadal chcą więcej.

Obie Iterative i Interactiveaplikacje wymagają szybszego udostępniania danych w równoległych zadaniach. Udostępnianie danych w MapReduce jest powolne z powodureplication, serialization, i disk IO. Jeśli chodzi o system pamięci masowej, większość aplikacji Hadoop spędza ponad 90% czasu na wykonywaniu operacji odczytu i zapisu HDFS.

Operacje iteracyjne w MapReduce

Ponownie wykorzystuj wyniki pośrednie w wielu obliczeniach w aplikacjach wieloetapowych. Na poniższej ilustracji wyjaśniono, jak działa bieżąca struktura podczas wykonywania operacji iteracyjnych na MapReduce. Powoduje to znaczne obciążenie ze względu na replikację danych, operacje we / wy dysków i serializację, co powoduje spowolnienie systemu.

Interaktywne operacje na MapReduce

Użytkownik uruchamia zapytania ad hoc dotyczące tego samego podzbioru danych. Każde zapytanie wykona dyskowe operacje we / wy w stabilnej pamięci masowej, co może zdominować czas wykonywania aplikacji.

Na poniższej ilustracji wyjaśniono, jak działa bieżąca struktura podczas wykonywania zapytań interaktywnych w MapReduce.

Udostępnianie danych za pomocą Spark RDD

Udostępnianie danych w MapReduce jest powolne z powodu replication, serialization, i disk IO. Większość aplikacji Hadoop spędza ponad 90% czasu na wykonywaniu operacji odczytu i zapisu HDFS.

Rozpoznając ten problem, naukowcy opracowali wyspecjalizowaną strukturę o nazwie Apache Spark. Kluczową ideą iskry jestResilient Dprzypisane Datasety (RDD); obsługuje obliczenia przetwarzania w pamięci. Oznacza to, że przechowuje stan pamięci jako obiekt w zadaniach, a obiekt jest współdzielony między tymi zadaniami. Udostępnianie danych w pamięci jest od 10 do 100 razy szybsze niż w sieci i na dysku.

Spróbujmy teraz dowiedzieć się, jak iteracyjne i interaktywne operacje odbywają się w Spark RDD.

Operacje iteracyjne na Spark RDD

Poniższa ilustracja przedstawia iteracyjne operacje na Spark RDD. Przechowuje pośrednie wyniki w pamięci rozproszonej zamiast w stabilnej pamięci masowej (dysk) i przyspiesza system.

Note - Jeśli pamięć rozproszona (RAM) jest wystarczająca do przechowywania wyników pośrednich (stan zadania), wówczas wyniki te zostaną zapisane na dysku.

Interaktywne operacje na Spark RDD

Ta ilustracja przedstawia interaktywne operacje na Spark RDD. Jeśli różne zapytania są uruchamiane wielokrotnie na tym samym zestawie danych, te konkretne dane można przechowywać w pamięci, aby zapewnić lepsze czasy wykonywania.

Domyślnie każdy przekształcony RDD może zostać przeliczony za każdym razem, gdy wykonujesz na nim akcję. Jednak możesz teżpersistRDD w pamięci, w którym to przypadku Spark zachowa elementy w klastrze, aby uzyskać znacznie szybszy dostęp, gdy następnym razem go zapytasz. Dostępna jest również obsługa utrwalania RDD na dysku lub replikacji w wielu węzłach.

Spark to podprojekt Hadoop. Dlatego lepiej jest zainstalować Sparka w systemie opartym na systemie Linux. Poniższe kroki pokazują, jak zainstalować Apache Spark.

Krok 1: weryfikacja instalacji Java

Instalacja Java jest jedną z obowiązkowych rzeczy podczas instalacji Sparka. Wypróbuj następujące polecenie, aby sprawdzić wersję JAVA.

$java -version

Jeśli Java jest już zainstalowana w twoim systemie, zobaczysz następującą odpowiedź -

java version "1.7.0_71" 
Java(TM) SE Runtime Environment (build 1.7.0_71-b13) 
Java HotSpot(TM) Client VM (build 25.0-b02, mixed mode)

Jeśli nie masz zainstalowanej Java w swoim systemie, zainstaluj Javę przed przejściem do następnego kroku.

Krok 2: Weryfikacja instalacji Scala

Aby zaimplementować Spark, powinieneś użyć języka Scala. Sprawdźmy więc instalację Scali za pomocą następującego polecenia.

$scala -version

Jeśli Scala jest już zainstalowana w twoim systemie, zobaczysz następującą odpowiedź -

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Jeśli nie masz zainstalowanej Scali w swoim systemie, przejdź do następnego kroku instalacji Scala.

Krok 3: Pobieranie Scali

Pobierz najnowszą wersję Scala, odwiedzając poniższy link Pobierz Scala . W tym samouczku używamy wersji scala-2.11.6. Po pobraniu znajdziesz plik tar Scala w folderze pobierania.

Krok 4: Instalacja Scali

Wykonaj poniższe kroki, aby zainstalować Scala.

Rozpakuj plik tar Scala

Wpisz następujące polecenie, aby wyodrębnić plik tar Scala.

$ tar xvf scala-2.11.6.tgz

Przenieś pliki oprogramowania Scala

Użyj następujących poleceń, aby przenieść pliki oprogramowania Scala do odpowiedniego katalogu (/usr/local/scala).

$ su – 
Password: 
# cd /home/Hadoop/Downloads/ 
# mv scala-2.11.6 /usr/local/scala 
# exit

Ustaw PATH dla Scala

Użyj następującego polecenia, aby ustawić PATH dla Scala.

$ export PATH = $PATH:/usr/local/scala/bin

Weryfikacja instalacji Scali

Po instalacji lepiej to zweryfikować. Użyj następującego polecenia, aby zweryfikować instalację Scala.

$scala -version

Jeśli Scala jest już zainstalowana w twoim systemie, zobaczysz następującą odpowiedź -

Scala code runner version 2.11.6 -- Copyright 2002-2013, LAMP/EPFL

Krok 5: Pobieranie Apache Spark

Pobierz najnowszą wersję Spark, odwiedzając poniższe łącze Pobierz Spark . W tym samouczku używamyspark-1.3.1-bin-hadoop2.6wersja. Po pobraniu plik tar Spark znajdziesz w folderze pobierania.

Krok 6: Instalacja Sparka

Wykonaj poniższe czynności, aby zainstalować Sparka.

Wydobywanie smoły Spark

Następujące polecenie do wyodrębnienia pliku Spark tar.

$ tar xvf spark-1.3.1-bin-hadoop2.6.tgz

Przenoszenie plików oprogramowania Spark

Następujące polecenia do przenoszenia plików oprogramowania Spark do odpowiedniego katalogu (/usr/local/spark).

$ su – 
Password:  

# cd /home/Hadoop/Downloads/ 
# mv spark-1.3.1-bin-hadoop2.6 /usr/local/spark 
# exit

Konfigurowanie środowiska dla Spark

Dodaj następujący wiersz do ~/.bashrcplik. Oznacza to dodanie lokalizacji, w której znajduje się plik oprogramowania Spark, do zmiennej PATH.

export PATH=$PATH:/usr/local/spark/bin

Użyj następującego polecenia, aby pozyskać plik ~ / .bashrc.

$ source ~/.bashrc

Krok 7: Weryfikacja instalacji Spark

Napisz następujące polecenie, aby otworzyć powłokę Spark.

$spark-shell

Jeśli Spark zostanie pomyślnie zainstalowany, znajdziesz następujący wynik.

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc  
scala>

Podstawą całego projektu jest Spark Core. Zapewnia rozproszone przydzielanie zadań, planowanie i podstawowe funkcje we / wy. Spark używa wyspecjalizowanej podstawowej struktury danych znanej jako RDD (Resilient Distributed Datasets), która jest logiczną kolekcją danych podzielonych na partycje między maszynami. RDD można tworzyć na dwa sposoby; jednym z nich jest tworzenie odniesień do zbiorów danych w zewnętrznych systemach pamięci masowej, a drugim jest stosowanie przekształceń (np. mapa, filtr, reduktor, łączenie) na istniejących RDD.

Abstrakcja RDD jest udostępniana za pośrednictwem interfejsu API zintegrowanego z językiem. Upraszcza to złożoność programowania, ponieważ sposób, w jaki aplikacje manipulują RDD, jest podobny do manipulowania lokalnymi zbiorami danych.

Spark Shell

Spark zapewnia interaktywną powłokę - potężne narzędzie do interaktywnej analizy danych. Jest dostępny w języku Scala lub Python. Podstawową abstrakcją Sparka jest rozproszona kolekcja elementów o nazwie Resilient Distributed Dataset (RDD). Pliki RDD można tworzyć z formatów wejściowych Hadoop (takich jak pliki HDFS) lub przekształcając inne pliki RDD.

Otwórz Spark Shell

Następujące polecenie służy do otwierania powłoki Spark.

$ spark-shell

Utwórz prosty RDD

Stwórzmy prosty RDD z pliku tekstowego. Użyj następującego polecenia, aby utworzyć prosty RDD.

scala> val inputfile = sc.textFile(“input.txt”)

Dane wyjściowe dla powyższego polecenia to

inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12

Spark RDD API wprowadza kilka Transformations i kilka Actions manipulować RDD.

Transformacje RDD

Transformacje RDD zwracają wskaźnik do nowego RDD i pozwalają tworzyć zależności między RDD. Każdy RDD w łańcuchu zależności (String of Dependencies) ma funkcję obliczania swoich danych i ma wskaźnik (zależność) do swojego nadrzędnego RDD.

Spark jest leniwy, więc nic nie zostanie wykonane, chyba że wywołasz jakąś transformację lub akcję, która wyzwoli tworzenie i wykonywanie zadań. Spójrz na poniższy fragment przykładu z liczbą słów.

Dlatego transformacja RDD nie jest zbiorem danych, ale jest krokiem w programie (może to być jedyny krok), który mówi Sparkowi, jak uzyskać dane i co z nimi zrobić.

S.Nr Transformacje i znaczenie
1

map(func)

Zwraca nowy rozproszony zestaw danych utworzony przez przekazanie każdego elementu źródła przez funkcję func.

2

filter(func)

Zwraca nowy zestaw danych utworzony przez wybranie tych elementów źródła, w którym func zwraca prawdę.

3

flatMap(func)

Podobny do map, ale każdy element wejściowy może być odwzorowany na 0 lub więcej elementów wyjściowych (więc func powinien zwracać Seq, a nie pojedynczy element).

4

mapPartitions(func)

Podobny do map, ale działa osobno na każdej partycji (bloku) RDD, więc func musi być typu Iterator <T> ⇒ Iterator <U>, gdy działa na RDD typu T.

5

mapPartitionsWithIndex(func)

Podobny do mapowania partycji, ale zapewnia również func z wartością całkowitą reprezentującą indeks partycji, więc func musi być typu (Int, Iterator <T>) ⇒ Iterator <U>, gdy działa na RDD typu T.

6

sample(withReplacement, fraction, seed)

Próbka a fraction danych, z zastąpieniem lub bez, przy użyciu danego ziarna generatora liczb losowych.

7

union(otherDataset)

Zwraca nowy zestaw danych, który zawiera sumę elementów w źródłowym zestawie danych i argument.

8

intersection(otherDataset)

Zwraca nowy RDD, który zawiera przecięcie elementów w źródłowym zbiorze danych i argument.

9

distinct([numTasks])

Zwraca nowy zestaw danych zawierający różne elementy źródłowego zestawu danych.

10

groupByKey([numTasks])

W przypadku wywołania zestawu danych składającego się z (K, V) par, zwraca zestaw danych składający się z (K, Iterable <V>) par.

Note - W przypadku grupowania w celu wykonania agregacji (takiej jak suma lub średnia) dla każdego klucza, użycie funkcji redukujByKey lub agregatuByKey zapewni znacznie lepszą wydajność.

11

reduceByKey(func, [numTasks])

Wywołany na zbiorze danych z (K, V) par zwraca zestaw danych z (K, V) par gdzie wartości dla każdego klucza są agregowane przy użyciu danego zmniejszenia funkcji func , który musi być typu (V, V) ⇒ V Podobnie jak w przypadku groupByKey, liczbę zadań redukcji można konfigurować za pomocą opcjonalnego drugiego argumentu.

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

W przypadku wywołania zestawu danych składającego się z (K, V) par, zwraca zestaw danych składający się z (K, U) par, w których wartości dla każdego klucza są agregowane przy użyciu podanych funkcji łączenia i neutralnej wartości „zero”. Zezwala na zagregowany typ wartości inny niż typ wartości wejściowej, unikając niepotrzebnych alokacji. Podobnie jak w przypadku groupByKey, liczbę zadań redukcji można konfigurować za pomocą opcjonalnego drugiego argumentu.

13

sortByKey([ascending], [numTasks])

W przypadku wywołania zestawu danych zawierającego (K, V) par, w którym K implementuje uporządkowane, zwraca zestaw danych składający się z (K, V) par posortowanych według kluczy w porządku rosnącym lub malejącym, zgodnie z argumentem rosnącym logicznym.

14

join(otherDataset, [numTasks])

W przypadku wywołania zestawów danych typu (K, V) i (K, W) zwraca zestaw danych (K, (V, W)) par ze wszystkimi parami elementów dla każdego klucza. Łączenia zewnętrzne są obsługiwane przez leftOuterJoin, rightOuterJoin i fullOuterJoin.

15

cogroup(otherDataset, [numTasks])

W przypadku wywołania na zestawach danych typu (K, V) i (K, W) zwraca zestaw danych obejmujący (K, (Iterable <V>, Iterable <W>)) krotek. Ta operacja jest również nazywana grupą z.

16

cartesian(otherDataset)

W przypadku wywołania w zestawach danych typu T i U zwraca zestaw danych składający się z (T, U) par (wszystkie pary elementów).

17

pipe(command, [envVars])

Prześlij każdą partycję RDD poleceniem powłoki, np. Skryptem Perla lub basha. Elementy RDD są zapisywane na stdin procesu, a linie wyprowadzane na jego standardowe wyjście są zwracane jako RDD łańcuchów.

18

coalesce(numPartitions)

Zmniejsz liczbę partycji w RDD do numPartitions. Przydatne do wydajniejszego wykonywania operacji po odfiltrowaniu dużego zbioru danych.

19

repartition(numPartitions)

Przetasuj losowo dane w RDD, aby utworzyć więcej lub mniej partycji i zrównoważyć je między nimi. To zawsze tasuje wszystkie dane w sieci.

20

repartitionAndSortWithinPartitions(partitioner)

Podziel RDD na partycje zgodnie z podanym partycjonerem i, w ramach każdej wynikowej partycji, posortuj rekordy według ich kluczy. Jest to bardziej wydajne niż wywołanie podziału na partycje, a następnie sortowanie w ramach każdej partycji, ponieważ może zepchnąć sortowanie w dół do mechanizmu tasowania.

działania

S.Nr Działanie i znaczenie
1

reduce(func)

Agreguj elementy zestawu danych za pomocą funkcji func(który przyjmuje dwa argumenty i zwraca jeden). Funkcja powinna być przemienna i asocjacyjna, aby można ją było poprawnie obliczyć równolegle.

2

collect()

Zwraca wszystkie elementy zestawu danych jako tablicę w programie sterownika. Zwykle jest to przydatne po filtrze lub innej operacji, która zwraca wystarczająco mały podzbiór danych.

3

count()

Zwraca liczbę elementów w zbiorze danych.

4

first()

Zwraca pierwszy element zbioru danych (podobnie do take (1)).

5

take(n)

Zwraca tablicę z pierwszą n elementy zbioru danych.

6

takeSample (withReplacement,num, [seed])

Zwraca tablicę z losową próbką num elementy zbioru danych, z wymianą lub bez, opcjonalnie z wstępnym określeniem ziarna generatora liczb losowych.

7

takeOrdered(n, [ordering])

Zwraca pierwszy n elementy RDD przy użyciu ich naturalnej kolejności lub niestandardowego komparatora.

8

saveAsTextFile(path)

Zapisuje elementy zbioru danych jako plik tekstowy (lub zestaw plików tekstowych) w podanym katalogu w lokalnym systemie plików, HDFS lub dowolnym innym systemie plików obsługiwanym przez Hadoop. Spark wywołuje toString dla każdego elementu, aby przekonwertować go na wiersz tekstu w pliku.

9

saveAsSequenceFile(path) (Java and Scala)

Zapisuje elementy zestawu danych jako Hadoop SequenceFile w podanej ścieżce w lokalnym systemie plików, HDFS lub dowolnym innym systemie plików obsługiwanym przez Hadoop. Jest to dostępne na RDD par klucz-wartość, które implementują interfejs Hadoop Writable. W Scali jest również dostępny dla typów, które są niejawnie konwertowane na Writable (Spark obejmuje konwersje dla typów podstawowych, takich jak Int, Double, String itp.).

10

saveAsObjectFile(path) (Java and Scala)

Zapisuje elementy zestawu danych w prostym formacie przy użyciu serializacji Java, który można następnie załadować za pomocą SparkContext.objectFile ().

11

countByKey()

Dostępne tylko w RDD typu (K, V). Zwraca wartość mieszania (K, Int) par z liczbą każdego klucza.

12

foreach(func)

Uruchamia funkcję funcna każdym elemencie zbioru danych. Zwykle dzieje się tak w przypadku skutków ubocznych, takich jak aktualizacja akumulatora lub interakcja z zewnętrznymi systemami pamięci masowej.

Note- modyfikowanie zmiennych innych niż akumulatory poza foreach () może spowodować nieokreślone zachowanie. Aby uzyskać więcej informacji, zobacz Omówienie zamknięć.

Programowanie z RDD

Zobaczmy implementacje kilku transformacji RDD i akcji w programowaniu RDD na przykładzie.

Przykład

Rozważ przykład liczenia słów - liczy każde słowo występujące w dokumencie. Rozważ poniższy tekst jako dane wejściowe i zostanie zapisany jako plikinput.txt plik w katalogu domowym.

input.txt - plik wejściowy.

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.

Postępuj zgodnie z procedurą podaną poniżej, aby wykonać podany przykład.

Otwórz Spark-Shell

Następujące polecenie służy do otwierania powłoki iskrowej. Generalnie iskrę buduje się przy użyciu Scali. Dlatego program Spark działa w środowisku Scala.

$ spark-shell

Jeśli powłoka Spark otworzy się pomyślnie, znajdziesz następujące dane wyjściowe. Spójrz na ostatni wiersz danych wyjściowych „Kontekst Spark dostępny jako sc” oznacza, że ​​kontener Spark jest automatycznie tworzony jako obiekt kontekstu Spark o nazwiesc. Przed rozpoczęciem pierwszego kroku programu należy utworzyć obiekt SparkContext.

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

Utwórz RDD

Najpierw musimy odczytać plik wejściowy za pomocą Spark-Scala API i utworzyć RDD.

Poniższe polecenie służy do odczytu pliku z podanej lokalizacji. Tutaj tworzony jest nowy RDD z nazwą pliku wejściowego. Ciąg podany jako argument w metodzie textFile („”) jest ścieżką bezwzględną dla nazwy pliku wejściowego. Jeśli jednak podana jest tylko nazwa pliku, oznacza to, że plik wejściowy znajduje się w bieżącej lokalizacji.

scala> val inputfile = sc.textFile("input.txt")

Wykonaj transformację liczby słów

Naszym celem jest policzenie słów w pliku. Utwórz płaską mapę, aby podzielić każdy wiersz na słowa (flatMap(line ⇒ line.split(“ ”)).

Następnie przeczytaj każde słowo jako klucz z wartością ‘1’ (<klucz, wartość> = <słowo, 1>) przy użyciu funkcji mapy (map(word ⇒ (word, 1)).

Na koniec zredukuj te klucze, dodając wartości podobnych kluczy (reduceByKey(_+_)).

Następujące polecenie służy do wykonywania logiki liczenia słów. Po wykonaniu tego nie znajdziesz żadnych wyników, ponieważ to nie jest akcja, to jest transformacja; wskazanie nowego RDD lub wskazanie Sparkowi, co zrobić z podanymi danymi)

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

Aktualny RDD

Podczas pracy z RDD, jeśli chcesz wiedzieć o aktualnym RDD, użyj następującego polecenia. Pokaże ci opis aktualnego RDD i jego zależności do debugowania.

scala> counts.toDebugString

Buforowanie transformacji

Możesz oznaczyć RDD do utrwalenia używając na nim metod persist () lub cache (). Gdy zostanie obliczony po raz pierwszy w akcji, zostanie zachowany w pamięci węzłów. Użyj następującego polecenia, aby zapisać transformacje pośrednie w pamięci.

scala> counts.cache()

Wykonanie akcji

Zastosowanie akcji, podobnie jak zapisanie wszystkich przekształceń, powoduje powstanie pliku tekstowego. Argument String metody saveAsTextFile („”) to bezwzględna ścieżka do folderu wyjściowego. Wypróbuj następujące polecenie, aby zapisać dane wyjściowe w pliku tekstowym. W poniższym przykładzie folder „output” znajduje się w bieżącej lokalizacji.

scala> counts.saveAsTextFile("output")

Sprawdzanie wyników

Otwórz inny terminal, aby przejść do katalogu domowego (gdzie iskra jest wykonywana w drugim terminalu). Użyj następujących poleceń, aby sprawdzić katalog wyjściowy.

[hadoop@localhost ~]$ cd output/ [hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

Poniższe polecenie służy do wyświetlania danych wyjściowych z Part-00000 akta.

[hadoop@localhost output]$ cat part-00000

Wynik

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Poniższe polecenie służy do wyświetlania danych wyjściowych z Part-00001 akta.

[hadoop@localhost output]$ cat part-00001

Wynik

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

ONZ nie ustaje w przechowywaniu

Jeśli chcesz zobaczyć przestrzeń dyskową używaną przez tę aplikację przed cofnięciem się, użyj następującego adresu URL w przeglądarce.

http://localhost:4040

Zobaczysz następujący ekran, który pokazuje przestrzeń dyskową używaną przez aplikację, która działa w powłoce Spark.

Jeśli chcesz cofnąć utrwalenie przestrzeni dyskowej określonego RDD, użyj następującego polecenia.

Scala> counts.unpersist()

Zobaczysz dane wyjściowe w następujący sposób -

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14

Aby zweryfikować przestrzeń dyskową w przeglądarce, użyj następującego adresu URL.

http://localhost:4040/

Pojawi się następujący ekran. Pokazuje przestrzeń dyskową używaną przez aplikację, która działa w powłoce Spark.

Aplikacja Spark, korzystająca z funkcji przesyłania Spark, to polecenie powłoki używane do wdrażania aplikacji Spark w klastrze. Korzysta ze wszystkich odpowiednich menedżerów klastrów za pośrednictwem jednolitego interfejsu. Dlatego nie musisz konfigurować aplikacji dla każdego z nich.

Przykład

Weźmy ten sam przykład liczenia słów, którego używaliśmy wcześniej, używając poleceń powłoki. Tutaj rozważymy ten sam przykład, co w przypadku aplikacji iskrowej.

Przykładowe wejście

Poniższy tekst to dane wejściowe, a plik o nazwie to in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Spójrz na następujący program -

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Zapisz powyższy program do pliku o nazwie SparkWordCount.scala i umieść go w katalogu zdefiniowanym przez użytkownika o nazwie spark-application.

Note - Podczas przekształcania inputRDD w countRDD używamy flatMap () do tokenizacji wierszy (z pliku tekstowego) na słowa, metody map () do zliczania częstotliwości słów i metody redukujByKey () do liczenia każdego powtórzenia słowa.

Wykonaj następujące kroki, aby przesłać tę aplikację. Wykonaj wszystkie kroki wspark-application katalog za pośrednictwem terminala.

Krok 1: Pobierz Spark Ja

Spark core jar jest wymagany do kompilacji, dlatego pobierz spark-core_2.10-1.3.0.jar z poniższego linku Spark core jar i przenieś plik jar z katalogu pobierania dospark-application informator.

Krok 2: Skompiluj program

Skompiluj powyższy program, używając polecenia podanego poniżej. To polecenie powinno zostać wykonane z katalogu spark-application. Tutaj,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar to jar obsługi Hadoop pobrany z biblioteki Spark.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Krok 3: Utwórz plik JAR

Utwórz plik jar aplikacji Spark za pomocą następującego polecenia. Tutaj,wordcount to nazwa pliku jar.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Krok 4: Prześlij wniosek o iskrę

Prześlij aplikację Spark za pomocą następującego polecenia -

spark-submit --class SparkWordCount --master local wordcount.jar

Jeśli zakończy się pomyślnie, znajdziesz dane wyjściowe podane poniżej. PlikOKwpuszczenie następującego wyjścia służy do identyfikacji użytkownika i jest to ostatnia linia programu. Jeśli uważnie przeczytasz poniższe wyniki, znajdziesz różne rzeczy, takie jak -

  • pomyślnie uruchomiono usługę „sparkDriver” na porcie 42954
  • Początkowo MemoryStore miał pojemność 267,3 MB
  • Uruchomiono SparkUI pod adresem http://192.168.1.217:4040
  • Dodano plik JAR: /home/hadoop/piapplication/count.jar
  • Wynik Etap 1 (saveAsTextFile w SparkPi.scala: 11) zakończył się w 0,566 s
  • Zatrzymano interfejs sieciowy Spark pod adresem http://192.168.1.217:4040
  • Wyczyszczono MemoryStore
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Krok 5: Sprawdzanie wyników

Po pomyślnym wykonaniu programu znajdziesz katalog o nazwie outfile w katalogu spark-application.

Poniższe polecenia służą do otwierania i sprawdzania listy plików w katalogu outfile.

$ cd outfile $ ls 
Part-00000 part-00001 _SUCCESS

Polecenia do sprawdzania danych wyjściowych part-00000 plik to -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Polecenia do sprawdzania wyników w pliku part-00001 to -

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Przejdź przez następną sekcję, aby dowiedzieć się więcej o poleceniu „spark-submit”.

Składnia przesyłania Spark

spark-submit [options] <app jar | python file> [app arguments]

Opcje

S.Nr Opcja Opis
1 --mistrz spark: // host: port, mesos: // host: port, yarn lub local.
2 --deploy-mode Czy uruchomić program sterownika lokalnie („klient”), czy na jednej z maszyn roboczych wewnątrz klastra („klaster”) (domyślnie: klient).
3 --klasa Główna klasa aplikacji (dla aplikacji Java / Scala).
4 --Nazwa Nazwa Twojej aplikacji.
5 - słoiki Rozdzielana przecinkami lista lokalnych plików JAR do dołączenia do ścieżek klas sterownika i modułu wykonawczego.
6 - paczki Rozdzielana przecinkami lista współrzędnych maven słoików do uwzględnienia w ścieżkach klas sterownika i modułu wykonawczego.
7 - repozytoria Rozdzielana przecinkami lista dodatkowych zdalnych repozytoriów do wyszukania współrzędnych maven podanych z --packages.
8 --py-pliki Lista rozdzielonych przecinkami plików .zip, .egg lub .py do umieszczenia w PYTHON PATH dla aplikacji Python.
9 --akta Rozdzielana przecinkami lista plików do umieszczenia w katalogu roboczym każdego modułu wykonawczego.
10 --conf (prop = val) Dowolna właściwość konfiguracji Spark.
11 - plik-właściwości Ścieżka do pliku, z którego można załadować dodatkowe właściwości. Jeśli nie zostanie określony, będzie szukać wartości conf / spark-defaults.
12 - pamięć kierowcy Pamięć dla sterownika (np. 1000M, 2G) (Domyślnie: 512M).
13 --driver-java-options Dodatkowe opcje Java do przekazania do sterownika.
14 --driver-Library-path Dodatkowe wpisy ścieżek bibliotek do przekazania do sterownika.
15 --driver-class-path

Dodatkowe wpisy ścieżki klasy do przekazania do sterownika.

Zauważ, że słoiki dodane za pomocą --jars są automatycznie dołączane do ścieżki klas.

16 --executor-memory Pamięć na executor (np. 1000 MB, 2 GB) (Domyślnie: 1 GB).
17 --proxy-user Użytkownik podszywa się pod użytkownika podczas składania wniosku.
18 --help, -h Pokaż ten komunikat pomocy i zakończ.
19 --verbose, -v Wydrukuj dodatkowe wyjście debugowania.
20 --wersja Wydrukuj wersję aktualnej wersji Spark.
21 - rdzenie sterowników NUM Rdzenie sterownika (domyślnie: 1).
22 --dozorować Jeśli podano, uruchamia ponownie sterownik w przypadku awarii.
23 --zabić Jeśli podano, zabija określony sterownik.
24 --status Jeśli podano, żąda statusu określonego sterownika.
25 - rdzenie-total-executor-rdzenie Całkowita liczba rdzeni dla wszystkich wykonawców.
26 - rdzenie wykonawcze Liczba rdzeni na executor. (Domyślnie: 1 w trybie YARN lub wszystkie dostępne rdzenie pracownika w trybie autonomicznym).

Spark zawiera dwa różne typy wspólnych zmiennych - jeden to broadcast variables a po drugie accumulators.

  • Broadcast variables - służy do wydajnej dystrybucji dużych wartości.

  • Accumulators - służy do agregowania informacji o konkretnym zbiorze.

Zmienne transmisji

Zmienne rozgłaszania pozwalają programiście przechowywać zmienną tylko do odczytu w pamięci podręcznej na każdym komputerze, zamiast wysyłać jej kopię z zadaniami. Można ich użyć na przykład do wydajnego nadawania każdemu węzłowi kopii dużego zestawu danych wejściowych. Spark próbuje również dystrybuować zmienne emisji przy użyciu wydajnych algorytmów emisji, aby zmniejszyć koszty komunikacji.

Akcje Spark są wykonywane przez zestaw etapów, oddzielonych rozproszonymi operacjami „shuffle”. Spark automatycznie rozgłasza wspólne dane potrzebne do zadań na każdym etapie.

Rozgłaszane w ten sposób dane są buforowane w postaci serializowanej i deserializowane przed uruchomieniem każdego zadania. Oznacza to, że jawne tworzenie zmiennych rozgłoszeniowych jest przydatne tylko wtedy, gdy zadania na wielu etapach wymagają tych samych danych lub gdy ważne jest buforowanie danych w formie zdeserializowanej.

Zmienne transmisji są tworzone ze zmiennej v poprzez dzwonienie SparkContext.broadcast(v). Zmienna rozgłoszeniowa jest otoką wokółv, a jego wartość można uzyskać, wywołując metodę valuemetoda. Poniższy kod pokazuje to -

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))

Output -

broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)

Po utworzeniu zmiennej rozgłoszeniowej należy jej używać zamiast wartości v we wszystkich funkcjach działających w klastrze, więc vnie jest wysyłany do węzłów więcej niż raz. Dodatkowo obiektv nie powinny być modyfikowane po rozgłoszeniu, aby zapewnić, że wszystkie węzły otrzymają tę samą wartość zmiennej rozgłoszeniowej.

Akumulatory

Akumulatory to zmienne, które są „dodawane” tylko poprzez operację asocjacyjną i dlatego mogą być efektywnie obsługiwane równolegle. Mogą służyć do implementacji liczników (jak w MapReduce) lub sum. Spark natywnie obsługuje akumulatory typów liczbowych, a programiści mogą dodawać obsługę nowych typów. Jeśli akumulatory zostaną utworzone z nazwą, zostaną one wyświetlone w formacieSpark’s UI. Może to być przydatne do zrozumienia postępu wykonywanych etapów (UWAGA - nie jest to jeszcze obsługiwane w Pythonie).

Akumulator jest tworzony z wartości początkowej v poprzez dzwonienie SparkContext.accumulator(v). Zadania uruchomione w klastrze można następnie dodać do niego przy użyciu rozszerzeniaaddmetoda lub operator + = (w Scali i Pythonie). Jednak nie mogą odczytać jego wartości. Tylko program sterownika może odczytać wartość akumulatora, używając jegovalue metoda.

Poniższy kod pokazuje akumulator używany do dodawania elementów tablicy -

scala> val accum = sc.accumulator(0) 
 
scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

Jeśli chcesz zobaczyć wynik powyższego kodu, użyj następującego polecenia -

scala> accum.value

Wynik

res2: Int = 10

Operacje numeryczne RDD

Spark umożliwia wykonywanie różnych operacji na danych liczbowych przy użyciu jednej z predefiniowanych metod interfejsu API. Operacje numeryczne Spark są implementowane za pomocą algorytmu przesyłania strumieniowego, który umożliwia budowanie modelu po jednym elemencie na raz.

Te operacje są obliczane i zwracane jako plik StatusCounter obiektu przez wywołanie status() metoda.

S.Nr Metody i znaczenie
1

count()

Liczba elementów w RDD.

2

Mean()

Średnia elementów w RDD.

3

Sum()

Łączna wartość elementów w RDD.

4

Max()

Maksymalna wartość spośród wszystkich elementów w RDD.

5

Min()

Minimalna wartość spośród wszystkich elementów w RDD.

6

Variance()

Wariancja elementów.

7

Stdev()

Odchylenie standardowe.

Jeśli chcesz użyć tylko jednej z tych metod, możesz wywołać odpowiednią metodę bezpośrednio na RDD.