Skip to content

Connectors API

ConnectionManager

aitaem.connectors.connection.ConnectionManager

Manages multiple backend connections and routes queries.

Responsibilities
  • Load connections from YAML configuration
  • Environment variable substitution
  • Store and retrieve connectors by backend type
  • Parse source URIs and route to appropriate connector
Source code in aitaem/connectors/connection.py
class ConnectionManager:
    """Manages multiple backend connections and routes queries.

    Responsibilities:
        - Load connections from YAML configuration
        - Environment variable substitution
        - Store and retrieve connectors by backend type
        - Parse source URIs and route to appropriate connector
    """

    def __init__(self) -> None:
        """Initialize empty connection manager."""
        self._connections: dict[str, IbisConnector] = {}

    @classmethod
    def from_yaml(cls, yaml_path: str) -> "ConnectionManager":
        """Load all connections from YAML file.

        Args:
            yaml_path: Path to YAML configuration file

        Returns:
            ConnectionManager instance with loaded connections

        Raises:
            FileNotFoundError: If YAML file doesn't exist
            ConfigurationError: If YAML is invalid or missing required fields
        """
        yaml_file = Path(yaml_path)
        if not yaml_file.exists():
            raise FileNotFoundError(
                f"Connection configuration file not found: {yaml_path}\n\n"
                "Check that the file path is correct and the file exists."
            )

        try:
            with open(yaml_file, "r") as f:
                config = yaml.safe_load(f)
        except yaml.YAMLError as e:
            raise ConfigurationError(
                f"Invalid YAML syntax in {yaml_path}:\n{str(e)}\n\n"
                "Fix the YAML syntax and try again."
            ) from e

        if config is None:
            config = {}

        manager = cls()

        # Process each backend configuration
        for backend_type, backend_config in config.items():
            if not isinstance(backend_config, dict):
                raise ConfigurationError(
                    f"Invalid configuration for backend '{backend_type}' in {yaml_path}\n"
                    "Expected a mapping (key-value pairs)."
                )

            # Substitute environment variables in all config values
            substituted_config = manager._substitute_env_vars_in_dict(backend_config, yaml_path)

            try:
                manager.add_connection(backend_type, **substituted_config)
            except Exception as e:
                if isinstance(e, (ConfigurationError, UnsupportedBackendError)):
                    raise
                raise ConfigurationError(
                    f"Failed to create connection for backend '{backend_type}': {str(e)}"
                ) from e

        return manager

    def _substitute_env_vars_in_dict(
        self, config: dict[str, Any], yaml_path: str
    ) -> dict[str, Any]:
        """Recursively substitute environment variables in config dictionary.

        Args:
            config: Configuration dictionary
            yaml_path: Path to YAML file (for error messages)

        Returns:
            Dictionary with environment variables substituted

        Raises:
            ConfigurationError: If referenced environment variable is not set
        """
        result: dict[str, Any] = {}
        for key, value in config.items():
            if isinstance(value, str):
                result[key] = self._substitute_env_vars(value, yaml_path)
            elif isinstance(value, dict):
                result[key] = self._substitute_env_vars_in_dict(value, yaml_path)
            else:
                result[key] = value
        return result

    def _substitute_env_vars(self, value: str, yaml_path: str) -> str:
        """Replace ${VAR_NAME} with environment variable values.

        Args:
            value: String potentially containing ${VAR_NAME} patterns
            yaml_path: Path to YAML file (for error messages)

        Returns:
            String with environment variables substituted

        Raises:
            ConfigurationError: If environment variable is not set
        """
        pattern = r"\$\{([^}]+)\}"

        def replace_var(match: re.Match) -> str:
            var_name = match.group(1)
            var_value = os.environ.get(var_name)
            if var_value is None:
                raise ConfigurationError(
                    f"Environment variable '{var_name}' referenced in {yaml_path} but not set\n\n"
                    f"Set the environment variable:\n"
                    f"  export {var_name}=your-value"
                )
            return var_value

        return re.sub(pattern, replace_var, value)

    def add_connection(
        self, backend_type: str, connector: "IbisConnector | None" = None, **config: Any
    ) -> None:
        """Add single connection by creating and storing IbisConnector.

        Args:
            backend_type: Backend type ('duckdb', 'bigquery', etc.)
            connector: Optional pre-existing IbisConnector instance. If provided,
                it is stored directly without creating a new connection.
            **config: Backend-specific configuration (used only when connector is None)
                - DuckDB: path (str), read_only (bool, optional)
                - BigQuery: project_id (str), dataset_id (str, optional)

        Raises:
            ConfigurationError: If a connection for backend_type already exists,
                or if required config fields are missing.
            UnsupportedBackendError: If backend_type is not supported
            ValueError: If configuration is invalid
        """
        if backend_type in self._connections:
            raise ConfigurationError(
                f"A connection for backend '{backend_type}' already exists.\n\n"
                "Close the existing connection before adding a new one:\n"
                "  manager.get_connection('{backend_type}').close()"
            )

        if connector is not None:
            self._connections[backend_type] = connector
            return

        # Validate required fields using backend spec (raises ConfigurationError if invalid)
        validate_backend_config(backend_type, config)

        # Create connector
        try:
            new_connector = IbisConnector(backend_type)
        except UnsupportedBackendError:
            raise

        # Connect using appropriate method
        if backend_type == "duckdb":
            path = config.pop("path")
            new_connector.connect(path, **config)
        elif backend_type == "bigquery":
            new_connector.connect(**config)
        elif backend_type == "postgres":
            new_connector.connect(**config)

        # Store connector
        self._connections[backend_type] = new_connector

    def get_connection(self, backend_type: str) -> IbisConnector:
        """Get connector by backend type.

        Args:
            backend_type: Backend type ('duckdb', 'bigquery', etc.)

        Returns:
            IbisConnector for the specified backend

        Raises:
            ConnectionNotFoundError: If backend not configured
        """
        if backend_type not in self._connections:
            raise ConnectionNotFoundError(
                f"No connection configured for backend '{backend_type}'\n\n"
                "Add the connection to your connections.yaml:\n"
                f"  {backend_type}:\n"
                "    # backend-specific config\n\n"
                "Or call add_connection():\n"
                f"  manager.add_connection('{backend_type}', ...)"
            )
        return self._connections[backend_type]

    def get_connection_for_source(self, source_uri: str) -> IbisConnector:
        """Parse URI and return appropriate connector.

        Args:
            source_uri: Source URI (e.g., 'duckdb://analytics.db/events')

        Returns:
            IbisConnector for the backend specified in URI

        Raises:
            InvalidURIError: If URI is malformed
            ConnectionNotFoundError: If backend not configured
        """
        backend_type, _, _ = self.parse_source_uri(source_uri)
        return self.get_connection(backend_type)

    @staticmethod
    def parse_source_uri(uri: str) -> tuple[str, str, str]:
        """Parse source URI into (backend, database, table).

        Format: backend://database_identifier/table_name

        DuckDB Examples:
            - 'duckdb://analytics.db/events' → ('duckdb', 'analytics.db', 'events')
            - 'duckdb://:memory:/events' → ('duckdb', ':memory:', 'events')
            - 'duckdb:///abs/path/db/events' → ('duckdb', '/abs/path/db', 'events')

        BigQuery Examples:
            - 'bigquery://my-project.dataset.table' → ('bigquery', 'my-project', 'dataset.table')
            - 'bigquery://project/dataset.table' → ('bigquery', 'project', 'dataset.table')

        Args:
            uri: Source URI

        Returns:
            Tuple of (backend_type, database_identifier, table_name)

        Raises:
            InvalidURIError: If URI is malformed
        """
        # Parse URI
        parsed = urlparse(uri)

        # Extract backend type (scheme)
        backend_type = parsed.scheme
        if not backend_type:
            raise InvalidURIError(
                f"Missing backend type in URI: '{uri}'\n\n"
                "URI must start with backend type:\n"
                "  duckdb://analytics.db/events\n"
                "  bigquery://project.dataset.table"
            )

        # Combine netloc and path
        full_path = parsed.netloc + parsed.path

        if not full_path:
            raise InvalidURIError(
                f"Empty path in URI: '{uri}'\n\nURI must include database and table."
            )

        # Backend-specific parsing
        if backend_type == "duckdb":
            return ConnectionManager._parse_duckdb_uri(uri, full_path)
        elif backend_type == "bigquery":
            return ConnectionManager._parse_bigquery_uri(uri, full_path)
        elif backend_type == "postgres":
            return ConnectionManager._parse_postgres_uri(uri, full_path)
        else:
            # For unknown backends, try generic parsing
            # This will fail later in get_connection with UnsupportedBackendError
            if "/" not in full_path:
                raise InvalidURIError(f"Missing table separator '/' in URI: '{uri}'")
            last_slash = full_path.rfind("/")
            database = full_path[:last_slash]
            table = full_path[last_slash + 1 :]
            if not table:
                raise InvalidURIError(
                    f"Empty table name in URI: '{uri}'\n\nURI must include table name."
                )
            return (backend_type, database, table)

    @staticmethod
    def _parse_duckdb_uri(uri: str, full_path: str) -> tuple[str, str, str]:
        """Parse DuckDB URI.

        Args:
            uri: Original URI (for error messages)
            full_path: Combined netloc + path

        Returns:
            Tuple of ('duckdb', database_path, table_name)

        Raises:
            InvalidURIError: If URI is malformed
        """
        if "/" not in full_path:
            raise InvalidURIError(
                f"Missing table separator '/' in URI: '{uri}'\n\n"
                "DuckDB URI format:\n"
                "  duckdb://database.db/table_name"
            )

        # Find LAST '/' to separate database from table
        last_slash = full_path.rfind("/")
        database = full_path[:last_slash]
        table = full_path[last_slash + 1 :]

        if not table:
            raise InvalidURIError(
                f"Empty table name in URI: '{uri}'\n\n"
                "URI must include table name:\n"
                "  duckdb://analytics.db/events"
            )

        return ("duckdb", database, table)

    @staticmethod
    def _parse_bigquery_uri(uri: str, full_path: str) -> tuple[str, str, str]:
        """Parse BigQuery URI.

        Args:
            uri: Original URI (for error messages)
            full_path: Combined netloc + path

        Returns:
            Tuple of ('bigquery', project_id, 'dataset.table')

        Raises:
            InvalidURIError: If URI is malformed or has <3 parts
        """
        # Normalize: replace all '/' with '.'
        normalized = full_path.replace("/", ".")

        # Split on '.'
        parts = normalized.split(".")

        if len(parts) < 3:
            raise InvalidURIError(
                f"BigQuery URI must have at least 3 parts (project.dataset.table): '{uri}'\n\n"
                "Valid formats:\n"
                "  bigquery://project.dataset.table\n"
                "  bigquery://project/dataset.table"
            )

        # Extract project (first part) and table (everything else as dataset.table)
        project = parts[0]
        table = ".".join(parts[1:])

        return ("bigquery", project, table)

    @staticmethod
    def _parse_postgres_uri(uri: str, full_path: str) -> tuple[str, str, str]:
        """Parse Postgres URI.

        Format: postgres://schema/table  or  postgres:///table (no schema)

        Args:
            uri: Original URI (for error messages)
            full_path: Combined netloc + path

        Returns:
            Tuple of ('postgres', schema, table_name)

        Raises:
            InvalidURIError: If URI is malformed
        """
        if "/" not in full_path:
            raise InvalidURIError(
                f"Missing table separator '/' in URI: '{uri}'\n\n"
                "Postgres URI format:\n"
                "  postgres://public/events\n"
                "  postgres:///events  (default schema)"
            )
        last_slash = full_path.rfind("/")
        schema = full_path[:last_slash]
        table = full_path[last_slash + 1 :]
        if not table:
            raise InvalidURIError(
                f"Empty table name in URI: '{uri}'\n\n"
                "URI must include table name:\n"
                "  postgres://public/events"
            )
        return ("postgres", schema, table)

    def close_all(self) -> None:
        """Close all connections."""
        for connector in self._connections.values():
            connector.close()
        self._connections.clear()

    def __repr__(self) -> str:
        """Return string representation of manager."""
        backends = list(self._connections.keys())
        return f"ConnectionManager(backends={backends})"

