Bulk Ingest API Guide
Netra AI Platform: Bulk Ingest API Guide
Integration Path Note: This document details data ingestion using the high-throughput Bulk Ingest API. This is one of two primary integration methods. If you prefer to grant read-only access to an existing ClickHouse database instead of pushing data, please refer to our ClickHouse integration guide. This API path is for teams who prefer a push-based model.
This document provides technical specifications for the Bulk Ingest API, which accepts a structured, AI-native data schema aligned with OpenTelemetry principles. It is designed for engineering teams looking to integrate their systems with our Co-Optimization Agent for deep analysis and real-time optimization.
1. Authentication
Access to the API is controlled via bearer tokens. Each request must include an API key in the Authorization
header.
- Header Format:
Authorization: Bearer <YOUR_API_KEY>
API keys are managed within the Netra platform's web interface. Keys should be stored securely as environment variables or using a secrets management service. Each key requires the ingestion:write
permission.
2. API Endpoint
All data is sent to a single, versioned REST endpoint.
Method | URI Path | Description |
---|---|---|
POST | /v1/bulk | Submits a batch of events for ingestion. |
The full endpoint URL is: https://ingest.api.netrasystems.ai/v1/bulk
3. Payload Format: NDJSON with AI-Native Schema
The API requires payloads to be formatted as Newline Delimited JSON (NDJSON). Each line must be a single, complete JSON object representing one atomic LLM operation, conforming to the AI-Native schema detailed below.
3.1 Log Entry Schema Overview
The schema is organized into logical namespaces that capture the full context of an LLM operation.
Namespace | Status | Description |
---|---|---|
event_metadata | Required | Core metadata for the log itself, including a unique event_id and timestamp_utc . |
trace_context | Recommended | OpenTelemetry-compliant tracing information (trace_id , span_id ) to link operations into a complete workflow. |
application_context | Recommended | Data about the client application initiating the call, including app_name , service_name , and environment . |
request | Recommended | After required redaction: An OpenAI-compliant or Netra-normalized record of all parameters sent to the model. |
response | Recommended | After required redaction: The output from the model, including the completion content and normalized usage metrics. |
identity_context | Optional | Information for attribution and security, such as user_id , organization_id , and api_key_hash . |
performance | Optional | Pre-existing performance data (total_e2e , time_to_first_token ) and detailed metrics from self-hosted infrastructure. |
finops | Optional | Pre-existing financial data. This is often enriched by Netra post-ingestion. |
governance | Optional | Data related to risk management, security (PII detection), safety ratings, and quality evaluations. |
Most namepsaces are enriched by Netra post-ingestion.
Some namespace items may not be available from your existing logs - that's normal.
After required redaction, submit your data "as is".
For example, you can include governance
data seperate from request
and response
so long as trace_context
is included.
3.2 Detailed Field Requirements
While entire namespaces can be optional, a minimal valid log requires a few key fields from the required namespaces.
Core Required Fields:
Field Name | Type | Description |
---|---|---|
event_metadata.event_id | String | Auto-generated if not provided. A unique UUID for this log event. |
event_metadata.timestamp_utc | Integer | Auto-generated if not provided. Unix Epoch Milliseconds when the event occurred. |
request.model | Object | An object identifying the model used (e.g., {"name": "gpt-4o"} ). |
request.prompt | Object | The prompt sent to the model (e.g., {"messages": [...]} ). |
response.usage | Object | Token usage statistics (e.g., {"total_tokens": 100} ). |
response.completion | Object | The model's output (e.g., {"choices": [...]} ). |
Highly Recommended Fields: Including these fields provides essential context for tracing and observability.
Field Name | Type | Description |
---|---|---|
trace_context.trace_id | String | Links all operations in a single workflow. |
trace_context.span_id | String | Uniquely identifies this specific operation. |
application_context.app_name | String | The logical name of your application. |
application_context.environment | String | The deployment environment (e.g., 'production'). |
Optional Fields & Namespaces:
All other fields and namespaces are optional and should be provided when available to enable advanced features. For example, providing the governance.security
object allows for tracking PII.
3.3 Example NDJSON Payload
This example shows a single log entry with key fields populated across the various namespaces.
{"event_metadata":{"log_schema_version":"3.0.0","event_id":"a1b2c3d4-e5f6-7890-1234-567890abcdef","timestamp_utc":1722384000000,"ingestion_source":"my-app-v1.2"},"trace_context":{"trace_id":"trace-xyz-123","span_id":"span-abc-456","parent_span_id":null,"span_name":"SummarizeArticle","span_kind":"llm"},"identity_context":{"user_id":"hashed-user-987","organization_id":"org-abc-123","api_key_hash":"sha256-of-key..."},"application_context":{"app_name":"doc-summarizer","service_name":"summarization-service","environment":"production"},"request":{"model":{"provider":"openai","family":"gpt-4","name":"gpt-4o"},"prompt":{"messages":[{"role":"user","content":"Summarize the provided article."}]},"generation_config":{"temperature":0.5,"max_tokens_to_sample":512}},"response":{"completion":{"choices":[{"index":0,"finish_reason":"stop","message":{"role":"assistant","content":"This is the summary of the article..."}}]},"usage":{"prompt_tokens":850,"completion_tokens":150,"total_tokens":1000}},"performance":{"latency_ms":{"total_e2e":1250,"time_to_first_token":400}},"finops":{"cost":{"total_cost_usd":0.0055},"pricing_info":{"prompt_token_rate_usd_per_million":5.00,"completion_token_rate_usd_per_million":15.00}},"governance":{"security":{"pii_redacted":false},"safety":{"overall_safety_verdict":"pass"}}}
4. Request Headers
Header | Description | Required | Example Value |
---|---|---|---|
Authorization | Contains the bearer token for authentication. | Yes | Bearer <YOUR_API_KEY> |
Content-Type | Specifies the payload format. Must be application/x-ndjson . | Yes | application/x-ndjson |
Content-Encoding | Informs the server that the request body is compressed. Recommended value: gzip . | No | gzip |
5. API Responses
The API uses standard HTTP status codes. A successful submission is indicated by 202 Accepted
. See the Best Practices section for handling errors and retries.
6. Best Practices
High-Throughput Ingestion
- Batch Size: Aim for an uncompressed batch size between 1 MB and 5 MB.
- Compression: Always use Gzip compression.
- Persistent Connections & Parallelism: Use connection pooling and send multiple requests concurrently.
Error Handling and Retries
- Categorize Errors: Distinguish between retryable (
429
,5xx
) and non-retryable (4xx
) errors. - Implement Exponential Backoff: For all retryable errors, use an exponential backoff with jitter algorithm.
7. Client Examples
cURL
# 1. Create a payload file with the new schema
cat <<EOF > payload.ndjson
{"event_metadata":{"event_id":"a1b2c3d4-e5f6-7890-1234-567890abcdef","timestamp_utc":1722384000000},"trace_context":{"trace_id":"trace-xyz-123","span_id":"span-abc-456"},"request":{"model":{"name":"gpt-4o"},"prompt":{"messages":[{"role":"user","content":"Hello, world!"}]}},"response":{"completion":{"choices":[]},"usage":{"total_tokens":10}}}
EOF
# 2. Compress the payload
gzip payload.ndjson
# 3. Send the request
curl -X POST "[https://ingest.api.netrasystems.ai/v1/bulk](https://ingest.api.netrasystems.ai/v1/bulk)" \
-H "Authorization: Bearer <YOUR_API_KEY>" \
-H "Content-Type: application/x-ndjson" \
-H "Content-Encoding: gzip" \
--data-binary "@payload.ndjson.gz"
Python
import requests
import json
import gzip
import os
import time
NETRA_API_KEY = os.getenv("NETRA_API_KEY")
NETRA_ENDPOINT = "[https://ingest.api.netrasystems.ai/v1/bulk](https://ingest.api.netrasystems.ai/v1/bulk)"
def send_logs(log_entries: list):
"""Prepares and sends a batch of log entries using the new schema."""
if not NETRA_API_KEY:
raise ValueError("NETRA_API_KEY environment variable not set.")
ndjson_payload = "\n".join(json.dumps(entry) for entry in log_entries) + "\n"
compressed_payload = gzip.compress(ndjson_payload.encode('utf-8'))
headers = {
"Authorization": f"Bearer {NETRA_API_KEY}",
"Content-Type": "application/x-ndjson",
"Content-Encoding": "gzip"
}
with requests.Session() as session:
try:
response = session.post(
NETRA_ENDPOINT, data=compressed_payload, headers=headers, timeout=10
)
response.raise_for_status()
print(f"Request successful with status code: {response.status_code}")
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
# Example Usage with the AI-Native Schema
log = {
"event_metadata": {
"log_schema_version": "3.0.0",
"event_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
"timestamp_utc": int(time.time() * 1000)
},
"trace_context": {"trace_id": "trace-xyz-123", "span_id": "span-abc-456"},
"application_context": {"app_name": "python-sdk-test", "environment": "development"},
"request": {
"model": {"provider": "openai", "name": "gpt-4o"},
"prompt": {"messages": [{"role": "user", "content": "Tell me a joke."}]}
},
"response": {
"completion": {"choices": [{"finish_reason": "stop", "message": {"role": "assistant", "content": "Why don't scientists trust atoms? Because they make up everything!"}}]},
"usage": {"prompt_tokens": 8, "completion_tokens": 12, "total_tokens": 20}
},
"performance": {"latency_ms": {"total_e2e": 850}}
}
send_logs([log])
Go
package main
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"time"
)
// Define structs to match the new AI-Native Schema
type EventMetadata struct {
LogSchemaVersion string `json:"log_schema_version,omitempty"`
EventID string `json:"event_id"`
TimestampUTC int64 `json:"timestamp_utc"`
}
type TraceContext struct {
TraceID string `json:"trace_id"`
SpanID string `json:"span_id"`
}
type Model struct {
Provider string `json:"provider,omitempty"`
Name string `json:"name"`
}
type Message struct {
Role string `json:"role"`
Content string `json:"content"`
}
type Request struct {
Model Model `json:"model"`
Prompt struct {
Messages []Message `json:"messages"`
} `json:"prompt"`
}
type Completion struct {
Choices []struct{
Message Message `json:"message"`
} `json:"choices"`
}
type Usage struct {
TotalTokens int `json:"total_tokens"`
}
type Response struct {
Completion Completion `json:"completion"`
Usage Usage `json:"usage"`
}
type LogEntry struct {
EventMetadata EventMetadata `json:"event_metadata"`
TraceContext TraceContext `json:"trace_context"`
Request Request `json:"request"`
Response Response `json:"response"`
}
func main() {
apiKey := os.Getenv("NETRA_API_KEY")
endpoint := "[https://ingest.api.netrasystems.ai/v1/bulk](https://ingest.api.netrasystems.ai/v1/bulk)"
if apiKey == "" {
fmt.Println("Error: NETRA_API_KEY not set.")
return
}
logEntry := LogEntry{
EventMetadata: EventMetadata{
EventID: "b2c3d4e5-f6a7-8901-2345-67890abcdef1",
TimestampUTC: time.Now().UnixMilli(),
},
TraceContext: TraceContext{TraceID: "go-trace-456", SpanID: "go-span-789"},
Request: Request{
Model: Model{Provider: "anthropic", Name: "claude-3-sonnet"},
Prompt: struct{Messages []Message `json:"messages"`}{
Messages: []Message{{Role: "user", Content: "Hello from Go!"}},
},
},
Response: Response{
Completion: Completion{
Choices: []struct{Message Message `json:"message"`}{
{Message: Message{Role: "assistant", Content: "Hello from Netra!"}},
},
},
Usage: Usage{TotalTokens: 15},
},
}
payloadBytes, err := json.Marshal(logEntry)
if err != nil {
fmt.Printf("Error marshalling log: %v\n", err)
return
}
var compressedPayload bytes.Buffer
gzipWriter := gzip.NewWriter(&compressedPayload)
_, err = gzipWriter.Write(payloadBytes)
if err != nil {
fmt.Printf("Error compressing payload: %v\n", err)
return
}
// Add the required newline for NDJSON
gzipWriter.Write([]byte("\n"))
if err := gzipWriter.Close(); err != nil {
fmt.Printf("Error closing gzip writer: %v\n", err)
return
}
req, _ := http.NewRequest("POST", endpoint, &compressedPayload)
req.Header.Set("Authorization", "Bearer "+apiKey)
req.Header.Set("Content-Type", "application/x-ndjson")
req.Header.Set("Content-Encoding", "gzip")
client := &http.Client{Timeout: 15 * time.Second}
resp, err := client.Do(req)
if err != nil {
fmt.Printf("Error sending request: %v\n", err)
return
}
defer resp.Body.Close()
fmt.Printf("Received status code: %d\n", resp.StatusCode)
body, _ := io.ReadAll(resp.Body)
if len(body) > 0 {
fmt.Printf("Response body:\n%s\n", string(body))
}
}
Updated 9 days ago