Skip to content

Go SDK

The Go client library handles the network protocol, connection pooling, cluster routing, and retries for you. Your application gets Ingest and Query methods and nothing else to learn. This page covers the full surface area of the library.

You need Go 1.22 or newer. Inside your Go module, add the client:

Terminal window
go get github.com/unimeter/go-unimeter@latest

Import it under a name of your choice. The convention in our examples is billing:

import billing "github.com/unimeter/go-unimeter"

A client is created by calling billing.New with a list of seed node addresses. The client connects, downloads the cluster topology, and opens persistent connections to every node.

client, err := billing.New([]string{"localhost:7001"})
if err != nil {
log.Fatal(err)
}
defer client.Close()

For a multi-node cluster, list two or three seeds so the client has fallbacks if the first is unavailable at startup. The client will discover the rest through the partition map.

The library is concurrency-safe. One *billing.Client should be shared across all goroutines in your service, initialized at startup, closed at shutdown.

Every method that does network IO takes a context.Context as its first argument:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
usage, err := client.Query(ctx, billing.QueryRequest{
AccountID: 42,
MetricCode: "api_calls",
Period: billing.CurrentMonth(),
})

If your Unimeter is running in Docker on your laptop and your Go program is running on the host, use localhost:7001 as the seed, assuming you published the port with -p 7001:7001. If your Go program is running inside another container on the same Docker network, use the container name as the host, like unimeter:7001.

Before sending events for a metric code, register it with the server. Schemas are durable and cluster-wide.

err := client.Metrics.Create(ctx, billing.MetricSchema{
Code: "api_calls",
AggType: billing.AggCount,
})

If the metric already exists, the call returns billing.ErrAlreadyExists. In most applications you want to treat this as success:

err := client.Metrics.Create(ctx, schema)
if err != nil && !errors.Is(err, billing.ErrAlreadyExists) {
log.Fatal(err)
}
billing.AggCount // count events
billing.AggSum // sum values
billing.AggMax // keep the maximum
billing.AggLatest // keep the most recent by timestamp
billing.AggCountUnique // count distinct values

If you want to slice usage by a tag at query time, add filters to the schema:

err := client.Metrics.Create(ctx, billing.MetricSchema{
Code: "compute_seconds",
AggType: billing.AggSum,
Filters: []billing.DimensionFilter{
{Key: "provider", Values: []string{"aws", "gcp", "azure"}},
{Key: "region", Values: []string{"us-east", "us-west", "europe"}},
},
})

By default metrics use fixed 30-day windows. For calendar-month billing, set the period type:

err := client.Metrics.Create(ctx, billing.MetricSchema{
Code: "api_calls",
AggType: billing.AggCount,
PeriodType: billing.PeriodCalendar,
})

If your billing cycle starts on a day other than the first, set BillingCycleDay (1–28):

err := client.Metrics.Create(ctx, billing.MetricSchema{
Code: "api_calls",
AggType: billing.AggCount,
PeriodType: billing.PeriodCalendar,
BillingCycleDay: 15, // period runs from the 15th to the 14th of the next month
})

Period helpers for queries:

billing.CurrentMonth() // 1st to 1st of next month
billing.LastMonth() // previous calendar month
billing.CurrentBillingPeriod(15) // current period starting on the 15th
billing.LastBillingPeriod(15) // previous period starting on the 15th

A metric can declare up to eight thresholds that fire when the running aggregate crosses them:

err := client.Metrics.Create(ctx, billing.MetricSchema{
Code: "api_calls",
AggType: billing.AggCount,
Thresholds: []billing.AlertThreshold{
{Code: "free_tier_exceeded", Value: 10_000, Recurring: false},
{Code: "hard_cap", Value: 100_000, Recurring: false},
},
})
schemas, err := client.Metrics.List(ctx)
for _, s := range schemas {
fmt.Printf("metric %q aggregates with %s\n", s.Code, s.AggType)
}
err := client.Metrics.Delete(ctx, "api_calls")

Events are sent in batches via Ingest. The client groups them by partition and fans out to the correct nodes in parallel.

result, err := client.Ingest(ctx, []billing.Event{{
AccountID: 42,
MetricCode: "api_calls",
Value: 1,
}})

AccountID picks the customer. MetricCode references the metric you registered. Value is the quantity. Timestamp is optional; if zero, the server stamps the event at ingestion time.

Batches are much more efficient than one-event-per-call. Accumulate events for a short window (ten to fifty milliseconds) and send as a single batch:

events := make([]billing.Event, 0, 1000)
for _, call := range recentApiCalls {
events = append(events, billing.Event{
AccountID: call.AccountID,
MetricCode: "api_calls",
Value: 1,
})
}
result, err := client.Ingest(ctx, events)

If the metric has dimension filters, attach properties to events:

event := billing.Event{
AccountID: 42,
MetricCode: "compute_seconds",
Value: billing.Scale(45.0),
Properties: map[string]string{
"provider": "aws",
"region": "us-east",
},
}

The billing.Scale helper converts a float to the integer representation the wire protocol uses, keeping six decimal places. Use billing.Unscale to convert back.

When the metric type is AggCountUnique, each event says whether it is adding or removing a value:

events := []billing.Event{
{
AccountID: 100,
MetricCode: "active_seats",
Value: uint64(userID),
OperationType: billing.OperationAdd,
},
{
AccountID: 100,
MetricCode: "active_seats",
Value: uint64(departedUserID),
OperationType: billing.OperationRemove,
},
}

