Skip to content

Collection of Service Examples

Below you will find a series of service examples. For simplicity, we will provide the metamodel in JSON format. For more information on manual definition, visit:

Ex. 1: Dataset Input and Output on MinIO

Core Program Code

import argparse

def str2bool(v):
    if isinstance(v, bool):
        return v
    if v.lower() in ('yes', 'true', 't', 'y', '1'):
        return True
    elif v.lower() in ('no', 'false', 'f', 'n', '0', ''):
        return False

parser = argparse.ArgumentParser()

parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)
parser.add_argument('--input-dataset.use_ssl', dest='input_dataset_use_ssl', type=str2bool, required=True)

parser.add_argument('--input-columns', dest='input_columns', type=str, required=False)

parser.add_argument('--output-dataset', dest='output_dataset', type=str, required=True)
parser.add_argument('--output-dataset.minio_bucket', dest='output_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset.minIO_URL', dest='output_dataset_minio_url', type=str, required=True)
parser.add_argument('--output-dataset.minIO_ACCESS_KEY', dest='output_dataset_access_key', type=str, required=True)
parser.add_argument('--output-dataset.minIO_SECRET_KEY', dest='output_dataset_secret_key', type=str, required=True)
parser.add_argument('--output-dataset.use_ssl', dest='output_dataset_use_ssl', type=str2bool, required=True)

args, unknown = parser.parse_known_args()

def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
    if folder[-1] != "/":
        folder = folder + "/"

    client = Minio(
        address,
        access_key=access_key,
        secret_key=secret_key,
        secure=use_ssl
    )
    objects = client.list_objects(bucket_name, prefix=folder)

    files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
    if len(files_list) > 1:
        return ["s3://" + bucket_name + "/" + x for x in files_list]
    elif len(files_list) == 1:
        return "s3://" + bucket_name + "/" + files_list[0]
    else:
        raise Exception("Dataset is empty!")

def load_from_minio(args):
    storage_options = {
        'key': args.input_dataset_access_key,
        'secret': args.input_dataset_secret_key,
        'client_kwargs': {
            'endpoint_url': f'{args.input_dataset_minio_url}'
        }
    }
    file_path = minio_ls(args.input_dataset_minio_url, args.input_dataset_access_key, args.input_dataset_secret_key, args.input_dataset_minio_bucket, args.input_dataset, ".csv")

    dataset = pd.read_csv(file_path, storage_options=storage_options, sep=None, engine='python')

    if args.input_columns is not None and args.input_columns.strip() != '*':
        selected_columns = [c.strip() for c in args.input_columns.split(",")]
        dataset = dataset[selected_columns]

    return dataset

def save_dataset_to_minio(df, args):
    storage_options = {
        'key': args.output_dataset_access_key,
        'secret': args.output_dataset_secret_key,
        'client_kwargs': {
            'endpoint_url': f'{args.output_dataset_minio_url}'
        }
    }
    file_path = f"s3://{args.output_dataset_minio_bucket}/{args.output_dataset}/output-dataset.csv"
    print(f"[TO_CSV] path={file_path} endpoint={args.output_dataset_minio_url} bucket={args.output_dataset_minio_bucket}")

    df.to_csv(
        file_path,
        storage_options=storage_options,
        index=False
    )

dataset = load_from_minio(args)
save_dataset_to_minio(dataset, args)

Metamodel

{
    "name": "example",
    "description": "example of input and output data",
    "mode": "BATCH",
    "url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/example:1.0.0",
    "version": "1.0.0",
    "accessLevel": "PUBLIC",
    "properties": [
        {
            "description": "Your input dataset",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-dataset",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "defaultValue": "ANY",
            "description": "Selected columns from table",
            "key": "input-columns",
            "type": "application",
            "mandatory": true,
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your output dataset",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "output-dataset",
            "valueType": "STRING",
            "invisible": true
        }
    ]
}

Ex. 2: Input Stream and Output Stream on Kafka

Core Program Code

import argparse
from kafka import KafkaConsumer, KafkaProducer
from arguments import args

parser = argparse.ArgumentParser()

parser.add_argument('--input-dataset', dest='input_topic', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)

parser.add_argument('--output-dataset', dest='output_topic', type=str, required=True)
parser.add_argument('--output-dataset.minio_bucket', dest='output_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--output-dataset.minIO_URL', dest='output_dataset_minio_url', type=str, required=True)
parser.add_argument('--output-dataset.minIO_ACCESS_KEY', dest='output_dataset_access_key', type=str, required=True)
parser.add_argument('--output-dataset.minIO_SECRET_KEY', dest='output_dataset_secret_key', type=str, required=True)

args, unknown = parser.parse_known_args()

def _transform_data(input_data):
    # Here the service logic has to be implemented
    return input_data

consumer = KafkaConsumer(
  args.input_topic,
  bootstrap_servers=args.input_kafka_brokers.split(",")
)

producer = KafkaProducer(
  bootstrap_servers=args.output_kafka_brokers.split(",")
)

