Skip to content

MetricCompute

aitaem.insights.MetricCompute

Compute metrics from a SpecCache and ConnectionManager.

Primary user interface for aitaem. Resolves specs, builds SQL queries, executes them, and returns a standardized pandas DataFrame.

Source code in aitaem/insights.py
class MetricCompute:
    """Compute metrics from a SpecCache and ConnectionManager.

    Primary user interface for aitaem.  Resolves specs, builds SQL queries,
    executes them, and returns a standardized pandas DataFrame.
    """

    def __init__(self, spec_cache: SpecCache, connection_manager: ConnectionManager) -> None:
        """
        Args:
            spec_cache: Loaded and validated metric, slice, and segment specs.
            connection_manager: Backend connections for query execution.
        """
        self.spec_cache = spec_cache
        self.connection_manager = connection_manager

    def compute(
        self,
        metrics: str | list[str],
        slices: str | list[str] | None = None,
        segments: dict[str, str] | str | None = None,
        time_window: tuple[str, str] | None = None,
        period_type: PeriodType = "all_time",
        by_entity: str | None = None,
        output_format: str = "pandas",
    ) -> pd.DataFrame:
        """Compute one or more metrics with optional slicing and segmentation.

        Args:
            metrics: Metric name(s) to compute.
            slices: Slice name(s). Each slice is computed independently.
            segments: Segment to apply.  Two forms are accepted:

                - ``str`` — segment name; uses the spec's ``entity_id`` as the join key.
                - ``dict[str, str]`` — ``{"segment_name": "fact_fk_col"}``; the value
                  overrides the default join key. Exactly one entry is allowed.

                Only one segment per ``compute()`` call is supported.
            time_window: (start_date, end_date) ISO strings for period filter.
                         Requires ``timestamp_col`` to be set on each metric spec.
            period_type: Granularity for time grouping. One of 'all_time', 'daily',
                         'weekly', 'monthly', 'yearly'. Non-'all_time' requires
                         time_window and timestamp_col on every metric spec.
            by_entity: Column name to group by for entity-level metrics. When set,
                       every requested metric must list this column in its ``entities``
                       field. The output includes an ``entity_id`` column with the
                       entity column value; ``None`` when ``by_entity`` is not set.
            output_format: Output format — only 'pandas' is supported in Phase 1.

        Returns:
            DataFrame with columns: period_type, period_start_date, period_end_date,
            entity_id, metric_name, slice_type, slice_value, segment_name, segment_value,
            metric_value. ``entity_id`` is ``None`` when ``by_entity`` is not set.

        Raises:
            SpecNotFoundError: if any metric/slice/segment name is not in the cache.
            QueryBuildError: if more than one segment is provided.
            QueryBuildError: if the join key in ``segments`` dict is not in the spec's
                             ``join_keys`` whitelist (when the whitelist is non-empty).
            QueryBuildError: if time_window is set but a metric has no timestamp_col.
            QueryBuildError: if period_type is invalid or missing required time_window.
            QueryBuildError: if by_entity is set but a metric does not list it in entities.
            QueryExecutionError: if all query groups fail to execute.
        """
        from aitaem.utils.exceptions import QueryBuildError

        # 1. Normalize inputs to lists
        metric_names = [metrics] if isinstance(metrics, str) else list(metrics)
        slice_names = ([slices] if isinstance(slices, str) else list(slices)) if slices else None

        # 2. Resolve segment spec and join key
        segment_spec = None
        segment_join_key: str | None = None
        if segments is not None:
            if isinstance(segments, str):
                segment_name = segments
                explicit_join_key: str | None = None
            else:
                if len(segments) != 1:
                    raise QueryBuildError(
                        f"Only one segment per compute() call is supported, "
                        f"but {len(segments)} were provided: {list(segments.keys())}"
                    )
                segment_name, explicit_join_key = next(iter(segments.items()))

            segment_spec = self.spec_cache.get_segment(segment_name)
            if explicit_join_key is not None:
                if segment_spec.join_keys and explicit_join_key not in segment_spec.join_keys:
                    raise QueryBuildError(
                        f"Join key '{explicit_join_key}' is not in the allowed join_keys for "
                        f"segment '{segment_name}': {list(segment_spec.join_keys)}"
                    )
                segment_join_key = explicit_join_key
            # When no explicit key: segment_join_key stays None, builder uses entity_id

        # 3. Resolve metric and slice specs from cache
        metric_specs = [self.spec_cache.get_metric(n) for n in metric_names]
        slice_specs = [self.spec_cache.get_slice(n) for n in slice_names] if slice_names else None

        # 4. Build SQL query groups
        query_groups = QueryBuilder.build_queries(
            metric_specs=metric_specs,
            slice_specs=slice_specs,
            segment_spec=segment_spec,
            segment_join_key=segment_join_key,
            time_window=time_window,
            spec_cache=self.spec_cache,
            period_type=period_type,
            by_entity=by_entity,
        )

        # 4. Execute and return in standard column order
        executor = QueryExecutor(self.connection_manager)
        df = executor.execute(query_groups, output_format=output_format)
        return ensure_standard_output(df)

    def scan(self) -> ScanResult:
        """Introspect source schemas and return a compatibility matrix for all loaded specs.

        For each loaded metric, checks every loaded slice and segment:

        - **Slice**: compatible when all referenced columns exist in the metric's source table.
        - **Segment**: compatible when at least one join key (from ``join_keys``, or
          ``entity_id`` when ``join_keys`` is empty) exists in the metric's source table.

        Schema introspection is batched by unique source URI — each table is queried once
        regardless of how many metrics share it. Metrics whose source connection is unavailable
        are skipped with a warning.

        Returns:
            ScanResult with one CompatibilityResult per metric × slice and per metric × segment.
        """
        return _run_scan(self.spec_cache, self.connection_manager)