__init__

__init__() -> None

Initialize empty connection manager.

Source code in aitaem/connectors/connection.py
def __init__(self) -> None:
    """Initialize empty connection manager."""
    self._connections: dict[str, IbisConnector] = {}

from_yaml classmethod

from_yaml(yaml_path: str) -> ConnectionManager

Load all connections from YAML file.

Parameters:

Name Type Description Default
yaml_path str

Path to YAML configuration file

required

Returns:

Type Description
ConnectionManager

ConnectionManager instance with loaded connections

Raises:

Type Description
FileNotFoundError

If YAML file doesn't exist

ConfigurationError

If YAML is invalid or missing required fields

Source code in aitaem/connectors/connection.py
@classmethod
def from_yaml(cls, yaml_path: str) -> "ConnectionManager":
    """Load all connections from YAML file.

    Args:
        yaml_path: Path to YAML configuration file

    Returns:
        ConnectionManager instance with loaded connections

    Raises:
        FileNotFoundError: If YAML file doesn't exist
        ConfigurationError: If YAML is invalid or missing required fields
    """
    yaml_file = Path(yaml_path)
    if not yaml_file.exists():
        raise FileNotFoundError(
            f"Connection configuration file not found: {yaml_path}\n\n"
            "Check that the file path is correct and the file exists."
        )

    try:
        with open(yaml_file, "r") as f:
            config = yaml.safe_load(f)
    except yaml.YAMLError as e:
        raise ConfigurationError(
            f"Invalid YAML syntax in {yaml_path}:\n{str(e)}\n\n"
            "Fix the YAML syntax and try again."
        ) from e

    if config is None:
        config = {}

    manager = cls()

    # Process each backend configuration
    for backend_type, backend_config in config.items():
        if not isinstance(backend_config, dict):
            raise ConfigurationError(
                f"Invalid configuration for backend '{backend_type}' in {yaml_path}\n"
                "Expected a mapping (key-value pairs)."
            )

        # Substitute environment variables in all config values
        substituted_config = manager._substitute_env_vars_in_dict(backend_config, yaml_path)

        try:
            manager.add_connection(backend_type, **substituted_config)
        except Exception as e:
            if isinstance(e, (ConfigurationError, UnsupportedBackendError)):
                raise
            raise ConfigurationError(
                f"Failed to create connection for backend '{backend_type}': {str(e)}"
            ) from e

    return manager

