Apache Spark: implementación
La aplicación Spark, que usa spark-submit, es un comando de shell que se usa para implementar la aplicación Spark en un clúster. Utiliza todos los administradores de clústeres respectivos a través de una interfaz uniforme. Por tanto, no es necesario que configure su aplicación para cada uno.
Ejemplo
Tomemos el mismo ejemplo de recuento de palabras que usamos antes, usando comandos de shell. Aquí, consideramos el mismo ejemplo como una aplicación de chispa.
Entrada de muestra
El siguiente texto son los datos de entrada y el archivo nombrado es in.txt.
people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share.
Mira el siguiente programa:
SparkWordCount.scala
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._
object SparkWordCount {
def main(args: Array[String]) {
val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map())
/* local = master URL; Word Count = application name; */
/* /usr/local/spark = Spark Home; Nil = jars; Map = environment */
/* Map = variables to work nodes */
/*creating an inputRDD to read text file (in.txt) through Spark context*/
val input = sc.textFile("in.txt")
/* Transform the inputRDD into countRDD */
val count = input.flatMap(line ⇒ line.split(" "))
.map(word ⇒ (word, 1))
.reduceByKey(_ + _)
/* saveAsTextFile method is an action that effects on the RDD */
count.saveAsTextFile("outfile")
System.out.println("OK");
}
}
Guarde el programa anterior en un archivo llamado SparkWordCount.scala y colocarlo en un directorio definido por el usuario llamado spark-application.
Note - Mientras transformamos inputRDD en countRDD, usamos flatMap () para convertir las líneas (del archivo de texto) en palabras, el método map () para contar la frecuencia de palabras y el método reduceByKey () para contar cada repetición de palabras.
Siga los siguientes pasos para enviar esta solicitud. Ejecute todos los pasos delspark-application directorio a través de la terminal.
Paso 1: Descarga Spark Ja
Spark core jar es necesario para la compilación, por lo tanto, descargue spark-core_2.10-1.3.0.jar desde el siguiente enlace Spark core jar y mueva el archivo jar del directorio de descarga aspark-application directorio.
Paso 2: compilar el programa
Compile el programa anterior usando el comando que se proporciona a continuación. Este comando debe ejecutarse desde el directorio Spark-application. Aquí,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar es un jar de soporte de Hadoop tomado de la biblioteca Spark.
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
Paso 3: crea un JAR
Cree un archivo jar de la aplicación Spark usando el siguiente comando. Aquí,wordcount es el nombre del archivo jar.
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
Paso 4: envíe la solicitud de Spark
Envíe la aplicación Spark usando el siguiente comando:
spark-submit --class SparkWordCount --master local wordcount.jar
Si se ejecuta correctamente, encontrará el resultado que se muestra a continuación. losOKdejar entrar la siguiente salida es para la identificación del usuario y esa es la última línea del programa. Si lee detenidamente el siguiente resultado, encontrará diferentes cosas, como:
- inició con éxito el servicio 'SparkDriver' en el puerto 42954
- MemoryStore se inició con una capacidad de 267,3 MB
- Comenzó SparkUI en http://192.168.1.217:4040
- Archivo JAR agregado: /home/hadoop/piapplication/count.jar
- ResultStage 1 (saveAsTextFile en SparkPi.scala: 11) terminó en 0.566 s
- Se detuvo la interfaz de usuario web de Spark en http://192.168.1.217:4040
- MemoryStore borrado
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
Paso 5: Verificación de la salida
Después de la ejecución exitosa del programa, encontrará el directorio llamado outfile en el directorio Spark-application.
Los siguientes comandos se utilizan para abrir y verificar la lista de archivos en el directorio de archivos de salida.
$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS
Los comandos para verificar la salida en part-00000 archivo son -
$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1)
Los comandos para verificar la salida en el archivo part-00001 son:
$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)
Consulte la siguiente sección para obtener más información sobre el comando 'spark-submit'.
Sintaxis de Spark-submit
spark-submit [options] <app jar | python file> [app arguments]
Opciones
S. No | Opción | Descripción |
---|---|---|
1 | --Maestro | spark: // host: puerto, mesos: // host: puerto, hilo o local. |
2 | --modo de implementación | Ya sea para iniciar el programa controlador localmente ("cliente") o en una de las máquinas trabajadoras dentro del clúster ("clúster") (predeterminado: cliente). |
3 | --clase | La clase principal de su aplicación (para aplicaciones Java / Scala). |
4 | --nombre | Un nombre de su aplicación. |
5 | --frascos | Lista separada por comas de archivos jar locales para incluir en las rutas de clase del controlador y del ejecutor. |
6 | --paquetes | Lista separada por comas de coordenadas maven de jar para incluir en las rutas de clase del controlador y del ejecutor. |
7 | - repositorios | Lista separada por comas de repositorios remotos adicionales para buscar las coordenadas de maven dadas con --packages. |
8 | --py-archivos | Lista separada por comas de archivos .zip, .egg o .py para colocar en PYTHON PATH para aplicaciones Python. |
9 | --archivos | Lista de archivos separados por comas que se colocarán en el directorio de trabajo de cada ejecutor. |
10 | --conf (prop = val) | Propiedad de configuración de Spark arbitraria. |
11 | --archivo-de-propiedades | Ruta a un archivo desde el que cargar propiedades adicionales. Si no se especifica, buscará conf / spark-defaults. |
12 | --controlador-memoria | Memoria para el controlador (por ejemplo, 1000M, 2G) (predeterminado: 512M). |
13 | --driver-java-options | Opciones adicionales de Java para pasar al controlador. |
14 | - ruta-biblioteca-controlador | Entradas de ruta de biblioteca adicionales para pasar al controlador. |
15 | - ruta de clase de controlador | Entradas de ruta de clase adicionales para pasar al conductor. Tenga en cuenta que los archivos jar agregados con --jars se incluyen automáticamente en la ruta de clases. |
dieciséis | --ejecutor-memoria | Memoria por ejecutor (por ejemplo, 1000M, 2G) (predeterminado: 1G). |
17 | --usuario-proxy | Usuario para suplantar al enviar la solicitud. |
18 | --ayuda, -h | Muestre este mensaje de ayuda y salga. |
19 | --verbose, -v | Imprima resultados de depuración adicionales. |
20 | --versión | Imprime la versión de Spark actual. |
21 | --driver-cores NUM | Núcleos para el controlador (predeterminado: 1). |
22 | --supervisar | Si se proporciona, reinicia el controlador en caso de falla. |
23 | --matar | Si se da, mata al conductor especificado. |
24 | --estado | Si se proporciona, solicita el estado del controlador especificado. |
25 | --total-ejecutor-núcleos | Núcleos totales para todos los ejecutores. |
26 | - núcleos de ejecutor | Número de núcleos por ejecutor. (Predeterminado: 1 en el modo YARN o todos los núcleos disponibles en el trabajador en modo independiente). |