API Reference

timedb provides three complementary interfaces for time series data management:

  • SDK: High-level Python API for programmatic access

  • REST API: HTTP endpoints for remote access and integration

  • CLI: Command-line tools for administrative operations

SDK Reference

The SDK provides a fluent, Pythonic interface for working with time series data.

Main Client

class timedb.TimeDataClient(conninfo: str | None = None, min_size: int = 2, max_size: int = 10)[source]

Bases: object

High-level client for TimeDB with fluent API for series selection.

The TimeDataClient provides the main entry point for working with timedb. It supports:

  • Creating and deleting database schema

  • Creating new time series with labels and metadata

  • Building fluent queries to filter, read, and update series data

Example

>>> from timedb import TimeDataClient
>>> import pandas as pd
>>> from datetime import datetime, timezone
>>> # Create client and schema
>>> td = TimeDataClient()
>>> td.create()
>>> # Create a series
>>> td.create_series('wind_power', unit='MW', labels={'site': 'offshore_1'})
>>> # Insert and read data using fluent API
>>> df = pd.DataFrame({
...     'valid_time': [datetime.now(timezone.utc)],
...     'wind_power': [100.0]
... })
>>> td.series('wind_power').where(site='offshore_1').insert(df)
>>> result = td.series('wind_power').where(site='offshore_1').read()
Environment:

Requires TIMEDB_DSN or DATABASE_URL environment variable to connect to the PostgreSQL database.

__init__(conninfo: str | None = None, min_size: int = 2, max_size: int = 10)[source]
close()[source]

Close the connection pool.

create(retention=None, *, retention_short: str = '6 months', retention_medium: str = '3 years', retention_long: str = '5 years') None[source]

Create database schema (TimescaleDB version).

Parameters:
  • retention – Shorthand to set the default (medium) retention period. An int is interpreted as years (e.g., 5 → “5 years”). A string is used as-is (e.g., “18 months”).

  • retention_short – Retention for overlapping_short (default: “6 months”)

  • retention_medium – Retention for overlapping_medium (default: “3 years”)

  • retention_long – Retention for overlapping_long (default: “5 years”)

create_series(name: str, unit: str = 'dimensionless', labels: Dict[str, str] | None = None, description: str | None = None, overlapping: bool = False, retention: str = 'medium') int[source]

Create a new time series with metadata and labels.

Creates a new series in the database with the specified configuration. Each series has a unique name+labels combination.

Parameters:
  • name (str) – Series name/identifier (e.g., ‘wind_power’, ‘solar_irradiance’). Human-readable identifier for the measurement.

  • unit (str, default="dimensionless") –

    Canonical unit for the series. Examples:

    • ’MW’ - megawatts (power)

    • ’kWh’ - kilowatt-hours (energy)

    • ’C’ - celsius (temperature)

    • ’dimensionless’ - unitless values

  • labels (dict, optional) – Dictionary of key-value labels to differentiate series with same name. Example: {“site”: “Gotland”, “turbine”: “T01”} Enables filtering and organization of related series.

  • description (str, optional) – Human-readable description of the series and its contents.

  • overlapping (bool, default=False) –

    Whether this series stores versioned/revised data:

    • False: Flat/immutable facts (e.g., meter readings, historical data)

    • True: Versioned/revised data (e.g., forecasts, estimates) with known_time tracking for changes over time

  • retention (str, default="medium") –

    Data retention policy (overlapping series only):

    • ’short’: 6 months (fast queries on recent data)

    • ’medium’: 3 years (balanced for forecasts)

    • ’long’: 5 years (historical archival)

Returns:

The unique series_id for this series, used in read/write operations

Return type:

int

Raises:

ValueError – If series with same name+labels already exists

Example

>>> client = TimeDataClient()
>>> # Create a flat series for meter readings
>>> series_id = client.create_series(
...     name='power_consumption',
...     unit='kWh',
...     labels={'building': 'main', 'floor': '3'},
...     description='Power consumption for floor 3 of main building',
... )
>>> # Create an overlapping series for weather forecasts
>>> series_id = client.create_series(
...     name='wind_speed',
...     unit='m/s',
...     labels={'site': 'offshore_1'},
...     overlapping=True,
...     retention='medium'
... )
delete() None[source]

