Skip to content

clinops.temporal

clinops.temporal.windower.TemporalWindower

TemporalWindower(
    window_hours=24.0,
    step_hours=6.0,
    imputation=ImputationStrategy.FORWARD_FILL,
    min_observations=1,
    aggregations=None,
    label_col=None,
    label_fn=None,
)

Extract fixed-size feature windows from long-format clinical time-series.

Parameters:

Name Type Description Default
window_hours float

Duration of each window in hours.

24.0
step_hours float

Step between window starts in hours. Use same value as window_hours for non-overlapping (tumbling) windows.

6.0
imputation ImputationStrategy

Imputation strategy for within-window missing values.

FORWARD_FILL
min_observations int

Drop windows with fewer non-null observations than this threshold.

1
aggregations dict[str, str | Callable[..., Any]] | None

Column → aggregation function mapping. If empty, mean is used for all numeric columns.

None
label_col str | None

Column name of binary/multi-class outcome labels (optional).

None
label_fn Callable[..., Any] | None

How to derive the label for a window. Default: last non-null value.

None

Examples:

>>> windower = TemporalWindower(window_hours=24, step_hours=6)
>>> windows = windower.fit_transform(
...     df=chartevents,
...     id_col="subject_id",
...     time_col="charttime",
...     feature_cols=["heart_rate", "spo2", "resp_rate", "map"],
... )
>>> windows.shape
(4820, 6)   # (n_windows, n_features + id + window_start)
Source code in clinops/temporal/windower.py
def __init__(
    self,
    window_hours: float = 24.0,
    step_hours: float = 6.0,
    imputation: ImputationStrategy = ImputationStrategy.FORWARD_FILL,
    min_observations: int = 1,
    aggregations: dict[str, str | Callable[..., Any]] | None = None,
    label_col: str | None = None,
    label_fn: Callable[..., Any] | None = None,
) -> None:
    self.config = WindowConfig(
        window_hours=window_hours,
        step_hours=step_hours,
        imputation=imputation,
        min_observations=min_observations,
        aggregations=aggregations or {},
        label_col=label_col,
        label_fn=label_fn,
    )
    self._imputer = Imputer(imputation)

fit_transform

fit_transform(
    df,
    id_col,
    time_col,
    feature_cols=None,
    value_col=None,
    item_col=None,
)

Extract windows from a long-format clinical DataFrame.

Supports two input formats:

Wide format (one column per feature): df has columns like heart_rate, spo2, map. Pass feature_cols to select which columns to use.

Long format (item × value pairs): df has an item_col (e.g. itemid) and a value_col (e.g. valuenum). Data is pivoted before windowing.

Parameters:

Name Type Description Default
df DataFrame

Input DataFrame in long or wide format.

required
id_col str

Column identifying each patient/subject.

required
time_col str

Datetime column for temporal ordering.

required
feature_cols list[str] | None

Columns to include as features (wide format).

None
value_col str | None

Numeric value column (long format with item_col).

None
item_col str | None

Item identifier column (long format, e.g. itemid).

None

Returns:

Type Description
DataFrame

One row per (patient, window_start). Columns: id_col, window_start, window_end, feature columns, and optionally label.

