Python SDK
The Python client library talks to Unimeter over the same binary protocol as the Go client. It is built on asyncio and exposes an AsyncClient that you use with async/await. This page covers the full surface area of the library.
Install
Section titled “Install”pip install unimeter-pythonRequires Python 3.11 or newer.
Connect
Section titled “Connect”Create a client with one or more seed node addresses. The client fetches the cluster topology on connect, opens persistent connections to every node, and refreshes the topology every thirty seconds in the background.
from unimeter import AsyncClient
async with AsyncClient(["localhost:7001"]) as client: # client is ready to use ...If you prefer explicit lifecycle management over a context manager:
client = AsyncClient(["localhost:7001"])await client.connect()# ... use client ...await client.close()For a multi-node cluster, list two or three seed addresses so the client has fallbacks if the first seed is unavailable at startup.
Define metrics
Section titled “Define metrics”Before sending events for a metric code, register the metric with the server. Schemas are durable and cluster-wide.
from unimeter import ( MetricSchema, AggType, PeriodType, DimensionFilter, AlertThreshold,)
await client.metrics.create(MetricSchema( code="api_calls", agg_type=AggType.COUNT,))
await client.metrics.create(MetricSchema( code="compute_seconds", agg_type=AggType.SUM, filters=[ DimensionFilter(key="provider", values=["aws", "gcp", "azure"]), ], thresholds=[ AlertThreshold(code="free_tier", value=10_000), ],))Calendar billing periods
Section titled “Calendar billing periods”By default metrics use fixed 30-day windows. For calendar-month billing:
await client.metrics.create(MetricSchema( code="api_calls", agg_type=AggType.COUNT, period_type=PeriodType.CALENDAR, billing_cycle_day=1, # 1-28; day of month when the period starts))Period helpers for queries:
from unimeter import current_month, last_month, current_billing_period, last_billing_period
current_month() # 1st to 1st of next monthlast_month() # previous calendar monthcurrent_billing_period(15) # current period starting on the 15thlast_billing_period(15) # previous period starting on the 15thList all registered metrics:
schemas = await client.metrics.list()for s in schemas: print(f"{s.code}: {s.agg_type.name}")Delete a metric by code:
await client.metrics.delete("api_calls")Send events
Section titled “Send events”Events are sent in batches. The client groups them by partition and fans out to the correct nodes in parallel.
from unimeter import Event, DeliveryMode, scale
await client.ingest([ Event(account_id=42, metric_code="api_calls", value=1), Event(account_id=42, metric_code="api_calls", value=1), Event(account_id=42, metric_code="api_calls", value=1),])Attach properties for dimension filtering:
await client.ingest([ Event( account_id=42, metric_code="compute_seconds", value=scale(45.0), properties={"provider": "aws"}, ),])For critical events that must be fsynced and replicated before acknowledgement, use sync delivery:
result = await client.ingest( [Event(account_id=42, metric_code="payment", value=scale(9.99))], delivery=DeliveryMode.SYNC,)print(f"stored {result.n_stored} events")For count unique metrics, set the operation type:
from unimeter import OperationType
await client.ingest([ Event( account_id=100, metric_code="active_seats", value=user_id, operation_type=OperationType.ADD, ),])Query usage
Section titled “Query usage”Read the total for a customer in a billing period:
from unimeter import current_month, unscale
result = await client.query(42, "api_calls", current_month())print(f"API calls this month: {result.value.count}")Query with a dimension filter:
result = await client.query( 42, "compute_seconds", current_month(), filters={"provider": "aws"},)print(f"AWS compute: {unscale(result.value.sum)} seconds")Query with multiple dimensions (AND). Pass two or more keys and Unimeter returns only events matching all of them:
result = await client.query( 42, "compute_seconds", current_month(), filters={"provider": "aws", "region": "us-east"},)For real-time dashboards, use query_realtime which reads from memory without touching disk:
agg = await client.query_realtime(42, "api_calls")print(f"current count: {agg.count}")Error handling
Section titled “Error handling”The client raises typed exceptions for different failure modes:
from unimeter import BackpressureError, ServerError
try: await client.ingest(events)except BackpressureError: # Server disk is full. Back off and retry. await asyncio.sleep(1) await client.ingest(events)except ServerError as e: print(f"Server error: {e}")Redirect responses from the cluster are handled transparently. If a node is no longer the leader for a partition, the client updates its routing cache and retries automatically.
What comes next
Section titled “What comes next”For the operational side of running Unimeter in production, see Running a cluster. For the Go client, which offers the same capabilities with a synchronous API, see Go SDK.