Skip to content

Processing Reference Implementation

The following practical specifications describes the way every thematic service shall comply with in order to index generated output inside the HUB. Here below there is the table of content of the following document:

[[TOC]]

Glossary

Term Definition
CWL – Common Workflow Language A standard for describing analysis workflows in a portable, reproducible way.
STAC – SpatioTemporal Asset Catalog A specification to organize geospatial assets (e.g., satellite images) into standardized metadata collections and items.
JSON – JavaScript Object Notation A lightweight data format used for structuring data in key-value pairs, often used for metadata.
Containerized Environment An isolated, portable environment where applications run, typically using Docker.
AOI – Area of Interest The geographical region selected by the user for analysis.
STAC Item A single geospatial asset (e.g., one map file and its metadata) in STAC format.
STAC Collection A grouping of related STAC Items with shared metadata.
API – Application Programming Interface A set of rules that allows different software applications to communicate.
RAM – Random Access Memory The memory allocated to a computing task, defined in resource requirements.
Cron Scheduler A tool that schedules recurring jobs (like automatic data processing) at fixed times or intervals.
User The end-user accessing the Governmental Hub through its interface to request or interact with services.
Governmental Hub UI The graphical interface that users interact with to submit requests, visualize results, and download products.
Governmental Hub Module The internal orchestrator or tasking component that manages workflows, triggers processing, and handles logic based on user input or scheduled tasks.
Governmental Hub Catalogue (STAC API) The backend metadata catalog used to store, search, and retrieve geospatial datasets and their STAC metadata.
Containerized Environment (Dedicated Storage) Where data, intermediate files, and results are stored during processing.
Thematic Service – Data Availability The first step in the workflow pipeline that verifies if all required data is accessible.
Thematic Service – Processing Module The main computational block responsible for executing algorithms and producing output data and STAC metadata.
External Data Acquisition Module Optional service component that fetches external data from third-party APIs when needed.

Main scope

This document provides a reference implementation and detailed integration guidelines for developing Thematic Services within the AXIS 3 Governmental Hub ecosystem. Thematic Services are modular processing pipelines responsible for transforming raw or derived geospatial data into structured, value-added products aligned with specific environmental, agricultural, disaster, or infrastructure monitoring use cases. Each Thematic Service —namely Land, Forest, Water, Agriculture, and Safety & Security — comprises a set of processes, with each process implementing a specific analytical capability.

There are two kinds of processes that a Thematic Service can be set up:

  • Indexing process: A process that will index the produced STAC items into the AXIS 3 Hub Catalogue.

  • Generic process: A process that can be set up to perform any kind of processing.

Indexing process

Each indexing process must be implemented as a two-step CWL workflow that supports the following functional stages: - Data Availability Check – A lightweight, fail-fast algorithm that verifies whether the required input datasets are present (either in the Hub Catalogue or accessible via external sources). Based on this assessment, the workflow proceeds, triggers tasking, or terminates with a controlled failure. - Computation Phase – The core processing logic that generates STAC-compliant outputs, including: - An assets/ directory containing all output files (raster/vector), - An items/ directory with item-level STAC metadata and a collection.json file. - Log files useful to analyze the process execution.

These outputs are automatically post-processed by the AXIS 3 Hub to complete the indexing, publication, and delivery phases.

Generic process

Each generic process must have at least one CWL workflow step that supports the following functional stages: - Computation Phase – The core processing logic that execute the job and generate: - Log files useful to analyze the process execution.

The produced log files are automatically post-processed by AXIS 3 Hub and made available for subsequent download and analysis by a Thematic Service.

All Thematic Services must be containerized (Docker-based), self-contained, and designed to operate without external input at trigger time. The reference workflow must conform to the Common Workflow Language (CWL v1.2) standard, and services are expected to provide detailed logs via a step's log file for integration into Hub monitoring systems.

This specification shall be considered as work in progress. Nevertheless, while minor adjustments may occur, the core logic and architectural principles will remain stable.


Thematic Service High level Operational Process

The Thematic Service High level Operational Process is outlines the end-to-end workflow involved in executing a user-created process.

Thematic Service processes can be indexing or generic processes.

  • Generic processes are processes that can be set up to perform any kind of processing written by the Thematic Service.
  • Indexing processes are processes dedicated to:
  • discover and check data availability.
  • execute an automated data processing using domain-specific computation algorithms.
  • standardize the process results.
  • index the processed data into the AXIS 3 Hub Catalogue.
  • deliver the processed data to end users via the AXIS 3 Hub UI.

This modular and interoperable framework ensures that each thematic service—whether systematic or ad-hoc—can be orchestrated efficiently while remaining flexible to support a variety of applications.

The Systematic Thematic Service and Ad-hoc Service represent two fundamental operational models within the Governmental Hub for delivering thematic geospatial analysis or executing custom generic processes. While the Systematic Service ensures routine, automated data processing on a scheduled basis, the Ad-hoc Service enables user-driven, on-demand analysis tailored to specific events and areas of interest.
These two models serve as foundational examples of how data-driven workflows can be implemented; however, the structure and execution of each thematic service may vary depending on the nature of the application, the availability of data, and operational requirements.

Indexing process

Systematic Thematic Service – Description

The Systematic Thematic Service is a scheduled, automated data processing pipeline that operates on a predefined temporal basis (e.g., monthly, weekly, or yearly). It supports thematic applications (for example flood risk mapping) by systematically querying, acquiring, and ingesting both internal and external geospatial datasets.

An indexing process begins with the Governmental Hub Module, which initiates a query to the Thematic Service (Data Availability) to assess the presence of required data from the Governmental Hub Catalogue (STAC API) and Dedicated Storage. If some required data is not available internally, and external datasets are needed (e.g., EO data not stored in the HUB, sensor feeds, or meteorological inputs), the system dynamically activates the External Data Acquisition Module to retrieve them from third-party APIs.

Once all necessary data is available, the system triggers the Thematic Service (Processing Module – computation algorithms) to perform data ingestion, fusion, and algorithmic processing. This includes generating outputs such as flood risk maps and metadata. The resulting products are indexed in the Hub Catalogue and made accessible through the Governmental Hub UI, where users can visualize and download them interactively. Throughout the workflow, the system provides status updates (e.g., process started, tasking needed, data missing, process fail, process completed) to keep users informed in real time.

::: mermaid sequenceDiagram title Systematic Thematic Service

%% Participants
participant User as 👤 User

%% 🟩 Governmental Hub (Green)
participant UI as 🟩🖥️ Governmental Hub UI
participant Mod as 🟩☁️ Governmental Hub Module\n(service orchestrator or tasking module)
participant Cat as 🟩🗄️ Governmental Hub Catalogue\n(STAC API)
participant Store as 🟩🗄️ Containarized Enviroment\n(Dedicated Storage)

