Skip to main content

Ingesting Events

The SDK provides three ways to send events, each suited to different throughput and reliability needs.

Building Events

All ingestion methods accept Event instances built with the fluent builder:
use monk_sdk::Event;

let event = Event::builder()
    .customer_id("cust_123")          // or .external_customer_id("ext_456")
    .event_name("api_call")
    .property("model", "gpt-4")
    .property("tokens", 1500)
    .idempotency_key("evt_unique_123") // optional, auto-generated if omitted
    .timestamp(chrono::Utc::now())     // optional, defaults to now
    .build()?;

Event Fields

FieldRequiredDescription
customer_idYes*Monk’s internal customer UUID
external_customer_idYes*Your external customer identifier
event_nameYesIdentifies the type of event
propertiesNoKey-value metadata (strings, numbers, booleans)
timestampNoEvent time (defaults to now). Must not be more than 1 hour in the future.
idempotency_keyNoUnique key to prevent duplicates (auto-generated)
*Exactly one of customer_id or external_customer_id must be provided.

Single Event

Send one event at a time with automatic retries on transient failures:
let resp = client.ingest(
    Event::builder()
        .customer_id("cust_123")
        .event_name("api_call")
        .property("model", "gpt-4")
        .property("tokens", 1500)
        .build()?
).await?;

println!("status: {}", resp.status);
println!("idempotency_key: {}", resp.idempotency_key);
println!("request_id: {}", resp.request_id);
Use this when events are infrequent or when you need confirmation that each event was accepted.

Batch Ingestion

Send up to 10,000 events in a single HTTP request:
let events = vec![
    Event::builder()
        .customer_id("cust_123")
        .event_name("api_call")
        .property("tokens", 42)
        .build()?,
    Event::builder()
        .external_customer_id("ext_456")
        .event_name("storage_read")
        .property("bytes", 1024)
        .build()?,
];

let resp = client.ingest_batch(events).await?;
println!("ingested {} events", resp.count);
Use this for bulk imports, backfills, or when you already have a batch of events ready to send.

Buffered Ingestion

For high-throughput pipelines, enqueue events into an in-memory buffer. The SDK automatically flushes them in batches using parallel HTTP workers:
for i in 0..100_000 {
    client.ingest_buffered(
        Event::builder()
            .customer_id("cust_123")
            .event_name("page_view")
            .property("page", format!("/page/{i}"))
            .build()?
    ).await?;
}

// Trigger an immediate flush
client.flush();

// Graceful shutdown (flushes remaining events, waits for in-flight requests)
client.close();

How Buffering Works

  1. ingest_buffered pushes the event into a bounded in-memory channel
  2. A background tokio task accumulates events
  3. When the batch reaches max_batch_size or the flush_interval elapses, a flush is triggered
  4. Multiple flushes run concurrently, limited by a semaphore (max_concurrent_flushes)
  5. If a flush fails after all retries, the on_flush_failure callback fires

When to Use Each Method

MethodThroughputConfirmationUse Case
ingestLowPer-eventCritical events, low volume
ingest_batchMediumPer-batchBulk imports, backfills
ingest_bufferedHighFire-and-forgetReal-time pipelines, high-frequency events