add_connection

add_connection(backend_type: str, connector: IbisConnector | None = None, **config: Any) -> None

Add single connection by creating and storing IbisConnector.

Parameters:

Name Type Description Default
backend_type str

Backend type ('duckdb', 'bigquery', etc.)

required
connector IbisConnector | None

Optional pre-existing IbisConnector instance. If provided, it is stored directly without creating a new connection.

None
**config Any

Backend-specific configuration (used only when connector is None) - DuckDB: path (str), read_only (bool, optional) - BigQuery: project_id (str), dataset_id (str, optional)

{}

Raises:

Type Description
ConfigurationError

If a connection for backend_type already exists, or if required config fields are missing.

UnsupportedBackendError

If backend_type is not supported

ValueError

If configuration is invalid

Source code in aitaem/connectors/connection.py
def add_connection(
    self, backend_type: str, connector: "IbisConnector | None" = None, **config: Any
) -> None:
    """Add single connection by creating and storing IbisConnector.

    Args:
        backend_type: Backend type ('duckdb', 'bigquery', etc.)
        connector: Optional pre-existing IbisConnector instance. If provided,
            it is stored directly without creating a new connection.
        **config: Backend-specific configuration (used only when connector is None)
            - DuckDB: path (str), read_only (bool, optional)
            - BigQuery: project_id (str), dataset_id (str, optional)

    Raises:
        ConfigurationError: If a connection for backend_type already exists,
            or if required config fields are missing.
        UnsupportedBackendError: If backend_type is not supported
        ValueError: If configuration is invalid
    """
    if backend_type in self._connections:
        raise ConfigurationError(
            f"A connection for backend '{backend_type}' already exists.\n\n"
            "Close the existing connection before adding a new one:\n"
            "  manager.get_connection('{backend_type}').close()"
        )

    if connector is not None:
        self._connections[backend_type] = connector
        return

    # Validate required fields using backend spec (raises ConfigurationError if invalid)
    validate_backend_config(backend_type, config)

    # Create connector
    try:
        new_connector = IbisConnector(backend_type)
    except UnsupportedBackendError:
        raise

    # Connect using appropriate method
    if backend_type == "duckdb":
        path = config.pop("path")
        new_connector.connect(path, **config)
    elif backend_type == "bigquery":
        new_connector.connect(**config)
    elif backend_type == "postgres":
        new_connector.connect(**config)

    # Store connector
    self._connections[backend_type] = new_connector