Delete database schema.

series(name: str | None = None, unit: str | None = None, series_id: int | None = None) SeriesCollection[source]

Start building a series collection by name, unit, and/or series_id.

Creates a lazy SeriesCollection that can be further filtered using .where() to add label-based filters. The collection resolves to the actual series only when an operation like .read() or .insert() is called.

Parameters:
  • name – Optional series name to filter by (e.g., ‘wind_power’)

  • unit – Optional unit to filter by (e.g., ‘MW’)

  • series_id – Optional series_id for direct lookup (e.g., 123)

Returns:

A lazy collection that can be further filtered

with .where() and then used for read/insert/update operations

Return type:

SeriesCollection

Example

>>> client = TimeDataClient()
>>> # Get a specific series by name and labels
>>> client.series('wind_power').where(site='offshore_1').read()
>>> # Get all series with unit 'MW'
>>> client.series(unit='MW').read()
>>> # Get series by ID (if you know it)
>>> client.series(series_id=123).read()

Series Operations

class timedb.SeriesCollection(conninfo: str, name: str | None = None, unit: str | None = None, label_filters: Dict[str, str] | None = None, series_id: int | None = None, _registry: SeriesRegistry | None = None, _pool: ConnectionPool | None = None)[source]

Bases: object

A lazy collection of time series that matches a set of filters.

SeriesCollection provides a fluent, chainable API for filtering and operating on one or more time series without manually managing series IDs.

The collection resolves which series match the filters only when an operation like .read(), .insert(), or .update_records() is called. This allows building complex queries progressively.

Filtering:

Series are filtered by name, unit, series_id, and labels. You can chain multiple .where() calls to add additional label filters.

Operations:

Once filtered, the collection supports: - read(): Retrieve time series data - insert(): Add new data points - update_records(): Update existing records - count(): Count matching series - list_labels(): List unique label values

Examples

>>> from timedb import TimeDataClient
>>> client = TimeDataClient()
>>> # Single series with label filter
>>> client.series('wind_power').where(site='offshore_1').read()
>>> # Multiple filters (chained)
>>> client.series(unit='MW').where(site='offshore_1', turbine='T01').read()
>>> # Direct lookup by series_id
>>> client.series(series_id=123).read()
>>> # Count matching series
>>> count = client.series('wind_power').count()
count() int[source]

Count how many series match the current filters.

insert(df: DataFrame, workflow_id: str | None = None, batch_start_time: datetime | None = None, batch_finish_time: datetime | None = None, known_time: datetime | None = None, batch_params: dict | None = None) InsertResult[source]

Insert time series data for this collection.

Single-series inserts only. DataFrame must have fixed columns: - Point-in-time: [valid_time, value] - Intervals: [valid_time, valid_time_end, value]

Automatically routes data to the correct table based on the series’ overlapping flag: - flat (overlapping=False): inserts into ‘flat’ table (immutable facts, upsert on conflict) - overlapping (overlapping=True): inserts into ‘overlapping_{tier}’ table with batch and known_time

Parameters:
  • df – DataFrame with columns [valid_time, value] or [valid_time, valid_time_end, value]

  • workflow_id – Workflow identifier (optional)

  • batch_start_time – Start time (optional)

  • batch_finish_time – Finish time (optional)

  • known_time – Time of knowledge (optional, used for overlapping)

  • batch_params – Batch parameters (optional)

Returns:

InsertResult with batch_id, workflow_id, series_id

Raises:
  • ValueError – If collection matches multiple series (use more specific filters)

  • ValueError – If DataFrame doesn’t have the required columns

list_labels(label_key: str) List[str][source]

List all unique values for a specific label key in this collection.

list_series() List[Dict[str, Any]][source]

List all series matching the current filters with their metadata.

