Using the TimeDB REST API
This notebook demonstrates the REST API for reading and writing time series data:
Setting up the database and starting the API server
Creating series via
POST /seriesInserting data via
POST /values(using name+labels or series_id)Reading data via
GET /values(with series filtering and versions mode)Listing and filtering series via
GET /seriesUpdating records via
PUT /values
[1]:
from timedb import TimeDataClient
import pandas as pd
import requests
import json
from datetime import datetime, timezone, timedelta
td = TimeDataClient()
API_BASE_URL = "http://127.0.0.1:8000"
headers = {"Content-Type": "application/json"}
print("Ready")
Ready
Part 1: Setup
Create the database schema via SDK (admin task — the API cannot create/delete schemas).
[2]:
# Delete existing schema (optional - only if you want to start fresh)
# Uncomment the line below if you want to start with a clean database
td.delete()
# Create database schema
td.create()
Creating database schema...
✓ Schema created successfully
Part 2: Start the API Server
Start the server before making API calls. In a notebook we run it as a background process.
[3]:
# Start the API server in a separate terminal:
# timedb api --host 127.0.0.1 --port 8000
# Or using subprocess (for notebook use):
import subprocess
import time
# Kill any existing API server
subprocess.run(["pkill", "-f", "uvicorn.*timedb"], capture_output=True)
time.sleep(1)
# Start API server in background
process = subprocess.Popen(
["timedb", "api", "--host", "127.0.0.1", "--port", "8000"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
time.sleep(3) # Wait for server to start
# Check if API is running
try:
response = requests.get(f"{API_BASE_URL}/")
print("✓ API is running")
print(f" Name: {response.json()['name']}")
print(f" Version: {response.json().get('version', 'unknown')}")
except Exception as e:
print(f"❌ API not running: {e}")
✓ API is running
Name: TimeDB API
Version: 0.2.0
Part 3: Insert Data Using the API
Now let’s create some sample time series data and insert it using the REST API.
[4]:
# First, create the time series using the /series endpoint
# Use overlapping=true so we can demonstrate updates later
# (updates only work on overlapping, not flat)
series_configs = [
{
"name": "wind_speed",
"description": "Wind speed measurements",
"unit": "m/s",
"labels": {"site": "Gotland", "type": "measurement"},
"overlapping": False
},
{
"name": "power_forecast",
"description": "Forecasted power values with overlapping revisions",
"unit": "MW",
"labels": {"model": "linear", "site": "Gotland", "type": "forecast"},
"overlapping": True
}
]
created_series = {}
for series_info in series_configs:
response = requests.post(
f"{API_BASE_URL}/series",
json=series_info,
headers=headers
)
response.raise_for_status()
result = response.json()
series_name = series_info["name"]
created_series[series_name] = result["series_id"]
print(f"✓ Created series '{series_name}': {result['series_id']}")
print(f" Message: {result['message']}")
print(f"\n✓ Created {len(created_series)} time series")
✓ Created series 'wind_speed': 1
Message: Series created successfully
✓ Created series 'power_forecast': 2
Message: Series created successfully
✓ Created 2 time series
[5]:
# Create sample time series data
base_time = datetime(2025, 1, 1, 0, 0, tzinfo=timezone.utc)
dates = [base_time + timedelta(hours=i) for i in range(24)]
# Prepare data for wind_speed (24 data points)
wind_speed_data = [
{"valid_time": date.isoformat(), "value": 20.0 + i * 0.3}
for i, date in enumerate(dates)
]
# Prepare data for power_forecast (24 data points)
power_forecast_data = [
{"valid_time": date.isoformat(), "value": 60.0 - i * 0.5}
for i, date in enumerate(dates)
]
print(f"Prepared {len(wind_speed_data)} wind_speed data points")
print(f"Prepared {len(power_forecast_data)} power_forecast data points")
print(f"Time range: {dates[0]} to {dates[-1]}")
Prepared 24 wind_speed data points
Prepared 24 power_forecast data points
Time range: 2025-01-01 00:00:00+00:00 to 2025-01-01 23:00:00+00:00
3.1: Insert Data via POST /values
The new POST /values endpoint resolves series by name+labels (like the SDK). No need to manage batch_start_time or value_key — just provide name, labels, and data.
[6]:
# Insert wind_speed data via POST /values (using name + labels)
response = requests.post(
f"{API_BASE_URL}/values",
json={
"name": "wind_speed",
"labels": {"site": "Gotland", "type": "measurement"},
"data": wind_speed_data,
},
headers=headers,
)
response.raise_for_status()
result = response.json()
print(f"✓ Wind Speed: batch_id={result['batch_id']}, series_id={result['series_id']}, rows={result['rows_inserted']}")
# Insert power_forecast data (using series_id directly)
response = requests.post(
f"{API_BASE_URL}/values",
json={
"series_id": created_series["power_forecast"],
"data": power_forecast_data,
},
headers=headers,
)
response.raise_for_status()
result = response.json()
print(f"✓ Power Forecast: batch_id={result['batch_id']}, series_id={result['series_id']}, rows={result['rows_inserted']}")
# Store for later use
batch_id = result['batch_id']
✓ Wind Speed: batch_id=None, series_id=1, rows=24
✓ Power Forecast: batch_id=1, series_id=2, rows=24
3.2: List and Filter Series via GET /series
The GET /series endpoint supports filtering by name, labels, unit, and series_id.
[7]:
# List all series
response = requests.get(f"{API_BASE_URL}/series", headers=headers)
response.raise_for_status()
all_series = response.json()
print(f"✓ Found {len(all_series)} time series\n")
for s in all_series:
print(f" series_id={s['series_id']}: {s['name']} ({s['unit']}) labels={s['labels']} overlapping={s['overlapping']}")
# Filter by name
print("\nFilter by name='wind_speed':")
response = requests.get(f"{API_BASE_URL}/series", params={"name": "wind_speed"}, headers=headers)
filtered = response.json()
for s in filtered:
print(f" series_id={s['series_id']}: {s['name']} labels={s['labels']}")
# List unique label values
print("\nUnique 'site' values:")
response = requests.get(f"{API_BASE_URL}/series/labels", params={"label_key": "site"}, headers=headers)
print(f" {response.json()}")
# Count series
response = requests.get(f"{API_BASE_URL}/series/count", headers=headers)
print(f"\nTotal series count: {response.json()['count']}")
✓ Found 2 time series
series_id=2: power_forecast (MW) labels={'site': 'Gotland', 'type': 'forecast', 'model': 'linear'} overlapping=True
series_id=1: wind_speed (m/s) labels={'site': 'Gotland', 'type': 'measurement'} overlapping=False
Filter by name='wind_speed':
series_id=1: wind_speed labels={'site': 'Gotland', 'type': 'measurement'}
Unique 'site' values:
{'label_key': 'site', 'values': ['Gotland']}
Total series count: 2
Part 4: Read Data Using the API
Let’s read the time series data we just inserted using the API.
[8]:
# Read data via API — filter by series name
params = {
"name": "wind_speed",
"labels": json.dumps({"site": "Gotland", "type": "measurement"}),
"start_valid": base_time.isoformat(),
"end_valid": (base_time + timedelta(hours=24)).isoformat(),
}
response = requests.get(f"{API_BASE_URL}/values", params=params, headers=headers)
response.raise_for_status()
data = response.json()
print(f"✓ Retrieved {data['count']} wind_speed records via API")
if data['count'] > 0:
df_api = pd.DataFrame(data['data'])
df_api['valid_time'] = pd.to_datetime(df_api['valid_time'])
print("\nFirst few rows:")
print(df_api.head(6))
print(f"\nDataFrame shape: {df_api.shape}")
print(f"Columns: {list(df_api.columns)}")
✓ Retrieved 24 wind_speed records via API
First few rows:
valid_time value
0 2025-01-01 00:00:00+00:00 20.0
1 2025-01-01 01:00:00+00:00 20.3
2 2025-01-01 02:00:00+00:00 20.6
3 2025-01-01 03:00:00+00:00 20.9
4 2025-01-01 04:00:00+00:00 21.2
5 2025-01-01 05:00:00+00:00 21.5
DataFrame shape: (24, 2)
Columns: ['valid_time', 'value']
4.1: Read with versions=true
Set versions=true to get all forecast revisions with their known_time (useful for backtesting):
[9]:
# Read with versions=true to see all forecast revisions
params_versions = {
"name": "power_forecast",
"start_valid": base_time.isoformat(),
"end_valid": (base_time + timedelta(hours=6)).isoformat(),
"versions": "true",
}
response = requests.get(f"{API_BASE_URL}/values", params=params_versions, headers=headers)
response.raise_for_status()
data_versions = response.json()
print(f"✓ Retrieved {data_versions['count']} records with versions=true")
if data_versions['count'] > 0:
df_versions = pd.DataFrame(data_versions['data'])
df_versions['valid_time'] = pd.to_datetime(df_versions['valid_time'])
if 'known_time' in df_versions.columns:
df_versions['known_time'] = pd.to_datetime(df_versions['known_time'])
print("\nFirst few rows (showing forecast revisions):")
print(df_versions.head(10))
✓ Retrieved 6 records with versions=true
First few rows (showing forecast revisions):
known_time valid_time value
0 2026-02-15 21:48:32.651934+00:00 2025-01-01 00:00:00+00:00 60.0
1 2026-02-15 21:48:32.651934+00:00 2025-01-01 01:00:00+00:00 59.5
2 2026-02-15 21:48:32.651934+00:00 2025-01-01 02:00:00+00:00 59.0
3 2026-02-15 21:48:32.651934+00:00 2025-01-01 03:00:00+00:00 58.5
4 2026-02-15 21:48:32.651934+00:00 2025-01-01 04:00:00+00:00 58.0
5 2026-02-15 21:48:32.651934+00:00 2025-01-01 05:00:00+00:00 57.5
Part 5: Update Records Using the API
Updates create a new version with a new known_time while preserving the original for audit trail.
Series identification: use name + labels (like inserts) or series_id.
Three lookup methods for overlapping series (mirroring the SDK):
``batch_id`` + ``valid_time`` — target a specific batch
``known_time`` + ``valid_time`` — target an exact version (known_time uniquely links to a batch)
Just ``valid_time`` — target the latest version overall (most convenient)
[10]:
forecast_labels = {"model": "linear", "site": "Gotland", "type": "forecast"}
# First, get the known_time from the inserted data (we'll need it for Method 2)
params_versions = {
"name": "power_forecast",
"labels": json.dumps(forecast_labels),
"start_valid": base_time.isoformat(),
"end_valid": (base_time + timedelta(hours=1)).isoformat(),
"versions": "true",
}
response = requests.get(f"{API_BASE_URL}/values", params=params_versions, headers=headers)
response.raise_for_status()
known_time_from_insert = response.json()["data"][0]["known_time"]
print(f"Known time from original insert: {known_time_from_insert}\n")
# ── Method 1: Update by batch_id ────────────────────────────────────────────
# Target a specific batch using batch_id (stored from the insert step)
update_1 = {
"updates": [{
"series_id": created_series["power_forecast"],
"batch_id": batch_id,
"valid_time": base_time.isoformat(),
"value": 75.5,
"annotation": "Corrected via batch_id lookup",
"changed_by": "analyst@example.com",
}]
}
response = requests.put(f"{API_BASE_URL}/values", json=update_1, headers=headers)
response.raise_for_status()
r1 = response.json()
# ── Method 2: Update by known_time ──────────────────────────────────────────
# Target an exact version using known_time (uniquely identifies a batch)
# Here we use name+labels instead of series_id
update_2 = {
"updates": [{
"name": "power_forecast",
"labels": forecast_labels,
"known_time": known_time_from_insert,
"valid_time": (base_time + timedelta(hours=1)).isoformat(),
"value": 80.0,
"annotation": "Corrected via known_time lookup",
"changed_by": "analyst@example.com",
}]
}
response = requests.put(f"{API_BASE_URL}/values", json=update_2, headers=headers)
response.raise_for_status()
r2 = response.json()
# ── Method 3: Update latest version (just valid_time) ───────────────────────
# Most convenient — targets the latest version overall, no batch_id or known_time needed
update_3 = {
"updates": [{
"name": "power_forecast",
"labels": forecast_labels,
"valid_time": (base_time + timedelta(hours=2)).isoformat(),
"value": 90.0,
"annotation": "Corrected via latest lookup (most convenient!)",
"tags": ["reviewed"],
"changed_by": "analyst@example.com",
}]
}
response = requests.put(f"{API_BASE_URL}/values", json=update_3, headers=headers)
response.raise_for_status()
r3 = response.json()
# ── Verify the updates ──────────────────────────────────────────────────────
params_verify = {
"name": "power_forecast",
"start_valid": base_time.isoformat(),
"end_valid": (base_time + timedelta(hours=3)).isoformat(),
}
response = requests.get(f"{API_BASE_URL}/values", params=params_verify, headers=headers)
response.raise_for_status()
data_verify = response.json()
print(f"\nVerified updated values:")
for row in data_verify['data']:
print(f" {row['valid_time']}: value={row['value']}")
Known time from original insert: 2026-02-15T21:48:32.651934+00:00
Verified updated values:
2025-01-01T00:00:00+00:00: value=75.5
2025-01-01T01:00:00+00:00: value=80.0
2025-01-01T02:00:00+00:00: value=90.0
Summary
Key Endpoints:
POST /series— create a time series (with name, unit, labels, overlapping)POST /values— insert data (specify series by name+labels or series_id)GET /values— read values (filter by name, labels, series_id, time range;versions=truefor all revisions)PUT /values— update existing records (creates new version for overlapping series)GET /series— list/filter series by name, labels, unitGET /series/labels— list unique label valuesGET /series/count— count matching series
Starting the server:
timedb api --host 127.0.0.1 --port 8000