Escreva e execute trabalhos do Spark Scala no Dataproc

Este tutorial ilustra diferentes formas de criar e enviar uma tarefa do Spark Scala para um cluster do Dataproc, incluindo como:

  • Escrever e compilar uma app "Hello World" do Spark Scala num computador local a partir da linha de comandos usando o Scala REPL (Read-Evaluate-Print-Loop ou intérprete interativo) ou a ferramenta de compilação SBT
  • compactar classes Scala compiladas num ficheiro JAR com um manifesto
  • enviar o JAR do Scala para uma tarefa do Spark que é executada no seu cluster do Dataproc
  • examinar a saída de tarefas Scala a partir da Google Cloud consola

Este tutorial também mostra como:

  • Escrever e executar uma tarefa de mapreduce "WordCount" do Spark Scala diretamente num cluster do Dataproc usando o spark-shell REPL

  • executar exemplos pré-instalados do Apache Spark e Hadoop num cluster

Configure um projeto da Google Cloud Platform

Caso ainda não o tenha feito:

  1. Configure um projeto
  2. Crie um contentor do Cloud Storage
  3. Crie um cluster do Dataproc

Escreva e compile código Scala localmente

Como um exercício simples para este tutorial, escreva uma app "Hello World" em Scala usando o Scala REPL ou a interface de linha de comandos SBT localmente na sua máquina de desenvolvimento.

Use Scala

  1. Transfira os ficheiros binários do Scala a partir da página Instalação do Scala
  2. Descompacte o ficheiro, defina a variável de ambiente SCALA_HOME e adicione-a ao caminho, conforme mostrado nas instruções de instalação do Scala. Por exemplo:

    export SCALA_HOME=/usr/local/share/scala
    export PATH=$PATH:$SCALA_HOME/
    

  3. Inicie o REPL do Scala

    $ scala
    Welcome to Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    scala>
    

  4. Copie e cole o código HelloWorld no REPL do Scala

    object HelloWorld {
      def main(args: Array[String]): Unit = {
        println("Hello, world!")
      }
    }

  5. Guarde HelloWorld.scala e saia do REPL

    scala> :save HelloWorld.scala
    scala> :q
    

  6. Compilar com scalac

    $ scalac HelloWorld.scala
    

  7. Liste os ficheiros .class compilados

    $ ls HelloWorld*.class
    HelloWorld$.class   HelloWorld.class
    

Use SBT

  1. Transfira o SBT

  2. Crie um projeto "HelloWorld", conforme mostrado abaixo

    $ mkdir hello
    $ cd hello
    $ echo \
    'object HelloWorld {def main(args: Array[String]) = println("Hello, world!")}' > \
    HelloWorld.scala
    

  3. Crie um ficheiro de configuração sbt.build para definir o artifactName (o nome do ficheiro JAR que vai gerar abaixo) como "HelloWorld.jar" (consulte Modificar artefactos predefinidos)

    echo \
    'artifactName := { (sv: ScalaVersion, module: ModuleID, artifact: Artifact) =>
    "HelloWorld.jar" }' > \
    build.sbt
    

  4. Inicie o SBT e execute código

    $ sbt
    [info] Set current project to hello ...
    > run
    ... Compiling 1 Scala source to .../hello/target/scala-.../classes...
    ... Running HelloWorld
    Hello, world!
    [success] Total time: 3 s ...
    

  5. Empacote o código num ficheiro JAR com um manifesto que especifica o ponto de entrada da classe principal (HelloWorld) e, de seguida, saia

    > package
    ... Packaging .../hello/target/scala-.../HelloWorld.jar ...
    ... Done packaging.
    [success] Total time: ...
    > exit
    

Crie um frasco

Crie um ficheiro JAR com SBT ou usando o comando jar.

Crie um frasco com um SBT

O comando package do SBT cria um ficheiro JAR (consulte Usar o SBT).

Crie um frasco manualmente

  1. Mude o diretório (cd) para o diretório que contém os ficheiros HelloWorld*.class compilados e, em seguida, execute o seguinte comando para criar um pacote dos ficheiros de classe num JAR com um manifest que especifica o ponto de entrada da classe principal (HelloWorld).
    $ jar cvfe HelloWorld.jar HelloWorld HelloWorld*.class
    added manifest
    adding: HelloWorld$.class(in = 637) (out= 403)(deflated 36%)
    adding: HelloWorld.class(in = 586) (out= 482)(deflated 17%)
    

Copie o JAR para o Cloud Storage

  1. Use a CLI do Google Cloud para copiar o JAR para um contentor do Cloud Storage no seu projeto
$ gcloud storage cp HelloWorld.jar gs://<bucket-name>/
Copying file://HelloWorld.jar [Content-Type=application/java-archive]...
Uploading   gs://bucket-name/HelloWorld.jar:         1.46 KiB/1.46 KiB

