clinops.orchestrate¶
clinops.orchestrate.store.GCSPipelineStore
¶
Upload and download clinops pipeline artifacts to Google Cloud Storage.
Requires pip install clinops[gcp].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket
|
str
|
GCS bucket name (without |
required |
prefix
|
str
|
Optional path prefix applied to all artifact names. Use to
namespace artifacts by pipeline or environment, e.g.
|
''
|
format
|
StorageFormat
|
Serialization format. Default |
PARQUET
|
credentials_path
|
str | None
|
Path to a service account JSON key file. If None, falls back to
Application Default Credentials ( |
None
|
Examples:
>>> store = GCSPipelineStore("my-clinical-bucket", prefix="clinops/prod")
>>> store.upload(windows_df, "features/windows_2024_01")
'gs://my-clinical-bucket/clinops/prod/features/windows_2024_01.parquet'
Source code in clinops/orchestrate/store.py
upload
¶
Serialize df and upload it to GCS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame to upload. |
required |
name
|
str
|
Artifact name (without extension). Forward slashes create
virtual directories, e.g. |
required |
Returns:
| Type | Description |
|---|---|
str
|
Full GCS URI of the uploaded object ( |
Source code in clinops/orchestrate/store.py
download
¶
Download and deserialize a DataFrame artifact from GCS.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Artifact name (without extension), matching what was passed to
|
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
|
Source code in clinops/orchestrate/store.py
list_artifacts
¶
List artifact names stored under an optional sub-prefix.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
Sub-prefix to filter by (relative to the store's base prefix). |
''
|
Returns:
| Type | Description |
|---|---|
list[str]
|
Artifact names (without file extension), relative to the store's base prefix. |
Source code in clinops/orchestrate/store.py
clinops.orchestrate.store.S3PipelineStore
¶
S3PipelineStore(
bucket,
prefix="",
format=StorageFormat.PARQUET,
region="us-east-1",
profile=None,
)
Upload and download clinops pipeline artifacts to Amazon S3.
Requires pip install clinops[aws].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bucket
|
str
|
S3 bucket name (without |
required |
prefix
|
str
|
Optional key prefix applied to all artifact names. |
''
|
format
|
StorageFormat
|
Serialization format. Default |
PARQUET
|
region
|
str
|
AWS region name. Default |
'us-east-1'
|
profile
|
str | None
|
AWS credentials profile name. If None, uses the default credential
chain (env vars, instance profile, |
None
|
Examples:
>>> store = S3PipelineStore("my-clinical-bucket", prefix="clinops/prod")
>>> store.upload(windows_df, "features/windows_2024_01")
's3://my-clinical-bucket/clinops/prod/features/windows_2024_01.parquet'
Source code in clinops/orchestrate/store.py
upload
¶
Serialize df and upload it to S3.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
DataFrame
|
DataFrame to upload. |
required |
name
|
str
|
Artifact name (without extension). |
required |
Returns:
| Type | Description |
|---|---|
str
|
Full S3 URI of the uploaded object ( |
Source code in clinops/orchestrate/store.py
download
¶
Download and deserialize a DataFrame artifact from S3.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Artifact name (without extension). |
required |
Returns:
| Type | Description |
|---|---|
DataFrame
|
|
Source code in clinops/orchestrate/store.py
list_artifacts
¶
List artifact names stored under an optional sub-prefix.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
Sub-prefix to filter by (relative to the store's base prefix). |
''
|
Returns:
| Type | Description |
|---|---|
list[str]
|
Artifact names (without file extension). |
Source code in clinops/orchestrate/store.py
clinops.orchestrate.store.StorageFormat
¶
Bases: StrEnum
Serialization format for pipeline artifacts.
clinops.orchestrate.stepfunctions.StepFunctionsPipeline
¶
Build, deploy, and execute an AWS Step Functions state machine.
Constructs a sequential pipeline from :class:PipelineStep objects.
The generated state machine runs steps in the order they were added,
passing each step's output as the input to the next.
Requires pip install clinops[aws].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
State machine name. Used as the AWS resource name. |
required |
role_arn
|
str
|
ARN of the IAM role that Step Functions assumes to invoke each step's resource (Lambda, ECS, etc.). |
required |
region
|
str
|
AWS region. Default |
'us-east-1'
|
comment
|
str
|
Human-readable description embedded in the state machine definition. |
''
|
profile
|
str | None
|
AWS credentials profile name. If None, uses the default credential chain. |
None
|
Examples:
>>> pipeline = StepFunctionsPipeline(
... name="clinops-daily-ingest",
... role_arn="arn:aws:iam::123456789:role/StepFunctionsRole",
... )
>>> pipeline.add_step(PipelineStep(
... name="Ingest",
... resource="arn:aws:lambda:us-east-1:123456789:function:clinops-ingest",
... ))
>>> pipeline.add_step(PipelineStep(
... name="Preprocess",
... resource="arn:aws:lambda:us-east-1:123456789:function:clinops-preprocess",
... ))
>>> print(pipeline.definition_json()) # inspect before deploying
>>> arn = pipeline.deploy()
>>> execution_arn = pipeline.execute({"date": "2024-01-01"})
Source code in clinops/orchestrate/stepfunctions.py
add_step
¶
Append a step to the pipeline.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step
|
PipelineStep
|
Step to add. Step names must be unique within the pipeline. |
required |
Returns:
| Type | Description |
|---|---|
StepFunctionsPipeline
|
Self, for method chaining. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If a step with the same name already exists. |
Source code in clinops/orchestrate/stepfunctions.py
definition
¶
Build the Amazon States Language (ASL) state machine definition.
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
ASL definition ready to pass to the Step Functions API or serialize to JSON. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no steps have been added. |
Source code in clinops/orchestrate/stepfunctions.py
definition_json
¶
Return the ASL definition as a JSON string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
indent
|
int
|
JSON indentation. Default 2. |
2
|
Returns:
| Type | Description |
|---|---|
str
|
|
Source code in clinops/orchestrate/stepfunctions.py
deploy
¶
Create or update the state machine in AWS.
If a state machine with self.name already exists in the account,
its definition is updated in-place. Otherwise a new state machine is
created.
Returns:
| Type | Description |
|---|---|
str
|
ARN of the deployed state machine. |
Raises:
| Type | Description |
|---|---|
ImportError
|
If |
Source code in clinops/orchestrate/stepfunctions.py
execute
¶
Start an execution of the deployed state machine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
input_data
|
dict[str, Any] | None
|
JSON-serializable dict passed as the execution's input. |
None
|
execution_name
|
str | None
|
Optional execution name. Defaults to a timestamp-based name. |
None
|
state_machine_arn
|
str | None
|
ARN of the state machine to execute. If None, looks up the
ARN by |
None
|
Returns:
| Type | Description |
|---|---|
str
|
ARN of the started execution. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the state machine cannot be found and |
Source code in clinops/orchestrate/stepfunctions.py
clinops.orchestrate.stepfunctions.PipelineStep
dataclass
¶
PipelineStep(
name,
resource,
parameters=dict(),
retry_attempts=3,
timeout_seconds=3600,
comment="",
)
A single task in a Step Functions state machine.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Human-readable step name. Used as the state name in the ASL definition — must be unique within a pipeline. |
required |
resource
|
str
|
ARN of the Lambda function, ECS task, or Step Functions activity that executes this step. See AWS docs for supported resource ARN formats. |
required |
parameters
|
dict[str, Any]
|
Static input parameters merged into the state's |
dict()
|
retry_attempts
|
int
|
Number of times to retry on |
3
|
timeout_seconds
|
int
|
Maximum execution time for this step in seconds. Default 3600 (1h). |
3600
|
comment
|
str
|
Optional human-readable description embedded in the ASL state. |
''
|
Examples:
>>> step = PipelineStep(
... name="Preprocess",
... resource="arn:aws:lambda:us-east-1:123456789:function:clinops-preprocess",
... parameters={"max_null_rate": 0.5},
... retry_attempts=2,
... )
to_asl_state
¶
Render this step as an Amazon States Language Task state dict.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
next_state
|
str | None
|
Name of the following state, or None if this is the terminal state. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
ASL Task state definition. |