%% 🔷 Thematic Services (Blue)
participant DA as 🔷📘 Thematic Service\n(Data Availability)
participant Proc as 🔷📘 Thematic Service\n(Processing Module – computation algorithms)
participant ExtTool as 🔷🛠️ External Data\nAcquisition Module

%% 🟥 External API
participant ExtAPI as 🟥 External API\n(EO/sensors/meteorological data)

%% External data acquisition (optional)
opt IF Thematic service needs external data
    Mod->>ExtTool: Activation of the tool every time that need it
    ExtAPI->>ExtTool: Download of data from APIs
    ExtTool->>Store: Store data to dedicated storage
end

%% Query for data availability
Mod->>DA: Query all data needed for starting the process (Monthly/Weekly/Yearly)

%% Data availability block
par Data Availability
    DA->>Store: Data need
    Store-->>DA: Data available
    DA->>Cat: Data need
    Cat-->>DA: Data available
end

%% Response with data list
DA-->>Mod: List of data needed for starting the process

%% Failure or Tasking
Mod-->>UI: Notify if data is missing (fail status)
Mod->>Mod: If needs tasking (post image)
Mod-->>UI: Notify the user that Tasking is needed (and Process is pending) OR if fail for tasking

%% Start processing
Mod->>Proc: Trigger Start
Mod-->>UI: Notify that the process starts (success status)

%% Data ingestion
Proc->>Store: Data need
Store-->>Proc: Data ingestion
Proc->>Cat: Data need
Cat-->>Proc: Data ingestion

%% Computation
Proc->>Proc: Main computation algorithms
Proc->>Proc: Processing outputs (physical files, collection.json, items/ folder)

%% Completion
Proc-->>Mod: Process finish
Mod->>Cat: Index products in catalog
Mod-->>UI: Notify that the process is finished
Mod->>UI: Provide data to UI

%% User interaction
User->>UI: Log in to Governmental HUB UI
UI-->>User: Authenticate and authorize
User->>UI: Navigate to Service and select the Thematic Service
UI-->>User: Provide interactive visualization with layered results
User->>UI: Download Flood Risk Map and metadata

::: | Legend | |---------------------| | 🟩 Hub Component | | 🔷 Thematic Service | | 🟥 External |

Ad-hoc Service – Description

The Ad-hoc Service is an on-demand, event-driven workflow initiated manually by the user via the Governmental Hub UI.

Typically used for rapid response or emergency mapping (e.g., flood delineation), the process starts when the user logs in, selects a service, and defines the Area of Interest (AOI) and event date. Once confirmed, the Governmental Hub Module queries the Thematic Service (Data Availability) to check for relevant datasets in the Hub Catalogue.

If key data (e.g., satellite imagery) is missing, the module can initiate tasking for post-event acquisition, notifying the user of pending processing. Upon data confirmation, the Thematic Service (Processing Module – computation algorithms) is triggered to execute custom processing steps like water detection, optical/SAR fusion, and depth estimation. Once complete, the outputs are indexed and visualized for the user, who can interact with or download them via the UI.

::: mermaid sequenceDiagram title Ad-hoc Service (indexing process)

%% Participants
participant User as 👤 User

%% 🟩 Governmental Hub (Green)
participant UI as 🟩🖥️ Governmental Hub UI
participant Mod as 🟩☁️ Governmental Hub Module\n(service orchestrator or tasking module)
participant Cat as 🟩🗄️ Governmental Hub Catalogue\n(STAC API)

%% 🔷 Thematic Services (Blue)
participant DA as 🔷📘 Thematic Service\n(Data Availability)
participant Proc as 🔷📘 Thematic Service\n(Processing Module – computation algorithms)

%% User interaction
User->>UI: Log in to Governmental HUB UI 🟩
UI-->>User: Authenticate and authorize user 🟩
User->>UI: Navigate to Service and select Flood Delineation Module 🟩
User->>UI: Specify AOI and event date 🟩
UI-->>User: Validate AOI and event date 🟩
User->>UI: Confirm AOI and event date,\ninitiate processing 🟩

%% Trigger data availability check
UI->>Mod: Trigger Thematic Service data availability 🟩
Mod->>DA: Query all the data needed\nfor starting the process 🟩

%% Data Availability block
par  Data Availability 🔷
    DA->>Cat: Data need
    Cat-->>DA: Data available
end

DA-->>Mod: List of data needed\nfor starting the process 🔷

%% Tasking condition 🟩
opt Tasking 🟩
Mod->>Mod: Tasking (if post data not found)
Mod-->>UI: Notify the user that Tasking is needed\n(and Process is pending)
end

%% Trigger processing
Mod->>Proc: Trigger Start 🟩
Mod-->>UI: Notify that the process starts


%% Data ingestion 🔷
Proc->>Cat: Data need 🔷
Cat-->>Proc: Data ingestion🔷

%% Computation
Proc->>Proc: Main computation algorithms 🔷
Proc->>Proc: Processing outputs\n(collection.json, items folder) 🔷

%% Completion
Proc-->>Mod: Process finish 🔷
Mod->>Cat: Products indexing to catalog 🟩
Mod-->>UI: Notify that the process is finished 🟩
Mod->>UI: Provide data to UI 🟩
UI-->>User: Provide interactive visualization with layered results 🟩

::: | Legend | |---------------------| | 🟩 Hub Component | | 🔷 Thematic Service | | 🟥 External |

Generic process

Systematic Thematic Service – Description

A generic process, after the trigger, begins directly with the computation phase. (The computation phase could have more than one step defined in CWL).
If some required data is not available internally, and external datasets are needed (e.g., EO data not stored in the HUB, sensor feeds, or meteorological inputs), the system dynamically activates the External Data Acquisition Module to retrieve them from third-party APIs.

Once the process results are available, they will be made accessible through the Governmental Hub UI, where users can visualize and download process logs interactively. Throughout the workflow, the system provides status updates (e.g., process started, process fail, process completed) to keep users informed in real time.

::: mermaid sequenceDiagram title Systematic Thematic Service

%% Participants
participant User as 👤 User

%% 🟩 Governmental Hub (Green)
participant UI as 🟩🖥️ Governmental Hub UI
participant Mod as 🟩☁️ Governmental Hub Module\n(service orchestrator or tasking module)
participant Store as 🟩🗄️ Containarized Enviroment\n(Dedicated Storage)

%% 🔷 Thematic Services (Blue)
participant Proc as 🔷📘 Thematic Service\n(Processing Module – computation algorithms)
participant ExtTool as 🔷🛠️ External Data\nAcquisition Module

%% 🟥 External API
participant ExtAPI as 🟥 External API\n(EO/sensors/meteorological data)

%% External data acquisition (optional)
opt IF Thematic service needs external data
    Mod->>ExtTool: Activation of the tool every time that need it
    ExtAPI->>ExtTool: Download of data from APIs
    ExtTool->>Store: Store data to dedicated storage