Returns:

  • series_id: int

  • name: str

  • unit: str

  • labels: dict

  • description: str (optional)

  • overlapping: bool

  • retention: str

Return type:

List of dicts, each containing

Example

>>> client.series('wind_power').where(site='Gotland').list_series()
[
    {'series_id': 1, 'name': 'wind_power', 'unit': 'MW',
     'labels': {'turbine': 'T01', 'site': 'Gotland', 'type': 'onshore'},
     'description': 'Onshore wind turbine power output',
     'overlapping': False, 'retention': 'medium'},
    ...
]
read(start_valid: datetime | None = None, end_valid: datetime | None = None, start_known: datetime | None = None, end_known: datetime | None = None, versions: bool = False, as_pint: bool = False) DataFrame[source]

Read time series data for this collection.

Single-series reads only. Collection must resolve to exactly one series.

Automatically reads from the correct table based on the series’ overlapping flag: - flat (overlapping=False): reads from ‘flat’ table (no versioning) - overlapping (overlapping=True): reads latest values or all versions (if versions=True)

Parameters:
  • start_valid – Start of valid time range (optional)

  • end_valid – End of valid time range (optional)

  • start_known – Start of known_time range (optional, overlapping only)

  • end_known – End of known_time range (optional, overlapping only)

  • versions – If True, return all overlapping revisions (default: False)

  • as_pint – If True, return value column as pint dtype with series unit (default: False). Requires pint and pint-pandas: pip install pint pint-pandas

Returns:

DataFrame with index (valid_time,) or (known_time, valid_time) for versions, and column (value). If as_pint=True, value column has pint dtype.

Raises:
  • ValueError – If collection matches multiple series or no series

  • ImportError – If as_pint=True but pint/pint-pandas not installed

Example

>>> # Single series read
>>> df = td.series("wind_power").where(site="Gotland", turbine="T01").read()
>>>
>>> # Read with pint units
>>> df = td.series("wind_power").where(turbine="T01").read(as_pint=True)
>>> print(df["value"].dtype)  # pint[MW]
update_records(updates: List[Dict[str, Any]]) List[Dict[str, Any]][source]

Update records for this collection.

Single-series only. Collection must resolve to exactly one series.

Supports both flat and overlapping series: - Flat: In-place update (no versioning) by (series_id, valid_time) - Overlapping: Creates new version with known_time=now(). Three lookup methods:

  1. known_time + valid_time: Exact version lookup

  2. batch_id + valid_time: Latest version in that batch

  3. Just valid_time: Latest version overall

Parameters:

updates – List of update dicts. Each must include: - valid_time (datetime): Required - value (float, optional): New value (omit to keep current) - annotation (str, optional): Text annotation (None to clear) - tags (list[str], optional): Tags ([] to clear) - changed_by (str, optional): User identifier For overlapping only: - batch_id (int, optional): Target specific batch - known_time (datetime, optional): Target specific version

Returns:

List of dicts with update info for each updated record

Raises:

ValueError – If collection matches multiple series or no series

Example

>>> td.series("temperature").where(site="A").update_records([
...     {"valid_time": dt, "value": 25.0, "annotation": "Corrected"}
... ])
where(**labels) SeriesCollection[source]

Add additional label filters to narrow down the collection.

Creates a new SeriesCollection with combined filters. This method is chainable and does not modify the original collection (immutable).

Parameters:

**labels – Key-value pairs for label filtering. Example: where(site=’offshore_1’, turbine=’T01’)

Returns:

New collection with combined filters applied

Return type:

SeriesCollection

Example

>>> coll = client.series('wind_power')
>>> # Add filters progressively
>>> coll = coll.where(site='offshore_1')
>>> coll = coll.where(turbine='T01')
>>> df = coll.read()  # Only applies both filters at read time

Data Classes

class timedb.InsertResult(batch_id: int | None, workflow_id: str | None, series_id: int)[source]

Result from insert containing the IDs that were used.

batch_id: int | None

Alias for field number 0

series_id: int

Alias for field number 2

workflow_id: str | None

Alias for field number 1

Exceptions