By default Ingest returns as soon as the server has accepted the batch into its buffer (async). For stronger guarantees, set DeliveryMode to DeliverySync:

event := billing.Event{
AccountID: 42,
MetricCode: "one_time_payment",
Value: billing.Scale(9.99),
DeliveryMode: billing.DeliverySync,
}

Sync delivery waits for fsync and replica acknowledgement. The cost is a few milliseconds of latency. Sync and async events can coexist in the same batch; if any event is sync, the whole batch is treated as sync.

type IngestResult struct {
NStored uint32
NDuplicates uint32
Offsets []uint64
}

Most applications only need to check err and move on.

QueryRealtime returns the running total from in-memory state in about a millisecond:

agg, err := client.QueryRealtime(ctx, 42, "api_calls")
fmt.Printf("account 42 has made %d calls this period\n", agg.Count)

The returned AggValue contains every statistic: Count, Sum, Max, LastValue, LastEventAt, AlertFlags.

Query takes a period and returns the total for that range:

usage, err := client.Query(ctx, billing.QueryRequest{
AccountID: 42,
MetricCode: "api_calls",
Period: billing.CurrentMonth(),
})
fmt.Printf("account 42 made %d calls in %s\n", usage.Value.Count, usage.Period.Start.Format("January 2006"))

Period helpers: CurrentMonth(), LastMonth(), CurrentPeriod(duration), CurrentBillingPeriod(cycleDay), LastBillingPeriod(cycleDay). Or construct a Period by hand with Start and End times.

Query a single dimension:

usage, err := client.Query(ctx, billing.QueryRequest{
AccountID: 42,
MetricCode: "compute_seconds",
Period: billing.CurrentMonth(),
Filters: map[string]string{
"provider": "aws",
},
})

Query multiple dimensions at once (AND). Pass two or more entries in the Filters map and Unimeter returns only events that matched all of them:

usage, err := client.Query(ctx, billing.QueryRequest{
AccountID: 42,
MetricCode: "compute_seconds",
Period: billing.CurrentMonth(),
Filters: map[string]string{
"provider": "aws",
"region": "us-east",
},
})
events, err := client.ListEvents(ctx, 42, startTime, endTime)
for _, e := range events {
fmt.Printf("%s: %s value=%d\n", e.Timestamp, e.MetricCode, e.Value)
}
alerts, err := client.ListAlerts(ctx, 42, lastSeenOffset)
for _, a := range alerts {
fmt.Printf("%s: %s crossed %s at value %d\n",
a.TriggeredAt, a.MetricCode, a.ThresholdCode, a.ValueAtCross)
}

When a metric has thresholds, the server broadcasts every crossing to subscribed clients. The Go client turns those into a channel:

sub, err := client.Alerts.Subscribe(ctx, alerts.Filter{
AccountIDs: []uint64{42, 43, 44},
})
if err != nil {
log.Fatal(err)
}
defer sub.Close()
for alert := range sub.C {
fmt.Printf("account %d crossed %s on %s (value %d)\n",
alert.AccountID, alert.ThresholdCode, alert.MetricCode, alert.ValueAtCross)
}

Persist offsets from sub.Offsets() and pass them back via alerts.Filter.SinceOffset:

lastSeen := loadOffsetsFromDB()
sub, err := client.Alerts.Subscribe(ctx, alerts.Filter{
AccountIDs: []uint64{42, 43, 44},
SinceOffset: lastSeen,
})
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
saveOffsetsToDB(sub.Offsets())
}
}()

At-least-once semantics mean you might see the same alert twice after a crash. Make handlers idempotent.

The channel has a buffer of 64 alerts. If your consumer falls behind, the SDK increments a counter readable via sub.Dropped(). Dropped alerts are not lost; they stay in the node’s alert log and appear on the next catchup.

var (
capsMu sync.RWMutex
overCap = make(map[uint64]bool)
)
func consumeAlerts(ctx context.Context, client *billing.Client) {
sub, err := client.Alerts.Subscribe(ctx, alerts.Filter{})
if err != nil {
log.Fatal(err)
}
defer sub.Close()
for alert := range sub.C {
switch alert.ThresholdCode {
case "free_tier_exceeded":
go sendUpgradeEmail(alert.AccountID)
case "hard_cap":
capsMu.Lock()
overCap[alert.AccountID] = true
capsMu.Unlock()
}
}
}
func apiHandler(w http.ResponseWriter, r *http.Request) {
account := accountFromRequest(r)
capsMu.RLock()
capped := overCap[account]
capsMu.RUnlock()
if capped {
http.Error(w, "usage cap exceeded", http.StatusPaymentRequired)
return
}
client.Ingest(r.Context(), []billing.Event{{
AccountID: account,
MetricCode: "api_calls",
Value: 1,
}})
}

The unimeter/examples repository collects working Go programs. Clone it, point UNIMETER_NODES at a running server, and run any example.

  • saas-api — HTTP middleware that calls client.Ingest per request. Source
  • infra-meteringDimensionFilter on compute seconds by provider and region. Source
  • seat-basedAggCountUnique with OperationAdd and OperationRemove. Source
  • high-throughput — Buffered flush pattern from a queue like Kafka or SQS. Source
  • free-tier-alerts — Two thresholds with live subscription and local-cache enforcement. Source

For the operational side of running Unimeter in production, see Running a cluster. For the Python client, which offers the same capabilities with an async API, see Python SDK.