end

%% Start processing
Mod->>Proc: Trigger Start
Mod-->>UI: Notify that the process starts (success status)

%% Computation
Proc->>Proc: Main computation algorithms

%% Completion
Proc-->>Mod: Process finish
Mod-->>UI: Notify that the process is finished
Mod->>UI: Provide data to UI

%% User interaction
User->>UI: Log in to Governmental HUB UI
UI-->>User: Authenticate and authorize
User->>UI: Navigate to Service and select the Thematic Service
UI-->>User: Provide interactive visualization with layered results
User->>UI: Download execution logs

::: | Legend | |---------------------| | 🟩 Hub Component | | 🔷 Thematic Service | | 🟥 External |

Ad-hoc Service – Description

Used to run rapid custom execution inside the Axis 3 Hub environment.

When the Thematic Service (Processing Module – computation algorithms) is triggered, the process execution starts.
Once complete, the outputs are visualized for the user, who can download the execution logs via the UI.

::: mermaid sequenceDiagram title Ad-hoc Service (generic process)

%% Participants
participant User as 👤 User

%% 🟩 Governmental Hub (Green)
participant UI as 🟩🖥️ Governmental Hub UI
participant Mod as 🟩☁️ Governmental Hub Module\n(service orchestrator)

%% 🔷 Thematic Services (Blue)
participant Proc as 🔷📘 Thematic Service\n(Processing Module – computation algorithms)

%% User interaction
User->>UI: Log in to Governmental HUB UI 🟩
UI-->>User: Authenticate and authorize user 🟩
User->>UI: Navigate to Service and select Custom Module 🟩
User->>UI: Initiate processing 🟩
UI->>Mod: Initiate processing 🟩

%% Trigger processing
Mod->>Proc: Trigger Start 🟩
Mod-->>UI: Notify that the process starts

%% Computation
Proc->>Proc: Main computation algorithms 🔷

%% Completion
Proc-->>Mod: Process finish 🔷
Mod-->>UI: Notify that the process is finished 🟩
Mod->>UI: Provide data to UI 🟩
UI-->>User: Provide interactive visualization with layered results 🟩

:::

Legend
🟩 Hub Component
🔷 Thematic Service
🟥 External

From operations to the implementation

In general the thematic service workflow would be the following:

  • design the algorithm, in terms of logical steps with a special focus on separating the data analysis from the data computation;
  • code the algorithm, testing and documenting it as per software engineering best practices;
  • deploy to the HUB, registering it through a GUI (not yet available to all the users), instructing the HUB about how to run them in terms of time-frequency (weekly, daily, yearly..);
  • operate it, checking and assuring it works as expected.

Few notes about the designing phase

Besides the widespread ideas about designing a software, design phase should end up with the following:

  • (only for indexing processes) clear separation between the data analysis and data computation;
  • wrapping each logical steps inside a HUB-retrievable Docker image; this approach gives thematic services all the liberty to adopt the most appropriate tool to solve the specific job;
  • merging all these dockerized steps into a unique CWL workflow (see sample below).

The algorithm, in practice

Before digging into all the details, and as reported above, the thematic service shall be able to provide:

  • (only for indexing processes) a data availability analysis algorithm, capable of stating if the real computation can happen just next.;
  • a computation algorithm, capable of producing all the output ready to be indexed later by the HUB.

In fact, thematic service is only required to assess if the computation can be performed, after an initial check. No direct indexing is foreseen, and it will not be required since the HUB will handle it on behalf of the Thematic Services.

Requirements

Aside from considerations stated above, HUB requires the following:

  • for systematic running (time-based algorithm running), the algorithm shall expect to run using some input parameters defined at registration time. Those parameters can be overridden by manual triggering, but not for systematic running;
  • (only for indexing processes) a mandatory bounding box is expected. This shall be a list of for floating numbers, expressed in 4326 projection.

A sample implementation

Indexing process

Being composed as two-phase pieces, thematic services shall submit a CWL workflow composed by at least the aforementioned steps. The requirement for wrapping everything inside a CWL should be already known and taken for granted, since a couple of tutorials has been delivered to services.

Considering all the hypotheses, a sample implementation will resemble something like this:

cwlVersion: v1.2
$namespaces:
  s: https://schema.org/
s:softwareVersion: 0.1.2
schemas:
  - http://schema.org/version/9.0/schemaorg-current-http.rdf
$graph:

  - class: Workflow
    id: <WORKFLOW_ID>
    label: <WORKFLOW_LABEL>
    doc: <WORKFLOW_DOC>
    inputs:
      spatial_extent:
        type: string[]
        label: Spatial extent bounding box [minLon, minLat, maxLon, maxLat]
    outputs:
      execution_results:
        type: Directory
        outputSource: [process/process_results]
    steps:
      analyse:
        run: "#analyse"
        in:
          spatial_extent: spatial_extent
        out: [data_analysis_results]
      process:
        run: "#process"
        in:
          spatial_extent: spatial_extent
          data_analysis_results: analyse/data_analysis_results
        out: [process_results]

  - class: CommandLineTool
    id: analyse
    requirements:
      ResourceRequirement:
        coresMax: 1
        ramMax: 512
    hints:
      DockerRequirement:
        dockerPull: <DATA_AVAILABILITY_DOCKER_IMAGE>
    baseCommand: python
    arguments:
      - <DATA_AVAILABILITY_COMMAND_TO_EXECUTE>
      - --spatial_extent
      - $(inputs.spatial_extent[0])
      - $(inputs.spatial_extent[1])
      - $(inputs.spatial_extent[2])
      - $(inputs.spatial_extent[3])
    inputs:
      spatial_extent:
        type: string[]
    outputs:
      data_analysis_results:
        type: Directory
        outputBinding:
          glob: .

  - class: CommandLineTool
    id: process
    baseCommand: python
    arguments:
      - <PROCESS_COMMAND_TO_EXECUTE>
      - --spatial_extent
      - $(inputs.spatial_extent[0])
      - $(inputs.spatial_extent[1])
      - $(inputs.spatial_extent[2])
      - $(inputs.spatial_extent[3])
    requirements:
      ResourceRequirement:
        coresMax: 1
        ramMax: 1024
    hints:
      DockerRequirement:
        dockerPull: <PROCESS_DOCKER_IMAGE>
    inputs:
      spatial_extent:
        type: string[]
      data_analysis_results:
        type: Directory
    outputs:
      process_results:
        type: Directory
        outputBinding:
          glob: .

Generic process

The thematic services shall submit a CWL workflow composed by at least one process step. The requirement for wrapping everything inside a CWL should be already known and taken for granted, since a couple of tutorials has been delivered to services.

Considering all the hypotheses, a sample implementation will resemble something like this:

cwlVersion: v1.2
$namespaces:
  s: https://schema.org/