Source code in clinops/temporal/windower.py
def fit_transform(
    self,
    df: pd.DataFrame,
    id_col: str,
    time_col: str,
    feature_cols: list[str] | None = None,
    value_col: str | None = None,
    item_col: str | None = None,
) -> pd.DataFrame:
    """
    Extract windows from a long-format clinical DataFrame.

    Supports two input formats:

    **Wide format** (one column per feature):
        ``df`` has columns like ``heart_rate``, ``spo2``, ``map``.
        Pass ``feature_cols`` to select which columns to use.

    **Long format** (item × value pairs):
        ``df`` has an ``item_col`` (e.g. ``itemid``) and a
        ``value_col`` (e.g. ``valuenum``).  Data is pivoted before
        windowing.

    Parameters
    ----------
    df:
        Input DataFrame in long or wide format.
    id_col:
        Column identifying each patient/subject.
    time_col:
        Datetime column for temporal ordering.
    feature_cols:
        Columns to include as features (wide format).
    value_col:
        Numeric value column (long format with item_col).
    item_col:
        Item identifier column (long format, e.g. itemid).

    Returns
    -------
    pd.DataFrame
        One row per (patient, window_start). Columns:
        ``id_col``, ``window_start``, ``window_end``, feature columns,
        and optionally ``label``.
    """
    df = df.copy()
    df[time_col] = pd.to_datetime(df[time_col])

    # Pivot long → wide if item/value columns given
    if item_col and value_col:
        df = self._pivot_long_to_wide(df, id_col, time_col, item_col, value_col)
        feature_cols = [c for c in df.columns if c not in [id_col, time_col]]

    if feature_cols is None:
        feature_cols = [
            c for c in df.select_dtypes(include=[np.number]).columns if c not in [id_col]
        ]

    logger.info(
        f"Windowing {len(df):,} rows for {df[id_col].nunique()} subjects "
        f"| window={self.config.window_hours}h step={self.config.step_hours}h "
        f"| {len(feature_cols)} features"
    )

    results = []
    window_td = pd.Timedelta(hours=self.config.window_hours)
    step_td = pd.Timedelta(hours=self.config.step_hours)

    for subject_id, subject_df in df.groupby(id_col):
        subject_df = subject_df.sort_values(time_col)
        t_start = subject_df[time_col].min()
        t_end = subject_df[time_col].max()

        window_start = t_start
        while window_start + window_td <= t_end + step_td:
            window_end = window_start + window_td
            mask = (subject_df[time_col] >= window_start) & (subject_df[time_col] < window_end)
            window_df = subject_df[mask]

            if len(window_df) < self.config.min_observations:
                window_start += step_td
                continue

            row = self._aggregate_window(
                window_df, feature_cols, subject_id, window_start, window_end, id_col
            )
            results.append(row)
            window_start += step_td

    if not results:
        logger.warning(
            "No windows extracted — check min_observations threshold or data coverage"
        )
        return pd.DataFrame()

    result_df = pd.DataFrame(results)
    logger.info(
        f"Extracted {len(result_df):,} windows across {result_df[id_col].nunique()} subjects"
    )
    return result_df

clinops.temporal.windower.WindowConfig dataclass

WindowConfig(
    window_hours=24.0,
    step_hours=6.0,
    min_observations=1,
    imputation=ImputationStrategy.FORWARD_FILL,
    aggregations=dict(),
    label_col=None,
    label_fn=None,
)

Configuration for temporal window extraction.

Parameters:

Name Type Description Default
window_hours float

Duration of each feature window in hours.

24.0
step_hours float

Step size between consecutive windows (< window_hours = overlapping). Set equal to window_hours for tumbling (non-overlapping) windows.

6.0
min_observations int

Minimum number of non-null observations required per window. Windows below this threshold are dropped.

1
imputation ImputationStrategy

Strategy used to fill gaps within each window.

FORWARD_FILL
aggregations dict[str, str | Callable[..., Any]]

Dict mapping feature column names to aggregation functions. Defaults to mean for all numeric columns.

dict()
label_col str | None

Optional column name containing outcome labels. If provided, labels are extracted per window using label_fn.

None
label_fn Callable[..., Any] | None

Function (window_df) → label applied to each window's rows within the label_col. Defaults to last observed value.

None

clinops.temporal.imputation.Imputer

Imputer(
    strategy=ImputationStrategy.FORWARD_FILL,
    max_gap_hours=None,
    time_col=None,
    per_patient=False,
    id_col=None,
)

Apply a chosen imputation strategy to a DataFrame.

Parameters:

Name Type Description Default
strategy ImputationStrategy

Imputation strategy to apply.

FORWARD_FILL
max_gap_hours float | None

For FORWARD_FILL and BACKWARD_FILL: maximum gap (in hours) to fill across. Gaps larger than this are left as NaN to avoid propagating stale values across long time periods. Requires a time_col to be set.