get_connection

get_connection(backend_type: str) -> IbisConnector

Get connector by backend type.

Parameters:

Name Type Description Default
backend_type str

Backend type ('duckdb', 'bigquery', etc.)

required

Returns:

Type Description
IbisConnector

IbisConnector for the specified backend

Raises:

Type Description
ConnectionNotFoundError

If backend not configured

Source code in aitaem/connectors/connection.py
def get_connection(self, backend_type: str) -> IbisConnector:
    """Get connector by backend type.

    Args:
        backend_type: Backend type ('duckdb', 'bigquery', etc.)

    Returns:
        IbisConnector for the specified backend

    Raises:
        ConnectionNotFoundError: If backend not configured
    """
    if backend_type not in self._connections:
        raise ConnectionNotFoundError(
            f"No connection configured for backend '{backend_type}'\n\n"
            "Add the connection to your connections.yaml:\n"
            f"  {backend_type}:\n"
            "    # backend-specific config\n\n"
            "Or call add_connection():\n"
            f"  manager.add_connection('{backend_type}', ...)"
        )
    return self._connections[backend_type]

get_connection_for_source

get_connection_for_source(source_uri: str) -> IbisConnector

Parse URI and return appropriate connector.

Parameters:

Name Type Description Default
source_uri str

Source URI (e.g., 'duckdb://analytics.db/events')