s:softwareVersion: 0.1.2
schemas:
  - http://schema.org/version/9.0/schemaorg-current-http.rdf
$graph:

  - class: Workflow
    id: <WORKFLOW_ID>
    label: <WORKFLOW_LABEL>
    doc: <WORKFLOW_DOC>
    inputs: {}
    outputs:
      execution_results:
        type: Directory
        outputSource: [process/process_results]
    steps:
      process:
        run: "#process"
        in: {}
        out: [process_results]

  - class: CommandLineTool
    id: process
    baseCommand: python
    arguments:
      - <PROCESS_COMMAND_TO_EXECUTE>
    requirements:
      ResourceRequirement:
        coresMax: 1
        ramMax: 1024
    hints:
      DockerRequirement:
        dockerPull: <PROCESS_DOCKER_IMAGE>
    inputs: {}
    outputs:
      process_results:
        type: Directory
        outputBinding:
          glob: .

Getting deeper into the implementation

Prepare the cwl

The above cwl has some placeholders that need to be replaced by real values:

  • <WORKFLOW_ID>: ID for the workflow
  • <WORKFLOW_LABEL>: Label for the workflow
  • <WORKFLOW_DESCRIPTION>: Description for the workflow
  • (only for indexing processes) <DATA_AVAILABILITY_DOCKER_IMAGE>: Docker image for data availability analysis
  • (only for indexing processes) <DATA_AVAILABILITY_COMMAND_TO_EXECUTE>: Command to execute for data availability analysis
  • <PROCESS_DOCKER_IMAGE>: Docker image for data processing
  • <PROCESS_COMMAND_TO_EXECUTE>: Command to execute for data processing

For example:

  • <WORKFLOW_ID>: workflow_id
  • <WORKFLOW_LABEL>: Workflow label
  • <WORKFLOW_DESCRIPTION>: Workflow description
  • (only for indexing processes) <DATA_AVAILABILITY_DOCKER_IMAGE>: pminel/fs-ha-ht-b-a2-s2
  • (only for indexing processes) <DATA_AVAILABILITY_COMMAND_TO_EXECUTE>: /app/stageout_data_analysis.py
  • <PROCESS_DOCKER_IMAGE>: pminel/fs-ha-ht-b-a2-s2
  • <PROCESS_COMMAND_TO_EXECUTE>: /app/esaforest/msavi22.py

Ensure that the docker images used into every step must be available to pull via https connection.

Analyse step (only for indexing processes)

id: analyse

Analyze the data availability within the bounding box.
Implement in this step your logic to check for data availability within the given bounding box.

Inputs and outputs required by the analyse step are listed below.

Inputs:

Input Detail Required
spatial_extent Bounding box defined by [minLon, minLat, maxLon, maxLat] True

Outputs:

Output Detail Always generated
execution_data_analysis.log Logged analysis results True
fail_completely.fail Fail completely False
tasking.json Tasking instructions False

Considering the provided sample Docker image:

  • fail_completely.fail is generated if items needed to run process are less than THRESHOLD_FOR_UNRECOVERABLE_ERROR (default 0).
  • tasking.json is generated if items needed to run process are less than THRESHOLD_FOR_TASKING (default 2).

Process step

Process step for indexing processes

id: process

Runs the main geospatial processing task.
Implement in this step your logic to process the data within the given bounding box.

Inputs and outputs required by the process step are listed below.

Inputs:

Input Detail Always generated
spatial_extent Bounding box defined by [minLon, minLat, maxLon, maxLat] True

Outputs:

Output Detail
collection.json Metadata about processed collection
assets Directory of generated assets
items STAC-compliant item records
execution_process.log Log of the execution

Process step for generic processes

id: process

Runs the main processing task.
Implement in this step your logic to process the data.

Outputs:

Output Detail
execution_process.log Log of the execution

Implementation workflow schema

Indexing process

:::mermaid flowchart TD A[Start] --> B[Analyse
Check data availability] B --> C[Data analysis result interceptor
Upload to S3 all .log files] C --> D{Data analysis output folder contains
fail_completely.fail or tasking.json file?} D -- yes --> E[Upload to S3
fail_completely.fail or tasking.json file] D -- no --> F[Process
Main computation] F --> H[Process result interceptor
Upload to S3 all
.log files] H --> I{Process result output folder contains
fail_completely.fail?} I -- yes --> J[Upload to S3
fail_completely.fail file] I -- no --> K[Stageout
Indexing the STAC items] E -- process failed --> Z J -- process failed --> Z K -- process successful --> Z[End] :::

Generic process

:::mermaid flowchart TD A[Start] --> F[Process
Main computation] F --> H[Process result interceptor
Upload to S3 all *.log files] H --> I{Process result output folder contains
fail_completely.fail?} I -- yes --> J[Upload to S3
fail_completely.fail file] I -- no, process successful --> Z J -- process failed --> Z[End] :::

CWL Validation

When a user register or update a process, the Services Orchestrator perform a validation on the given cwl content.
The cwl validation is enforced by a jsonschema that define how a cwl content must be defined.

CWL validation for an indexing process

