Skip to content

clinops.orchestrate

clinops.orchestrate.store.GCSPipelineStore

GCSPipelineStore(
    bucket,
    prefix="",
    format=StorageFormat.PARQUET,
    credentials_path=None,
)

Upload and download clinops pipeline artifacts to Google Cloud Storage.

Requires pip install clinops[gcp].

Parameters:

Name Type Description Default
bucket str

GCS bucket name (without gs:// prefix).

required
prefix str

Optional path prefix applied to all artifact names. Use to namespace artifacts by pipeline or environment, e.g. "clinops/prod/v2".

''
format StorageFormat

Serialization format. Default StorageFormat.PARQUET.

PARQUET
credentials_path str | None

Path to a service account JSON key file. If None, falls back to Application Default Credentials (GOOGLE_APPLICATION_CREDENTIALS env var or gcloud CLI credentials).

None

Examples:

>>> store = GCSPipelineStore("my-clinical-bucket", prefix="clinops/prod")
>>> store.upload(windows_df, "features/windows_2024_01")
'gs://my-clinical-bucket/clinops/prod/features/windows_2024_01.parquet'
>>> df = store.download("features/windows_2024_01")
Source code in clinops/orchestrate/store.py
def __init__(
    self,
    bucket: str,
    prefix: str = "",
    format: StorageFormat = StorageFormat.PARQUET,
    credentials_path: str | None = None,
) -> None:
    self.bucket = bucket
    self.prefix = prefix.rstrip("/")
    self.format = format
    self.credentials_path = credentials_path
    self._client: Any = None

upload

upload(df, name)

Serialize df and upload it to GCS.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to upload.

required
name str

Artifact name (without extension). Forward slashes create virtual directories, e.g. "features/windows_2024_01".

required

Returns:

Type Description
str

Full GCS URI of the uploaded object (gs://bucket/path).

Source code in clinops/orchestrate/store.py
def upload(self, df: pd.DataFrame, name: str) -> str:
    """
    Serialize ``df`` and upload it to GCS.

    Parameters
    ----------
    df:
        DataFrame to upload.
    name:
        Artifact name (without extension). Forward slashes create
        virtual directories, e.g. ``"features/windows_2024_01"``.

    Returns
    -------
    str
        Full GCS URI of the uploaded object (``gs://bucket/path``).
    """
    client = self._get_client()
    blob_path = self._blob_path(name)
    buffer = self._serialize(df)

    bucket_obj = client.bucket(self.bucket)
    blob = bucket_obj.blob(blob_path)
    blob.upload_from_file(buffer, content_type=self._content_type())

    uri = f"gs://{self.bucket}/{blob_path}"
    logger.info(f"GCSPipelineStore: uploaded {len(df):,} rows → {uri}")
    return uri

download

download(name)

Download and deserialize a DataFrame artifact from GCS.

Parameters:

Name Type Description Default
name str

Artifact name (without extension), matching what was passed to upload().

required

Returns:

Type Description
DataFrame
Source code in clinops/orchestrate/store.py
def download(self, name: str) -> pd.DataFrame:
    """
    Download and deserialize a DataFrame artifact from GCS.

    Parameters
    ----------
    name:
        Artifact name (without extension), matching what was passed to
        ``upload()``.

    Returns
    -------
    pd.DataFrame
    """
    client = self._get_client()
    blob_path = self._blob_path(name)

    bucket_obj = client.bucket(self.bucket)
    blob = bucket_obj.blob(blob_path)
    data = blob.download_as_bytes()

    df = self._deserialize(data)
    logger.info(
        f"GCSPipelineStore: downloaded {len(df):,} rows ← gs://{self.bucket}/{blob_path}"
    )
    return df

list_artifacts

list_artifacts(prefix='')

List artifact names stored under an optional sub-prefix.

Parameters:

Name Type Description Default
prefix str

Sub-prefix to filter by (relative to the store's base prefix).

''

Returns:

Type Description
list[str]

Artifact names (without file extension), relative to the store's base prefix.

Source code in clinops/orchestrate/store.py
def list_artifacts(self, prefix: str = "") -> list[str]:
    """
    List artifact names stored under an optional sub-prefix.

    Parameters
    ----------
    prefix:
        Sub-prefix to filter by (relative to the store's base prefix).

    Returns
    -------
    list[str]
        Artifact names (without file extension), relative to the store's
        base prefix.
    """
    client = self._get_client()
    base = f"{self.prefix}/{prefix}".strip("/")
    blobs = client.list_blobs(self.bucket, prefix=base)
    ext = f".{self.format}"
    names = []
    for blob in blobs:
        name = blob.name
        if name.endswith(ext):
            rel = name[len(self.prefix) :].lstrip("/").removesuffix(ext)
            names.append(rel)
    return sorted(names)

clinops.orchestrate.store.S3PipelineStore

S3PipelineStore(
    bucket,
    prefix="",
    format=StorageFormat.PARQUET,
    region="us-east-1",
    profile=None,
)

Upload and download clinops pipeline artifacts to Amazon S3.

Requires pip install clinops[aws].

Parameters:

Name Type Description Default
bucket str

S3 bucket name (without s3:// prefix).

required
prefix str

Optional key prefix applied to all artifact names.

''
format StorageFormat

Serialization format. Default StorageFormat.PARQUET.

PARQUET
region str

AWS region name. Default "us-east-1".

'us-east-1'
profile str | None

AWS credentials profile name. If None, uses the default credential chain (env vars, instance profile, ~/.aws/credentials).

None

Examples:

>>> store = S3PipelineStore("my-clinical-bucket", prefix="clinops/prod")
>>> store.upload(windows_df, "features/windows_2024_01")
's3://my-clinical-bucket/clinops/prod/features/windows_2024_01.parquet'
>>> df = store.download("features/windows_2024_01")
Source code in clinops/orchestrate/store.py
def __init__(
    self,
    bucket: str,
    prefix: str = "",
    format: StorageFormat = StorageFormat.PARQUET,
    region: str = "us-east-1",
    profile: str | None = None,
) -> None:
    self.bucket = bucket
    self.prefix = prefix.rstrip("/")
    self.format = format
    self.region = region
    self.profile = profile
    self._client: Any = None

upload

upload(df, name)

Serialize df and upload it to S3.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to upload.

required
name str

Artifact name (without extension).

required

Returns:

Type Description
str

Full S3 URI of the uploaded object (s3://bucket/key).

Source code in clinops/orchestrate/store.py
def upload(self, df: pd.DataFrame, name: str) -> str:
    """
    Serialize ``df`` and upload it to S3.

    Parameters
    ----------
    df:
        DataFrame to upload.
    name:
        Artifact name (without extension).

    Returns
    -------
    str
        Full S3 URI of the uploaded object (``s3://bucket/key``).
    """
    client = self._get_client()
    key = self._key(name)
    buffer = self._serialize(df)

    client.upload_fileobj(buffer, self.bucket, key)
    uri = f"s3://{self.bucket}/{key}"
    logger.info(f"S3PipelineStore: uploaded {len(df):,} rows → {uri}")
    return uri

download

download(name)

Download and deserialize a DataFrame artifact from S3.

Parameters:

Name Type Description Default
name str

Artifact name (without extension).

required

Returns:

Type Description
DataFrame
Source code in clinops/orchestrate/store.py
def download(self, name: str) -> pd.DataFrame:
    """
    Download and deserialize a DataFrame artifact from S3.

    Parameters
    ----------
    name:
        Artifact name (without extension).

    Returns
    -------
    pd.DataFrame
    """
    client = self._get_client()
    key = self._key(name)

    buf = io.BytesIO()
    client.download_fileobj(self.bucket, key, buf)
    buf.seek(0)

    df = self._deserialize(buf.read())
    logger.info(f"S3PipelineStore: downloaded {len(df):,} rows ← s3://{self.bucket}/{key}")
    return df

list_artifacts

list_artifacts(prefix='')

List artifact names stored under an optional sub-prefix.

Parameters:

Name Type Description Default
prefix str

Sub-prefix to filter by (relative to the store's base prefix).

''

Returns:

Type Description
list[str]

Artifact names (without file extension).

Source code in clinops/orchestrate/store.py
def list_artifacts(self, prefix: str = "") -> list[str]:
    """
    List artifact names stored under an optional sub-prefix.

    Parameters
    ----------
    prefix:
        Sub-prefix to filter by (relative to the store's base prefix).

    Returns
    -------
    list[str]
        Artifact names (without file extension).
    """
    client = self._get_client()
    base = f"{self.prefix}/{prefix}".strip("/")
    ext = f".{self.format}"
    paginator = client.get_paginator("list_objects_v2")
    names = []
    for page in paginator.paginate(Bucket=self.bucket, Prefix=base):
        for obj in page.get("Contents", []):
            key = obj["Key"]
            if key.endswith(ext):
                rel = key[len(self.prefix) :].lstrip("/").removesuffix(ext)
                names.append(rel)
    return sorted(names)

clinops.orchestrate.store.StorageFormat

Bases: StrEnum

Serialization format for pipeline artifacts.

clinops.orchestrate.stepfunctions.StepFunctionsPipeline

StepFunctionsPipeline(
    name,
    role_arn,
    region="us-east-1",
    comment="",
    profile=None,
)

Build, deploy, and execute an AWS Step Functions state machine.

Constructs a sequential pipeline from :class:PipelineStep objects. The generated state machine runs steps in the order they were added, passing each step's output as the input to the next.

Requires pip install clinops[aws].

Parameters:

Name Type Description Default
name str

State machine name. Used as the AWS resource name.

required
role_arn str

ARN of the IAM role that Step Functions assumes to invoke each step's resource (Lambda, ECS, etc.).

required
region str

AWS region. Default "us-east-1".

'us-east-1'
comment str

Human-readable description embedded in the state machine definition.

''
profile str | None

AWS credentials profile name. If None, uses the default credential chain.

None

Examples:

>>> pipeline = StepFunctionsPipeline(
...     name="clinops-daily-ingest",
...     role_arn="arn:aws:iam::123456789:role/StepFunctionsRole",
... )
>>> pipeline.add_step(PipelineStep(
...     name="Ingest",
...     resource="arn:aws:lambda:us-east-1:123456789:function:clinops-ingest",
... ))
>>> pipeline.add_step(PipelineStep(
...     name="Preprocess",
...     resource="arn:aws:lambda:us-east-1:123456789:function:clinops-preprocess",
... ))
>>> print(pipeline.definition_json())   # inspect before deploying
>>> arn = pipeline.deploy()
>>> execution_arn = pipeline.execute({"date": "2024-01-01"})
Source code in clinops/orchestrate/stepfunctions.py
def __init__(
    self,
    name: str,
    role_arn: str,
    region: str = "us-east-1",
    comment: str = "",
    profile: str | None = None,
) -> None:
    self.name = name
    self.role_arn = role_arn
    self.region = region
    self.comment = comment
    self.profile = profile
    self._steps: list[PipelineStep] = []
    self._client: Any = None

add_step

add_step(step)

Append a step to the pipeline.

Parameters:

Name Type Description Default
step PipelineStep

Step to add. Step names must be unique within the pipeline.

required

Returns:

Type Description
StepFunctionsPipeline

Self, for method chaining.

Raises:

Type Description
ValueError

If a step with the same name already exists.

Source code in clinops/orchestrate/stepfunctions.py
def add_step(self, step: PipelineStep) -> StepFunctionsPipeline:
    """
    Append a step to the pipeline.

    Parameters
    ----------
    step:
        Step to add. Step names must be unique within the pipeline.

    Returns
    -------
    StepFunctionsPipeline
        Self, for method chaining.

    Raises
    ------
    ValueError
        If a step with the same name already exists.
    """
    existing = {s.name for s in self._steps}
    if step.name in existing:
        raise ValueError(f"A step named '{step.name}' already exists in this pipeline")
    self._steps.append(step)
    return self

definition

definition()

Build the Amazon States Language (ASL) state machine definition.

Returns:

Type Description
dict[str, Any]

ASL definition ready to pass to the Step Functions API or serialize to JSON.

Raises:

Type Description
ValueError

If no steps have been added.

Source code in clinops/orchestrate/stepfunctions.py
def definition(self) -> dict[str, Any]:
    """
    Build the Amazon States Language (ASL) state machine definition.

    Returns
    -------
    dict[str, Any]
        ASL definition ready to pass to the Step Functions API or
        serialize to JSON.

    Raises
    ------
    ValueError
        If no steps have been added.
    """
    if not self._steps:
        raise ValueError("Pipeline has no steps — call add_step() first")

    states: dict[str, Any] = {}
    for i, step in enumerate(self._steps):
        next_state = self._steps[i + 1].name if i + 1 < len(self._steps) else None
        states[step.name] = step.to_asl_state(next_state)

    defn: dict[str, Any] = {
        "Comment": self.comment or f"{self.name} — built by clinops.orchestrate",
        "StartAt": self._steps[0].name,
        "States": states,
    }
    return defn

definition_json

definition_json(indent=2)

Return the ASL definition as a JSON string.

Parameters:

Name Type Description Default
indent int

JSON indentation. Default 2.

2

Returns:

Type Description
str
Source code in clinops/orchestrate/stepfunctions.py
def definition_json(self, indent: int = 2) -> str:
    """
    Return the ASL definition as a JSON string.

    Parameters
    ----------
    indent:
        JSON indentation. Default 2.

    Returns
    -------
    str
    """
    return json.dumps(self.definition(), indent=indent)

deploy

deploy()

Create or update the state machine in AWS.

If a state machine with self.name already exists in the account, its definition is updated in-place. Otherwise a new state machine is created.

Returns:

Type Description
str

ARN of the deployed state machine.

Raises:

Type Description
ImportError

If boto3 is not installed.

Source code in clinops/orchestrate/stepfunctions.py
def deploy(self) -> str:
    """
    Create or update the state machine in AWS.

    If a state machine with ``self.name`` already exists in the account,
    its definition is updated in-place. Otherwise a new state machine is
    created.

    Returns
    -------
    str
        ARN of the deployed state machine.

    Raises
    ------
    ImportError
        If ``boto3`` is not installed.
    """
    client = self._get_client()
    defn_json = self.definition_json()

    # Check if it already exists
    existing_arn: str | None = self._find_existing_arn(client)

    if existing_arn:
        client.update_state_machine(
            stateMachineArn=existing_arn,
            definition=defn_json,
            roleArn=self.role_arn,
        )
        logger.info(f"StepFunctionsPipeline: updated '{self.name}' → {existing_arn}")
        return existing_arn
    else:
        response = client.create_state_machine(
            name=self.name,
            definition=defn_json,
            roleArn=self.role_arn,
            type="STANDARD",
        )
        arn: str = response["stateMachineArn"]
        logger.info(f"StepFunctionsPipeline: created '{self.name}' → {arn}")
        return arn

execute

execute(
    input_data=None,
    execution_name=None,
    state_machine_arn=None,
)

Start an execution of the deployed state machine.

Parameters:

Name Type Description Default
input_data dict[str, Any] | None

JSON-serializable dict passed as the execution's input.

None
execution_name str | None

Optional execution name. Defaults to a timestamp-based name.

None
state_machine_arn str | None

ARN of the state machine to execute. If None, looks up the ARN by self.name.

None

Returns:

Type Description
str

ARN of the started execution.

Raises:

Type Description
RuntimeError

If the state machine cannot be found and state_machine_arn is not provided.

Source code in clinops/orchestrate/stepfunctions.py
def execute(
    self,
    input_data: dict[str, Any] | None = None,
    execution_name: str | None = None,
    state_machine_arn: str | None = None,
) -> str:
    """
    Start an execution of the deployed state machine.

    Parameters
    ----------
    input_data:
        JSON-serializable dict passed as the execution's input.
    execution_name:
        Optional execution name. Defaults to a timestamp-based name.
    state_machine_arn:
        ARN of the state machine to execute. If None, looks up the
        ARN by ``self.name``.

    Returns
    -------
    str
        ARN of the started execution.

    Raises
    ------
    RuntimeError
        If the state machine cannot be found and ``state_machine_arn``
        is not provided.
    """
    client = self._get_client()

    arn = state_machine_arn or self._find_existing_arn(client)
    if not arn:
        raise RuntimeError(
            f"State machine '{self.name}' not found — call deploy() first "
            "or pass state_machine_arn explicitly"
        )

    exec_name = execution_name or f"{self.name}-{int(time.time())}"
    response = client.start_execution(
        stateMachineArn=arn,
        name=exec_name,
        input=json.dumps(input_data or {}),
    )
    execution_arn: str = response["executionArn"]
    logger.info(f"StepFunctionsPipeline: started execution '{exec_name}' → {execution_arn}")
    return execution_arn

clinops.orchestrate.stepfunctions.PipelineStep dataclass

PipelineStep(
    name,
    resource,
    parameters=dict(),
    retry_attempts=3,
    timeout_seconds=3600,
    comment="",
)

A single task in a Step Functions state machine.

Parameters:

Name Type Description Default
name str

Human-readable step name. Used as the state name in the ASL definition — must be unique within a pipeline.

required
resource str

ARN of the Lambda function, ECS task, or Step Functions activity that executes this step. See AWS docs for supported resource ARN formats.

required
parameters dict[str, Any]

Static input parameters merged into the state's Parameters block. Dynamic input from the previous step's output is also available via $$.Execution.Input or States.JsonMerge.

dict()
retry_attempts int

Number of times to retry on Lambda.ServiceException, Lambda.TooManyRequestsException, or States.TaskFailed. Default 3.

3
timeout_seconds int

Maximum execution time for this step in seconds. Default 3600 (1h).

3600
comment str

Optional human-readable description embedded in the ASL state.

''

Examples:

>>> step = PipelineStep(
...     name="Preprocess",
...     resource="arn:aws:lambda:us-east-1:123456789:function:clinops-preprocess",
...     parameters={"max_null_rate": 0.5},
...     retry_attempts=2,
... )

to_asl_state

to_asl_state(next_state)

Render this step as an Amazon States Language Task state dict.

Parameters:

Name Type Description Default
next_state str | None

Name of the following state, or None if this is the terminal state.

required

Returns:

Type Description
dict[str, Any]

ASL Task state definition.

Source code in clinops/orchestrate/stepfunctions.py
def to_asl_state(self, next_state: str | None) -> dict[str, Any]:
    """
    Render this step as an Amazon States Language Task state dict.

    Parameters
    ----------
    next_state:
        Name of the following state, or None if this is the terminal state.

    Returns
    -------
    dict[str, Any]
        ASL Task state definition.
    """
    state: dict[str, Any] = {
        "Type": "Task",
        "Resource": self.resource,
        "TimeoutSeconds": self.timeout_seconds,
        "Retry": [
            {
                "ErrorEquals": [
                    "Lambda.ServiceException",
                    "Lambda.TooManyRequestsException",
                    "States.TaskFailed",
                ],
                "IntervalSeconds": 2,
                "MaxAttempts": self.retry_attempts,
                "BackoffRate": 2.0,
            }
        ],
    }
    if self.comment:
        state["Comment"] = self.comment
    if self.parameters:
        state["Parameters"] = self.parameters
    if next_state is not None:
        state["Next"] = next_state
    else:
        state["End"] = True
    return state