__init__

__init__(spec_cache: SpecCache, connection_manager: ConnectionManager) -> None

Parameters:

Name Type Description Default
spec_cache SpecCache

Loaded and validated metric, slice, and segment specs.

required
connection_manager ConnectionManager

Backend connections for query execution.

required
Source code in aitaem/insights.py
def __init__(self, spec_cache: SpecCache, connection_manager: ConnectionManager) -> None:
    """
    Args:
        spec_cache: Loaded and validated metric, slice, and segment specs.
        connection_manager: Backend connections for query execution.
    """
    self.spec_cache = spec_cache
    self.connection_manager = connection_manager

compute

compute(metrics: str | list[str], slices: str | list[str] | None = None, segments: dict[str, str] | str | None = None, time_window: tuple[str, str] | None = None, period_type: PeriodType = 'all_time', by_entity: str | None = None, output_format: str = 'pandas') -> pd.DataFrame

Compute one or more metrics with optional slicing and segmentation.

Parameters:

Name Type Description Default
metrics str | list[str]

Metric name(s) to compute.

required
slices str | list[str] | None

Slice name(s). Each slice is computed independently.

None
segments dict[str, str] | str | None

Segment to apply. Two forms are accepted:

  • str — segment name; uses the spec's entity_id as the join key.
  • dict[str, str]{"segment_name": "fact_fk_col"}; the value overrides the default join key. Exactly one entry is allowed.

Only one segment per compute() call is supported.

None
time_window tuple[str, str] | None

(start_date, end_date) ISO strings for period filter. Requires timestamp_col to be set on each metric spec.

None
period_type PeriodType

Granularity for time grouping. One of 'all_time', 'daily', 'weekly', 'monthly', 'yearly'. Non-'all_time' requires time_window and timestamp_col on every metric spec.

'all_time'
by_entity str | None

Column name to group by for entity-level metrics. When set, every requested metric must list this column in its entities field. The output includes an entity_id column with the entity column value; None when by_entity is not set.

None
output_format str

Output format — only 'pandas' is supported in Phase 1.

'pandas'

Returns:

Type Description
DataFrame

DataFrame with columns: period_type, period_start_date, period_end_date,

DataFrame

entity_id, metric_name, slice_type, slice_value, segment_name, segment_value,

DataFrame

metric_value. entity_id is None when by_entity is not set.

Raises:

Type Description
SpecNotFoundError

if any metric/slice/segment name is not in the cache.

QueryBuildError

if more than one segment is provided.

QueryBuildError

if the join key in segments dict is not in the spec's join_keys whitelist (when the whitelist is non-empty).

QueryBuildError

if time_window is set but a metric has no timestamp_col.

QueryBuildError

if period_type is invalid or missing required time_window.

QueryBuildError

if by_entity is set but a metric does not list it in entities.

QueryExecutionError

if all query groups fail to execute.