for message in consumer:
    data_to_send = _transform_data(message.value)
    producer.send(args.output_topic, data_to_send)

Metamodel

{
    "name": "consume-and-publish",
    "description": "Consume and publish example",
    "mode": "BATCH",
    "url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/consume-and-publish:1.0.0",
    "version": "1.0.0",
    "accessLevel": "PUBLIC",
    "properties": [
        {
            "description": "The input channel where to read the text.",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-dataset",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "The output channel to publish to.",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "output-dataset",
            "valueType": "STRING",
            "invisible": true
        }
    ]
}

Ex. 3: Input Dataset and Output Model on MinIO

Core Program Code

import argparse
import os
from minio import Minio
import pandas as pd
from sklearn.cluster import KMeans

parser = argparse.ArgumentParser()

parser.add_argument('--input-dataset', dest='input_dataset', type=str, required=True)
parser.add_argument('--input-dataset.minio_bucket', dest='input_dataset_minio_bucket', type=str, required=True)
parser.add_argument('--input-dataset.minIO_URL', dest='input_dataset_minio_url', type=str, required=True)
parser.add_argument('--input-dataset.minIO_ACCESS_KEY', dest='input_dataset_access_key', type=str, required=True)
parser.add_argument('--input-dataset.minIO_SECRET_KEY', dest='input_dataset_secret_key', type=str, required=True)

parser.add_argument('--output-model', dest='output_model', type=str, required=True)
parser.add_argument('--output-model.minio_bucket', dest='output_model_minio_bucket', type=str, required=True)
parser.add_argument('--output-model.minIO_URL', dest='output_model_minio_url', type=str, required=True)
parser.add_argument('--output-model.minIO_ACCESS_KEY', dest='output_model_access_key', type=str, required=True)
parser.add_argument('--output-model.minIO_SECRET_KEY', dest='output_model_secret_key', type=str, required=True)

parser.add_argument("--n_clusters", help="Number of clusters for KMeans.", type=int, default=5)
parser.add_argument('--input-columns', dest='input_columns', type=str, required=False)

args, unknown = parser.parse_known_args()

def minio_ls(address, access_key, secret_key, bucket_name, folder, extention, use_ssl=False):
    if folder[-1] != "/":
        folder = folder + "/"

    client = Minio(
        address,
        access_key=access_key,
        secret_key=secret_key,
        secure=use_ssl
    )
    objects = client.list_objects(bucket_name, prefix=folder)

    files_list = [x._object_name for x in objects if extention in x._object_name[-len(extention):]]
    if len(files_list) > 1:
        return ["s3://" + bucket_name + "/" + x for x in files_list]
    elif len(files_list) == 1:
        return "s3://" + bucket_name + "/" + files_list[0]
    else:
        raise Exception("Dataset is empty!")

def load_dataset_from_minio(args):
    storage_options = {
        'key': args.input_dataset_access_key,
        'secret': args.input_dataset_secret_key,
        'client_kwargs': {
            'endpoint_url': f'{args.input_dataset_minio_url}'
        }
    }
    file_path = minio_ls(args.input_dataset_minio_url, args.input_dataset_access_key, args.input_dataset_secret_key, args.input_dataset_minio_bucket, args.input_dataset, ".csv")

    dataset = pd.read_csv(file_path, storage_options=storage_options, sep=None, engine='python')

    if args.input_columns is not None and args.input_columns.strip() != '*':
        selected_columns = [c.strip() for c in args.input_columns.split(",")]
        dataset = dataset[selected_columns]

    return dataset

def save_model_to_minio(model, args):
    storage_options = {
        'key': args.output_model_access_key,
        'secret': args.output_model_secret_key,
        'client_kwargs': {
            'endpoint_url': f'{args.output_model_minio_url}'
        }
    }
    file_path = f"s3://{args.output_model_minio_bucket}/{args.output_model}/kmeans_model.joblib"
    model.save(file_path, format="joblib")
    print(f"Model saved to MinIO: {file_path}")

dataset = load_dataset_from_minio(args)
kmeans = KMeans(n_clusters=args.n_clusters, random_state=0)
kmeans.fit(dataset)
save_model_to_minio(kmeans, args)

Metamodel

{
    "name": "kmeans-example",
    "description": "Kmeans example",
    "mode": "BATCH",
    "url": "docker://gitlab.alidalab.it:5000/alida/analytics/python-applications/kmeans-example:1.0.0",
    "version": "1.0.0",
    "accessLevel": "PUBLIC",
    "properties": [
        {
            "description": "Your input dataset",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "input-dataset",
            "valueType": "STRING",
            "invisible": true
        },
        {
            "defaultValue": "ANY",
            "description": "Selected columns from table",
            "key": "input-columns",
            "type": "application",
            "mandatory": true,
            "valueType": "STRING",
            "invisible": true
        },
        {
            "description": "Your output model",
            "mandatory": true,
            "type": "application",
            "defaultValue": null,
            "key": "output-model",
            "valueType": "STRING",
            "invisible": true
        }
    ]
}