required

Returns:

Type Description
IbisConnector

IbisConnector for the backend specified in URI

Raises:

Type Description
InvalidURIError

If URI is malformed

ConnectionNotFoundError

If backend not configured

Source code in aitaem/connectors/connection.py
def get_connection_for_source(self, source_uri: str) -> IbisConnector:
    """Parse URI and return appropriate connector.

    Args:
        source_uri: Source URI (e.g., 'duckdb://analytics.db/events')

    Returns:
        IbisConnector for the backend specified in URI

    Raises:
        InvalidURIError: If URI is malformed
        ConnectionNotFoundError: If backend not configured
    """
    backend_type, _, _ = self.parse_source_uri(source_uri)
    return self.get_connection(backend_type)

parse_source_uri staticmethod

parse_source_uri(uri: str) -> tuple[str, str, str]

Parse source URI into (backend, database, table).

Format: backend://database_identifier/table_name

DuckDB Examples
  • 'duckdb://analytics.db/events' → ('duckdb', 'analytics.db', 'events')
  • 'duckdb://:memory:/events' → ('duckdb', ':memory:', 'events')
  • 'duckdb:///abs/path/db/events' → ('duckdb', '/abs/path/db', 'events')
BigQuery Examples
  • 'bigquery://my-project.dataset.table' → ('bigquery', 'my-project', 'dataset.table')
  • 'bigquery://project/dataset.table' → ('bigquery', 'project', 'dataset.table')

Parameters:

Name Type Description Default
uri str

Source URI

required

Returns:

Type Description
tuple[str, str, str]

Tuple of (backend_type, database_identifier, table_name)

Raises:

Type Description
InvalidURIError

If URI is malformed

