Apache Spark: implementación

La aplicación Spark, que usa spark-submit, es un comando de shell que se usa para implementar la aplicación Spark en un clúster. Utiliza todos los administradores de clústeres respectivos a través de una interfaz uniforme. Por tanto, no es necesario que configure su aplicación para cada uno.

Ejemplo

Tomemos el mismo ejemplo de recuento de palabras que usamos antes, usando comandos de shell. Aquí, consideramos el mismo ejemplo como una aplicación de chispa.

Entrada de muestra

El siguiente texto son los datos de entrada y el archivo nombrado es in.txt.

people are not as beautiful as they look, 
as they walk or as they talk. 
they are only as beautiful  as they love, 
as they care as they share.

Mira el siguiente programa:

SparkWordCount.scala

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark._  

object SparkWordCount { 
   def main(args: Array[String]) { 

      val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 
		
      /* local = master URL; Word Count = application name; */  
      /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ 
      /* Map = variables to work nodes */ 
      /*creating an inputRDD to read text file (in.txt) through Spark context*/ 
      val input = sc.textFile("in.txt") 
      /* Transform the inputRDD into countRDD */ 
		
      val count = input.flatMap(line ⇒ line.split(" ")) 
      .map(word ⇒ (word, 1)) 
      .reduceByKey(_ + _) 
       
      /* saveAsTextFile method is an action that effects on the RDD */  
      count.saveAsTextFile("outfile") 
      System.out.println("OK"); 
   } 
}

Guarde el programa anterior en un archivo llamado SparkWordCount.scala y colocarlo en un directorio definido por el usuario llamado spark-application.

Note - Mientras transformamos inputRDD en countRDD, usamos flatMap () para convertir las líneas (del archivo de texto) en palabras, el método map () para contar la frecuencia de palabras y el método reduceByKey () para contar cada repetición de palabras.

Siga los siguientes pasos para enviar esta solicitud. Ejecute todos los pasos delspark-application directorio a través de la terminal.

Paso 1: Descarga Spark Ja

Spark core jar es necesario para la compilación, por lo tanto, descargue spark-core_2.10-1.3.0.jar desde el siguiente enlace Spark core jar y mueva el archivo jar del directorio de descarga aspark-application directorio.

Paso 2: compilar el programa

Compile el programa anterior usando el comando que se proporciona a continuación. Este comando debe ejecutarse desde el directorio Spark-application. Aquí,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar es un jar de soporte de Hadoop tomado de la biblioteca Spark.

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala

Paso 3: crea un JAR

Cree un archivo jar de la aplicación Spark usando el siguiente comando. Aquí,wordcount es el nombre del archivo jar.

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

Paso 4: envíe la solicitud de Spark

Envíe la aplicación Spark usando el siguiente comando:

spark-submit --class SparkWordCount --master local wordcount.jar

Si se ejecuta correctamente, encontrará el resultado que se muestra a continuación. losOKdejar entrar la siguiente salida es para la identificación del usuario y esa es la última línea del programa. Si lee detenidamente el siguiente resultado, encontrará diferentes cosas, como:

  • inició con éxito el servicio 'SparkDriver' en el puerto 42954
  • MemoryStore se inició con una capacidad de 267,3 MB
  • Comenzó SparkUI en http://192.168.1.217:4040
  • Archivo JAR agregado: /home/hadoop/piapplication/count.jar
  • ResultStage 1 (saveAsTextFile en SparkPi.scala: 11) terminó en 0.566 s
  • Se detuvo la interfaz de usuario web de Spark en http://192.168.1.217:4040
  • MemoryStore borrado
15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started 
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954. 
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42954] 
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB 
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server 
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707. 
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040 
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029 
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader 
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54 
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver 
 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable 
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11) 
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s 
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK 
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook 
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040 
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler 
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion. 
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared 
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped 
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext 
15/07/08 13:56:14 INFO Utils: Shutdown hook called 
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af 
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

Paso 5: Verificación de la salida

Después de la ejecución exitosa del programa, encontrará el directorio llamado outfile en el directorio Spark-application.

Los siguientes comandos se utilizan para abrir y verificar la lista de archivos en el directorio de archivos de salida.

$ cd outfile 
$ ls 
Part-00000 part-00001 _SUCCESS

Los comandos para verificar la salida en part-00000 archivo son -

$ cat part-00000 
(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1)

Los comandos para verificar la salida en el archivo part-00001 son:

$ cat part-00001 
(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1)

Consulte la siguiente sección para obtener más información sobre el comando 'spark-submit'.

Sintaxis de Spark-submit

spark-submit [options] <app jar | python file> [app arguments]

Opciones

S. No Opción Descripción
1 --Maestro spark: // host: puerto, mesos: // host: puerto, hilo o local.
2 --modo de implementación Ya sea para iniciar el programa controlador localmente ("cliente") o en una de las máquinas trabajadoras dentro del clúster ("clúster") (predeterminado: cliente).
3 --clase La clase principal de su aplicación (para aplicaciones Java / Scala).
4 --nombre Un nombre de su aplicación.
5 --frascos Lista separada por comas de archivos jar locales para incluir en las rutas de clase del controlador y del ejecutor.
6 --paquetes Lista separada por comas de coordenadas maven de jar para incluir en las rutas de clase del controlador y del ejecutor.
7 - repositorios Lista separada por comas de repositorios remotos adicionales para buscar las coordenadas de maven dadas con --packages.
8 --py-archivos Lista separada por comas de archivos .zip, .egg o .py para colocar en PYTHON PATH para aplicaciones Python.
9 --archivos Lista de archivos separados por comas que se colocarán en el directorio de trabajo de cada ejecutor.
10 --conf (prop = val) Propiedad de configuración de Spark arbitraria.
11 --archivo-de-propiedades Ruta a un archivo desde el que cargar propiedades adicionales. Si no se especifica, buscará conf / spark-defaults.
12 --controlador-memoria Memoria para el controlador (por ejemplo, 1000M, 2G) (predeterminado: 512M).
13 --driver-java-options Opciones adicionales de Java para pasar al controlador.
14 - ruta-biblioteca-controlador Entradas de ruta de biblioteca adicionales para pasar al controlador.
15 - ruta de clase de controlador

Entradas de ruta de clase adicionales para pasar al conductor.

Tenga en cuenta que los archivos jar agregados con --jars se incluyen automáticamente en la ruta de clases.

dieciséis --ejecutor-memoria Memoria por ejecutor (por ejemplo, 1000M, 2G) (predeterminado: 1G).
17 --usuario-proxy Usuario para suplantar al enviar la solicitud.
18 --ayuda, -h Muestre este mensaje de ayuda y salga.
19 --verbose, -v Imprima resultados de depuración adicionales.
20 --versión Imprime la versión de Spark actual.
21 --driver-cores NUM Núcleos para el controlador (predeterminado: 1).
22 --supervisar Si se proporciona, reinicia el controlador en caso de falla.
23 --matar Si se da, mata al conductor especificado.
24 --estado Si se proporciona, solicita el estado del controlador especificado.
25 --total-ejecutor-núcleos Núcleos totales para todos los ejecutores.
26 - núcleos de ejecutor Número de núcleos por ejecutor. (Predeterminado: 1 en el modo YARN o todos los núcleos disponibles en el trabajador en modo independiente).