Skip to content

Specs

Looking for YAML spec syntax?

For the full YAML format, required fields, and examples for each spec type, see the Writing Specs user guide.

SpecCache

aitaem.specs.loader.SpecCache

Eagerly-loaded cache for metric, slice, and segment specs.

Use from_yaml() or from_string() as the primary entry points. The constructor creates an empty cache; specs can be added via add().

Source code in aitaem/specs/loader.py
class SpecCache:
    """Eagerly-loaded cache for metric, slice, and segment specs.

    Use from_yaml() or from_string() as the primary entry points.
    The constructor creates an empty cache; specs can be added via add().
    """

    def __init__(self) -> None:
        """Empty cache. Use from_yaml() or from_string() to load specs."""
        self._metrics: dict[str, MetricSpec] = {}
        self._slices: dict[str, SliceSpec] = {}
        self._segments: dict[str, SegmentSpec] = {}

    @classmethod
    def from_yaml(
        cls,
        metric_paths: str | list[str] | None = None,
        slice_paths: str | list[str] | None = None,
        segment_paths: str | list[str] | None = None,
    ) -> "SpecCache":
        """Load and validate all specs from YAML files or directories.

        Loading is eager — all specs are loaded and validated before returning.

        Raises:
            FileNotFoundError: if a path does not exist
            SpecValidationError: if any spec is invalid
        """
        cache = cls()
        cache._metrics = cls._load_paths_strict(metric_paths, MetricSpec)  # type: ignore[arg-type, assignment]
        cache._slices = cls._load_paths_strict(slice_paths, SliceSpec)  # type: ignore[arg-type, assignment]
        cache._segments = cls._load_paths_strict(segment_paths, SegmentSpec)  # type: ignore[arg-type, assignment]
        cache._validate_slice_cross_references()
        return cache

    @classmethod
    def from_string(
        cls,
        metric_yaml: str | list[str] | None = None,
        slice_yaml: str | list[str] | None = None,
        segment_yaml: str | list[str] | None = None,
    ) -> "SpecCache":
        """Load specs from YAML strings. Validates eagerly.

        Each argument can be a single YAML string or a list of YAML strings.

        Raises:
            SpecValidationError: if any spec is invalid
        """
        cache = cls()
        for yaml_str in cls._normalize_strings(metric_yaml):
            spec = load_spec_from_string(yaml_str, MetricSpec)
            if spec.name in cache._metrics:
                raise SpecValidationError(
                    "MetricSpec",
                    spec.name,
                    [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
                )
            cache._metrics[spec.name] = spec  # type: ignore[assignment]
        for yaml_str in cls._normalize_strings(slice_yaml):
            spec = load_spec_from_string(yaml_str, SliceSpec)
            if spec.name in cache._slices:
                raise SpecValidationError(
                    "SliceSpec",
                    spec.name,
                    [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
                )
            cache._slices[spec.name] = spec  # type: ignore[assignment]
        for yaml_str in cls._normalize_strings(segment_yaml):
            spec = load_spec_from_string(yaml_str, SegmentSpec)
            if spec.name in cache._segments:
                raise SpecValidationError(
                    "SegmentSpec",
                    spec.name,
                    [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
                )
            cache._segments[spec.name] = spec  # type: ignore[assignment]
        cache._validate_slice_cross_references()
        return cache

    def add(self, spec: MetricSpec | SliceSpec | SegmentSpec) -> None:
        """Add a spec programmatically.

        Raises:
            SpecValidationError: if a spec with the same name is already present.
        """
        if isinstance(spec, MetricSpec):
            if spec.name in self._metrics:
                raise SpecValidationError(
                    "MetricSpec",
                    spec.name,
                    [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
                )
            self._metrics[spec.name] = spec
        elif isinstance(spec, SliceSpec):
            if spec.name in self._slices:
                raise SpecValidationError(
                    "SliceSpec",
                    spec.name,
                    [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
                )
            self._slices[spec.name] = spec
        elif isinstance(spec, SegmentSpec):
            if spec.name in self._segments:
                raise SpecValidationError(
                    "SegmentSpec",
                    spec.name,
                    [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
                )
            self._segments[spec.name] = spec

    @property
    def metrics(self) -> Mapping[str, MetricSpec]:
        """Read-only view of all loaded metric specs, keyed by name."""
        return MappingProxyType(self._metrics)

    @property
    def slices(self) -> Mapping[str, SliceSpec]:
        """Read-only view of all loaded slice specs, keyed by name."""
        return MappingProxyType(self._slices)

    @property
    def segments(self) -> Mapping[str, SegmentSpec]:
        """Read-only view of all loaded segment specs, keyed by name."""
        return MappingProxyType(self._segments)

    def get_metric(self, name: str) -> MetricSpec:
        """Return MetricSpec for the given name.

        Raises:
            SpecNotFoundError: if name not found
        """
        if name not in self._metrics:
            raise SpecNotFoundError("metric", name, [])
        return self._metrics[name]

    def get_slice(self, name: str) -> SliceSpec:
        """Return SliceSpec for the given name.

        Raises:
            SpecNotFoundError: if name not found
        """
        if name not in self._slices:
            raise SpecNotFoundError("slice", name, [])
        return self._slices[name]

    def get_segment(self, name: str) -> SegmentSpec:
        """Return SegmentSpec for the given name.

        Raises:
            SpecNotFoundError: if name not found
        """
        if name not in self._segments:
            raise SpecNotFoundError("segment", name, [])
        return self._segments[name]

    def clear(self) -> None:
        """Clear all cached specs."""
        self._metrics = {}
        self._slices = {}
        self._segments = {}

    # ------------------------------------------------------------------
    # Private helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _normalize_paths(paths: str | list[str] | None) -> list[Path]:
        if paths is None:
            return []
        if isinstance(paths, (str, Path)):
            return [Path(paths)]
        return [Path(p) for p in paths]

    @staticmethod
    def _normalize_strings(strings: str | list[str] | None) -> list[str]:
        if strings is None:
            return []
        if isinstance(strings, str):
            return [strings]
        return list(strings)

    @classmethod
    def _load_paths_strict(
        cls, paths: str | list[str] | None, spec_type: SpecType
    ) -> dict[str, AnySpec]:
        """Load specs from paths/directories, raising on any error."""
        result: dict[str, AnySpec] = {}
        for path in cls._normalize_paths(paths):
            if not path.exists():
                raise FileNotFoundError(f"Spec path not found: {path}")
            if path.is_dir():
                yaml_files = sorted(list(path.glob("*.yaml")) + list(path.glob("*.yml")))
                for yaml_file in yaml_files:
                    spec = spec_type.from_yaml(yaml_file)
                    if spec.name in result:
                        raise SpecValidationError(
                            spec_type.__name__,
                            spec.name,
                            [
                                ValidationError(
                                    field="name",
                                    message=f"Duplicate spec name '{spec.name}': already loaded from a previous file",
                                )
                            ],
                        )
                    result[spec.name] = spec
            else:
                spec = spec_type.from_yaml(path)
                if spec.name in result:
                    raise SpecValidationError(
                        spec_type.__name__,
                        spec.name,
                        [
                            ValidationError(
                                field="name",
                                message=f"Duplicate spec name '{spec.name}': already loaded from a previous file",
                            )
                        ],
                    )
                result[spec.name] = spec
        return result

    def _validate_slice_cross_references(self) -> None:
        """Validate that composite specs' cross-product references resolve.

        Raises:
            SpecValidationError: if a referenced name is missing or is itself composite.
        """
        for spec_name, spec in self._slices.items():
            if not spec.is_composite:
                continue
            for ref_name in spec.cross_product:
                if ref_name not in self._slices:
                    raise SpecValidationError(
                        "slice",
                        spec_name,
                        [
                            ValidationError(
                                field="cross_product",
                                message=f"Referenced slice '{ref_name}' not found in loaded slices",
                            )
                        ],
                    )
                ref_spec = self._slices[ref_name]
                if ref_spec.is_composite:
                    raise SpecValidationError(
                        "slice",
                        spec_name,
                        [
                            ValidationError(
                                field="cross_product",
                                message=f"Nested composite slices not supported (Phase 1): "
                                f"'{ref_name}' is also composite",
                            )
                        ],
                    )

metrics property

metrics: Mapping[str, MetricSpec]

Read-only view of all loaded metric specs, keyed by name.

slices property

slices: Mapping[str, SliceSpec]

Read-only view of all loaded slice specs, keyed by name.

segments property

segments: Mapping[str, SegmentSpec]

Read-only view of all loaded segment specs, keyed by name.

__init__

__init__() -> None

Empty cache. Use from_yaml() or from_string() to load specs.

Source code in aitaem/specs/loader.py
def __init__(self) -> None:
    """Empty cache. Use from_yaml() or from_string() to load specs."""
    self._metrics: dict[str, MetricSpec] = {}
    self._slices: dict[str, SliceSpec] = {}
    self._segments: dict[str, SegmentSpec] = {}

from_yaml classmethod

from_yaml(metric_paths: str | list[str] | None = None, slice_paths: str | list[str] | None = None, segment_paths: str | list[str] | None = None) -> 'SpecCache'

Load and validate all specs from YAML files or directories.

Loading is eager — all specs are loaded and validated before returning.

Raises:

Type Description
FileNotFoundError

if a path does not exist

SpecValidationError

if any spec is invalid

Source code in aitaem/specs/loader.py
@classmethod
def from_yaml(
    cls,
    metric_paths: str | list[str] | None = None,
    slice_paths: str | list[str] | None = None,
    segment_paths: str | list[str] | None = None,
) -> "SpecCache":
    """Load and validate all specs from YAML files or directories.

    Loading is eager — all specs are loaded and validated before returning.

    Raises:
        FileNotFoundError: if a path does not exist
        SpecValidationError: if any spec is invalid
    """
    cache = cls()
    cache._metrics = cls._load_paths_strict(metric_paths, MetricSpec)  # type: ignore[arg-type, assignment]
    cache._slices = cls._load_paths_strict(slice_paths, SliceSpec)  # type: ignore[arg-type, assignment]
    cache._segments = cls._load_paths_strict(segment_paths, SegmentSpec)  # type: ignore[arg-type, assignment]
    cache._validate_slice_cross_references()
    return cache

from_string classmethod

from_string(metric_yaml: str | list[str] | None = None, slice_yaml: str | list[str] | None = None, segment_yaml: str | list[str] | None = None) -> 'SpecCache'

Load specs from YAML strings. Validates eagerly.

Each argument can be a single YAML string or a list of YAML strings.

Raises:

Type Description
SpecValidationError

if any spec is invalid

Source code in aitaem/specs/loader.py
@classmethod
def from_string(
    cls,
    metric_yaml: str | list[str] | None = None,
    slice_yaml: str | list[str] | None = None,
    segment_yaml: str | list[str] | None = None,
) -> "SpecCache":
    """Load specs from YAML strings. Validates eagerly.

    Each argument can be a single YAML string or a list of YAML strings.

    Raises:
        SpecValidationError: if any spec is invalid
    """
    cache = cls()
    for yaml_str in cls._normalize_strings(metric_yaml):
        spec = load_spec_from_string(yaml_str, MetricSpec)
        if spec.name in cache._metrics:
            raise SpecValidationError(
                "MetricSpec",
                spec.name,
                [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
            )
        cache._metrics[spec.name] = spec  # type: ignore[assignment]
    for yaml_str in cls._normalize_strings(slice_yaml):
        spec = load_spec_from_string(yaml_str, SliceSpec)
        if spec.name in cache._slices:
            raise SpecValidationError(
                "SliceSpec",
                spec.name,
                [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
            )
        cache._slices[spec.name] = spec  # type: ignore[assignment]
    for yaml_str in cls._normalize_strings(segment_yaml):
        spec = load_spec_from_string(yaml_str, SegmentSpec)
        if spec.name in cache._segments:
            raise SpecValidationError(
                "SegmentSpec",
                spec.name,
                [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
            )
        cache._segments[spec.name] = spec  # type: ignore[assignment]
    cache._validate_slice_cross_references()
    return cache

add

add(spec: MetricSpec | SliceSpec | SegmentSpec) -> None

Add a spec programmatically.

Raises:

Type Description
SpecValidationError

if a spec with the same name is already present.

Source code in aitaem/specs/loader.py
def add(self, spec: MetricSpec | SliceSpec | SegmentSpec) -> None:
    """Add a spec programmatically.

    Raises:
        SpecValidationError: if a spec with the same name is already present.
    """
    if isinstance(spec, MetricSpec):
        if spec.name in self._metrics:
            raise SpecValidationError(
                "MetricSpec",
                spec.name,
                [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
            )
        self._metrics[spec.name] = spec
    elif isinstance(spec, SliceSpec):
        if spec.name in self._slices:
            raise SpecValidationError(
                "SliceSpec",
                spec.name,
                [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
            )
        self._slices[spec.name] = spec
    elif isinstance(spec, SegmentSpec):
        if spec.name in self._segments:
            raise SpecValidationError(
                "SegmentSpec",
                spec.name,
                [ValidationError(field="name", message=f"Duplicate spec name '{spec.name}'")],
            )
        self._segments[spec.name] = spec

get_metric

get_metric(name: str) -> MetricSpec

Return MetricSpec for the given name.

Raises:

Type Description
SpecNotFoundError

if name not found

Source code in aitaem/specs/loader.py
def get_metric(self, name: str) -> MetricSpec:
    """Return MetricSpec for the given name.

    Raises:
        SpecNotFoundError: if name not found
    """
    if name not in self._metrics:
        raise SpecNotFoundError("metric", name, [])
    return self._metrics[name]

get_slice

get_slice(name: str) -> SliceSpec

Return SliceSpec for the given name.

Raises:

Type Description
SpecNotFoundError

if name not found

Source code in aitaem/specs/loader.py
def get_slice(self, name: str) -> SliceSpec:
    """Return SliceSpec for the given name.

    Raises:
        SpecNotFoundError: if name not found
    """
    if name not in self._slices:
        raise SpecNotFoundError("slice", name, [])
    return self._slices[name]

get_segment

get_segment(name: str) -> SegmentSpec

Return SegmentSpec for the given name.

Raises:

Type Description
SpecNotFoundError

if name not found

Source code in aitaem/specs/loader.py
def get_segment(self, name: str) -> SegmentSpec:
    """Return SegmentSpec for the given name.

    Raises:
        SpecNotFoundError: if name not found
    """
    if name not in self._segments:
        raise SpecNotFoundError("segment", name, [])
    return self._segments[name]

clear

clear() -> None

Clear all cached specs.

Source code in aitaem/specs/loader.py
def clear(self) -> None:
    """Clear all cached specs."""
    self._metrics = {}
    self._slices = {}
    self._segments = {}

Introspection

SpecCache exposes three read-only properties for iterating over loaded specs without needing get_metric() / get_slice() / get_segment() lookups:

cache = SpecCache.from_yaml(metric_paths="metrics/", slice_paths="slices/")

# Iterate names
for name in cache.metrics:
    print(name, cache.metrics[name].description)

# Access a spec directly
spec = cache.slices["geography"]

The returned Mapping is a live read-only view of the internal dict (MappingProxyType). Mutation attempts raise TypeError.

Spec name constraints

All spec names (MetricSpec, SliceSpec, SegmentSpec) must be valid SQL identifiers:

  • Match ^[A-Za-z_][A-Za-z0-9_]*$
  • Letters, digits, and underscores only
  • Must start with a letter or underscore (not a digit)

Names are validated at load time. Invalid names raise SpecValidationError with the offending name and a suggested replacement. This constraint exists because SliceSpec names are used as bare SQL column aliases (_slice_{name}), and all names are validated consistently for simplicity.

Duplicate name enforcement

All loading paths (from_yaml, from_string, add) raise SpecValidationError if a spec with the same name has already been loaded into the same cache. Uniqueness is enforced per spec type — a metric and a slice may share a name without conflict.


ValidationResult

ValidationResult is returned by MetricSpec.validate(), SliceSpec.validate(), SegmentSpec.validate(), and the lower-level validate_metric_spec() / validate_slice_spec() / validate_segment_spec() functions.

Field Type Description
valid bool True if the spec passed all validation checks
errors list[ValidationError] List of validation errors (empty when valid)
referenced_columns dict[str, list[str]] \| None Column map — see below

referenced_columns

Maps each spec field to the unqualified column names it references. Populated only when valid is True; None when the spec is invalid.

Warning

Always check result.valid before using result.referenced_columns. When the spec is invalid, the field is None — not an empty dict.

Keys for metric specs:

Key Source
"numerator" SQL expression (AST-parsed)
"denominator" SQL expression (AST-parsed), present only if the field is set
"timestamp_col" Plain string field
"entities" Plain list field, present only if the field is set

Keys for slice leaf specs:

Key Source
"values[i].where" SQL WHERE expression (AST-parsed), one key per value

Keys for wildcard slice specs:

Key Source
"where" The bare column name

Keys for composite slice specs: empty dict {} — no SQL expressions to extract from.

Keys for segment specs:

Key Source
"entity_id" Plain string field
"join_keys" Plain list field, present only when join_keys is non-empty
"values[i].where" SQL WHERE expression (AST-parsed), one key per value

Example — metric spec:

result = metric_spec.validate()
if result.valid:
    for field, columns in result.referenced_columns.items():
        print(f"{field}: {columns}")
    # numerator:     ['revenue']
    # denominator:   ['impressions']
    # timestamp_col: ['created_at']
    # entities:      ['user_id']

Example — slice spec:

result = slice_spec.validate()
if result.valid:
    print(result.referenced_columns)
    # {'values[0].where': ['region'], 'values[1].where': ['region', 'country']}

Column names are unqualified — for SUM(t.revenue) the extracted name is "revenue", not "t.revenue". This is sufficient for single-source specs where each MetricSpec.source points to one table.


Compatibility

CompatibilityResult and ScanResult are returned by MetricCompute.scan(). They carry the pre-flight compatibility verdict for every metric × slice and metric × segment pair loaded into a SpecCache.

CompatibilityResult

One result per metric × spec pair.

Field Type Description
metric_name str Name of the metric
spec_name str Name of the slice or segment
spec_type Literal["slice", "segment"] Which kind of spec this row covers
compatible bool True when the spec is usable with this metric
valid_join_keys list[str] Segment only — join-key candidates present in the metric's source table
missing_columns list[str] Columns (slices) or join-key candidates (segments) absent from the source table
reason str \| None Human-readable explanation when compatible is False; None when compatible is True

ScanResult

Container for the full compatibility matrix. The results tuple holds every CompatibilityResult in metric-declaration order.

Method Returns Description
compatible_slices(metric_name) list[str] Names of slices compatible with the given metric
compatible_segments(metric_name) list[str] Names of segments compatible with the given metric
compatible_metrics(spec_name) list[str] Metric names compatible with the given slice or segment
for_metric(metric_name) list[CompatibilityResult] All rows for the given metric
for_spec(spec_name) list[CompatibilityResult] All rows for the given slice or segment across all metrics

MetricSpec

aitaem.specs.metric.MetricSpec dataclass

Source code in aitaem/specs/metric.py
@dataclass(frozen=True)
class MetricSpec:
    name: str
    source: str
    numerator: str
    timestamp_col: str
    description: str = ""
    denominator: str | None = None
    entities: list[str] | None = None
    format: str | None = None

    @classmethod
    def from_yaml(cls, yaml_input: str | Path) -> "MetricSpec":
        """Load and validate a MetricSpec from a YAML file path or YAML string.

        If yaml_input is a Path or a string pointing to an existing file, it is
        read as a file. Otherwise, it is treated as a YAML string.

        Raises:
            SpecValidationError: if validation fails or YAML is malformed
            FileNotFoundError: if a Path is provided but the file does not exist
        """
        spec_dict = load_yaml_spec_dict(yaml_input, "metric")

        result = validate_metric_spec(spec_dict)
        name = spec_dict.get("name") if isinstance(spec_dict.get("name"), str) else None

        if not result.valid:
            raise SpecValidationError("metric", name, result.errors)

        denominator = spec_dict.get("denominator") or None
        entities_raw = spec_dict.get("entities")
        entities = list(entities_raw) if entities_raw else None

        unknown_fields = set(spec_dict.keys()) - {f.name for f in fields(cls)}
        if unknown_fields:
            logger.debug("MetricSpec '%s': ignoring unknown fields: %s", name, unknown_fields)

        return cls(
            name=spec_dict["name"],
            source=spec_dict["source"],
            numerator=spec_dict["numerator"],
            timestamp_col=spec_dict["timestamp_col"],
            description=spec_dict.get("description", ""),
            denominator=denominator,
            entities=entities,
            format=spec_dict.get("format") or None,
        )

    def validate(self) -> ValidationResult:
        """Validate spec fields and return a ValidationResult (does not raise)."""
        spec_dict: dict[str, object] = {
            "name": self.name,
            "source": self.source,
            "numerator": self.numerator,
            "timestamp_col": self.timestamp_col,
            "description": self.description,
        }
        if self.denominator is not None:
            spec_dict["denominator"] = self.denominator
        if self.entities is not None:
            spec_dict["entities"] = self.entities
        if self.format is not None:
            spec_dict["format"] = self.format
        return validate_metric_spec(spec_dict)

from_yaml classmethod

from_yaml(yaml_input: str | Path) -> 'MetricSpec'

Load and validate a MetricSpec from a YAML file path or YAML string.

If yaml_input is a Path or a string pointing to an existing file, it is read as a file. Otherwise, it is treated as a YAML string.

Raises:

Type Description
SpecValidationError

if validation fails or YAML is malformed

FileNotFoundError

if a Path is provided but the file does not exist

Source code in aitaem/specs/metric.py
@classmethod
def from_yaml(cls, yaml_input: str | Path) -> "MetricSpec":
    """Load and validate a MetricSpec from a YAML file path or YAML string.

    If yaml_input is a Path or a string pointing to an existing file, it is
    read as a file. Otherwise, it is treated as a YAML string.

    Raises:
        SpecValidationError: if validation fails or YAML is malformed
        FileNotFoundError: if a Path is provided but the file does not exist
    """
    spec_dict = load_yaml_spec_dict(yaml_input, "metric")

    result = validate_metric_spec(spec_dict)
    name = spec_dict.get("name") if isinstance(spec_dict.get("name"), str) else None

    if not result.valid:
        raise SpecValidationError("metric", name, result.errors)

    denominator = spec_dict.get("denominator") or None
    entities_raw = spec_dict.get("entities")
    entities = list(entities_raw) if entities_raw else None

    unknown_fields = set(spec_dict.keys()) - {f.name for f in fields(cls)}
    if unknown_fields:
        logger.debug("MetricSpec '%s': ignoring unknown fields: %s", name, unknown_fields)

    return cls(
        name=spec_dict["name"],
        source=spec_dict["source"],
        numerator=spec_dict["numerator"],
        timestamp_col=spec_dict["timestamp_col"],
        description=spec_dict.get("description", ""),
        denominator=denominator,
        entities=entities,
        format=spec_dict.get("format") or None,
    )

validate

validate() -> ValidationResult

Validate spec fields and return a ValidationResult (does not raise).

Source code in aitaem/specs/metric.py
def validate(self) -> ValidationResult:
    """Validate spec fields and return a ValidationResult (does not raise)."""
    spec_dict: dict[str, object] = {
        "name": self.name,
        "source": self.source,
        "numerator": self.numerator,
        "timestamp_col": self.timestamp_col,
        "description": self.description,
    }
    if self.denominator is not None:
        spec_dict["denominator"] = self.denominator
    if self.entities is not None:
        spec_dict["entities"] = self.entities
    if self.format is not None:
        spec_dict["format"] = self.format
    return validate_metric_spec(spec_dict)

SliceSpec

aitaem.specs.slice.SliceSpec dataclass

Source code in aitaem/specs/slice.py
@dataclass(frozen=True)
class SliceSpec:
    name: str
    values: tuple[SliceValue, ...] = ()  # Leaf spec — direct WHERE-based values
    cross_product: tuple[str, ...] = ()  # Composite spec — names of other SliceSpecs
    column: str = ""  # Wildcard spec — bare column name
    description: str = ""

    @property
    def is_composite(self) -> bool:
        """True if this spec references other SliceSpecs via cross_product."""
        return bool(self.cross_product)

    @property
    def is_wildcard(self) -> bool:
        """True if this spec auto-discovers values from a column at query time."""
        return bool(self.column)

    @classmethod
    def from_yaml(cls, yaml_input: str | Path) -> "SliceSpec":
        """Load and validate a SliceSpec from a YAML file path or YAML string.

        Expects top-level key 'slice:'.

        Raises:
            SpecValidationError: if validation fails or YAML is malformed
            FileNotFoundError: if a Path is provided but the file does not exist
        """
        spec_dict = load_yaml_spec_dict(yaml_input, "slice")

        result = validate_slice_spec(spec_dict)
        name = spec_dict.get("name") if isinstance(spec_dict.get("name"), str) else None

        if not result.valid:
            raise SpecValidationError("slice", name, result.errors)

        raw_cross_product = spec_dict.get("cross_product")
        raw_where = spec_dict.get("where")
        values: tuple[SliceValue, ...] = ()
        cross_product: tuple[str, ...] = ()
        column: str = ""
        if raw_cross_product is not None:
            # Composite spec
            cross_product = tuple(raw_cross_product)
        elif raw_where is not None:
            # Wildcard spec
            column = str(raw_where)
        else:
            # Leaf spec
            values = tuple(
                SliceValue(name=v["name"], where=v["where"]) for v in spec_dict["values"]
            )

        unknown_fields = set(spec_dict.keys()) - {
            "name",
            "values",
            "cross_product",
            "where",
            "description",
        }
        if unknown_fields:
            logger.debug("SliceSpec '%s': ignoring unknown fields: %s", name, unknown_fields)

        return cls(
            name=spec_dict["name"],
            values=values,
            cross_product=cross_product,
            column=column,
            description=spec_dict.get("description", ""),
        )

    def validate(self) -> ValidationResult:
        """Validate spec fields and return a ValidationResult (does not raise)."""
        spec_dict: dict = {"name": self.name, "description": self.description}
        if self.is_composite:
            spec_dict["cross_product"] = list(self.cross_product)
        elif self.is_wildcard:
            spec_dict["where"] = self.column
        else:
            spec_dict["values"] = [{"name": v.name, "where": v.where} for v in self.values]
        return validate_slice_spec(spec_dict)

is_composite property

is_composite: bool

True if this spec references other SliceSpecs via cross_product.

is_wildcard property

is_wildcard: bool

True if this spec auto-discovers values from a column at query time.

from_yaml classmethod

from_yaml(yaml_input: str | Path) -> 'SliceSpec'

Load and validate a SliceSpec from a YAML file path or YAML string.

Expects top-level key 'slice:'.

Raises:

Type Description
SpecValidationError

if validation fails or YAML is malformed

FileNotFoundError

if a Path is provided but the file does not exist

Source code in aitaem/specs/slice.py
@classmethod
def from_yaml(cls, yaml_input: str | Path) -> "SliceSpec":
    """Load and validate a SliceSpec from a YAML file path or YAML string.

    Expects top-level key 'slice:'.

    Raises:
        SpecValidationError: if validation fails or YAML is malformed
        FileNotFoundError: if a Path is provided but the file does not exist
    """
    spec_dict = load_yaml_spec_dict(yaml_input, "slice")

    result = validate_slice_spec(spec_dict)
    name = spec_dict.get("name") if isinstance(spec_dict.get("name"), str) else None

    if not result.valid:
        raise SpecValidationError("slice", name, result.errors)

    raw_cross_product = spec_dict.get("cross_product")
    raw_where = spec_dict.get("where")
    values: tuple[SliceValue, ...] = ()
    cross_product: tuple[str, ...] = ()
    column: str = ""
    if raw_cross_product is not None:
        # Composite spec
        cross_product = tuple(raw_cross_product)
    elif raw_where is not None:
        # Wildcard spec
        column = str(raw_where)
    else:
        # Leaf spec
        values = tuple(
            SliceValue(name=v["name"], where=v["where"]) for v in spec_dict["values"]
        )

    unknown_fields = set(spec_dict.keys()) - {
        "name",
        "values",
        "cross_product",
        "where",
        "description",
    }
    if unknown_fields:
        logger.debug("SliceSpec '%s': ignoring unknown fields: %s", name, unknown_fields)

    return cls(
        name=spec_dict["name"],
        values=values,
        cross_product=cross_product,
        column=column,
        description=spec_dict.get("description", ""),
    )

validate

validate() -> ValidationResult

Validate spec fields and return a ValidationResult (does not raise).

Source code in aitaem/specs/slice.py
def validate(self) -> ValidationResult:
    """Validate spec fields and return a ValidationResult (does not raise)."""
    spec_dict: dict = {"name": self.name, "description": self.description}
    if self.is_composite:
        spec_dict["cross_product"] = list(self.cross_product)
    elif self.is_wildcard:
        spec_dict["where"] = self.column
    else:
        spec_dict["values"] = [{"name": v.name, "where": v.where} for v in self.values]
    return validate_slice_spec(spec_dict)

SliceValue

aitaem.specs.slice.SliceValue dataclass

Source code in aitaem/specs/slice.py
@dataclass(frozen=True)
class SliceValue:
    name: str
    where: str

SegmentSpec

aitaem.specs.segment.SegmentSpec dataclass

Source code in aitaem/specs/segment.py
@dataclass(frozen=True)
class SegmentSpec:
    name: str
    source: str
    entity_id: str
    values: tuple[SegmentValue, ...]
    description: str = ""
    join_keys: tuple[str, ...] = ()

    @classmethod
    def from_yaml(cls, yaml_input: str | Path) -> "SegmentSpec":
        """Load and validate a SegmentSpec from a YAML file path or YAML string.

        Expects top-level key 'segment:'.

        Raises:
            SpecValidationError: if validation fails or YAML is malformed
            FileNotFoundError: if a Path is provided but the file does not exist
        """
        spec_dict = load_yaml_spec_dict(yaml_input, "segment")

        result = validate_segment_spec(spec_dict)
        name = spec_dict.get("name") if isinstance(spec_dict.get("name"), str) else None

        if not result.valid:
            raise SpecValidationError("segment", name, result.errors)

        values = tuple(SegmentValue(name=v["name"], where=v["where"]) for v in spec_dict["values"])
        raw_join_keys = spec_dict.get("join_keys") or []
        join_keys = tuple(raw_join_keys) if isinstance(raw_join_keys, list) else ()

        unknown_fields = set(spec_dict.keys()) - {
            "name",
            "source",
            "entity_id",
            "values",
            "description",
            "join_keys",
        }
        if unknown_fields:
            logger.debug("SegmentSpec '%s': ignoring unknown fields: %s", name, unknown_fields)

        return cls(
            name=spec_dict["name"],
            source=spec_dict["source"],
            entity_id=spec_dict["entity_id"],
            values=values,
            description=spec_dict.get("description", ""),
            join_keys=join_keys,
        )

    def validate(self) -> ValidationResult:
        """Validate spec fields and return a ValidationResult (does not raise)."""
        spec_dict: dict = {
            "name": self.name,
            "source": self.source,
            "entity_id": self.entity_id,
            "values": [{"name": v.name, "where": v.where} for v in self.values],
            "description": self.description,
        }
        if self.join_keys:
            spec_dict["join_keys"] = list(self.join_keys)
        return validate_segment_spec(spec_dict)

from_yaml classmethod

from_yaml(yaml_input: str | Path) -> 'SegmentSpec'

Load and validate a SegmentSpec from a YAML file path or YAML string.

Expects top-level key 'segment:'.

Raises:

Type Description
SpecValidationError

if validation fails or YAML is malformed

FileNotFoundError

if a Path is provided but the file does not exist

Source code in aitaem/specs/segment.py
@classmethod
def from_yaml(cls, yaml_input: str | Path) -> "SegmentSpec":
    """Load and validate a SegmentSpec from a YAML file path or YAML string.

    Expects top-level key 'segment:'.

    Raises:
        SpecValidationError: if validation fails or YAML is malformed
        FileNotFoundError: if a Path is provided but the file does not exist
    """
    spec_dict = load_yaml_spec_dict(yaml_input, "segment")

    result = validate_segment_spec(spec_dict)
    name = spec_dict.get("name") if isinstance(spec_dict.get("name"), str) else None

    if not result.valid:
        raise SpecValidationError("segment", name, result.errors)

    values = tuple(SegmentValue(name=v["name"], where=v["where"]) for v in spec_dict["values"])
    raw_join_keys = spec_dict.get("join_keys") or []
    join_keys = tuple(raw_join_keys) if isinstance(raw_join_keys, list) else ()

    unknown_fields = set(spec_dict.keys()) - {
        "name",
        "source",
        "entity_id",
        "values",
        "description",
        "join_keys",
    }
    if unknown_fields:
        logger.debug("SegmentSpec '%s': ignoring unknown fields: %s", name, unknown_fields)

    return cls(
        name=spec_dict["name"],
        source=spec_dict["source"],
        entity_id=spec_dict["entity_id"],
        values=values,
        description=spec_dict.get("description", ""),
        join_keys=join_keys,
    )

validate

validate() -> ValidationResult

Validate spec fields and return a ValidationResult (does not raise).

Source code in aitaem/specs/segment.py
def validate(self) -> ValidationResult:
    """Validate spec fields and return a ValidationResult (does not raise)."""
    spec_dict: dict = {
        "name": self.name,
        "source": self.source,
        "entity_id": self.entity_id,
        "values": [{"name": v.name, "where": v.where} for v in self.values],
        "description": self.description,
    }
    if self.join_keys:
        spec_dict["join_keys"] = list(self.join_keys)
    return validate_segment_spec(spec_dict)

SegmentValue

aitaem.specs.segment.SegmentValue dataclass

Source code in aitaem/specs/segment.py
@dataclass(frozen=True)
class SegmentValue:
    name: str
    where: str