{
    "type": "object",
    "properties": {
        "cwlVersion": {
            "type": "string",
            "const": "v1.2",
        },
        "$namespaces": {
            "type": "object",
            "properties": {
                "s": {
                    "type": "string",
                    "const": "https://schema.org/",
                },
            },
        },
        "s:softwareVersion": {
            "type": "string",
            "const": "0.1.2",
        },
        "schemas": {
            "type": "array",
            "items": {
                "type": "string",
                "format": "uri",
                "const": "http://schema.org/version/9.0/schemaorg-current-http.rdf",
            },
        },
        "$graph": {
            "type": "array",
            "items": {
                "anyOf": [
                    {
                        "type": "object",
                        "properties": {
                            "class": {
                                "type": "string",
                                "const": "Workflow",
                            },
                            "id": {
                                "type": "string",
                                "description":
                                    "The unique identifier for the workflow.",
                            },
                            "inputs": {
                                "type": "object",
                                "properties": {
                                    "spatial_extent": {
                                        "type": "object",
                                        "properties": {
                                            "type": {
                                                "type": "string",
                                                "const": "string[]",
                                            }
                                        }
                                    },
                                },
                                "required": ["spatial_extent"],
                                "additionalProperties": True,
                            },
                            "outputs": {
                                "type": "object",
                                "properties": {
                                    "execution_results": {
                                        "type": "object",
                                        "properties": {
                                            "type": {
                                                "type": "string",
                                                "const": "Directory",
                                            },
                                            "outputSource": {
                                                "type": "array",
                                                "items": {
                                                    "type": "string",
                                                    "const": "process/process_results",
                                                },
                                            },
                                            "required": ["type", "outputSource"],
                                        }
                                    },
                                },
                                "required": ["execution_results"],
                            },
                            "steps": {
                                "type": "object",
                                "additionalProperties": True,
                                "properties": {
                                    "analyse": {
                                        "type": "object",
                                        "properties": {
                                            "run": {
                                                "type": "string",
                                                "const": "#analyse",
                                            },
                                            "in": {
                                                "type": "object",
                                                "properties": {
                                                    "spatial_extent": {
                                                        "type": "string",
                                                        "const": "spatial_extent",
                                                    },
                                                },
                                                "required": ["spatial_extent"],
                                            },
                                            "out": {
                                                "type": "array",
                                                "items": {
                                                    "type": "string",
                                                    "const": "data_analysis_results",
                                                },
                                            },
                                        },
                                        "required": ["run", "in", "out"],
                                        "additionalProperties": True,
                                    },
                                    "process": {
                                        "type": "object",
                                        "properties": {
                                            "run": {
                                                "type": "string",
                                                "const": "#process",
                                            },
                                            "in": {
                                                "type": "object",
                                                "properties": {
                                                    "spatial_extent": {
                                                        "type": "string",
                                                        "const": "spatial_extent",
                                                    },
                                                    "data_analysis_results": {
                                                        "type": "string",
                                                        "const":
                                                            "analyse/data_analysis_results",
                                                    },
                                                },
                                                "required": [
                                                    "spatial_extent",
                                                    "data_analysis_results"
                                                ],
                                            },
                                            "out": {
                                                "type": "array",
                                                "items": {
                                                    "type": "string",
                                                    "const": "process_results",
                                                },
                                            },
                                        },
                                        "required": ["run", "in", "out"],
                                        "additionalProperties": True,
                                    },
                                },
                                "required": ["analyse", "process"],
                                "minProperties": 2,
                            }
                        },
                        "required": ["class", "id", "inputs", "outputs", "steps"],
                        "additionalProperties": True,
                    },
                    {
                        "type": "object",
                        "properties": {
                            "class": {
                                "type": "string",
                                "const": "CommandLineTool",
                            },
                            "id": {
                                "type": "string",
                                "const": "analyse",
                            },
                            "inputs": {
                                "type": "object",
                                "properties": {
                                    "spatial_extent": {
                                        "type": "object",
                                        "properties": {
                                            "type": {
                                                "type": "string",
                                                "const": "string[]",
                                            }
                                        }
                                    },
                                },
                                "required": ["spatial_extent"],
                                "additionalProperties": True,
                            },
                            "outputs": {
                                "type": "object",
                                "properties": {
                                    "data_analysis_results": {
                                        "type": "object",
                                        "properties": {
                                            "type": {
                                                "type": "string",
                                                "const": "Directory",
                                            },
                                            "outputBinding": {
                                                "type": "object",
                                                "properties": {
                                                    "glob": {
                                                        "type": "string",
                                                        "const": ".",
                                                    }
                                                },
                                                "required": ["glob"],
                                            }
                                        },
                                        "required": ["type", "outputBinding"],
                                    }
                                },
                                "required": ["data_analysis_results"],
                                "additionalProperties": False,
                            },
                            "requirements": {
                                "type": "object",
                                "properties": {
                                    "ResourceRequirement": {
                                        "type": "object",
                                        "properties": {
                                            "coresMax": {
                                                "type": "number",
                                            },
                                            "ramMax": {
                                                "type": "number",
                                            }
                                        },
                                        "required": ["coresMax", "ramMax"],
                                    }
                                },
                                "required": ["ResourceRequirement"],
                                "additionalProperties": True,
                            },
                            "baseCommand": {
                                "type": "string",
                                "const": "python",
                            },
                            "hints": {
                                "type": "object",
                                "properties": {
                                    "DockerRequirement": {
                                        "type": "object",
                                        "properties": {
                                            "dockerPull": {
                                                "type": "string",
                                                "description":
                                                    "Docker image to use "
                                                    "for analyse execution",
                                            }
                                        },
                                        "required": ["dockerPull"],
                                    }
                                },
                                "required": ["DockerRequirement"],
                                "additionalProperties": True
                            },
                            "arguments": {
                                "type": "array",
                                "items": {
                                    "type": "string",
                                    "uniqueItems": True,
                                    "contains": [
                                        {"const": "--spatial_extent"},
                                        {"const": "$(inputs.spatial_extent[0])"},
                                        {"const": "$(inputs.spatial_extent[1])"},
                                        {"const": "$(inputs.spatial_extent[2])"},
                                        {"const": "$(inputs.spatial_extent[3])"},
                                    ],
                                    "minItems": 5,
                                }
                            }
                        },
                        "required": [
                            "class",
                            "id",
                            "inputs",
                            "outputs",
                            "requirements",
                            "baseCommand",
                            "hints",
                            "arguments"
                        ],
                        "additionalProperties": True,
                    },
                    {
                        "type": "object",
                        "properties": {
                            "class": {
                                "type": "string",
                                "const": "CommandLineTool",
                            },
                            "id": {
                                "type": "string",
                                "const": "process",
                            },
                            "inputs": {
                                "type": "object",
                                "properties": {
                                    "spatial_extent": {
                                        "type": "object",
                                        "properties": {
                                            "type": {
                                                "type": "string",
                                                "const": "string[]",
                                            }
                                        }
                                    },
                                    "data_analysis_results": {
                                        "type": "object",
                                        "properties": {
                                            "type": {
                                                "type": "string",
                                                "const": "Directory",
                                            }
                                        }
                                    }
                                },
                                "required": [
                                    "spatial_extent",
                                    "data_analysis_results"
                                ],
                            },
                            "outputs": {
                                "type": "object",
                                "properties": {
                                    "process_results": {
                                        "type": "object",
                                        "properties": {
                                            "type": {
                                                "type": "string",
                                                "const": "Directory",
                                            },
                                            "outputBinding": {
                                                "type": "object",
                                                "properties": {
                                                    "glob": {
                                                        "type": "string",
                                                        "const": ".",
                                                    }
                                                }
                                            },
                                        }
                                    }
                                }
                            },
                            "requirements": {
                                "type": "object",
                                "properties": {
                                    "ResourceRequirement": {
                                        "type": "object",
                                        "properties": {
                                            "coresMax": {
                                                "type": "number",
                                            },
                                            "ramMax": {
                                                "type": "number",
                                            }
                                        },
                                        "required": ["coresMax", "ramMax"],
                                    }
                                },
                                "required": ["ResourceRequirement"],
                                "additionalProperties": True,
                            },
                            "baseCommand": {
                                "type": "string",
                                "const": "python",
                            },
                            "hints": {
                                "type": "object",
                                "properties": {
                                    "DockerRequirement": {
                                        "type": "object",
                                        "properties": {
                                            "dockerPull": {
                                                "type": "string",
                                                "description":
                                                    "Docker image to use "
                                                    "for process execution",
                                            }
                                        },
                                        "required": ["dockerPull"],
                                    }
                                },
                                "required": ["DockerRequirement"],
                                "additionalProperties": True
                            },
                            "arguments": {
                                "type": "array",
                                "items": {
                                    "type": "string",
                                    "uniqueItems": True,
                                    "contains": [
                                        {"const": "--spatial_extent"},
                                        {"const": "$(inputs.spatial_extent[0])"},
                                        {"const": "$(inputs.spatial_extent[1])"},
                                        {"const": "$(inputs.spatial_extent[2])"},
                                        {"const": "$(inputs.spatial_extent[3])"},
                                    ],
                                    "minItems": 5,
                                }
                            }
                        },
                        "required": [
                            "class",
                            "id",
                            "inputs",
                            "outputs",
                            "baseCommand",
                            "requirements",
                            "hints",
                            "arguments"
                        ],
                        "additionalProperties": True,
                    },
                    {
                        "type": "object",
                        "properties": {
                            "class": {
                                "type": "string",
                                "const": "CommandLineTool",
                            },
                            "id": {
                                "type": "string",
                            },
                            "inputs": {
                                "type": "object",
                            },
                            "outputs": {
                                "type": "object",
                            },
                            "requirements": {
                                "type": "object",
                                "properties": {
                                    "ResourceRequirement": {
                                        "type": "object",
                                        "properties": {
                                            "coresMax": {
                                                "type": "number",
                                            },
                                            "ramMax": {
                                                "type": "number",
                                            }
                                        },
                                        "required": ["coresMax", "ramMax"],
                                    }
                                },
                                "required": ["ResourceRequirement"],
                                "additionalProperties": True,
                            },
                            "baseCommand": {
                                "type": "string",
                                "const": "python",
                            },
                            "hints": {
                                "type": "object",
                                "properties": {
                                    "DockerRequirement": {
                                        "type": "object",
                                        "properties": {
                                            "dockerPull": {
                                                "type": "string",
                                                "description":
                                                    "Docker image to use "
                                                    "for process execution",
                                            }
                                        },
                                        "required": ["dockerPull"],
                                    }
                                },
                                "required": ["DockerRequirement"],
                                "additionalProperties": True
                            },
                        },
                        "required": [
                            "class",
                            "id",
                            "inputs",
                            "outputs",
                            "baseCommand",
                            "requirements",
                            "hints"
                        ],
                        "additionalProperties": True,
                    }
                ]
            },
            "minItems": 3
        }
    }
}