Source code in aitaem/connectors/connection.py
@staticmethod
def parse_source_uri(uri: str) -> tuple[str, str, str]:
    """Parse source URI into (backend, database, table).

    Format: backend://database_identifier/table_name

    DuckDB Examples:
        - 'duckdb://analytics.db/events' → ('duckdb', 'analytics.db', 'events')
        - 'duckdb://:memory:/events' → ('duckdb', ':memory:', 'events')
        - 'duckdb:///abs/path/db/events' → ('duckdb', '/abs/path/db', 'events')

    BigQuery Examples:
        - 'bigquery://my-project.dataset.table' → ('bigquery', 'my-project', 'dataset.table')
        - 'bigquery://project/dataset.table' → ('bigquery', 'project', 'dataset.table')

    Args:
        uri: Source URI

    Returns:
        Tuple of (backend_type, database_identifier, table_name)

    Raises:
        InvalidURIError: If URI is malformed
    """
    # Parse URI
    parsed = urlparse(uri)

    # Extract backend type (scheme)
    backend_type = parsed.scheme
    if not backend_type:
        raise InvalidURIError(
            f"Missing backend type in URI: '{uri}'\n\n"
            "URI must start with backend type:\n"
            "  duckdb://analytics.db/events\n"
            "  bigquery://project.dataset.table"
        )

    # Combine netloc and path
    full_path = parsed.netloc + parsed.path

    if not full_path:
        raise InvalidURIError(
            f"Empty path in URI: '{uri}'\n\nURI must include database and table."
        )

    # Backend-specific parsing
    if backend_type == "duckdb":
        return ConnectionManager._parse_duckdb_uri(uri, full_path)
    elif backend_type == "bigquery":
        return ConnectionManager._parse_bigquery_uri(uri, full_path)
    elif backend_type == "postgres":
        return ConnectionManager._parse_postgres_uri(uri, full_path)
    else:
        # For unknown backends, try generic parsing
        # This will fail later in get_connection with UnsupportedBackendError
        if "/" not in full_path:
            raise InvalidURIError(f"Missing table separator '/' in URI: '{uri}'")
        last_slash = full_path.rfind("/")
        database = full_path[:last_slash]
        table = full_path[last_slash + 1 :]
        if not table:
            raise InvalidURIError(
                f"Empty table name in URI: '{uri}'\n\nURI must include table name."
            )
        return (backend_type, database, table)

close_all

close_all() -> None

Close all connections.

Source code in aitaem/connectors/connection.py
def close_all(self) -> None:
    """Close all connections."""
    for connector in self._connections.values():
        connector.close()
    self._connections.clear()

__repr__

__repr__() -> str

Return string representation of manager.

Source code in aitaem/connectors/connection.py
def __repr__(self) -> str:
    """Return string representation of manager."""
    backends = list(self._connections.keys())
    return f"ConnectionManager(backends={backends})"

Backend Configuration

Each backend type has a corresponding configuration dataclass. These are the authoritative definitions of what fields each backend accepts — required fields have no default, optional fields do.

ConnectionManager.add_connection() and ConnectionManager.from_yaml() both validate the supplied config against the relevant dataclass and raise ConfigurationError with a clear message and YAML snippet if a required field is missing.

DuckDB

aitaem.connectors.backend_specs.DuckDBConfig dataclass

Connection configuration for a DuckDB backend.

Attributes:

Name Type Description
path str

File path to the DuckDB database, or ':memory:' for an in-process, ephemeral database. Required.

read_only bool

Open the database in read-only mode. Defaults to False.

Source code in aitaem/connectors/backend_specs.py
@dataclass
class DuckDBConfig:
    """Connection configuration for a DuckDB backend.

    Attributes:
        path: File path to the DuckDB database, or `':memory:'` for an
            in-process, ephemeral database. **Required.**
        read_only: Open the database in read-only mode. Defaults to `False`.
    """

    path: str
    read_only: bool = False

BigQuery

aitaem.connectors.backend_specs.BigQueryConfig dataclass

Connection configuration for a BigQuery backend.

Authentication uses Application Default Credentials (ADC). Run gcloud auth application-default login or set the GOOGLE_APPLICATION_CREDENTIALS environment variable before connecting.

Attributes:

Name Type Description
project_id str

GCP project ID that owns the BigQuery datasets. Required.

dataset_id str | None

Default dataset used when a table name is not fully-qualified. Optional.