None
time_col str | None

Name of the datetime column (used with max_gap_hours).

None
per_patient bool

If True and strategy is MEAN/MEDIAN, compute statistics per patient group rather than globally.

False
id_col str | None

Patient identifier column (required when per_patient=True).

None
Source code in clinops/temporal/imputation.py
def __init__(
    self,
    strategy: ImputationStrategy = ImputationStrategy.FORWARD_FILL,
    max_gap_hours: float | None = None,
    time_col: str | None = None,
    per_patient: bool = False,
    id_col: str | None = None,
) -> None:
    self.strategy = strategy
    self.max_gap_hours = max_gap_hours
    self.time_col = time_col
    self.per_patient = per_patient
    self.id_col = id_col
    self._fill_values: dict[str, float] = {}

fit

fit(df)

Compute imputation statistics from a reference DataFrame (training set).

Source code in clinops/temporal/imputation.py
def fit(self, df: pd.DataFrame) -> Imputer:
    """Compute imputation statistics from a reference DataFrame (training set)."""
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    if self.strategy == ImputationStrategy.MEAN:
        if self.per_patient and self.id_col:
            # store per-patient means — used in transform
            self._patient_means = df.groupby(self.id_col)[numeric_cols].mean()
        else:
            self._fill_values = {str(k): float(v) for k, v in df[numeric_cols].mean().items()}
    elif self.strategy == ImputationStrategy.MEDIAN:
        if self.per_patient and self.id_col:
            self._patient_medians = df.groupby(self.id_col)[numeric_cols].median()
        else:
            self._fill_values = {str(k): float(v) for k, v in df[numeric_cols].median().items()}
    return self

transform

transform(df)

Apply imputation to df. Call fit() first for MEAN/MEDIAN strategies.

Source code in clinops/temporal/imputation.py
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Apply imputation to df. Call fit() first for MEAN/MEDIAN strategies."""
    df = df.copy()
    numeric_cols = [
        c for c in df.select_dtypes(include=[np.number]).columns if c != self.id_col
    ]

    if self.strategy == ImputationStrategy.NONE:
        return df

    elif self.strategy == ImputationStrategy.ZERO:
        df[numeric_cols] = df[numeric_cols].fillna(0.0)

    elif self.strategy == ImputationStrategy.FORWARD_FILL:
        if self.max_gap_hours is not None and self.time_col and self.time_col in df.columns:
            # Use a UUID-based sentinel to avoid clobbering any user column
            # and to guarantee uniqueness. try/finally ensures the column is
            # always removed, even if an exception is raised mid-transform.
            _sentinel = f"__clinops_pos_{uuid.uuid4().hex}__"
            try:
                df[_sentinel] = np.arange(len(df))
                df = self._fill_with_gap_mask(df, numeric_cols, self.time_col, forward=True)
                df = df.sort_values(_sentinel).reset_index(drop=True)
            finally:
                df = df.drop(columns=[_sentinel], errors="ignore")
        else:
            if self.id_col and self.id_col in df.columns:
                df[numeric_cols] = df.groupby(self.id_col)[numeric_cols].ffill()
            else:
                df[numeric_cols] = df[numeric_cols].ffill()

    elif self.strategy == ImputationStrategy.BACKWARD_FILL:
        if self.max_gap_hours is not None and self.time_col and self.time_col in df.columns:
            # Use a UUID-based sentinel to avoid clobbering any user column
            # and to guarantee uniqueness. try/finally ensures the column is
            # always removed, even if an exception is raised mid-transform.
            _sentinel = f"__clinops_pos_{uuid.uuid4().hex}__"
            try:
                df[_sentinel] = np.arange(len(df))
                df = self._fill_with_gap_mask(df, numeric_cols, self.time_col, forward=False)
                df = df.sort_values(_sentinel).reset_index(drop=True)
            finally:
                df = df.drop(columns=[_sentinel], errors="ignore")
        else:
            if self.id_col and self.id_col in df.columns:
                df[numeric_cols] = df.groupby(self.id_col)[numeric_cols].bfill()
            else:
                df[numeric_cols] = df[numeric_cols].bfill()

    elif self.strategy == ImputationStrategy.LINEAR:
        df[numeric_cols] = df[numeric_cols].interpolate(method="linear", limit_direction="both")

    elif self.strategy == ImputationStrategy.MEAN:
        if self._fill_values:
            df[numeric_cols] = df[numeric_cols].fillna(self._fill_values)
        else:
            logger.warning("Imputer not fitted — using column means from current df")
            df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].mean())

    elif self.strategy == ImputationStrategy.MEDIAN:
        if self._fill_values:
            df[numeric_cols] = df[numeric_cols].fillna(self._fill_values)
        else:
            logger.warning("Imputer not fitted — using column medians from current df")
            df[numeric_cols] = df[numeric_cols].fillna(df[numeric_cols].median())

    elif self.strategy == ImputationStrategy.INDICATOR:
        for col in numeric_cols:
            df[f"{col}_missing"] = df[col].isna().astype(int)
        df[numeric_cols] = df[numeric_cols].fillna(0.0)

    return df

fit_transform

fit_transform(df)

Convenience method: fit then transform.

Source code in clinops/temporal/imputation.py
def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Convenience method: fit then transform."""
    return self.fit(df).transform(df)