The validation will enforce the following rules:

Top-level attributes
  • the cwl must have the cwlVersion top-level attribute
  • the cwl must have the $namespaces top-level attribute
  • the cwl must have the s:softwareVersion top-level attribute
  • the cwl must have the $graph top-level attribute
Workflow graph-level attributes

Graphs must contain 1 element with class: Workflow with following required properties: - id attribute - inputs section with at least spatial_extent input param - outputs section with only execution_results folder as outputs element - steps section with at least analyse and process steps

CommandLineTool graph-level attributes

Graphs must contain at least 2 elements with class: CommandLineTool (analyse and process) with the following required properties: - id attribute - inputs section with at least required inputs params mentioned in the above cwl - outputs section with at least the outputs params mentioned in the above cwl - baseCommand to execute - arguments inputs for the step execution - requirements section where add step execution limits - hints section where add docker image specifications

CWL validation for generic process

{
    "type": "object",
    "properties": {
        "cwlVersion": {
            "type": "string",
            "const": "v1.2",
        },
        "$namespaces": {
            "type": "object",
            "properties": {
                "s": {
                    "type": "string",
                    "const": "https://schema.org/",
                },
            },
        },
        "s:softwareVersion": {
            "type": "string",
            "const": "0.1.2",
        },
        "schemas": {
            "type": "array",
            "items": {
                "type": "string",
                "format": "uri",
                "const": "http://schema.org/version/9.0/schemaorg-current-http.rdf",
            },
        },
        "$graph": {
            "type": "array",
            "items": {
                "anyOf": [
                    {
                        "type": "object",
                        "properties": {
                            "class": {
                                "type": "string",
                                "const": "Workflow",
                            },
                            "id": {
                                "type": "string",
                                "description":
                                    "The unique identifier for the workflow.",
                                "pattern": r"^[a-z0-9]([-a-z0-9]*[a-z0-9])?$",
                            },
                            "inputs": {
                                "type": "object",
                                "additionalProperties": True,
                            },
                            "outputs": {
                                "type": "object",
                                "properties": {
                                    "execution_results": {
                                        "type": "object",
                                        "properties": {
                                            "type": {
                                                "type": "string",
                                                "const": "Directory",
                                            },
                                            "outputSource": {
                                                "type": "array",
                                                "items": {
                                                    "type": "string",
                                                    "const": "process/process_results",
                                                },
                                                "minItems": 1,
                                            },
                                            "required": ["type", "outputSource"],
                                            "additionalProperties": False,
                                            "minProperties": 2,
                                        }
                                    },
                                },
                                "required": ["execution_results"],
                                "additionalProperties": False,
                                "minProperties": 1,
                            },
                            "steps": {
                                "type": "object",
                                "additionalProperties": True,
                                "properties": {
                                    "process": {
                                        "type": "object",
                                        "properties": {
                                            "run": {
                                                "type": "string",
                                                "const": "#process",
                                            },
                                            "in": {
                                                "type": "object",
                                                "additionalProperties": True,
                                            },
                                            "out": {
                                                "type": "array",
                                                "items": {
                                                    "type": "string",
                                                    "const": "process_results",
                                                },
                                                "minItems": 1,
                                            },
                                        },
                                        "required": ["run", "out"],
                                        "additionalProperties": True,
                                    },
                                },
                                "required": ["process"],
                                "minProperties": 1,
                            }
                        },
                        "required": ["class", "id", "inputs", "outputs", "steps"],
                        "additionalProperties": True,
                    },
                    {
                        "type": "object",
                        "properties": {
                            "class": {
                                "type": "string",
                                "const": "CommandLineTool",
                            },
                            "id": {
                                "type": "string",
                                "const": "process",
                            },
                            "inputs": {
                                "type": "object",
                                "additionalProperties": True,
                            },
                            "outputs": {
                                "type": "object",
                                "properties": {
                                    "process_results": {
                                        "type": "object",
                                        "properties": {
                                            "type": {
                                                "type": "string",
                                                "const": "Directory",
                                            },
                                            "outputBinding": {
                                                "type": "object",
                                                "properties": {
                                                    "glob": {
                                                        "type": "string",
                                                        "const": ".",
                                                    }
                                                },
                                                "required": ["glob"],
                                            },
                                        },
                                        "required": [
                                            "type",
                                            "outputBinding"
                                        ],
                                    }
                                },
                                "minProperties": 1,
                                "additionalProperties": False,
                                "required": [
                                    "process_results"
                                ],
                            },
                            "requirements": {
                                "type": "object",
                                "properties": {
                                    "ResourceRequirement": {
                                        "type": "object",
                                        "properties": {
                                            "coresMax": {
                                                "type": "number",
                                            },
                                            "ramMax": {
                                                "type": "number",
                                            }
                                        },
                                        "required": ["coresMax", "ramMax"],
                                    }
                                },
                                "required": ["ResourceRequirement"],
                                "additionalProperties": True,
                            },
                            "baseCommand": {
                                "type": "string",
                            },
                            "hints": {
                                "type": "object",
                                "properties": {
                                    "DockerRequirement": {
                                        "type": "object",
                                        "properties": {
                                            "dockerPull": {
                                                "type": "string",
                                                "description":
                                                    "Docker image to use "
                                                    "for process execution",
                                            }
                                        },
                                        "required": ["dockerPull"],
                                    }
                                },
                                "required": ["DockerRequirement"],
                                "additionalProperties": True
                            },
                            "arguments": {
                                "type": "array",
                                "items": {
                                    "type": "string",
                                    "uniqueItems": True,
                                }
                            }
                        },
                        "required": [
                            "class",
                            "id",
                            "outputs",
                            "baseCommand",
                            "requirements",
                            "hints",
                            "arguments"
                        ],
                        "additionalProperties": True,
                    }
                ]
            }
        }
    }
}

