Collection of Service Examples
Below you will find a series of service examples. For simplicity, we will provide the metamodel in JSON format. For more information on manual definition, visit:
Ex. 1: Dataset Input and Output on MinIO
Core Program Code
import argparse
def str2bool(v):
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0', ''):
return False
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)
parser.add_argument('--input-dataset.use_ssl', dest='input_dataset_use_ssl', type=str2bool, required=True)
parser.add_argument('--input-columns', dest='input_columns', type=str, required=False)
parser.add_argument('--output-dataset', dest='output_dataset', type=str, required=True)
parser.add_argument('--output-dataset.minio_bucket', dest='output_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset.minIO_URL', dest='output_dataset_minio_url', type=str, required=True)
parser.add_argument('--output-dataset.minIO_ACCESS_KEY', dest='output_dataset_access_key', type=str, required=True)
parser.add_argument('--output-dataset.minIO_SECRET_KEY', dest='output_dataset_secret_key', type=str, required=True)
parser.add_argument('--output-dataset.use_ssl', dest='output_dataset_use_ssl', type=str2bool, required=True)
args, unknown = parser.parse_known_args()
def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
if folder[-1] != "/":
folder = folder + "/"
client = Minio(
address,
access_key=access_key,
secret_key=secret_key,
secure=use_ssl
)
objects = client.list_objects(bucket_name, prefix=folder)
files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
if len(files_list) > 1:
return ["s3://" + bucket_name + "/" + x for x in files_list]
elif len(files_list) == 1:
return "s3://" + bucket_name + "/" + files_list[0]
else:
raise Exception("Dataset is empty!")
def load_from_minio(args):
storage_options = {
'key': args.input_dataset_access_key,
'secret': args.input_dataset_secret_key,
'client_kwargs': {
'endpoint_url': f'{args.input_dataset_minio_url}'
}
}
file_path = minio_ls(args.input_dataset_minio_url, args.input_dataset_access_key, args.input_dataset_secret_key, args.input_dataset_minio_bucket, args.input_dataset, ".csv")
dataset = pd.read_csv(file_path, storage_options=storage_options, sep=None, engine='python')
if args.input_columns is not None and args.input_columns.strip() != '*':
selected_columns = [c.strip() for c in args.input_columns.split(",")]
dataset = dataset[selected_columns]
return dataset
def save_dataset_to_minio(df, args):
storage_options = {
'key': args.output_dataset_access_key,
'secret': args.output_dataset_secret_key,
'client_kwargs': {
'endpoint_url': f'{args.output_dataset_minio_url}'
}
}
file_path = f"s3://{args.output_dataset_minio_bucket}/{args.output_dataset}/output-dataset.csv"
print(f"[TO_CSV] path={file_path} endpoint={args.output_dataset_minio_url} bucket={args.output_dataset_minio_bucket}")
df.to_csv(
file_path,
storage_options=storage_options,
index=False
)
dataset = load_from_minio(args)
save_dataset_to_minio(dataset, args)
Metamodel
{
"name": "example",
"description": "example of input and output data",
"mode": "BATCH",
"url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/example:1.0.0",
"version": "1.0.0",
"accessLevel": "PUBLIC",
"properties": [
{
"description": "Your input dataset",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-dataset",
"valueType": "STRING",
"invisible": true
},
{
"defaultValue": "ANY",
"description": "Selected columns from table",
"key": "input-columns",
"type": "application",
"mandatory": true,
"valueType": "STRING",
"invisible": true
},
{
"description": "Your output dataset",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "output-dataset",
"valueType": "STRING",
"invisible": true
}
]
}
Ex. 2: Input Stream and Output Stream on Kafka
Core Program Code
import argparse
from kafka import KafkaConsumer, KafkaProducer
from arguments import args
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset', dest='input_topic', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)
parser.add_argument('--output-dataset', dest='output_topic', type=str, required=True)
parser.add_argument('--output-dataset.minio_bucket', dest='output_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset.minIO_URL', dest='output_dataset_minio_url', type=str, required=True)
parser.add_argument('--output-dataset.minIO_ACCESS_KEY', dest='output_dataset_access_key', type=str, required=True)
parser.add_argument('--output-dataset.minIO_SECRET_KEY', dest='output_dataset_secret_key', type=str, required=True)
args, unknown = parser.parse_known_args()
def _transform_data(input_data):
# Here the service logic has to be implemented
return input_data
consumer = KafkaConsumer(
args.input_topic,
bootstrap_servers=args.input_kafka_brokers.split(",")
)
producer = KafkaProducer(
bootstrap_servers=args.output_kafka_brokers.split(",")
)
for message in consumer:
data_to_send = _transform_data(message.value)
producer.send(args.output_topic, data_to_send)
Metamodel
{
"name": "consume-and-publish",
"description": "Consume and publish example",
"mode": "BATCH",
"url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/consume-and-publish:1.0.0",
"version": "1.0.0",
"accessLevel": "PUBLIC",
"properties": [
{
"description": "The input channel where to read the text.",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-dataset",
"valueType": "STRING",
"invisible": true
},
{
"description": "The output channel to publish to.",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "output-dataset",
"valueType": "STRING",
"invisible": true
}
]
}
Ex. 3: Input Dataset and Output Model on MinIO
Core Program Code
import argparse
import os
from minio import Minio
import pandas as pd
from sklearn.cluster import KMeans
parser = argparse.ArgumentParser()
parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)
parser.add_argument('--output-model', dest='output_model', type=str, required=True)
parser.add_argument('--output-model.minio_bucket', dest='output_model_minio_bucket', type=str, required=True)
parser.add_argument('--output-model.minIO_URL', dest='output_model_minio_url', type=str, required=True)
parser.add_argument('--output-model.minIO_ACCESS_KEY', dest='output_model_access_key', type=str, required=True)
parser.add_argument('--output-model.minIO_SECRET_KEY', dest='output_model_secret_key', type=str, required=True)
parser.add_argument("--n_clusters", help="Number of clusters for KMeans.", type=int, default=5)
parser.add_argument('--input-columns', dest='input_columns', type=str, required=False)
args, unknown = parser.parse_known_args()
def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
if folder[-1] != "/":
folder = folder + "/"
client = Minio(
address,
access_key=access_key,
secret_key=secret_key,
secure=use_ssl
)
objects = client.list_objects(bucket_name, prefix=folder)
files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
if len(files_list) > 1:
return ["s3://" + bucket_name + "/" + x for x in files_list]
elif len(files_list) == 1:
return "s3://" + bucket_name + "/" + files_list[0]
else:
raise Exception("Dataset is empty!")
def load_dataset_from_minio(args):
storage_options = {
'key': args.input_dataset_access_key,
'secret': args.input_dataset_secret_key,
'client_kwargs': {
'endpoint_url': f'{args.input_dataset_minio_url}'
}
}
file_path = minio_ls(args.input_dataset_minio_url, args.input_dataset_access_key, args.input_dataset_secret_key, args.input_dataset_minio_bucket, args.input_dataset, ".csv")
dataset = pd.read_csv(file_path, storage_options=storage_options, sep=None, engine='python')
if args.input_columns is not None and args.input_columns.strip() != '*':
selected_columns = [c.strip() for c in args.input_columns.split(",")]
dataset = dataset[selected_columns]
return dataset
def save_model_to_minio(model, args):
storage_options = {
'key': args.output_model_access_key,
'secret': args.output_model_secret_key,
'client_kwargs': {
'endpoint_url': f'{args.output_model_minio_url}'
}
}
file_path = f"s3://{args.output_model_minio_bucket}/{args.output_model}/kmeans_model.joblib"
model.save(file_path, format="joblib")
print(f"Model saved to MinIO: {file_path}")
dataset = load_dataset_from_minio(args)
kmeans = KMeans(n_clusters=args.n_clusters, random_state=0)
kmeans.fit(dataset)
save_model_to_minio(kmeans, args)
Metamodel
{
"name": "kmeans-example",
"description": "Kmeans example",
"mode": "BATCH",
"url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/kmeans-example:1.0.0",
"version": "1.0.0",
"accessLevel": "PUBLIC",
"properties": [
{
"description": "Your input dataset",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "input-dataset",
"valueType": "STRING",
"invisible": true
},
{
"defaultValue": "ANY",
"description": "Selected columns from table",
"key": "input-columns",
"type": "application",
"mandatory": true,
"valueType": "STRING",
"invisible": true
},
{
"description": "Your output model",
"mandatory": true,
"type": "application",
"defaultValue": null,
"key": "output-model",
"valueType": "STRING",
"invisible": true
}
]
}