clinops.temporal.imputation.ImputationStrategy

Bases: StrEnum

Supported imputation strategies.

FORWARD_FILL Carry the last observed value forward in time. Appropriate for slowly-changing vitals (heart rate, SpO2) where repeated measurements are assumed stable until updated.

BACKWARD_FILL Fill from the next observed value backward. Useful when a measurement is known to have been taken but not yet recorded.

LINEAR Linear interpolation between surrounding observations. Use for continuous physiological signals with regular sampling.

MEAN Replace missing values with the column mean (global, per patient, or per cohort depending on fit scope).

MEDIAN Replace with column median. More robust than mean for skewed lab values.

ZERO Fill with zero. Use only for count-based features where absence genuinely means zero (e.g. number of interventions).

INDICATOR Add a binary missingness indicator column ({col}_missing) and fill values with zero. Lets the model learn from missingness patterns directly.

NONE Do not impute — leave NaN values in place.

clinops.temporal.features.LagFeatureBuilder

LagFeatureBuilder(
    lags=None,
    rolling_windows=None,
    feature_cols=None,
    id_col="subject_id",
)

Add lag and rolling-statistics features to windowed clinical data.

Parameters:

Name Type Description Default
lags list[int] | None

List of lag steps (in window units) to include. e.g. [1, 2, 4] adds features at t-1, t-2, t-4 windows back.

None
rolling_windows list[int] | None

List of rolling window sizes to compute mean/std over.

None
feature_cols list[str] | None

Columns to create lags for. If None, all numeric columns are used.

None
id_col str

Patient identifier column — lags are computed within each patient.

'subject_id'

Examples:

>>> builder = LagFeatureBuilder(lags=[1, 2], rolling_windows=[4])
>>> enriched = builder.fit_transform(windows_df, id_col="subject_id")
Source code in clinops/temporal/features.py
def __init__(
    self,
    lags: list[int] | None = None,
    rolling_windows: list[int] | None = None,
    feature_cols: list[str] | None = None,
    id_col: str = "subject_id",
) -> None:
    self.lags = lags or [1, 2]
    self.rolling_windows = rolling_windows or []
    self.feature_cols = feature_cols
    self.id_col = id_col

fit_transform

fit_transform(df)

Add lag and rolling features. Returns enriched DataFrame.

