Service con Spark
Sulla piattaforma Alida è consentito inserire applicazioni Spark nei Service. Tuttavia, per sfruttare pienamente le potenzialità del framework nel calcolo distribuito, è necessario approntare alcuni accorgimenti e strutturare i Service in modo diverso.
L'esecuzione di un Service Spark si articola in tre fasi:
- Lancio dell'applicazione Spark tramite un comando
spark-submit - La creazione di un pod contenente il driver spark e al suo interno l'esecuzione del programma Spark principale
- La creazione di nodi per gli executor in modo da eseguire i task in maniera distribuita
Questo modello di esecuzione implica lo sviluppo di due immagini, una relativa allo spark-client - che gestisce la richiesta dell'applicazione Spark nel cluster kubernetes - e una relativa al programma Spark vero e proprio. La gestione dei nodi per driver/executor verrà gestista automaticamente da Spark.
Di seguito verrà esaminato nel dettaglio il processo di sviluppo delle due immagini. Facendo riferimento alla versione di Spark 4.x.y.
Spark Application
L'immagine dell'applicazione Spark contiene il codice Spark vero e proprio, assieme alla logica di business.
Ad esempio, in caso si utilizzi Java, l'immagine conterrà il file .jar dell'applicazione, mentre in caso di Python conterrà i vari moduli.
Sarà quindi necessario realizzare un Dockerfile che includa il codice eseguibile dell'applicazione.
E' consigliabile adoperare un'immagine base ufficiale di Apache Spark, come apache/spark:4.0.1
Le configurazioni relative alla gestione delle risorse del cluster, come anche le configurazioni specifiche relative al caso d'uso, verranno fornite dallo spark-client all'esecuzione dello spark-submit. Tra queste sono inclusi i consueti parametri inseriti da Alida, come argomenti o variabili di ambiente.
Non sarà necessario approntare un metamodello, in quanto questa immagine non corrisponderà a un blocco Service nel designer, ma sarà indicata come parametro nel comando submit dello spark-client. L'immagine in questione sarà eseguita come spark-driver.
Va sottolineato il fatto che, per l'utilizzo in cluster di questa immagine da parte di Spark,
essa non dovrà contenere un entrypoint.
Sarà infatti lo spark-submit a stabilire la modalità di esecuzione dell'immagine.
Inserire un entrypoint personalizzato potrebbe impedire la corretta esecuzione del
job spark in maniera distribuita, riducendola a una dimensione atomica sul singolo nodo del driver.
Non è inoltre necessario definire un metamodello. Infatti, sarà sufficiente registrare il client in Alida, il quale istanzierà l'applicazione direttamente dall'immagine Docker indicata, aggiungendo eventuali parametri e configurazioni.
Spark Client
L'immagine client sottopone al cluster Kubernetes la richiesta di creazione ed esecuzione dell'applicazione Spark. Ciò si articolerà prima nella creazione di un container corrispondente allo Spark driver e poi eventuali altri container per gli executor. Spark stesso si occuperà di creare/distruggere i pod in questione.
Funge inoltre da raccordo tra l'applicazione Spark e Alida. A questo livello le
informazioni e i dati propri di Alida, come variabili d'ambiente o parametri
configurati dall'utente, verranno trasmesse all'applicazione Spark vera e propria,
così che possano essere accedute come un qualsiasi Service Alida, previa registrazione tramite un metamodello opportuno.
In particolare, l'immagine client:
- Recupera la variabile d'ambiente TARGET, contenente le specifiche di affinity e tolerations, e in base a queste costruisce dinamicamente un manifest yaml secondo le cui specifiche verranno istanziati i pod applicativi.
- Esegue il comando spark-submit, inoltrando eventuali parametri/variabili di interesse presenti nel contesto di esecuzione di Alida.
- Recupera l'exit code del programma driver per aggiornare lo stato di esecuzione.
Di seguito saranno easminati in dettaglio i punti salienti delle due immagini descritte. La versione di Spark di riferimento è la 4.0.1.
Dettagli di sviluppo - client
Il cuore dello spark client è il comando spark-submit, inserito in un file .sh che
funge da entrypoint per l'immagine. Questo entrypoint.sh conterrà porzioni di
codice analoghe alle seguenti.
#!/bin/bash
# Variabili con default (sovrascrivibili con export prima dello script)
SPARK_MASTER="${SPARK_MASTER:-local[*]}"
SPARK_NAMESPACE="${NAMESPACE:-alida}"
SPARK_DEPLOY_MODE="${SPARK_DEPLOY_MODE:-client}"
SPARK_APP_NAME="${SPARK_APP_NAME:-sparkapp}"
SPARK_IMAGE="${SPARK_IMAGE:-dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.1}"
SPARK_DRIVER_MEMORY="${SPARK_DRIVER_MEMORY:-4g}"
SPARK_DRIVER_CORES="${SPARK_DRIVER_CORES:-2}"
SPARK_EXECUTOR_MEMORY="${SPARK_EXECUTOR_MEMORY:-4g}"
SPARK_EXECUTOR_CORES="${SPARK_EXECUTOR_CORES:-2}"
SPARK_PULLSECRETS="${SPARK_PULLSECRETS:-alida-regcred}"
TARGET="${TARGET:-$(cat <<'EOF'
{
"affinity":{
"nodeAffinity":{
"requiredDuringSchedulingIgnoredDuringExecution":{
"nodeSelectorTerms":[
{
"matchExpressions":[
{
"key":"kubernetes.io/role",
"operator":"In",
"values":[
"opt-worker"
]
}
]
}
]
}
}
},
"tolerations":[
{
"effect":"NoSchedule",
"key":"opt-worker",
"operator":"Equal",
"value":"true"
},
{
"effect":"NoExecute",
"key":"opt-worker",
"operator":"Equal",
"value":"true"
}
]
}
EOF
)}"
Per prima cosa, vengono lette eventuali variabili di ambiente, qui inizializzate con default. In particolare la variabile target contiene dati in formato json relativi ad manifest yaml per il pod.
# Extract affinity and tolerations into env vars
AFFINITY=$(echo "$TARGET" | jq -c '.affinity')
TOLERATIONS=$(echo "$TARGET" | jq -c '.tolerations')
echo "MAKING POD YAML DESCRIPTOR FROM TARGET ENV..."
# Convert JSON to YAML using yq
TOLERATIONS_YAML=$(echo "$TOLERATIONS" | yq eval -P -)
AFFINITY_YAML=$(echo "$AFFINITY" | yq eval -P -)
# Output to pod-template.yaml
cat <<EOF > pod-template.yaml
apiVersion: v1
kind: Pod
spec:
tolerations:
$(echo "$TOLERATIONS_YAML" | sed 's/^/ /')
affinity:
$(echo "$AFFINITY_YAML" | sed 's/^/ /')
EOF
Viene poi costruito dinamicamente un file pod-template.yaml che verrà indicato al comando spark-submit.
echo "SUBMITTING SPARK APPLICATION..."
OUTPUT="$(/opt/spark/bin/spark-submit \
--master k8s://kubernetes.default.svc \
--deploy-mode cluster \
--name "${SPARK_APP_NAME}" \
--conf spark.kubernetes.namespace="${SPARK_NAMESPACE}" \
--conf spark.kubernetes.container.image=dockerhub.alidalab.it/alida/restricted/services/spark-kmeans-example:1.0.0 \
--conf spark.kubernetes.driver.limit.memory="${SPARK_DRIVER_MEMORY}" \
--conf spark.kubernetes.driver.limits.memory="${SPARK_DRIVER_MEMORY}" \
--conf spark.kubernetes.driver.request.cores="${SPARK_DRIVER_CORES}" \
--conf spark.kubernetes.executor.limit.memory="${SPARK_EXECUTOR_MEMORY}" \
--conf spark.kubernetes.executor.limits.memory="${SPARK_EXECUTOR_MEMORY}" \
--conf spark.kubernetes.executor.request.cores="${SPARK_EXECUTOR_CORES}" \
--conf spark.kubernetes.driver.podTemplateFile=pod-template.yaml \
--conf spark.kubernetes.executor.podTemplateFile=pod-template.yaml \
--conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" \
--conf spark.kubernetes.driverEnv.EXECUTION_ID="${EXECUTION_ID}" \
--conf spark.kubernetes.driverEnv.HASH_TOKEN="${HASH_TOKEN}" \
--conf spark.kubernetes.driverEnv.SERVICE_ID="${SERVICE_ID}" \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.driver.memory="${SPARK_DRIVER_MEMORY}" \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.serializer=org.apache.spark.serializer.JavaSerializer \
--conf spark.hadoop.fs.s3a.connection.timeout=60000 \
--conf spark.hadoop.fs.s3a.connection.establish.timeout=5000 \
--conf spark.hadoop.fs.s3a.attempts.maximum=10 \
--conf spark.hadoop.fs.s3a.paging.maximum=1000 \
--conf spark.hadoop.fs.s3a.connection.maximum=200 \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.threads.keepalivetime=60000 \
--conf spark.hadoop.fs.s3a.connection.ttl=300000 \
--conf spark.hadoop.fs.s3a.multipart.purge.age=86400000 \
--conf spark.hadoop.fs.s3a.assumed.role.session.duration=1800000 \
--conf spark.driver.userClassPathFirst=true \
--conf spark.executor.userClassPathFirst=true \
--conf spark.driver.log.localDir=/tmp \
--conf spark.driver.extraJavaOptions="-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties" \
local:///app/src/main/main.py $@ 2>&1)"
Il parametro spark.kubernetes.container.image fa riferimento al tag dell'immagine applicativa.
Le configurazioni spark.kubernetes.authenticate.driver.serviceAccountName=argo-workflow e
--conf spark.kubernetes.container.image.pullSecrets="${SPARK_PULLSECRETS}" sono necessarie
per accedere correttamente alle immagini nel cluster Alida e per operare sui pod.
Le configurazioni del tipo spark.hadoop.fs.s3a... sono necessarie per l'integrazione
con un filesystem di tipo S3, nel nostro caso MinIO. Nei casi in cui viene assegnato
un timestamp, si va semplicemente a riportare i valori di default convertiti in millisecondo.
Le librerie di Hadoop necessarie per l'integrazione, infatti, usano il formato temporale 60s, 24h... etc.
Ciò causa degli errori nel funzionamento con la versione 4 di Spark, che supporta i millisecondi.
I parametri userClassPathFirst fanno in modo che in caso di conflitti tra librerie,
vengano privilegiate quelle indicate dal programmatore.
Per il recupero dei log del driver, le variabili spark.driver.log.localDir e
spark.driver.extraJavaOptions=-Ddriver.log.dir=/tmp -Dlog4j2.configurationFile=/opt/spark/conf/log4j2.properties
fanno si che lo spark driver scriva i log in una cartella indicata
all'interno del container del driver (immagine applicativa).
$@ in coda aggiunge gli arguments di Alida, rendendoli visibili nel job Spark.
Infine il comando è wrappato in una variabile che cattura il log, utile per
riesaminarlo terminata l'esecuzione. 2>&1 riunisce stdout e stderr.
Di seguito il Dockerfile corrispondente.
FROM apache/spark:4.0.1
USER root
# Install yq
RUN wget https://github.com/mikefarah/yq/releases/download/v4.45.1/yq_linux_amd64 -O /usr/local/bin/yq && \
chmod +x /usr/local/bin/yq
# Install jq
RUN apt-get update && apt-get install -y jq
USER spark
COPY entrypoint.sh .
ENTRYPOINT ["/bin/sh", "entrypoint.sh"]
Notare l'installazione dei tool jq e yq per la manipolazione di .json e .yaml.
Dettagli di sviluppo - app
L'immagine applicativa contiene il codice Spark da eseguire. Pertanto necessita di una immagine contentente Spark e eventuali dipendenze necessarie. Di seguito vedremo un esempio di servizio Spark con PySpark. Iniziamo visionando il Dockerfile:
FROM apache/spark:4.0.1
USER root
# Install Python 3 and pip
RUN apt-get update && \
apt-get install -y python3 python3-pip python3-dev && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
COPY ./requirements.txt .
RUN pip install -r requirements.txt
# install dependencies to read/write on S3-compatible
ADD https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar /opt/spark/jars/hadoop-aws-3.3.6.jar
ADD https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.406/aws-java-sdk-bundle-1.12.406.jar /opt/spark/jars/aws-java-sdk-bundle-1.12.406.jar
RUN chown -R spark:spark /opt/spark/jars
USER spark
COPY src/ /app/src/
# Copy custom log4j2 config
COPY log4j2.properties /opt/spark/conf/log4j2.properties
Partendo dall'immagine base di Spark, si installa Python e relative dipendenze. Vengono installate inoltre delle librerie di Hadoop per l'interfacciamento con AWS, che nel nostro caso servono per accedere a minio come se fosse un filesystem locale. Infine viene incluso un file di configurazione per i log, in modo che Spark logghi secondo le nostre specifiche.
status = error
name = PropertiesConfig
# File appender (all logs go here)
appender.file.type = File
appender.file.name = FILE
appender.file.fileName = /path/to/your/logfile.log
appender.file.layout.type = PatternLayout
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} %-5level [%t] %c{1} - %msg%n
appender.file.immediateFlush = true
appender.file.append = true # ensure append mode
# Root logger
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = FILE
Per quanto riguarda il programma vero e proprio, questo si occuperà di:
- Leggere parametri di interesse.
- Eseguire il job spark.
- Inviare il file di log come Workflow Media, per renderlo visionabile nel Workflow.
Per il primo punto, se il client è stato impostato come visto precedentemente, è possibile recuperare gli arguments nella maniera classica.
import argparse
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0', ''):
return False
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)
parser.add_argument('--input-dataset.use_ssl', dest='input_dataset_use_ssl', type=str2bool, required=True)
parser.add_argument('--output-model', dest='output_model', type=str, required=True)
parser.add_argument('--output-model.minio_bucket', dest='output_model_minio_bucket', type=str, required=True)
parser.add_argument('--output-model.minIO_URL', dest='output_model_minio_url', type=str, required=True)
parser.add_argument('--output-model.minIO_ACCESS_KEY', dest='output_model_access_key', type=str, required=True)
parser.add_argument('--output-model.minIO_SECRET_KEY', dest='output_model_secret_key', type=str, required=True)
parser.add_argument('--output-model.use_ssl', dest='output_model_use_ssl', type=str2bool, required=True)
parser.add_argument("--n_clusters", help="Number of clusters for KMeans." ,type=int, default=5)
parser.add_argument("--max_iterations", help="Number of clusters for KMeans." ,type=int, default=5)
parser.add_argument('--input-columns', dest='input_columns', type=str, required=False)
parser.add_argument('--go_manager.brokers', dest='go_manager_brokers', type=str, required=True)
parser.add_argument('--go_manager.topic', dest='go_manager_topic', type=str, required=True)
parser.add_argument('--go_manager.base_path', dest='go_minio_path', type=str, required=True)
parser.add_argument('--go_manager.minio_bucket', dest='go_minio_bucket', type=str, required=True)
parser.add_argument('--go_manager.minIO_URL', dest='go_minio_url', type=str, required=True)
parser.add_argument('--go_manager.minIO_ACCESS_KEY', dest='go_access_key', type=str, required=True)
parser.add_argument('--go_manager.minIO_SECRET_KEY', dest='go_secret_key', type=str, required=True)
parser.add_argument('--go_manager.use_ssl', dest='go_use_ssl', type=str2bool, required=True)
args, unknown = parser.parse_known_args()
Di seguito è mostrato il codice applicativo di un job PySpark che esegue l'algoritmo Kmeans.
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from utils import send_application_media
from arguments import args
import os
import sys
import atexit
import traceback
log_local_dir = os.environ.get("DRIVER_LOG_DIR", "/tmp")
log_file_path = f"{log_local_dir}/driver.log"
log_file = open(log_file_path, "a") # append mode
sys.stdout = log_file
sys.stderr = log_file
def run_kmeans_example(k: int = 3,
max_iter: int = 20,
init_mode: str = "k-means||"):
"""
Runs KMeans clustering on a dataset.
Args:
input_path: path to data (could be CSV, Parquet, libsvm etc.)
k: number of clusters
max_iter: maximum number of iterations
tol: convergence tolerance
init_mode: initialization mode ("random" or "k-means||")
seed: random seed
"""
# Create Spark session (Spark 4.0.0)
spark = SparkSession.builder \
.appName("KMeansExample_Spark_4_0_0") \
.config("spark.hadoop.fs.s3a.endpoint", args.input_dataset_minio_url) \
.config("spark.hadoop.fs.s3a.access.key", args.input_dataset_access_key) \
.config("spark.hadoop.fs.s3a.secret.key", args.input_dataset_secret_key) \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", args.input_dataset_use_ssl) \
.getOrCreate()
sc = spark.sparkContext
df = spark.read.option("header", "true").option("inferSchema", "true").csv(f"s3a://{args.input_dataset_minio_bucket}/{args.input_dataset}")
#Suppose you know which columns are numeric features
feature_cols = [c for c, t in df.dtypes if t in ("double", "float", "int", "long")]
#Remove columns you don't want, e.g. label or id
feature_cols = [c for c in feature_cols if c not in ("CustomerID")]
# Assemble into feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
df_features = assembler.transform(df).select("features")
# Instantiate KMeans
kmeans = KMeans(
k=k,
initMode=init_mode,
maxIter=max_iter
)
# Fit the model
model = kmeans.fit(df_features)
# Get cluster centers
centers = model.clusterCenters()
print(f"Cluster Centers: {centers}")
# Make predictions
predictions = model.transform(df_features)
# Evaluate clustering (silhouette score)
evaluator = ClusteringEvaluator(metricName="silhouette", distanceMeasure="squaredEuclidean", featuresCol="features", predictionCol="prediction")
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette with squared euclidean distance = {silhouette}")
# Summary: get sizes etc
summary = model.summary
print(f"Cluster sizes -> {summary.clusterSizes}")
print(f"Training cost (sum of squared distances): {summary.trainingCost}")
print(f"Number of iterations until convergence: {summary.numIter}")
print(f"Saving model to Minio")
model.write().overwrite().save(f"s3a://{args.output_model_minio_bucket}/{args.output_model}/kmeans-model-pyspark")
spark.stop()
def save_log_to_minio():
try:
send_application_media("/tmp/driver.log", "driver.log", "file", args)
except Exception:
print ("Failed to upload logs to application media")
traceback.print_exc()
if __name__ == "__main__":
atexit.register(save_log_to_minio)
print(f"START SPARK JOB - SERVICE {os.environ.get('SERVICE_ID', 'SERVICE_ID')}")
run_kmeans_example(k=args.n_clusters, max_iter=args.max_iterations, init_mode="k-means||")
print(f"END SPARK JOB - SERVICE {os.environ.get('SERVICE_ID', 'SERVICE_ID')}")
Notare come il salvataggio dei log sia stato inserito in un metodo atexit.
Questo per far si che i log vengano inviati come Workflow Media al termine
dell'esecuzione, anche in caso di errori.
Le righe di codice in testa:
log_local_dir = os.environ.get("DRIVER_LOG_DIR", "/tmp")
log_file_path = f"{log_local_dir}/driver.log"
log_file = open(log_file_path, "a") # append mode
sys.stdout = log_file
sys.stderr = log_file
Inviano i log applicativi sullo stesso file di log prodotto da Spark.
Per quanto riguarda interfacciamento con MinIO e uso dei Workflow Media, le
modalità sono quelle consuete nello sviluppo di servizi Alida.
Di seguito comunque il modulo utils.py di riferimento.
import json
import os
import traceback
import uuid
from datetime import datetime
from kafka import KafkaProducer
from minio import Minio
def _init_producer_kafka(args):
if not hasattr(_init_producer_kafka, "_producer"):
kafka_brokers = args.go_manager_brokers.split(",")
_init_producer_kafka._producer = KafkaProducer(
bootstrap_servers=kafka_brokers
)
return _init_producer_kafka._producer
def upload_file_to_minio(minio_url, access_key, secret_key, bucket_name, object_name, local_file_path, secure=False):
# Initialize the MinIO client
address = minio_url.replace("http://", "").replace("https://", "")
client = Minio(address, access_key=access_key, secret_key=secret_key, secure=secure)
# Upload the file
client.fput_object(bucket_name, object_name, local_file_path)
def prepare_metadata_json(name, messageType):
result = {
"name": name,
"key": name.lower().replace(" ", "-"),
"uuid": str(uuid.uuid4()),
"messageType": messageType,
"created": str(datetime.now()),
"modified": str(datetime.now())
}
if "SERVICE_ID" in os.environ:
result['serviceId'] = os.environ.get('SERVICE_ID')
if "EXECUTION_ID" in os.environ:
result['executionId'] = os.environ.get('EXECUTION_ID')
if "HASH_TOKEN" in os.environ:
result['hashToken'] = os.environ.get('HASH_TOKEN')
return result
def send_message(data, args):
producer = _init_producer_kafka(args)
producer.send(args.go_manager_topic, json.dumps(data).encode('utf-8'))
producer.flush()
def prepare_file_metadata(name, messageType, path, extension, filename, **kwargs):
metadata = prepare_metadata_json(name=name, messageType=messageType, **kwargs)
metadata['path'] = path
metadata['extension'] = extension
if messageType == "picture":
metadata['filename'] = filename
return metadata
def send_application_media(file_to_send, file_name, file_type, args, **kwargs):
folder = args.go_minio_path
if folder[-1] != "/":
folder = folder + "/"
metadata = prepare_file_metadata(name=os.path.basename(file_to_send).split('.')[0], messageType=file_type, path=folder + file_name,
extension=os.path.basename(file_to_send).split('.')[-1], filename=file_name, **kwargs)
upload_file_to_minio(args.go_minio_url, args.go_access_key, args.go_secret_key, args.go_minio_bucket, folder + file_name, file_to_send, secure=args.go_use_ssl)
print(f"[MEDIA] type={file_type} local={file_to_send} remote_object={args.go_minio_url} uploadto={args.go_minio_path + file_name}")
send_message(metadata, args)
def send_log(name, value, value_type, args, **kwargs):
message_to_sent = prepare_metadata_json(name, "log", **kwargs)
message_to_sent['value'] = value
message_to_sent['valueType'] = value_type
send_message(message_to_sent, args)
Limitazioni e problemi noti
Come già accennato, è presente un disallineamento tra le librerie di hadoop e spark 4 sul formato dei valori temporali. Spark 4 non supporta più il parsing dei valori temporali di tipo 60s, 24h… che invece vengono usati dalle librerie di Hadoop per l’integrazione con s3, il che rende necessaria una sovrascrittura in millisecondi come visto nei paragrafi precedenti.
Un’altra limitazione riguarda la gestione delle configurazioni di minio: queste vengono impostate globalmente nella configurazione di spark, all’inizio dell’esecuzione, cachate e poi propagate agli executor. Eventuali modifiche durante l’esecuzione del job non verranno rilevate se non nel driver.
Questo è particolarmente rilevante per la gestione delle credenziali, in quanto non rende possibile connettersi a path minio con credenziali diverse. È quindi consigliabile preparare opportunamente eventuali input e output ponendoli tutti un un’area con medesime credenziali.
In Alida in particolare, è opportuno assicurarsi che i dataset di input possano essere ricondotti allo stesso bucket/path degli output, che viene generato automaticamente e corrisponde al datasource di default impostato alla creazione del workflow.