The validation will enforce the following rules:

Top-level attributes
  • the cwl must have the cwlVersion top-level attribute
  • the cwl must have the $namespaces top-level attribute
  • the cwl must have the s:softwareVersion top-level attribute
  • the cwl must have the $graph top-level attribute
Workflow graph-level attributes

Graphs must contain 1 element with class: Workflow with following required properties: - id attribute - outputs section with only execution_results folder as outputs element - steps section with at least process step

CommandLineTool graph-level attributes

Graphs must contain at least 1 element with class: CommandLineTool called process with the following required properties: - id attribute - outputs section with at least the outputs params mentioned in the above cwl - baseCommand to execute - arguments inputs for the step execution - requirements section where add step execution limits - hints section where add docker image specifications


About data availability analysis

The idea under this current step is checking if there are data to processed later in the computational part of the workflow. In general, thematic services shall implement this analysis on their own, being totally responsible for assessing if data processing can be performed. At the current point, data analysis phase can exit with:

  • a success status. This condition is mandatory for making flow continuing and thus running the real computation; after this step, a validation is performed to assert all the required files have been produced. If validation fails, the process gets marked as failed, but not being recoverable.
  • a fail status. This condition is mandatory for making flow stopping and subjected to retry/failure handling as per process registration indications.
  • a fail for tasking status. This condition is mandatory to trigger the tasking phase by HUB and resuming the whole process again once the data has been retrieved inside the HUB.

In every case, the data analysis phase shall be finish gracefully.

Data availability analysis needs to be lightweight and designed to fail-fast: this condition implies make it fail as soon as possible to move to next step (triggering the tasking, or retrying/handling failure). At the same time, data availability analysis shall be as verbose as possible and self-explaining. Both in the tasking scenario and in failing one, it's mandatory to have a step that is capable of include all the relevant information for the next step:

  • for tasking, we require all those parameters/inputs/values to be sent along with the tasking request to HSC to be saved to a file named tasking.json, whose content shall be JSON-serialized.
  • for failed, HUB requires to add meaningful data to fail_completely.fail file to be reported later and presented to end-user.

In terms of diagrams:

::: mermaid graph LR; Start(Start) --> DataAnalysis[Trigger data availability analysis]; DataAnalysis --> StatusCheck{Is analysis failed?} StatusCheck -->|Yes|TaskingRequired{Tasking required?}

StatusCheck -->|No| Exit TaskingRequired --> |Yes| TriggerTasking[Trigger Tasking] TriggerTasking --> RunComputation TaskingRequired --> |No|Exit(Exit) RunComputation --> Exit :::

And here below there some scenario any thematic service can encounter while running the data analysis check, in terms of

  • cause of the error
  • the kind of error (between recoverable and not recoverable)
  • the next logical step to be performed
  • notes (nature of the problem)
  • the status code that the process will return to in order to let HUB handle the case.
  • the mandatory file placed in the output folder in order to let HUB handle the case.
