SDK Usage
The timedb SDK provides a high-level Python interface for working with time series data. It handles unit conversion, series management, and DataFrame operations automatically.
Getting Started
Import the SDK:
import timedb as td
import pandas as pd
import pint
Database Connection
The SDK uses environment variables for database connection:
TIMEDB_DSN(preferred)DATABASE_URL(alternative)
You can also use a .env file in your project root.
Schema Management
Creating the Schema
Before using the SDK, create the database schema:
td.create()
This creates all necessary tables. It’s safe to run multiple times.
Deleting the Schema
To delete all tables and data (use with caution):
td.delete()
WARNING: This will delete all data!
Inserting Data
The main function for inserting data is insert_batch(). It automatically:
Detects series from DataFrame columns
Extracts units from Pint Quantity objects or pint-pandas Series
Creates/gets series with appropriate units
Converts all values to canonical units before storage
Basic Example
import timedb as td
import pandas as pd
import pint
from datetime import datetime, timezone, timedelta
# Create unit registry
ureg = pint.UnitRegistry()
# Create sample data with Pint Quantity objects
times = pd.date_range(
start=datetime.now(timezone.utc),
periods=24,
freq='H',
tz='UTC'
)
# Create DataFrame with Pint Quantity columns
df = pd.DataFrame({
"valid_time": times,
"power": [100.0, 105.0, 110.0] * 8 * ureg.kW,
"temperature": [20.0, 21.0, 22.0] * 8 * ureg.degC,
})
# Insert the data
result = td.insert_batch(df=df)
# result.series_ids contains the mapping of series_key to series_id
print(result.series_ids)
# {'power': UUID('...'), 'temperature': UUID('...')}
Using pint-pandas Series
You can also use pint-pandas Series with dtype annotations:
df = pd.DataFrame({
"valid_time": times,
"power": pd.Series([100.0, 105.0, 110.0] * 8, dtype="pint[MW]"),
"wind_speed": pd.Series([5.0, 6.0, 7.0] * 8, dtype="pint[m/s]"),
})
result = td.insert_batch(df=df)
Custom Series Keys
Override default series keys (column names):
result = td.insert_batch(
df=df,
series_key_overrides={
'power': 'wind_power_forecast',
'temperature': 'ambient_temperature'
}
)
Interval Values
For interval-based time series (e.g., energy over a time period):
start_times = pd.date_range(
start=datetime.now(timezone.utc),
periods=24,
freq='H',
tz='UTC'
)
end_times = start_times + timedelta(hours=1)
df_intervals = pd.DataFrame({
"valid_time": start_times,
"valid_time_end": end_times,
"energy": [100.0, 105.0, 110.0] * 8 * ureg.MWh
})
result = td.insert_batch(
df=df_intervals,
valid_time_end_col='valid_time_end'
)
Advanced Options
Full function signature:
result = td.insert_batch(
df=pd.DataFrame(...),
tenant_id=None, # Optional, defaults to zeros UUID
run_id=None, # Optional, auto-generated if not provided
workflow_id=None, # Optional, defaults to "sdk-workflow"
run_start_time=None, # Optional, defaults to now()
run_finish_time=None, # Optional
valid_time_col='valid_time', # Column name for valid_time
valid_time_end_col=None, # Column name for valid_time_end
known_time=None, # Time of knowledge (for backfills)
run_params=None, # Optional dict of run parameters
series_key_overrides=None, # Optional dict mapping column names to series_key
)
Reading Data
The SDK provides several functions for reading data.
Basic Read
Read time series values:
df = td.read(
series_id=None, # Optional, filter by series ID
tenant_id=None, # Optional, defaults to zeros UUID
start_valid=None, # Optional, start of valid time range
end_valid=None, # Optional, end of valid time range
return_mapping=False, # If True, return (DataFrame, mapping_dict)
all_versions=False, # If True, include all versions
return_value_id=False, # If True, include value_id column
tags_and_annotations=False, # If True, include tags and annotation columns
)
Returns a DataFrame with:
Index:
valid_timeColumns:
series_key(one column per series)Each column has pint-pandas dtype based on
series_unit
Example:
# Read all series
df = td.read()
# Read specific series
series_id = result.series_ids['power']
df = td.read(series_id=series_id)
# Read with time range
df = td.read(
start_valid=datetime(2024, 1, 1, tzinfo=timezone.utc),
end_valid=datetime(2024, 1, 2, tzinfo=timezone.utc),
)
# Get mapping of series_id to series_key
df, mapping = td.read(return_mapping=True)
Reading All Versions
By default, read() returns only the latest version of each value. To see all historical versions:
df = td.read(
series_id=series_id,
all_versions=True,
return_value_id=True # Include value_id to distinguish versions
)
This is useful for:
Auditing changes to time series values
Viewing complete version history
Analyzing how values evolved over time
Flat Mode Read
Read values in flat mode (latest known_time per valid_time):
df = td.read_values_flat(
series_id=None,
tenant_id=None,
start_valid=None,
end_valid=None,
start_known=None, # Start of known_time range
end_known=None, # End of known_time range
all_versions=False,
return_mapping=False,
units=True, # If True, return pint-pandas DataFrame
return_value_id=False,
)
Returns the latest version of each (valid_time, series_id) combination based on known_time.
Overlapping Mode Read
Read values in overlapping mode (all forecast revisions):
df = td.read_values_overlapping(
series_id=None,
tenant_id=None,
start_valid=None,
end_valid=None,
start_known=None,
end_known=None,
all_versions=False,
return_mapping=False,
units=True,
)
Returns all versions of forecasts, showing how predictions evolve over time. Useful for analyzing forecast revisions and backtesting.
Example: Analyzing forecast revisions:
# Create multiple forecasts at different known_times
base_time = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc)
for i in range(4):
known_time = base_time + timedelta(days=i)
valid_times = [known_time + timedelta(hours=j) for j in range(72)]
df = pd.DataFrame({
"valid_time": valid_times,
"power": generate_forecast(valid_times) * ureg.MW
})
td.insert_batch(
df=df,
known_time=known_time, # When forecast was made
workflow_id=f"forecast-run-{i}"
)
# Read all forecast revisions (overlapping mode)
df_overlapping = td.read_values_overlapping(
series_id=series_id,
start_valid=base_time,
end_valid=base_time + timedelta(days=5)
)
Updating Records
Update existing records with new values, annotations, or tags:
updates = [
{
'value_id': 123, # Simplest: update by value_id
'value': 150.0,
'annotation': 'Manually corrected',
'tags': ['reviewed', 'corrected'],
'changed_by': 'user@example.com',
}
]
result = td.update_records(updates)
For each update:
Omit a field to leave it unchanged
Set to
None(or[]for tags) to explicitly clear itSet to a value to update it
The function returns:
{
'updated': [...], # List of updated records
'skipped_no_ops': [...] # List of skipped records (no changes)
}
API Server
The SDK provides functions to start and manage the TimeDB REST API server.
Starting the API Server (Blocking)
Start the API server in the current process:
td.start_api()
# With custom host/port
td.start_api(host="0.0.0.0", port=8080)
# With auto-reload (development)
td.start_api(reload=True)
This blocks until the server is stopped (Ctrl+C).
Starting the API Server (Non-blocking)
For use in notebooks or when you need the API to run in the background:
# Start in background thread
started = td.start_api_background()
if started:
print("API server started")
else:
print("API server was already running")
Checking API Status
Check if the API server is running:
if td.check_api():
# API is running
pass
else:
# API is not running
td.start_api_background()
Error Handling
The SDK raises specific exceptions:
ValueError: If tables don’t exist (runtd.create()first)IncompatibleUnitError: If unit conversion fails due to dimensionality mismatchValueError: If DataFrame structure is invalid or units are inconsistent
Example error handling:
from timedb import IncompatibleUnitError
try:
result = td.insert_batch(df=df)
except ValueError as e:
if "TimeDB tables do not exist" in str(e):
print("Please create the schema first: td.create()")
else:
raise
except IncompatibleUnitError as e:
print(f"Unit conversion error: {e}")
Best Practices
Always use timezone-aware datetimes: All time columns must be timezone-aware (UTC recommended)
Use Pint for units: Leverage Pint Quantity objects or pint-pandas Series for automatic unit handling
Create schema first: Run
td.create()before inserting dataUse known_time for backfills: When inserting historical data, set
known_timeto when the data was actually knownUse workflow_id: Set meaningful workflow IDs to track data sources
Complete Example
import timedb as td
import pandas as pd
import pint
from datetime import datetime, timezone, timedelta
# 1. Create schema
td.create()
# 2. Prepare data
ureg = pint.UnitRegistry()
times = pd.date_range(
start=datetime.now(timezone.utc),
periods=24,
freq='H',
tz='UTC'
)
df = pd.DataFrame({
"valid_time": times,
"power": [100.0 + i for i in range(24)] * ureg.kW,
"temperature": [20.0 + i*0.5 for i in range(24)] * ureg.degC,
})
# 3. Insert data
result = td.insert_batch(
df=df,
workflow_id="forecast-v1",
run_params={"model": "wind-forecast-v2", "version": "1.0"}
)
# 4. Read data
df_read = td.read(
series_id=result.series_ids['power'],
start_valid=times[0],
end_valid=times[-1]
)
print(df_read.head())