Skip to content

Spark Services

Service with Spark

On the Alida platform, it is permitted to insert Spark applications into Services. However, to fully exploit the potential of the framework in distributed computing, it is necessary to make some adjustments and structure the Services differently.

The execution of a Spark Service is divided into three phases:

  1. Launching the Spark application via a spark-submit command
  2. Creating a pod containing the spark driver and executing the main Spark program within it
  3. Creating nodes for executors to execute tasks in a distributed manner

This execution model implies the development of two images, one relative to the spark-client - which manages the request for the Spark application in the Kubernetes cluster - and one relative to the actual Spark program. The management of nodes for driver/executor will be automatically managed by Spark.

The following will examine in detail the process of developing the two images, referring to Spark version 4.x.y.

Spark Application

The Spark application image contains the actual Spark code, along with the business logic. For example, if using Java, the image will contain the application .jar file, while if using Python, it will contain the various modules.

It will therefore be necessary to create a Dockerfile that includes the executable code of the application. It is recommended to use an official Apache Spark base image, such as apache/spark:4.0.1

Configurations related to cluster resource management, as well as configurations specific to the use case, will be provided by the spark-client when executing the spark-submit. These include the usual parameters inserted by Alida, such as arguments or global variables.

It will not be necessary to create a metamodel, as this image will not correspond to a Service block in the designer, but will be indicated as a parameter in the submit command of the spark-client. The image in question will be executed as a spark-driver.

It should be emphasized that, for cluster use of this image by Spark, it must not contain an entrypoint. This is because the spark-submit will establish the execution mode of the image. Inserting a custom entrypoint could prevent the correct execution of the spark job in a distributed manner, reducing it to an atomic size on a single driver node.

It is also not necessary to define a metamodel. It will be sufficient to register the client in Alida, which will instantiate the application directly from the indicated Docker image, adding any parameters and configurations.

Spark Client

The client image submits the Spark application request to the Kubernetes cluster. This will first involve creating a container corresponding to the Spark driver and then any other containers for the executors. Spark itself will take care of creating/destroying the pods in question.

It also acts as a bridge between the Spark application and Alida. At this level, information and data specific to Alida, such as global variables or user-configured parameters, will be passed to the actual Spark application, so that they can be accessed as a regular Alida Service, through a suitable metamodel.

In particular, the client image:

  1. Retrieves the TARGET global variable, containing affinity and tolerations specifications, and dynamically constructs a YAML manifest based on these specifications, according to which the application pods will be instantiated.
  2. Executes the spark-submit command, forwarding any parameters/variables of interest present in the Alida execution context.
  3. Retrieves the exit code of the driver program to update the execution status.

The following will examine in detail the key points of the two images described. The reference Spark version is 4.0.1.

Development Details - Client

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

Limitations and Known Issues

As already mentioned, there is a misalignment between the Hadoop libraries and Spark 4 regarding the time value format. Spark 4 no longer supports parsing time values of the 60s, 24h… format, which in fact are used by Hadoop libraries for integration with s3, making it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

Another limitation concerns the management of MinIO configurations: these are set globally in the Spark configuration, at the beginning of the execution, cached and then propagated to the executors. Any changes during job execution will not be detected unless in the driver.

This is particularly relevant for credential management, as it does not allow connecting to MinIO paths with different credentials. It is therefore recommended to prepare the input and output appropriately, placing them all in an area with the same credentials.

In Alida, in particular, it is necessary to ensure that the input datasets can be traced back to the same bucket/path of the default datasource created when the workflow is created.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.

#!/bin/bash

# Variables with defaults (overwritable with export before the script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"

TARGET="${TARGET:-$(cat <<'EOF'
{
   "affinity":{
      "nodeAffinity":{
         "requiredDuringSchedulingIgnoredDuringExecution":{
            "nodeSelectorTerms":[
               {
                  "matchExpressions":[
                     {
                        "key":"kubernetes.io/role",
                        "operator":"In",
                        "values":[
                           "opt-worker"
                        ]
                     }
                  ]
               }
            ]
         }
      }
   },
   "tolerations":[
      {
         "effect":"NoSchedule",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      },
      {
         "effect":"NoExecute",
         "key":"opt-worker",
         "operator":"Equal",
         "value":"true"
      }
   ]
}
EOF
)}"

First, global variables are read, here initialized with defaults. In particular, the TARGET variable contains data in JSON format for the pod YAML.

# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')


echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."

# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)

# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
  tolerations: 
$(echo "$TOLERATIONS_YAML" | sed 's/^/    /')
  affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/    /')
EOF

A dynamic file pod-template.yaml is then constructed, which will be indicated to the spark-submit command.

echo "SUBMITTING SPARK APPLICATION..."

OUTPUT="$(/opt/spark/bin/spark-submit \
    --master k8s://kubernetes.default.svc \
    --deploy-mode cluster \
    --name "${SPARK_APP_NAME}" \
    --conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
    --conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
    --conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
    --conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
    --conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
    --conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
    --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
    --conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
    --conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
    --conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
    --conf spark.kubernetes.container.image.pullPolicy=Always \
    --conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
    --conf spark.hadoop.fs.s3a.connection.timeout=60000 \
    --conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
    --conf spark.hadoop.fs.s3a.attempts.maximum=10 \
    --conf spark.hadoop.fs.s3a.paging.maximum=1000 \
    --conf spark.hadoop.fs.s3a.connection.maximum=200 \
    --conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
    --conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
    --conf spark.hadoop.fs.s3a.connection.ttl=300000 \
    --conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
    --conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
    --conf spark.driver.userClassPathFirst=true \
    --conf spark.executor.userClassPathFirst=true \
    --conf spark.driver.log.localDir=/tmp \
    --conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
    local:///app/src/main/main.py $@ 2>&1)"
The spark-submit command, noting the global variables of interest read or forwarded to execution, is wrapped in a variable.

The parameter spark.kubernetes.container.image refers to the tag of the application image.

The configurations spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow and --conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" are necessary to correctly access the images in the Alida cluster and to operate on the pods.

The configurations of the type spark.hadoop.fs.s3a... are necessary for integration with an S3-type filesystem, in our case MinIO. In cases where a timestamp is assigned, it simply returns the default values converted to milliseconds. The Hadoop libraries necessary for integration, in fact, use the 60s, 24h… time format, which makes it necessary to overwrite them in milliseconds as seen in the previous paragraphs.

The userClassPathFirst parameters ensure that in case of library conflicts, the programmer-indicated libraries are prioritized.

Regarding the recovery of driver logs, the spark.driver.log.localDir variables and spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties ensure that the Spark driver writes the logs to a directory indicated inside the driver container (application image).

$@ at the end adds Alida arguments, making them visible in the Spark job.

The heart of the spark client is the spark-submit command, placed in a .sh file that serves as an entrypoint for the image. This entrypoint.sh will contain portions of code similar to the following.