Skip to main content

Build pipelines with Scala and Spark

This tutorial is focused on using Dagster Pipes to launch & monitor Apache Spark jobs implemented in Scala. The Spark integration page provides more information on using Pipes with specific Spark providers, such as AWS EMR or Databricks.

Spark is often used with object stores such as Amazon S3. In our example, Dagster will use an S3 bucket to communicate with Apache Spark (sending data to the Spark application, as well as reading logs and results from Spark).

Prerequisites

  • An AWS S3 bucket to be used for communication between Spark and Dagster (and the corresponding AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY).
  • Docker to emulate a typical Spark setup locally. This setup includes Apache Spark and the Java SDK, as well as the required Dagster libraries to run a development orchestration server.

Production considerations

For demonstration purposes, this tutorial makes a few simplifications that you should consider when deploying in production:

  • We are going to be running Spark in local mode, sharing the same container with Dagster. In production, consider having two separate environments:

    • In the Dagster environment, you'll need to have the following Python packages:

      • dagster
      • dagster-webserver --- to run the Dagster UI
      • dagster-aws --- when using S3

      You will also need to make the orchestration code available to Dagster (typically via a code location).

    • In the Spark environment, you'll need a suitable Scala compiler (we are using Gradle) and typically also the Java AWS S3 SDK packages. For example:

      curl -fSL "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.5.1/hadoop-aws-3.5.1.jar" \
      -o /opt/bitnami/spark/jars/hadoop-aws-3.5.1.jar

      curl -fSL "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.780/aws-java-sdk-bundle-1.12.780.jar" \
      -o /opt/bitnami/spark/jars/aws-java-sdk-bundle-1.12.780.jar

      Make sure hadoop-aws JAR and AWS Java SDK versions are compatible with your Spark/Hadoop build.

  • In our example, the Scala JAR will be built by Docker before Spark starts running. In production, consider building and uploading the JAR to Spark via CI/CD.

Project outline

.
├── Dockerfile
├── docker-compose.yml
├── dagster_code.py
├── external_scala
│   ├── build.gradle
│   ├── gradle.properties
│   ├── settings.gradle
│   ├── gradlew
│   └── src/main/scala/org/examples
│   └── Example.scala
│   └── build/libs
│   └── external_scala-all.jar
  • dagster_code.py will contain the Dagster orchestration code.
  • Example.scala will contain the Spark code to calculate pi, as well as the code to send messages to Dagster.
  • gradlew is the Gradle wrapper (a script that downloads Gradle on demand, which is the recommended way to use Gradle).
  • build.gradle contains all the build instructions to build and package Example.scala into a suitable JAR for submission to Spark.
  • build/libs/external_scala-all.jar is the output of this build process (created automatically by docker build).
  • Dockerfile and docker-compose.yml allow for running this example in a reproducible environment using docker compose.

Step 1: Write the Dagster orchestration code (dagster_code.py)

We will set up a few non-default Pipes components to streamline the otherwise challenging problem of orchestrating Spark jobs.

import os
import subprocess
from collections.abc import Sequence

import boto3
from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader

import dagster as dg


@dg.asset(check_specs=[dg.AssetCheckSpec(name="demo_check", asset="scala_spark_demo")])
def scala_spark_demo(context: dg.AssetExecutionContext) -> dg.MaterializeResult:
s3_client = boto3.client("s3")
s3_bucket_name = os.environ["DAGSTER_PIPES_BUCKET"]

jar_path = dg.file_relative_path(
__file__, "external_scala/build/libs/external_scala-all.jar"
)

with dg.open_pipes_session(
context=context,
message_reader=PipesS3MessageReader(bucket=s3_bucket_name, client=s3_client),
context_injector=PipesS3ContextInjector(
bucket=s3_bucket_name, client=s3_client
),
) as session:
args = []
for key, value in session.get_bootstrap_cli_arguments().items():
args.extend([key, str(value)])

subprocess.run(
["spark-submit", jar_path] + args,
shell=False,
check=True,
)

return session.get_results()


