PySpark - Краткое руководство
В этой главе мы познакомимся с тем, что такое Apache Spark и как был разработан PySpark.
Spark - Обзор
Apache Spark - это молниеносная среда обработки в реальном времени. Он выполняет вычисления в памяти для анализа данных в реальном времени. Это вошло в картину какApache Hadoop MapReduceвыполняла только пакетную обработку и не имела функции обработки в реальном времени. Следовательно, был представлен Apache Spark, поскольку он может выполнять потоковую обработку в режиме реального времени, а также может заботиться о пакетной обработке.
Помимо обработки в реальном времени и пакетной обработки, Apache Spark также поддерживает интерактивные запросы и итерационные алгоритмы. Apache Spark имеет собственный диспетчер кластеров, в котором может размещаться свое приложение. Он использует Apache Hadoop как для хранения, так и для обработки. Оно используетHDFS (Распределенная файловая система Hadoop) для хранения и может запускать приложения Spark на YARN также.
PySpark - Обзор
Apache Spark написан на Scala programming language. Для поддержки Python с помощью Spark сообщество Apache Spark выпустило инструмент PySpark. Используя PySpark, вы можете работать сRDDsтакже на языке программирования Python. Это из-за библиотеки под названиемPy4j что они могут этого добиться.
PySpark предлагает PySpark Shellкоторый связывает Python API с ядром Spark и инициализирует контекст Spark. Большинство специалистов по данным и аналитике сегодня используют Python из-за его богатого набора библиотек. Интеграция Python со Spark - благо для них.
В этой главе мы разберемся с настройкой среды PySpark.
Note - Это с учетом того, что на вашем компьютере установлены Java и Scala.
Давайте теперь загрузим и настроим PySpark, выполнив следующие действия.
Step 1- Перейти на официальный Apache Спарк загрузки страницы и загрузить последнюю версию Apache там Спарк доступны. В этом руководстве мы используемspark-2.1.0-bin-hadoop2.7.
Step 2- Теперь извлеките загруженный tar-файл Spark. По умолчанию он загружается в каталог загрузок.
# tar -xvf Downloads/spark-2.1.0-bin-hadoop2.7.tgz
Будет создан каталог spark-2.1.0-bin-hadoop2.7. Перед запуском PySpark вам необходимо настроить следующие среды, чтобы задать путь Spark иPy4j path.
export SPARK_HOME = /home/hadoop/spark-2.1.0-bin-hadoop2.7
export PATH = $PATH:/home/hadoop/spark-2.1.0-bin-hadoop2.7/bin
export PYTHONPATH = $SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.4-src.zip:$PYTHONPATH
export PATH = $SPARK_HOME/python:$PATH
Или, чтобы настроить указанные выше среды глобально, поместите их в .bashrc file. Затем выполните следующую команду, чтобы среды работали.
# source .bashrc
Теперь, когда у нас настроены все среды, давайте перейдем в каталог Spark и вызовем оболочку PySpark, выполнив следующую команду -
# ./bin/pyspark
Это запустит вашу оболочку PySpark.
Python 2.7.12 (default, Nov 19 2016, 06:48:10)
[GCC 5.4.0 20160609] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Python version 2.7.12 (default, Nov 19 2016 06:48:10)
SparkSession available as 'spark'.
<<<
SparkContext - это точка входа в любую функциональность Spark. Когда мы запускаем любое приложение Spark, запускается программа драйвера, которая выполняет функцию main, и здесь запускается ваш SparkContext. Затем программа драйвера выполняет операции внутри исполнителей на рабочих узлах.
SparkContext использует Py4J для запуска JVM и создает JavaSparkContext. По умолчанию в PySpark SparkContext доступен как‘sc’, поэтому создание нового SparkContext не сработает.
Следующий блок кода содержит подробную информацию о классе PySpark и параметрах, которые может принимать SparkContext.
class pyspark.SparkContext (
master = None,
appName = None,
sparkHome = None,
pyFiles = None,
environment = None,
batchSize = 0,
serializer = PickleSerializer(),
conf = None,
gateway = None,
jsc = None,
profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)
Параметры
Ниже приведены параметры SparkContext.
Master - Это URL-адрес кластера, к которому он подключается.
appName - Название вашей работы.
sparkHome - Каталог установки Spark.
pyFiles - Файлы .zip или .py для отправки в кластер и добавления в PYTHONPATH.
Environment - Переменные среды рабочих узлов.
batchSize- Количество объектов Python, представленных как один объект Java. Установите 1 для отключения пакетной обработки, 0 для автоматического выбора размера пакета на основе размеров объекта или -1 для использования неограниченного размера пакета.
Serializer - Сериализатор RDD.
Conf - Объект L {SparkConf} для установки всех свойств Spark.
Gateway - Используйте существующий шлюз и JVM, в противном случае инициализируйте новую JVM.
JSC - Экземпляр JavaSparkContext.
profiler_cls - Класс настраиваемого профилировщика, используемый для профилирования (по умолчанию - pyspark.profiler.BasicProfiler).
Среди вышеперечисленных параметров master и appnameв основном используются. Первые две строки любой программы PySpark выглядят так, как показано ниже -
from pyspark import SparkContext
sc = SparkContext("local", "First App")
Пример SparkContext - оболочка PySpark
Теперь, когда вы достаточно знаете о SparkContext, давайте запустим простой пример в оболочке PySpark. В этом примере мы будем подсчитывать количество строк с символом 'a' или 'b' вREADME.mdфайл. Итак, допустим, если в файле 5 строк и 3 строки имеют символ 'a', то вывод будет →Line with a: 3. То же самое будет сделано для символа «b».
Note- Мы не создаем никаких объектов SparkContext в следующем примере, потому что по умолчанию Spark автоматически создает объект SparkContext с именем sc при запуске оболочки PySpark. Если вы попытаетесь создать другой объект SparkContext, вы получите следующую ошибку:"ValueError: Cannot run multiple SparkContexts at once".
<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30
Пример SparkContext - программа Python
Давайте запустим тот же пример, используя программу Python. Создайте файл Python с именемfirstapp.py и введите в этот файл следующий код.
----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------
Затем мы выполним следующую команду в терминале, чтобы запустить этот файл Python. Мы получим тот же результат, что и выше.
$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30
Теперь, когда мы установили и настроили PySpark в нашей системе, мы можем программировать на Python на Apache Spark. Однако прежде чем это сделать, давайте разберемся с фундаментальной концепцией Spark - RDD.
RDD означает Resilient Distributed Dataset, это элементы, которые запускаются и работают на нескольких узлах для параллельной обработки в кластере. RDD - это неизменяемые элементы, а это значит, что после создания RDD вы не можете его изменить. RDD также являются отказоустойчивыми, поэтому в случае сбоя они восстанавливаются автоматически. Вы можете применить несколько операций к этим RDD для достижения определенной задачи.
Чтобы применить операции к этим RDD, есть два способа:
- Преобразование и
- Action
Давайте разберемся в этих двух способах подробно.
Transformation- Это операции, которые применяются к RDD для создания нового RDD. Filter, groupBy и map являются примерами преобразований.
Action - Это операции, которые применяются к RDD, которые предписывают Spark выполнить вычисление и отправить результат обратно драйверу.
Чтобы применить любую операцию в PySpark, нам нужно создать PySpark RDDпервый. Следующий блок кода содержит подробную информацию о классе PySpark RDD:
class pyspark.RDD (
jrdd,
ctx,
jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)
Давайте посмотрим, как выполнить несколько основных операций с помощью PySpark. Следующий код в файле Python создает слова RDD, в которых хранится набор упомянутых слов.
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
Теперь мы выполним несколько операций над словами.
счет ()
Возвращается количество элементов в RDD.
----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------
Command - Команда для count () -
$SPARK_HOME/bin/spark-submit count.py
Output - Вывод для вышеуказанной команды -
Number of elements in RDD → 8
собирать ()
Возвращаются все элементы RDD.
----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------
Command - Команда для collect () -
$SPARK_HOME/bin/spark-submit collect.py
Output - Вывод для вышеуказанной команды -
Elements in RDD -> [
'scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
foreach (f)
Возвращает только те элементы, которые удовлетворяют условию функции внутри foreach. В следующем примере мы вызываем функцию печати в foreach, которая печатает все элементы в RDD.
----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------
Command - Команда для foreach (f) -
$SPARK_HOME/bin/spark-submit foreach.py
Output - Вывод для вышеуказанной команды -
scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark
фильтр (f)
Возвращается новый RDD, содержащий элементы, которые удовлетворяют функции внутри фильтра. В следующем примере мы отфильтровываем строки, содержащие «искру».
----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------
Command - Команда для фильтра (f) -
$SPARK_HOME/bin/spark-submit filter.py
Output - Вывод для вышеуказанной команды -
Fitered RDD -> [
'spark',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
map (f, preservePartitioning = False)
Новый RDD возвращается путем применения функции к каждому элементу в RDD. В следующем примере мы формируем пару «ключ-значение» и сопоставляем каждую строку со значением 1.
----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------
Command - Команда для map (f, preservePartitioning = False) -
$SPARK_HOME/bin/spark-submit map.py
Output - Результат выполнения вышеуказанной команды -
Key value pair -> [
('scala', 1),
('java', 1),
('hadoop', 1),
('spark', 1),
('akka', 1),
('spark vs hadoop', 1),
('pyspark', 1),
('pyspark and spark', 1)
]
уменьшить (е)
После выполнения указанной коммутативной и ассоциативной двоичной операции возвращается элемент в СДР. В следующем примере мы импортируем пакет add из оператора и применяем его к 'num', чтобы выполнить простую операцию добавления.
----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------
Command - Команда для уменьшения (f) -
$SPARK_HOME/bin/spark-submit reduce.py
Output - Результат выполнения вышеуказанной команды -
Adding all the elements -> 15
присоединиться (другое, numPartitions = None)
Он возвращает RDD с парой элементов с совпадающими ключами и всеми значениями для этого конкретного ключа. В следующем примере есть две пары элементов в двух разных СДР. После объединения этих двух RDD мы получаем RDD с элементами, имеющими совпадающие ключи и их значения.
----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------
Command - Команда для соединения (other, numPartitions = None) -
$SPARK_HOME/bin/spark-submit join.py
Output - Вывод для вышеуказанной команды -
Join RDD -> [
('spark', (1, 2)),
('hadoop', (4, 5))
]
кеш ()
Сохраните этот RDD с уровнем хранения по умолчанию (MEMORY_ONLY). Вы также можете проверить, кэширован ли RDD или нет.
----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------
Command - Команда для cache () -
$SPARK_HOME/bin/spark-submit cache.py
Output - Вывод для вышеуказанной программы -
Words got cached -> True
Это были одни из самых важных операций, которые выполняются в PySpark RDD.
Для параллельной обработки Apache Spark использует общие переменные. Копия общей переменной отправляется на каждый узел кластера, когда драйвер отправляет задачу исполнителю в кластере, чтобы ее можно было использовать для выполнения задач.
Apache Spark поддерживает два типа общих переменных:
- Broadcast
- Accumulator
Давайте разберемся с ними подробнее.
Трансляция
Переменные широковещательной передачи используются для сохранения копии данных на всех узлах. Эта переменная кэшируется на всех машинах и не отправляется на машины с задачами. В следующем блоке кода содержится подробная информация о классе Broadcast для PySpark.
class pyspark.Broadcast (
sc = None,
value = None,
pickle_registry = None,
path = None
)
В следующем примере показано, как использовать переменную Broadcast. Переменная Broadcast имеет атрибут с именем value, который хранит данные и используется для возврата транслируемого значения.
----------------------------------------broadcast.py--------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Broadcast app")
words_new = sc.broadcast(["scala", "java", "hadoop", "spark", "akka"])
data = words_new.value
print "Stored data -> %s" % (data)
elem = words_new.value[2]
print "Printing a particular element in RDD -> %s" % (elem)
----------------------------------------broadcast.py--------------------------------------
Command - Команда для широковещательной переменной выглядит следующим образом -
$SPARK_HOME/bin/spark-submit broadcast.py
Output - Ниже приведены выходные данные для следующей команды.
Stored data -> [
'scala',
'java',
'hadoop',
'spark',
'akka'
]
Printing a particular element in RDD -> hadoop
Аккумулятор
Накопительные переменные используются для агрегирования информации посредством ассоциативных и коммутативных операций. Например, вы можете использовать аккумулятор для операции суммирования или счетчиков (в MapReduce). В следующем блоке кода содержится подробная информация о классе Accumulator для PySpark.
class pyspark.Accumulator(aid, value, accum_param)
В следующем примере показано, как использовать переменную Accumulator. У переменной Accumulator есть атрибут, называемый значением, который аналогичен тому, что имеет широковещательная переменная. Он хранит данные и используется для возврата значения аккумулятора, но может использоваться только в программе драйвера.
В этом примере аккумуляторная переменная используется несколькими рабочими процессами и возвращает накопленное значение.
----------------------------------------accumulator.py------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
global num
num+=x
rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print "Accumulated value is -> %i" % (final)
----------------------------------------accumulator.py------------------------------------
Command - Команда для переменной аккумулятора выглядит следующим образом -
$SPARK_HOME/bin/spark-submit accumulator.py
Output - Результат выполнения вышеуказанной команды приведен ниже.
Accumulated value is -> 150
Чтобы запустить приложение Spark на локальном компьютере / кластере, вам необходимо установить несколько конфигураций и параметров, в этом и помогает SparkConf. Он предоставляет конфигурации для запуска приложения Spark. В следующем блоке кода содержится подробная информация о классе SparkConf для PySpark.
class pyspark.SparkConf (
loadDefaults = True,
_jvm = None,
_jconf = None
)
Первоначально мы создадим объект SparkConf с помощью SparkConf (), который загрузит значения из spark.*Системные свойства Java. Теперь вы можете задавать различные параметры с помощью объекта SparkConf, и их параметры будут иметь приоритет над свойствами системы.
В классе SparkConf есть методы установки, которые поддерживают цепочку. Например, вы можете написатьconf.setAppName(“PySpark App”).setMaster(“local”). Когда мы передаем объект SparkConf в Apache Spark, он не может быть изменен ни одним пользователем.
Ниже приведены некоторые из наиболее часто используемых атрибутов SparkConf.
set(key, value) - Чтобы установить свойство конфигурации.
setMaster(value) - Чтобы установить главный URL.
setAppName(value) - Установить имя приложения.
get(key, defaultValue=None) - Получить значение конфигурации ключа.
setSparkHome(value) - Установить путь установки Spark на рабочих узлах.
Давайте рассмотрим следующий пример использования SparkConf в программе PySpark. В этом примере мы устанавливаем имя приложения Spark какPySpark App и установив главный URL-адрес для приложения Spark на → spark://master:7077.
В следующем блоке кода есть строки, когда они добавляются в файл Python, он устанавливает базовые конфигурации для запуска приложения PySpark.
---------------------------------------------------------------------------------------
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName("PySpark App").setMaster("spark://master:7077")
sc = SparkContext(conf=conf)
---------------------------------------------------------------------------------------
В Apache Spark вы можете загружать файлы, используя sc.addFile (sc - ваш SparkContext по умолчанию) и получите путь к рабочему, используя SparkFiles.get. Таким образом, SparkFiles разрешает пути к файлам, добавленным черезSparkContext.addFile().
SparkFiles содержат следующие методы классов -
- get(filename)
- getrootdirectory()
Давайте разберемся с ними подробнее.
получить (имя файла)
Он указывает путь к файлу, который добавляется через SparkContext.addFile ().
getrootdirectory ()
Он указывает путь к корневому каталогу, который содержит файл, добавленный через SparkContext.addFile ().
----------------------------------------sparkfile.py------------------------------------
from pyspark import SparkContext
from pyspark import SparkFiles
finddistance = "/home/hadoop/examples_pyspark/finddistance.R"
finddistancename = "finddistance.R"
sc = SparkContext("local", "SparkFile App")
sc.addFile(finddistance)
print "Absolute Path -> %s" % SparkFiles.get(finddistancename)
----------------------------------------sparkfile.py------------------------------------
Command - Команда следующая -
$SPARK_HOME/bin/spark-submit sparkfiles.py
Output - Вывод для вышеуказанной команды -
Absolute Path ->
/tmp/spark-f1170149-af01-4620-9805-f61c85fecee4/userFiles-641dfd0f-240b-4264-a650-4e06e7a57839/finddistance.R
StorageLevel решает, как следует хранить RDD. В Apache Spark StorageLevel решает, следует ли хранить RDD в памяти, или на диске, или и то, и другое. Он также решает, следует ли сериализовать RDD и реплицировать ли разделы RDD.
В следующем блоке кода есть определение класса StorageLevel -
class pyspark.StorageLevel(useDisk, useMemory, useOffHeap, deserialized, replication = 1)
Теперь, чтобы определить хранилище RDD, существуют разные уровни хранения, которые приведены ниже -
DISK_ONLY = StorageLevel (Истина, Ложь, Ложь, Ложь, 1)
DISK_ONLY_2 = StorageLevel (Истина, Ложь, Ложь, Ложь, 2)
MEMORY_AND_DISK = StorageLevel (True, True, False, False, 1)
MEMORY_AND_DISK_2 = StorageLevel (Истина, Истина, Ложь, Ложь, 2)
MEMORY_AND_DISK_SER = StorageLevel (True, True, False, False, 1)
MEMORY_AND_DISK_SER_2 = StorageLevel (Истина, Истина, Ложь, Ложь, 2)
MEMORY_ONLY = StorageLevel (False, True, False, False, 1)
MEMORY_ONLY_2 = StorageLevel (False, True, False, False, 2)
MEMORY_ONLY_SER = StorageLevel (False, True, False, False, 1)
MEMORY_ONLY_SER_2 = StorageLevel (False, True, False, False, 2)
OFF_HEAP = StorageLevel (True, True, True, False, 1)
Давайте рассмотрим следующий пример StorageLevel, где мы используем уровень хранения MEMORY_AND_DISK_2, это означает, что разделы RDD будут иметь репликацию 2.
------------------------------------storagelevel.py-------------------------------------
from pyspark import SparkContext
import pyspark
sc = SparkContext (
"local",
"storagelevel app"
)
rdd1 = sc.parallelize([1,2])
rdd1.persist( pyspark.StorageLevel.MEMORY_AND_DISK_2 )
rdd1.getStorageLevel()
print(rdd1.getStorageLevel())
------------------------------------storagelevel.py-------------------------------------
Command - Команда следующая -
$SPARK_HOME/bin/spark-submit storagelevel.py
Output - Результат выполнения вышеуказанной команды приведен ниже -
Disk Memory Serialized 2x Replicated
Apache Spark предлагает API машинного обучения под названием MLlib. PySpark также имеет этот API машинного обучения на Python. Он поддерживает различные типы алгоритмов, которые упомянуты ниже -
mllib.classification - The spark.mllibпакет поддерживает различные методы бинарной классификации, многоклассовой классификации и регрессионного анализа. Некоторые из самых популярных алгоритмов классификации:Random Forest, Naive Bayes, Decision Tree, и т.д.
mllib.clustering - Кластеризация - это проблема обучения без учителя, при которой вы стремитесь сгруппировать подмножества сущностей друг с другом на основе некоторого понятия сходства.
mllib.fpm- Частое сопоставление с образцом - это поиск часто встречающихся элементов, наборов элементов, подпоследовательностей или других подструктур, которые обычно являются одними из первых шагов для анализа крупномасштабного набора данных. Это была активная тема исследования данных в течение многих лет.
mllib.linalg - Утилиты MLlib для линейной алгебры.
mllib.recommendation- Совместная фильтрация обычно используется для рекомендательных систем. Эти методы направлены на заполнение недостающих записей в матрице ассоциации пользовательских элементов.
spark.mllib- В настоящее время он поддерживает совместную фильтрацию на основе моделей, при которой пользователи и продукты описываются небольшим набором скрытых факторов, которые можно использовать для прогнозирования отсутствующих записей. spark.mllib использует алгоритм альтернативных наименьших квадратов (ALS) для изучения этих скрытых факторов.
mllib.regression- Линейная регрессия относится к семейству алгоритмов регрессии. Цель регрессии - найти взаимосвязи и зависимости между переменными. Интерфейс для работы с моделями линейной регрессии и сводками моделей аналогичен случаю логистической регрессии.
В составе пакета mllib есть и другие алгоритмы, классы и функции. На данный момент давайте разберемся с демонстрацией наpyspark.mllib.
В следующем примере показана совместная фильтрация с использованием алгоритма ALS для построения модели рекомендаций и ее оценки на обучающих данных.
Dataset used - test.data
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
--------------------------------------recommend.py----------------------------------------
from __future__ import print_function
from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
if __name__ == "__main__":
sc = SparkContext(appName="Pspark mllib Example")
data = sc.textFile("test.data")
ratings = data.map(lambda l: l.split(','))\
.map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))
# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
--------------------------------------recommend.py----------------------------------------
Command - Команда будет следующей -
$SPARK_HOME/bin/spark-submit recommend.py
Output - Результатом вышеуказанной команды будет -
Mean Squared Error = 1.20536041839e-05
Сериализация используется для настройки производительности в Apache Spark. Все данные, которые отправляются по сети, записываются на диск или сохраняются в памяти, должны быть сериализованы. Сериализация играет важную роль в дорогостоящих операциях.
PySpark поддерживает настраиваемые сериализаторы для настройки производительности. PySpark поддерживает следующие два сериализатора -
MarshalSerializer
Сериализует объекты с помощью Python Marshal Serializer. Этот сериализатор быстрее, чем PickleSerializer, но поддерживает меньше типов данных.
class pyspark.MarshalSerializer
PickleSerializer
Сериализует объекты с помощью Python Pickle Serializer. Этот сериализатор поддерживает практически любой объект Python, но может быть не таким быстрым, как более специализированные сериализаторы.
class pyspark.PickleSerializer
Давайте посмотрим на пример сериализации PySpark. Здесь мы сериализуем данные с помощью MarshalSerializer.
--------------------------------------serializing.py-------------------------------------
from pyspark.context import SparkContext
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "serialization app", serializer = MarshalSerializer())
print(sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10))
sc.stop()
--------------------------------------serializing.py-------------------------------------
Command - Команда следующая -
$SPARK_HOME/bin/spark-submit serializing.py
Output - Результат выполнения вышеуказанной команды -
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]