exception timedb.IncompatibleUnitError[source]

Raised when units cannot be converted to each other.

REST API Reference

The REST API provides HTTP endpoints for reading and writing time series data. It mirrors the SDK’s workflow: create series first, then insert/read/update data using name+labels or series_id to identify series.

Interactive Documentation

When the API server is running, comprehensive interactive documentation is available:

  • Swagger UI: http://<host>:<port>/docs - Full interactive API explorer

  • ReDoc: http://<host>:<port>/redoc - Alternative documentation format

These provide live request/response examples, parameter descriptions, and test capabilities.

Key Endpoints

POST /series

Create a new time series.

Request Body:

  • name (str): Human-readable identifier

  • description (str, optional): Series description

  • unit (str, default=”dimensionless”): Canonical unit

  • labels (dict, default={}): Key-value labels for differentiation

  • overlapping (bool, default=false): true for versioned forecasts

  • retention (str, default=”medium”): “short”, “medium”, or “long”

Returns: CreateSeriesResponse with series_id

POST /values

Insert time series data. Specify the target series by name+labels or series_id. The series must already exist (use POST /series first).

Routing is automatic: flat series are inserted directly (no batch), overlapping series create a batch with known_time tracking.

Request Body:

  • name (str, optional): Series name (used with labels to resolve series)

  • labels (dict, optional): Labels for resolution

  • series_id (int, optional): Direct series_id (alternative to name+labels)

  • workflow_id (str, default=”api-workflow”): Workflow identifier

  • known_time (datetime, optional): Time of knowledge (defaults to now)

  • batch_params (dict, optional): Custom batch parameters

  • data (list): Array of data points [{valid_time, value, valid_time_end?}]

Returns: InsertResponse with batch_id (null for flat), series_id, rows_inserted

GET /values

Read time series values with filtering.

Query Parameters:

  • name (str, optional): Filter by series name

  • labels (str, optional): Filter by labels (JSON string, e.g. {"site":"Gotland"})

  • series_id (int, optional): Filter by series_id

  • start_valid (datetime, optional): Start of valid time range (ISO format)

  • end_valid (datetime, optional): End of valid time range (ISO format)

  • start_known (datetime, optional): Start of known_time range (ISO format)

  • end_known (datetime, optional): End of known_time range (ISO format)

  • versions (bool, default=false): Return all forecast revisions

Returns: JSON object with count and data array

PUT /values

Update existing time series records. Creates new versions for overlapping series.

Identify the series by series_id OR by name (+``labels``).

For overlapping series, three lookup methods (all optional):

  • batch_id + valid_time: target specific batch

  • known_time + valid_time: target specific version

  • Just valid_time: target latest version overall

Request Body (updates is a list of objects with these fields):

  • valid_time (datetime, required): The timestamp to update

  • series_id (int, optional): Series ID (alternative to name+labels)

  • name (str, optional): Series name (alternative to series_id)

  • labels (dict, default={}): Labels for series resolution

  • batch_id (int, optional): Target specific batch (overlapping only)

  • known_time (datetime, optional): Target specific version (overlapping only)

  • value (float, optional): New value (omit to leave unchanged, null to clear)

  • annotation (str, optional): Annotation text (omit/null/value tri-state)

  • tags (list[str], optional): Tags (omit/null/value tri-state)

  • changed_by (str, optional): Who made the change (audit trail)

Returns: UpdateRecordsResponse with updated and skipped records

GET /series

List time series, optionally filtered by name, labels, unit, or series_id.

Query Parameters:

  • name (str, optional): Filter by series name

  • labels (str, optional): Filter by labels (JSON string)

  • unit (str, optional): Filter by unit

  • series_id (int, optional): Filter by series_id

Returns: List of SeriesInfo objects

GET /series/labels

List unique values for a specific label key.

Query Parameters:

  • label_key (str, required): The label key to get unique values for

  • name (str, optional): Filter by series name

  • labels (str, optional): Filter by labels (JSON string)

Returns: {"label_key": "...", "values": [...]}

GET /series/count

Count time series matching filters.