Envie um JAR para uma tarefa do Spark do Dataproc

  1. Use a Google Cloud consola para enviar o ficheiro JAR para a tarefa do Spark do Dataproc. Preencha os campos na página Enviar uma tarefa da seguinte forma:

    • Cluster: selecione o nome do cluster na lista de clusters
    • Tipo de tarefa: Spark
    • Classe principal ou JAR: especifique o caminho do URI do Cloud Storage para o seu JAR HelloWorld (gs://your-bucket-name/HelloWorld.jar).

      Se o seu JAR não incluir um manifesto que especifique o ponto de entrada para o seu código ("Main-Class: HelloWorld"), o campo "Classe principal ou JAR" deve indicar o nome da sua classe principal ("HelloWorld") e deve preencher o campo "Ficheiros JAR" com o caminho URI para o seu ficheiro JAR (gs://your-bucket-name/HelloWorld.jar).

  2. Clique em Enviar para iniciar a tarefa. Depois de iniciar a tarefa, esta é adicionada à lista de tarefas.

  3. Clique no ID do trabalho para abrir a página Trabalhos, onde pode ver o resultado do controlador do trabalho.

Escrever e executar código Spark Scala através do REPL spark-shell do cluster

Pode querer desenvolver apps Scala diretamente no seu cluster do Dataproc. O Hadoop e o Spark estão pré-instalados nos clusters do Dataproc e configurados com o conetor do Cloud Storage, o que permite que o seu código leia e escreva dados diretamente a partir do Cloud Storage.

Este exemplo mostra como usar o SSH para aceder ao nó principal do cluster do Dataproc do seu projeto e, em seguida, usar o REPL spark-shell para criar e executar uma aplicação de contagem de palavras mapreduce em Scala.

  1. SSH para o nó principal do cluster do Dataproc

    1. Aceda à página Clusters do Dataproc do seu projeto na Google Cloud consola e, de seguida, clique no nome do cluster.

    2. Na página de detalhes do cluster, selecione o separador Instâncias de VMs e, de seguida, clique na seleção de SSH apresentada à direita da linha do nome do cluster.

      É aberta uma janela do navegador no diretório base do nó principal

  2. Inicie a app spark-shell

    $ spark-shell
    ...
    Using Scala version ...
    Type in expressions to have them evaluated.
    Type :help for more information.
    ...
    Spark context available as sc.
    ...
    SQL context available as sqlContext.
    scala>
    

  3. Crie um RDD (conjunto de dados distribuído resiliente) a partir de um fragmento de texto de Shakespeare localizado no Cloud Storage público

    scala> val text_file = sc.textFile("gs://pub/shakespeare/rose.txt")
    

  4. Executar um wordcount mapreduce no texto e, em seguida, apresentar o resultado wordcounts

    scala> val wordCounts = text_file.flatMap(line => line.split(" ")).map(word =>
    (word, 1)).reduceByKey((a, b) => a + b)
    scala> wordCounts.collect
    ... Array((call,1), (What's,1), (sweet.,1), (we,1), (as,1), (name?,1), (any,1), (other,1),
    (rose,1), (smell,1), (name,1), (a,2), (would,1), (in,1), (which,1), (That,1), (By,1))
    

  5. Guarde as contagens em <bucket-name>/wordcounts-out no Cloud Storage e, em seguida, saia do scala-shell

    scala> wordCounts.saveAsTextFile("gs://<bucket-name>/wordcounts-out/")
    scala> exit
    

  6. Use a CLI gcloud para listar os ficheiros de saída e apresentar o conteúdo dos ficheiros

    $ gcloud storage ls gs://bucket-name/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/
    gs://spark-scala-demo-bucket/wordcounts-out/_SUCCESS
    gs://spark-scala-demo-bucket/wordcounts-out/part-00000
    gs://spark-scala-demo-bucket/wordcounts-out/part-00001
    

  7. Verifique os conteúdos de gs://<bucket-name>/wordcounts-out/part-00000

    $ gcloud storage cat gs://bucket-name/wordcounts-out/part-00000
    (call,1)
    (What's,1)
    (sweet.,1)
    (we,1)
    (as,1)
    (name?,1)
    (any,1)
    (other,1)
    

Executar código de exemplo pré-instalado

O nó principal do Dataproc contém ficheiros JAR executáveis com exemplos padrão do Apache Hadoop e Spark.

Tipo de frasco Master node /usr/lib/ location Origem do GitHub Documentação do Apache
Hadoop hadoop-mapreduce/hadoop-mapreduce-examples.jar link de origem Tutorial do MapReduce
Spark spark/lib/spark-examples.jar link de origem Exemplos do Spark

Enviar exemplos para o cluster a partir da linha de comandos

Pode enviar exemplos a partir da sua máquina de desenvolvimento local através da ferramenta de linha de comandos gcloud da Google Cloud CLI (consulte a secção Usar a Google Cloud consola para enviar tarefas a partir da Google Cloud consola).

Exemplo de WordCount do Hadoop

gcloud dataproc jobs submit hadoop --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar \
    --class=org.apache.hadoop.examples.WordCount \
    -- URI of input file URI of output file

Exemplo de contagem de palavras do Spark

gcloud dataproc jobs submit spark --cluster=cluster-name \
    --region=region \
    --jars=file:///usr/lib/spark/examples/jars/spark-examples.jar \
    --class=org.apache.spark.examples.JavaWordCount \
    -- URI of input file

Encerre o cluster

Para evitar encargos contínuos, encerre o cluster e elimine os recursos do Cloud Storage (contentor e ficheiros do Cloud Storage) usados para este tutorial.

Para encerrar um cluster:

gcloud dataproc clusters delete cluster-name \
    --region=region

Para eliminar o ficheiro JAR do Cloud Storage:

gcloud storage rm gs://bucket-name/HelloWorld.jar

Pode eliminar um contentor e todas as respetivas pastas e ficheiros com o seguinte comando:

gcloud storage rm gs://bucket-name/ --recursive

O que se segue?