Source code in aitaem/connectors/backend_specs.py
@dataclass
class BigQueryConfig:
    """Connection configuration for a BigQuery backend.

    Authentication uses Application Default Credentials (ADC). Run
    `gcloud auth application-default login` or set the
    `GOOGLE_APPLICATION_CREDENTIALS` environment variable before connecting.

    Attributes:
        project_id: GCP project ID that owns the BigQuery datasets. **Required.**
        dataset_id: Default dataset used when a table name is not
            fully-qualified. Optional.
    """

    project_id: str
    dataset_id: str | None = None

PostgreSQL

aitaem.connectors.backend_specs.PostgresConfig dataclass

Connection configuration for a PostgreSQL backend.

Attributes:

Name Type Description
database str

Name of the PostgreSQL database to connect to. Required.

user str

PostgreSQL username. Required.

password str

Password for the given user. Required. Use ${ENV_VAR} syntax in connections.yaml to avoid storing plaintext credentials.

host str

Hostname or IP address of the PostgreSQL server. Defaults to 'localhost'.

port int

TCP port the PostgreSQL server listens on. Defaults to 5432.

Source code in aitaem/connectors/backend_specs.py
@dataclass
class PostgresConfig:
    """Connection configuration for a PostgreSQL backend.

    Attributes:
        database: Name of the PostgreSQL database to connect to. **Required.**
        user: PostgreSQL username. **Required.**
        password: Password for the given user. **Required.**
            Use `${ENV_VAR}` syntax in `connections.yaml` to avoid storing
            plaintext credentials.
        host: Hostname or IP address of the PostgreSQL server.
            Defaults to `'localhost'`.
        port: TCP port the PostgreSQL server listens on. Defaults to `5432`.
    """

    database: str
    user: str
    password: str
    host: str = "localhost"
    port: int = 5432

validate_backend_config

aitaem.connectors.backend_specs.validate_backend_config

validate_backend_config(backend_type: str, config: dict[str, Any]) -> Any

Validate a config dict against the backend's dataclass spec.

Instantiates the backend's config dataclass from the provided dict, ignoring any extra keys that are not part of the dataclass (they may be pass-through kwargs consumed directly by the Ibis backend).

Parameters:

Name Type Description Default
backend_type str

Backend identifier (e.g. 'duckdb', 'bigquery', 'postgres')

required
config dict[str, Any]

Configuration dictionary from connections.yaml or add_connection()

required

Returns:

Type Description
Any

Instantiated config dataclass (e.g. PostgresConfig)

Raises:

Type Description
UnsupportedBackendError

If backend_type is not in BACKEND_SPECS

ConfigurationError

If a required field is missing or has the wrong type

Source code in aitaem/connectors/backend_specs.py
def validate_backend_config(backend_type: str, config: dict[str, Any]) -> Any:
    """Validate a config dict against the backend's dataclass spec.

    Instantiates the backend's config dataclass from the provided dict,
    ignoring any extra keys that are not part of the dataclass (they may
    be pass-through kwargs consumed directly by the Ibis backend).

    Args:
        backend_type: Backend identifier (e.g. 'duckdb', 'bigquery', 'postgres')
        config: Configuration dictionary from connections.yaml or add_connection()

    Returns:
        Instantiated config dataclass (e.g. PostgresConfig)

    Raises:
        UnsupportedBackendError: If backend_type is not in BACKEND_SPECS
        ConfigurationError: If a required field is missing or has the wrong type
    """
    if backend_type not in BACKEND_SPECS:
        raise UnsupportedBackendError(
            f"Backend type '{backend_type}' not supported\n\n"
            f"Supported backends: {', '.join(sorted(BACKEND_SPECS.keys()))}"
        )

    spec_cls = BACKEND_SPECS[backend_type]
    import dataclasses

    known_fields = {f.name for f in dataclasses.fields(spec_cls)}
    filtered = {k: v for k, v in config.items() if k in known_fields}

    try:
        return spec_cls(**filtered)
    except TypeError as e:
        # TypeError message: "__init__() missing required argument: 'field'"
        msg = str(e)
        snippet = _YAML_SNIPPETS.get(backend_type, "")
        raise ConfigurationError(
            f"Invalid configuration for '{backend_type}' backend: {msg}\n\n"
            f"Expected configuration:\n{snippet}"
        ) from e