Query Parameters:

  • name (str, optional): Filter by series name

  • labels (str, optional): Filter by labels (JSON string)

  • unit (str, optional): Filter by unit

Returns: {"count": N}

Pydantic Models

Request/Response models used by the REST API:

class timedb.api.DataPoint(*, valid_time: datetime, value: float | None = None, valid_time_end: datetime | None = None)[source]

A single data point for insertion.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class timedb.api.InsertRequest(*, name: str | None = None, labels: Dict[str, str]=<factory>, series_id: int | None = None, workflow_id: str = 'api-workflow', known_time: datetime | None = None, batch_params: Dict[str, ~typing.Any] | None=None, data: List[DataPoint] = <factory>)[source]

Request to insert time series data.

Specify the target series by name+labels OR by series_id. The series must already exist (use POST /series to create it first).

name

Series name (used with labels to resolve series_id)

Type:

str | None

labels

Labels for series resolution (e.g., {“site”: “Gotland”})

Type:

Dict[str, str]

series_id

Direct series_id (alternative to name+labels)

Type:

int | None

workflow_id

Workflow identifier (defaults to ‘api-workflow’)

Type:

str

known_time

Time of knowledge (defaults to now(), important for overlapping series)

Type:

datetime.datetime | None

batch_params

Custom parameters to store with the batch

Type:

Dict[str, Any] | None

data

Array of data points to insert

Type:

List[timedb.api.DataPoint]

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class timedb.api.InsertResponse(*, batch_id: int | None = None, series_id: int, rows_inserted: int)[source]

Response after inserting data.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class timedb.api.RecordUpdateRequest(*, valid_time: datetime, series_id: int | None = None, name: str | None = None, labels: Dict[str, str]=<factory>, batch_id: int | None = None, known_time: datetime | None = None, value: float | None = None, annotation: str | None = None, tags: List[str] | None = None, changed_by: str | None = None)[source]

Request to update a record.

Supports both flat and overlapping series with different update semantics:

Flat series: In-place update by (series_id, valid_time). Overlapping series: Creates new version with known_time=now().

Identify the series by series_id OR by name(+labels).

For overlapping series, three lookup methods (all optional): - batch_id + valid_time: latest version in that batch - known_time + valid_time: exact version lookup - just valid_time: latest version overall

For value, annotation, tags, and changed_by: - Omit the field to leave it unchanged - Set to null to explicitly clear it - Set to a value to update it

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class timedb.api.UpdateRecordsResponse(*, updated: List[Dict[str, Any]])[source]

Response after updating records.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class timedb.api.CreateSeriesRequest(*, name: str, description: str | None = None, unit: str = 'dimensionless', labels: Dict[str, str]=<factory>, overlapping: bool = False, retention: str = 'medium')[source]

Request to create a new time series.

Series identity is determined by (name, labels). Two series with the same name but different labels are different series.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class timedb.api.CreateSeriesResponse(*, series_id: int, message: str)[source]

Response after creating a series.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class timedb.api.SeriesInfo(*, series_id: int, name: str, description: str | None = None, unit: str, labels: Dict[str, str]=<factory>, overlapping: bool = False, retention: str = 'medium')[source]

Information about a time series.

model_config = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

CLI Reference

The CLI provides command-line tools for administrative operations and server management.

API Server

The timedb api command starts the REST API server:

timedb api [OPTIONS]

Options:

  • --host TEXT (default: 127.0.0.1): Host to bind API server to

  • --port INTEGER (default: 8000): Port to bind API server to

  • --reload: Enable auto-reload for development (watch file changes)

Examples:

# Start server on localhost
timedb api

# Start on all interfaces on port 9000
timedb api --host 0.0.0.0 --port 9000

# Start with auto-reload for development
timedb api --reload

For additional CLI commands, see Command Line Interface (CLI).

Units and Validation

Unit handling is integrated into the SDK. The IncompatibleUnitError exception is raised when pint unit conversion fails (e.g., inserting kg values into an MW series).

See the SDK documentation for details on pint unit support.