Source code in clinops/temporal/features.py
def fit_transform(self, df: pd.DataFrame) -> pd.DataFrame:
    """Add lag and rolling features. Returns enriched DataFrame."""
    df = df.copy().sort_values([self.id_col, "window_start"])
    numeric_cols = self.feature_cols or [
        c
        for c in df.select_dtypes(include=[np.number]).columns
        if c not in [self.id_col, "label"]
    ]

    logger.info(
        f"Building lag features: lags={self.lags}, "
        f"rolling={self.rolling_windows}, cols={len(numeric_cols)}"
    )

    for col in numeric_cols:
        # Lag features
        for lag in self.lags:
            lag_col = f"{col}_lag{lag}"
            df[lag_col] = df.groupby(self.id_col)[col].shift(lag)

        # Rolling statistics
        for window in self.rolling_windows:
            grouped = df.groupby(self.id_col)[col]
            df[f"{col}_roll{window}_mean"] = grouped.transform(
                lambda s, w=window: s.rolling(w, min_periods=1).mean()
            )
            df[f"{col}_roll{window}_std"] = grouped.transform(
                lambda s, w=window: s.rolling(w, min_periods=1).std()
            )

    logger.info(f"Added {len(df.columns) - len(df.columns):,} lag/rolling features")
    return df

clinops.temporal.cohort.CohortAligner

CohortAligner(
    anchor_col="icu_intime",
    id_col="subject_id",
    max_hours_before=0.0,
    max_hours_after=48.0,
    time_col="charttime",
)

Align multiple patients' time-series to a common reference event.

In clinical research it's common to align patients relative to an anchor event (e.g. ICU admission, ventilation start, first sepsis flag) rather than using wall-clock time. This class handles the realignment so downstream models see time-relative-to-event rather than absolute timestamps.

Parameters:

Name Type Description Default
anchor_col str

Column containing the anchor event timestamp for each patient.

'icu_intime'
id_col str

Patient identifier column.

'subject_id'
max_hours_before float

Include data up to this many hours before the anchor event.

0.0
max_hours_after float

Include data up to this many hours after the anchor event.

48.0

Examples:

>>> aligner = CohortAligner(anchor_col="icu_intime", max_hours_after=48)
>>> aligned = aligner.align(chartevents, admissions)
Source code in clinops/temporal/features.py
def __init__(
    self,
    anchor_col: str = "icu_intime",
    id_col: str = "subject_id",
    max_hours_before: float = 0.0,
    max_hours_after: float = 48.0,
    time_col: str = "charttime",
) -> None:
    self.anchor_col = anchor_col
    self.id_col = id_col
    self.max_hours_before = max_hours_before
    self.max_hours_after = max_hours_after
    self.time_col = time_col

align

align(events_df, anchor_df)

Align events to anchor timestamps from a reference DataFrame.

Parameters:

Name Type Description Default
events_df DataFrame

Long-format events with id_col and time_col.

required
anchor_df DataFrame

One row per patient with id_col and anchor_col.

required

Returns:

Type Description
DataFrame

events_df filtered to the alignment window with a new hours_from_anchor column (negative = before anchor).

Source code in clinops/temporal/features.py
def align(
    self,
    events_df: pd.DataFrame,
    anchor_df: pd.DataFrame,
) -> pd.DataFrame:
    """
    Align events to anchor timestamps from a reference DataFrame.

    Parameters
    ----------
    events_df:
        Long-format events with id_col and time_col.
    anchor_df:
        One row per patient with id_col and anchor_col.

    Returns
    -------
    pd.DataFrame
        events_df filtered to the alignment window with a new
        ``hours_from_anchor`` column (negative = before anchor).
    """
    anchor_map = anchor_df.set_index(self.id_col)[self.anchor_col]
    anchor_map = pd.to_datetime(anchor_map)

    df = events_df.copy()
    df[self.time_col] = pd.to_datetime(df[self.time_col])

    anchors = df[self.id_col].map(anchor_map)
    df["hours_from_anchor"] = (df[self.time_col] - anchors).dt.total_seconds() / 3600

    df = df[
        (df["hours_from_anchor"] >= -self.max_hours_before)
        & (df["hours_from_anchor"] <= self.max_hours_after)
    ]
    df = df.drop(columns=[], errors="ignore").reset_index(drop=True)

    logger.info(
        f"CohortAligner: retained {len(df):,} rows "
        f"(window: -{self.max_hours_before}h to +{self.max_hours_after}h from anchor)"
    )
    return df