PySpark-RDD
システムにPySparkをインストールして構成したので、ApacheSpark上のPythonでプログラミングできます。ただし、その前に、Sparkの基本的な概念であるRDDについて理解しましょう。
RDDはの略です Resilient Distributed Dataset、これらは、クラスター上で並列処理を実行するために複数のノードで実行および操作される要素です。RDDは不変の要素です。つまり、RDDを作成すると、それを変更することはできません。RDDはフォールトトレラントでもあるため、障害が発生した場合は自動的に回復します。これらのRDDに複数の操作を適用して、特定のタスクを実行できます。
これらのRDDに操作を適用するには、2つの方法があります-
- 変革と
- Action
これら2つの方法を詳しく理解しましょう。
Transformation−これらは、新しいRDDを作成するためにRDDに適用される操作です。Filter、groupBy、mapは変換の例です。
Action −これらは、RDDに適用される操作であり、Sparkに計算を実行し、結果をドライバーに送り返すように指示します。
PySparkで操作を適用するには、を作成する必要があります PySpark RDD最初。次のコードブロックには、PySparkRDDクラスの詳細が含まれています-
class pyspark.RDD (
jrdd,
ctx,
jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)
PySparkを使用していくつかの基本的な操作を実行する方法を見てみましょう。Pythonファイルの次のコードは、言及された単語のセットを格納するRDD単語を作成します。
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
次に、単語に対していくつかの操作を実行します。
カウント()
RDDの要素数が返されます。
----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------
Command − count()のコマンドは−です。
$SPARK_HOME/bin/spark-submit count.py
Output −上記のコマンドの出力は−です。
Number of elements in RDD → 8
collect()
RDD内のすべての要素が返されます。
----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------
Command − collect()のコマンドは−です。
$SPARK_HOME/bin/spark-submit collect.py
Output −上記のコマンドの出力は−です。
Elements in RDD -> [
'scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
foreach(f)
foreach内の関数の条件を満たす要素のみを返します。次の例では、foreachでprint関数を呼び出します。この関数は、RDD内のすべての要素を出力します。
----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------
Command − foreach(f)のコマンドは−です。
$SPARK_HOME/bin/spark-submit foreach.py
Output −上記のコマンドの出力は−です。
scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark
filter(f)
フィルタ内の関数を満たす要素を含む新しいRDDが返されます。次の例では、「spark」を含む文字列を除外します。
----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------
Command − filter(f)のコマンドは−です。
$SPARK_HOME/bin/spark-submit filter.py
Output −上記のコマンドの出力は−です。
Fitered RDD -> [
'spark',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
map(f、preservesPartitioning = False)
RDDの各要素に関数を適用すると、新しいRDDが返されます。次の例では、キーと値のペアを形成し、すべての文字列を値1でマップします。
----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------
Command − map(f、preservesPartitioning = False)のコマンドは−です。
$SPARK_HOME/bin/spark-submit map.py
Output −上記のコマンドの出力は−です。
Key value pair -> [
('scala', 1),
('java', 1),
('hadoop', 1),
('spark', 1),
('akka', 1),
('spark vs hadoop', 1),
('pyspark', 1),
('pyspark and spark', 1)
]
reduce(f)
指定された可換および連想二項演算を実行した後、RDDの要素が返されます。次の例では、演算子からaddパッケージをインポートし、それを「num」に適用して、単純な加算操作を実行しています。
----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------
Command − reduce(f)のコマンドは−です。
$SPARK_HOME/bin/spark-submit reduce.py
Output −上記のコマンドの出力は−です。
Adding all the elements -> 15
join(other、numPartitions = None)
一致するキーとその特定のキーのすべての値を持つ要素のペアを含むRDDを返します。次の例では、2つの異なるRDDに2組の要素があります。これらの2つのRDDを結合した後、一致するキーとその値を持つ要素を持つRDDを取得します。
----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------
Command − join(other、numPartitions = None)のコマンドは−です。
$SPARK_HOME/bin/spark-submit join.py
Output −上記のコマンドの出力は−です。
Join RDD -> [
('spark', (1, 2)),
('hadoop', (4, 5))
]
キャッシュ()
このRDDをデフォルトのストレージレベル(MEMORY_ONLY)で永続化します。RDDがキャッシュされているかどうかを確認することもできます。
----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------
Command − cache()のコマンドは−です。
$SPARK_HOME/bin/spark-submit cache.py
Output −上記のプログラムの出力は−です。
Words got cached -> True
これらは、PySparkRDDで実行される最も重要な操作の一部でした。