Operation Status Next Step Note Status code of operation Mandatory output file Log file sample content
Data analysis failed for any bug in the code Non recoverable End Since there's the bug, it's not possible to run the analysis 0 fail_completely.fail {"details": "....."}
Data analysis failed for not working containerized service Non recoverable End Since there's the bug, it's not possible to run the analysis. 0 fail_completely.fail {"details": "....."}
Data analysis failed for no data availability in the catalog (tasking needed) Recoverable Triggering Since there's no data available, it is still possible to trigger the tasking operation 0 tasking.json {"details": ".....", "tasking_parameters": "..."}
Data analysis failed for no data availability in the catalog (no data available Non Recoverable End Since there's no data available, it is not requested to task for them 0 fail_completely.fail {"details": "....."}
Data analysis failed for not usable data in the catalog Recoverable End Since there's no data available for the current processor, it is still possible to warn catalog owner to inform about the issue 0 fail_completely.fail {"details": "....."}

Note: the exact structure of JSON serialized payload shall be defined to reflect all the to-be-designed interfaces for performing interaction with external services.


About processing outputs

Each running algorithm contributed from Services shall end up with below final folder structure:

├── execution_process.log
├── assets/
│   ├── <collection_id>/
│   │   ├── <item_id>/
│   │   │   ├── <asset_1>.tif
│   │   │   ├── <asset_2>.jpg
├── items/
│   ├── <collection_id>/
│   │   ├── collection.json
│   │   ├── <item_id>/
│   │   │   ├── <item>.json
  • creating a log files
  • creating a structure assets// that contains all the physical files (related to the item_id) to be later referenced (raster data, vector data, other resources..);
  • creating a structure items// that contains the json serialized STAC item;
  • creating a file named collection.json inside items/.

If the process generate more than 1 item, the process step must create the same structure as above, with an folder for each generated item:

├── execution_process.log
├── assets/
│   ├── <collection_id>/
│   │   ├── <item_id_a>/
│   │   │   ├── <asset_a_1>.tif
│   │   │   ├── <asset_a_2>.jpg
│   │   ├── <item_id_b>/
│   │   │   ├── <asset_b_1>.tif
│   │   │   ├── <asset_b_2>.jpg
├── items/
│   ├── <collection_id>/
│   │   ├── collection.json
│   │   ├── <item_id_a>/
│   │   │   ├── <item_a>.json
│   │   ├── <item_id_b>/
│   │   │   ├── <item_b>.json

As a diagram:

::: mermaid

graph LR; subgraph Processing by Thematic services A[Algorithm] --> |creates and fill|B[Assets Folder] A --> |creates and fill|I[Items Folder] A --> |creates and fill|C[Log file] end

:::

Reference to physical assets in STAC items shall be local (aka: assets/XCF/myCoolAsset.tif)

Staging out data and indexing inside HUB catalog

Once products have been generated and saved to filesystem, STAC items get mangled to fix reference to HUB catalog, assets moved to the final storage, and data indexed. If everything succeeds, the entire flow succeeds. Actually, before performing this staging out phase, a validation process runs to ensure produced data comply with all the requirements HUB expect. If validation or any staging out fails, it gets marked in a non-recoverable state and not retried.

CWL Step Resource Requests

Each CWL step can request computing resources—such as RAM, CPU, and disk space—from the cluster. These values are specified in the step’s ResourceRequirement section:

requirements:
  ResourceRequirement:
    ramMax: 64000      # Maximum RAM (in MB)
    coresMax: 10       # Maximum number of CPU cores
    outdirMax: 250000  # Maximum disk space for the output directory (in MB)

Adjust the values in consideration of the expected usage.

GPU access with CUDA

To allow a Docker image to use a GPU in a CUDA-enabled environment, it must be based on an NVIDIA CUDA image that includes the required CUDA libraries and drivers.

Start your Dockerfile with:

#In the beggining of the file add the base image
FROM nvidia/cuda:12.2.0-runtime-ubuntu22.04

This ensures that the container has access to the NVIDIA drivers and CUDA runtime needed for GPU-based computations.

When running a CWL step on an EKS cluster (or any Kubernetes environment) with NVIDIA GPUs, you must define a cwltool:CUDARequirement in the hints section. This informs the CWL runner about the GPU and CUDA constraints.

- class: CommandLineTool
    id: step_gpu
    baseCommand: python
    arguments:
      - /app/run.py
      - --spatial_extent
      - $(inputs.spatial_extent[0])
      - $(inputs.spatial_extent[1])
      - $(inputs.spatial_extent[2])
      - $(inputs.spatial_extent[3])
    requirements:
      ResourceRequirement:
        coresMax: 1
        ramMax: 512
    hints:
      DockerRequirement:
        dockerPull: dockeruser/reference_implementation_gpu
      ResourceRequirement:
        ramMax: 128
        coresMax: 1
      cwltool:CUDARequirement:
        cudaVersionMin: "12.2"
        cudaComputeCapability: "7.5"
        cudaDeviceCountMin: 1
        cudaDeviceCountMax: 1
    inputs:
      spatial_extent:
        type:
          type: array
          items: string
    outputs:
      execution_results:
        type: Directory
        outputBinding:
          glob: .

GPU compatibility table: | GPU Model | Architecture | Compute Capability | Min CUDA Version | Tested CUDA Version | CWL CUDARequirement Example | | ------------- | ---------------- | ---------------------- | -------------------- | ----------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | | NVIDIA T4 | Turing | 7.5 | 10.0 | 12.2 | yaml
- cwltool:CUDARequirement:
cudaVersionMin: "12.2"
cudaComputeCapability: "7.5"
cudaDeviceCountMin: 1
cudaDeviceCountMax: 1
|

Access to vault secrets

Each thematic service will have access to their dedicated kv2 secret path, and can add more secrets using the Vault UI. All user defined and dedicated secrets will be injected as environment variables in the pods generated by the CWL steps.

The dedicated secrets for each thematic service namely are:

  • AWS Access/Secret keys for kubernetes containerized services (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY): Keys to access to the dedicated S3 bucket for user's historical data
  • AWS_REGION
  • S3_BUCKET_NAME: Dedicated S3 bucket for user's historical data
  • CATALOG_URL: Catalog URL for data requests
  • Catalog credentials to authenticate and receive data:
  • CATALOG_CLIENT_ID
  • CATALOG_CLIENT_SECRET
  • S3_ENDPOINT_URL

The complete time diagram

The diagram below depicts the whole flow and include thematic services contributions as well as the HUB key components:

::: mermaid sequenceDiagram autonumber participant Cron as "Cron Scheduler" participant Workflow as "Workflow" participant Step1 as "Step 1: Initial Task" participant LogReader as "Log Reader" participant TaskTrigger as "Tasking Trigger" participant Step2 as "Step 2: Next Task" participant Validator as "Validation Step" participant Mangler as "Mangle Files" participant Indexer as "Indexing Step" participant Done as "Mark as Done"

Cron->>Workflow: Trigger Workflow (on cron)
Workflow->>Step1: Run Step 1
alt Step 1 fails (general)
    loop Retry Step 1 (3 times)
        Workflow->>Step1: Retry
        alt Step 1 fails due to no data
            Step1-->>Workflow: No Data Error
            Workflow->>LogReader: Read Logs
            LogReader->>TaskTrigger: Trigger Tasking
            TaskTrigger-->>Workflow: Tasking Triggered
            Workflow->>Step1: Retry Step 1
        end
    end
end
alt Step 1 succeeds
    Step1-->>Workflow: Success
    Workflow->>Step2: Run Step 2
    alt Step 2 fails
        Step2-->>Workflow: Failure
        Workflow->>Workflow: Restart Whole Chain
        Workflow->>Step1: Rerun Step 1
    else Step 2 succeeds
        Step2-->>Validator: Run Validation
        alt Validation fails
            Validator-->>Workflow: Exit Workflow
        else Validation succeeds
            Validator-->>Mangler: Mangle Files
            Mangler-->>Indexer: Run Indexing
            alt Indexing fails
                Indexer-->>Workflow: Exit Workflow
            else Indexing succeeds
                Indexer-->>Done: Mark as Done
            end
        end
    end
end

:::

The diagram above provides the general overview of the entire process: from running the processor via the cron component (aka: systematic process) to mark the process as done. In general, multiple components are involved in such process and not all of them shall be considered thematic services responsibility: for example validating output produced by processors is HUB responsibility and it is performed on thematic service's behalf that in the end will be notified about the result. At the same time, internal operations are foreseen to avoid temporary issues with some components: retries are a very simple yet effective strategy to overcome temporary connectivity issue and shall be kept in mind while designing processes in a distributed architecture like the HUB.


Other practical implications

  • Logs can be produced, but in general, logging to a dedicated log file per step is preferred. This approach simplifies log collection by avoiding the need to read from multiple sources.
  • Thematic services are responsible for building and running their own Docker images, which must be hosted in a Docker registry accessible by the HUB.
  • Thematic services may utilize ancillary components deployed on the HUB, but these components must be fully managed by the services themselves. More on this specific topic will be provided.

Getting help

Hub is aware of the fact that grasping all the aforementioned concepts require a very relevant human effort; at the same time it is eager to collect feedbacks and general comments as well as questions and clarifications for getting all the contributions up and running inside the HUB.