Source code in aitaem/insights.py
def compute(
    self,
    metrics: str | list[str],
    slices: str | list[str] | None = None,
    segments: dict[str, str] | str | None = None,
    time_window: tuple[str, str] | None = None,
    period_type: PeriodType = "all_time",
    by_entity: str | None = None,
    output_format: str = "pandas",
) -> pd.DataFrame:
    """Compute one or more metrics with optional slicing and segmentation.

    Args:
        metrics: Metric name(s) to compute.
        slices: Slice name(s). Each slice is computed independently.
        segments: Segment to apply.  Two forms are accepted:

            - ``str`` — segment name; uses the spec's ``entity_id`` as the join key.
            - ``dict[str, str]`` — ``{"segment_name": "fact_fk_col"}``; the value
              overrides the default join key. Exactly one entry is allowed.

            Only one segment per ``compute()`` call is supported.
        time_window: (start_date, end_date) ISO strings for period filter.
                     Requires ``timestamp_col`` to be set on each metric spec.
        period_type: Granularity for time grouping. One of 'all_time', 'daily',
                     'weekly', 'monthly', 'yearly'. Non-'all_time' requires
                     time_window and timestamp_col on every metric spec.
        by_entity: Column name to group by for entity-level metrics. When set,
                   every requested metric must list this column in its ``entities``
                   field. The output includes an ``entity_id`` column with the
                   entity column value; ``None`` when ``by_entity`` is not set.
        output_format: Output format — only 'pandas' is supported in Phase 1.

    Returns:
        DataFrame with columns: period_type, period_start_date, period_end_date,
        entity_id, metric_name, slice_type, slice_value, segment_name, segment_value,
        metric_value. ``entity_id`` is ``None`` when ``by_entity`` is not set.

    Raises:
        SpecNotFoundError: if any metric/slice/segment name is not in the cache.
        QueryBuildError: if more than one segment is provided.
        QueryBuildError: if the join key in ``segments`` dict is not in the spec's
                         ``join_keys`` whitelist (when the whitelist is non-empty).
        QueryBuildError: if time_window is set but a metric has no timestamp_col.
        QueryBuildError: if period_type is invalid or missing required time_window.
        QueryBuildError: if by_entity is set but a metric does not list it in entities.
        QueryExecutionError: if all query groups fail to execute.
    """
    from aitaem.utils.exceptions import QueryBuildError

    # 1. Normalize inputs to lists
    metric_names = [metrics] if isinstance(metrics, str) else list(metrics)
    slice_names = ([slices] if isinstance(slices, str) else list(slices)) if slices else None

    # 2. Resolve segment spec and join key
    segment_spec = None
    segment_join_key: str | None = None
    if segments is not None:
        if isinstance(segments, str):
            segment_name = segments
            explicit_join_key: str | None = None
        else:
            if len(segments) != 1:
                raise QueryBuildError(
                    f"Only one segment per compute() call is supported, "
                    f"but {len(segments)} were provided: {list(segments.keys())}"
                )
            segment_name, explicit_join_key = next(iter(segments.items()))

        segment_spec = self.spec_cache.get_segment(segment_name)
        if explicit_join_key is not None:
            if segment_spec.join_keys and explicit_join_key not in segment_spec.join_keys:
                raise QueryBuildError(
                    f"Join key '{explicit_join_key}' is not in the allowed join_keys for "
                    f"segment '{segment_name}': {list(segment_spec.join_keys)}"
                )
            segment_join_key = explicit_join_key
        # When no explicit key: segment_join_key stays None, builder uses entity_id

    # 3. Resolve metric and slice specs from cache
    metric_specs = [self.spec_cache.get_metric(n) for n in metric_names]
    slice_specs = [self.spec_cache.get_slice(n) for n in slice_names] if slice_names else None

    # 4. Build SQL query groups
    query_groups = QueryBuilder.build_queries(
        metric_specs=metric_specs,
        slice_specs=slice_specs,
        segment_spec=segment_spec,
        segment_join_key=segment_join_key,
        time_window=time_window,
        spec_cache=self.spec_cache,
        period_type=period_type,
        by_entity=by_entity,
    )

    # 4. Execute and return in standard column order
    executor = QueryExecutor(self.connection_manager)
    df = executor.execute(query_groups, output_format=output_format)
    return ensure_standard_output(df)

scan

scan() -> ScanResult

Introspect source schemas and return a compatibility matrix for all loaded specs.

For each loaded metric, checks every loaded slice and segment:

  • Slice: compatible when all referenced columns exist in the metric's source table.
  • Segment: compatible when at least one join key (from join_keys, or entity_id when join_keys is empty) exists in the metric's source table.

Schema introspection is batched by unique source URI — each table is queried once regardless of how many metrics share it. Metrics whose source connection is unavailable are skipped with a warning.

Returns:

Type Description
ScanResult

ScanResult with one CompatibilityResult per metric × slice and per metric × segment.

Source code in aitaem/insights.py
def scan(self) -> ScanResult:
    """Introspect source schemas and return a compatibility matrix for all loaded specs.

    For each loaded metric, checks every loaded slice and segment:

    - **Slice**: compatible when all referenced columns exist in the metric's source table.
    - **Segment**: compatible when at least one join key (from ``join_keys``, or
      ``entity_id`` when ``join_keys`` is empty) exists in the metric's source table.

    Schema introspection is batched by unique source URI — each table is queried once
    regardless of how many metrics share it. Metrics whose source connection is unavailable
    are skipped with a warning.

    Returns:
        ScanResult with one CompatibilityResult per metric × slice and per metric × segment.
    """
    return _run_scan(self.spec_cache, self.connection_manager)

scan()

mc.scan() -> ScanResult

Introspects source table schemas and returns a compatibility matrix for all specs loaded in the SpecCache. For each metric, every loaded slice and segment is checked:

  • Slice — compatible when all columns referenced in values[].where (or the bare column field for wildcard slices) exist in the metric's source table. Composite slices are resolved transitively via their component leaf/wildcard slices.
  • Segment — compatible when at least one join-key candidate exists in the metric's source table. Candidates are taken from join_keys (if non-empty) or entity_id. Only the fact-table side is checked; DIM-table columns are not.

Schema introspection is batched by unique source URI — each table is queried once regardless of how many metrics share it. Metrics whose source connection is unavailable are skipped with a warning; all other metrics are still processed.

Returns a ScanResult with one CompatibilityResult per metric × slice and per metric × segment. See the Specs API reference for field descriptions.