defs = dg.Definitions(
assets=[scala_spark_demo],
)
  • Notice that we are using S3 to pass Pipes messages from the Spark job to Dagster, so we create PipesS3MessageReader and PipesS3ContextInjector objects. (Technically, it's not strictly required to use S3 for passing the Dagster context, but storing it there will decrease the CLI arguments size.)

  • Notice we are using CLI arguments to pass the bootstrap information from Dagster to the Spark job. We will fetch them from the session.get_bootstrap_cli_arguments method. We pass these arguments to spark-submit.

note

In other Pipes workflows, passing the bootstrap information from Dagster to the remote Pipes session is typically done via environment variables, but setting environment variables for Spark jobs can be complicated (the exact way of doing this depends on the Spark deployment) or not possible at all. CLI arguments are a convenient alternative.

Step 2: Use Pipes in the Spark job (Example.scala)

Our example Spark workload (based on the official example) will use Spark to calculate pi, and report the result back to Dagster.

First, create a new file named Example.scala in the src/main/scala/org/examples directory, then add the following code to create a context that can be used to send messages to Dagster:

package org.examples;

import io.dagster.pipes.{
DagsterPipesException,
PipesContext,
PipesSession
}
import io.dagster.pipes.loaders._
import io.dagster.pipes.writers._
import scala.math.random
import org.apache.spark.sql.SparkSession
import software.amazon.awssdk.services.s3.S3Client
import scala.collection.mutable
import scala.collection.JavaConverters._
import java.util.HashMap

object Example {

def main(args: Array[String]): Unit = {
val s3Client = S3Client.create()

val paramsLoader = new PipesCliArgsParamsLoader(args)
val contextLoader = new PipesS3ContextLoader(s3Client)
val messageWriter = new PipesS3MessageWriter(s3Client)

val session = new PipesSession(paramsLoader, contextLoader, messageWriter)

session.runDagsterPipes(distributedCalculatePi)
}

// Based on the example at https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala
@throws[DagsterPipesException]
private def distributedCalculatePi(context: PipesContext): Unit = {
context.getLogger().info("This is a log message from scala to dagster")

val spark = SparkSession
.builder()
.appName("Spark Pi")
.getOrCreate()

val slices = 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
val x = random() * 2 - 1
val y = random() * 2 - 1
if (x*x + y*y <= 1) 1 else 0
}.reduce(_ + _)

val pi = (4.0 * count / (n-1))
spark.stop()

context.getLogger().info(s"Successfully used Spark to calculate pi! pi is approximately ${pi}")

// Report whether the value of pi is between 3 and 4:
context.reportAssetCheck(
"demo_check",
pi < 4 && pi > 3,
mutable.Map.empty[String, Any].asJava,
null
)

val metaMap = mutable.Map("pi" -> pi.toFloat)
context.reportAssetMaterialization(
metaMap.asJava,
null,
null
)
}
}
  • Note how PipesCliArgsParamsLoader is used to load the CLI arguments passed by Dagster. This information will be used to automatically configure PipesS3MessageWriter and PipesS3ContextLoader.

  • We are using the Java Pipes Framework in our code, since Scala is fully compatible with Java libraries.

  • distributedCalculatePi is the method actually doing the calculation. It also demonstrates sending messages and reporting results back to Dagster with the methods context.getLogger().info, context.reportAssetMaterialization, and reportAssetCheck.

Step 3: Create a Gradle build file

We will also need external_scala/settings.gradle and external_scala/build.gradle files:

/* settings.gradle */

plugins {
id 'org.gradle.toolchains.foojay-resolver-convention' version '0.9.0'
}

rootProject.name = 'external_scala'

plugins {
id 'scala'
id 'java'
id 'application'
id 'com.gradleup.shadow' version '8.3.6'

}

repositories {
mavenCentral()
}

java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}

tasks.withType(ScalaCompile).configureEach {
scalaCompileOptions.additionalParameters = ['-target:jvm-17']
}

dependencies {
// compileOnly - because Spark already provides the Scala and Spark libraries.
// everything else should be included in the "FatJar" created by shadow.
compileOnly 'org.scala-lang:scala-library:2.12.18'
compileOnly 'org.apache.spark:spark-sql_2.12:3.5.5'

implementation 'io.dagster:pipes:1.1.0'
}

application {
mainClass = 'org.examples.Example'
}

shadowJar {
mergeServiceFiles()
archiveClassifier.set('all')
zip64 true
}
  • We are using the shadow plugin to create the JAR, since our dependencies must be bundled in the JAR.

  • Note that the scala-library and spark-sql dependencies are marked compileOnly. Since Spark deployments already contain these libraries, they must be excluded from the JAR package.

Step 4: Run the pipeline with Docker

  1. Place the PySpark code for script.py and the Dagster orchestration code for dagster_code.py in the same directory.

  2. Create a Dockerfile. Notice that ./gradlew build will run as part of docker build:

ARG SPARK_VERSION=3.5.5

FROM bitnami/spark:${SPARK_VERSION}

USER root

COPY --from=ghcr.io/astral-sh/uv:0.5.11 /uv /uvx /bin/

RUN install_packages curl

RUN uv pip install --system dagster dagster-cloud dagster-webserver dagster-aws

RUN mkdir -p /dagster_home
ENV DAGSTER_HOME=/dagster_home

COPY dagster_code.py /src/
COPY external_scala/ /src/external_scala/

# Build the Scala JAR for Spark
WORKDIR /src/external_scala/
RUN ./gradlew build

WORKDIR /src/
  1. Create a docker-compose.yml. Don't forget to update the environment variables AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION:
services:
dagster-dev:
build:
context: .
dockerfile: Dockerfile
command:
- "dagster"
- "dev"
- "-f"
- "/src/dagster_code.py"
- "--host"
- "0.0.0.0"
- "--port"
- "3000"
ports:
- "3000:3000"
environment:
AWS_ACCESS_KEY_ID: <AWS_ACCESS_KEY_ID>
AWS_SECRET_ACCESS_KEY: <AWS_SECRET_ACCESS_KEY>
AWS_REGION: <AWS_REGION>
DAGSTER_PIPES_BUCKET: dagster-pipes
  1. Start the Dagster dev instance inside Docker:
docker compose up --build
  1. Navigate to http://localhost:3000 to open the Dagster UI and materialize the asset. Metadata and logs from Spark will be available in Dagster!