Bulk Ingest API Guide

Netra AI Platform: Bulk Ingest API Guide

Integration Path Note: This document details data ingestion using the high-throughput Bulk Ingest API. This is one of two primary integration methods. If you prefer to grant read-only access to an existing ClickHouse database instead of pushing data, please refer to our ClickHouse integration guide. This API path is for teams who prefer a push-based model.

This document provides technical specifications for the Bulk Ingest API, which accepts a structured, AI-native data schema aligned with OpenTelemetry principles. It is designed for engineering teams looking to integrate their systems with our Co-Optimization Agent for deep analysis and real-time optimization.

1. Authentication

Access to the API is controlled via bearer tokens. Each request must include an API key in the Authorization header.

  • Header Format: Authorization: Bearer <YOUR_API_KEY>

API keys are managed within the Netra platform's web interface. Keys should be stored securely as environment variables or using a secrets management service. Each key requires the ingestion:write permission.

2. API Endpoint

All data is sent to a single, versioned REST endpoint.

MethodURI PathDescription
POST/v1/bulkSubmits a batch of events for ingestion.

The full endpoint URL is: https://ingest.api.netrasystems.ai/v1/bulk

3. Payload Format: NDJSON with AI-Native Schema

The API requires payloads to be formatted as Newline Delimited JSON (NDJSON). Each line must be a single, complete JSON object representing one atomic LLM operation, conforming to the AI-Native schema detailed below.

3.1 Log Entry Schema Overview

The schema is organized into logical namespaces that capture the full context of an LLM operation.

NamespaceStatusDescription
event_metadataRequiredCore metadata for the log itself, including a unique event_id and timestamp_utc.
trace_contextRecommendedOpenTelemetry-compliant tracing information (trace_id, span_id) to link operations into a complete workflow.
application_contextRecommendedData about the client application initiating the call, including app_name, service_name, and environment.
requestRecommendedAfter required redaction: An OpenAI-compliant or Netra-normalized record of all parameters sent to the model.
responseRecommendedAfter required redaction: The output from the model, including the completion content and normalized usage metrics.
identity_contextOptionalInformation for attribution and security, such as user_id, organization_id, and api_key_hash.
performanceOptionalPre-existing performance data (total_e2e, time_to_first_token) and detailed metrics from self-hosted infrastructure.
finopsOptionalPre-existing financial data. This is often enriched by Netra post-ingestion.
governanceOptionalData related to risk management, security (PII detection), safety ratings, and quality evaluations.

Most namepsaces are enriched by Netra post-ingestion.

Some namespace items may not be available from your existing logs - that's normal.

After required redaction, submit your data "as is". For example, you can include governance data seperate from request and response so long as trace_context is included.

3.2 Detailed Field Requirements

While entire namespaces can be optional, a minimal valid log requires a few key fields from the required namespaces.

Core Required Fields:

Field NameTypeDescription
event_metadata.event_idStringAuto-generated if not provided. A unique UUID for this log event.
event_metadata.timestamp_utcIntegerAuto-generated if not provided. Unix Epoch Milliseconds when the event occurred.
request.modelObjectAn object identifying the model used (e.g., {"name": "gpt-4o"}).
request.promptObjectThe prompt sent to the model (e.g., {"messages": [...]}).
response.usageObjectToken usage statistics (e.g., {"total_tokens": 100}).
response.completionObjectThe model's output (e.g., {"choices": [...]}).

Highly Recommended Fields: Including these fields provides essential context for tracing and observability.

Field NameTypeDescription
trace_context.trace_idStringLinks all operations in a single workflow.
trace_context.span_idStringUniquely identifies this specific operation.
application_context.app_nameStringThe logical name of your application.
application_context.environmentStringThe deployment environment (e.g., 'production').

Optional Fields & Namespaces: All other fields and namespaces are optional and should be provided when available to enable advanced features. For example, providing the governance.security object allows for tracking PII.

3.3 Example NDJSON Payload

This example shows a single log entry with key fields populated across the various namespaces.

{"event_metadata":{"log_schema_version":"3.0.0","event_id":"a1b2c3d4-e5f6-7890-1234-567890abcdef","timestamp_utc":1722384000000,"ingestion_source":"my-app-v1.2"},"trace_context":{"trace_id":"trace-xyz-123","span_id":"span-abc-456","parent_span_id":null,"span_name":"SummarizeArticle","span_kind":"llm"},"identity_context":{"user_id":"hashed-user-987","organization_id":"org-abc-123","api_key_hash":"sha256-of-key..."},"application_context":{"app_name":"doc-summarizer","service_name":"summarization-service","environment":"production"},"request":{"model":{"provider":"openai","family":"gpt-4","name":"gpt-4o"},"prompt":{"messages":[{"role":"user","content":"Summarize the provided article."}]},"generation_config":{"temperature":0.5,"max_tokens_to_sample":512}},"response":{"completion":{"choices":[{"index":0,"finish_reason":"stop","message":{"role":"assistant","content":"This is the summary of the article..."}}]},"usage":{"prompt_tokens":850,"completion_tokens":150,"total_tokens":1000}},"performance":{"latency_ms":{"total_e2e":1250,"time_to_first_token":400}},"finops":{"cost":{"total_cost_usd":0.0055},"pricing_info":{"prompt_token_rate_usd_per_million":5.00,"completion_token_rate_usd_per_million":15.00}},"governance":{"security":{"pii_redacted":false},"safety":{"overall_safety_verdict":"pass"}}}

4. Request Headers

HeaderDescriptionRequiredExample Value
AuthorizationContains the bearer token for authentication.YesBearer <YOUR_API_KEY>
Content-TypeSpecifies the payload format. Must be application/x-ndjson.Yesapplication/x-ndjson
Content-EncodingInforms the server that the request body is compressed. Recommended value: gzip.Nogzip

5. API Responses

The API uses standard HTTP status codes. A successful submission is indicated by 202 Accepted. See the Best Practices section for handling errors and retries.

6. Best Practices

High-Throughput Ingestion

  • Batch Size: Aim for an uncompressed batch size between 1 MB and 5 MB.
  • Compression: Always use Gzip compression.
  • Persistent Connections & Parallelism: Use connection pooling and send multiple requests concurrently.

Error Handling and Retries

  • Categorize Errors: Distinguish between retryable (429, 5xx) and non-retryable (4xx) errors.
  • Implement Exponential Backoff: For all retryable errors, use an exponential backoff with jitter algorithm.

7. Client Examples

cURL

# 1. Create a payload file with the new schema
cat <<EOF > payload.ndjson
{"event_metadata":{"event_id":"a1b2c3d4-e5f6-7890-1234-567890abcdef","timestamp_utc":1722384000000},"trace_context":{"trace_id":"trace-xyz-123","span_id":"span-abc-456"},"request":{"model":{"name":"gpt-4o"},"prompt":{"messages":[{"role":"user","content":"Hello, world!"}]}},"response":{"completion":{"choices":[]},"usage":{"total_tokens":10}}}
EOF

# 2. Compress the payload
gzip payload.ndjson

# 3. Send the request
curl -X POST "[https://ingest.api.netrasystems.ai/v1/bulk](https://ingest.api.netrasystems.ai/v1/bulk)" \
     -H "Authorization: Bearer <YOUR_API_KEY>" \
     -H "Content-Type: application/x-ndjson" \
     -H "Content-Encoding: gzip" \
     --data-binary "@payload.ndjson.gz"

Python

import requests
import json
import gzip
import os
import time

NETRA_API_KEY = os.getenv("NETRA_API_KEY")
NETRA_ENDPOINT = "[https://ingest.api.netrasystems.ai/v1/bulk](https://ingest.api.netrasystems.ai/v1/bulk)"

def send_logs(log_entries: list):
    """Prepares and sends a batch of log entries using the new schema."""
    if not NETRA_API_KEY:
        raise ValueError("NETRA_API_KEY environment variable not set.")

    ndjson_payload = "\n".join(json.dumps(entry) for entry in log_entries) + "\n"
    compressed_payload = gzip.compress(ndjson_payload.encode('utf-8'))

    headers = {
        "Authorization": f"Bearer {NETRA_API_KEY}",
        "Content-Type": "application/x-ndjson",
        "Content-Encoding": "gzip"
    }

    with requests.Session() as session:
        try:
            response = session.post(
                NETRA_ENDPOINT, data=compressed_payload, headers=headers, timeout=10
            )
            response.raise_for_status()
            print(f"Request successful with status code: {response.status_code}")
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")

# Example Usage with the AI-Native Schema
log = {
    "event_metadata": {
        "log_schema_version": "3.0.0",
        "event_id": "a1b2c3d4-e5f6-7890-1234-567890abcdef",
        "timestamp_utc": int(time.time() * 1000)
    },
    "trace_context": {"trace_id": "trace-xyz-123", "span_id": "span-abc-456"},
    "application_context": {"app_name": "python-sdk-test", "environment": "development"},
    "request": {
        "model": {"provider": "openai", "name": "gpt-4o"},
        "prompt": {"messages": [{"role": "user", "content": "Tell me a joke."}]}
    },
    "response": {
        "completion": {"choices": [{"finish_reason": "stop", "message": {"role": "assistant", "content": "Why don't scientists trust atoms? Because they make up everything!"}}]},
        "usage": {"prompt_tokens": 8, "completion_tokens": 12, "total_tokens": 20}
    },
    "performance": {"latency_ms": {"total_e2e": 850}}
}

send_logs([log])

Go

package main

import (
	"bytes"
	"compress/gzip"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"os"
	"time"
)

// Define structs to match the new AI-Native Schema
type EventMetadata struct {
	LogSchemaVersion string `json:"log_schema_version,omitempty"`
	EventID          string `json:"event_id"`
	TimestampUTC     int64  `json:"timestamp_utc"`
}

type TraceContext struct {
	TraceID string `json:"trace_id"`
	SpanID  string `json:"span_id"`
}

type Model struct {
	Provider string `json:"provider,omitempty"`
	Name     string `json:"name"`
}

type Message struct {
	Role    string `json:"role"`
	Content string `json:"content"`
}

type Request struct {
	Model  Model     `json:"model"`
	Prompt struct {
		Messages []Message `json:"messages"`
	} `json:"prompt"`
}

type Completion struct {
    Choices []struct{
        Message Message `json:"message"`
    } `json:"choices"`
}

type Usage struct {
	TotalTokens int `json:"total_tokens"`
}

type Response struct {
    Completion Completion `json:"completion"`
	Usage      Usage      `json:"usage"`
}

type LogEntry struct {
	EventMetadata EventMetadata `json:"event_metadata"`
	TraceContext  TraceContext  `json:"trace_context"`
	Request       Request       `json:"request"`
	Response      Response      `json:"response"`
}


func main() {
	apiKey := os.Getenv("NETRA_API_KEY")
	endpoint := "[https://ingest.api.netrasystems.ai/v1/bulk](https://ingest.api.netrasystems.ai/v1/bulk)"
	if apiKey == "" {
		fmt.Println("Error: NETRA_API_KEY not set.")
		return
	}

	logEntry := LogEntry{
		EventMetadata: EventMetadata{
			EventID:          "b2c3d4e5-f6a7-8901-2345-67890abcdef1",
			TimestampUTC:     time.Now().UnixMilli(),
		},
		TraceContext: TraceContext{TraceID: "go-trace-456", SpanID: "go-span-789"},
		Request: Request{
			Model: Model{Provider: "anthropic", Name: "claude-3-sonnet"},
			Prompt: struct{Messages []Message `json:"messages"`}{
				Messages: []Message{{Role: "user", Content: "Hello from Go!"}},
			},
		},
		Response: Response{
            Completion: Completion{
                Choices: []struct{Message Message `json:"message"`}{
                    {Message: Message{Role: "assistant", Content: "Hello from Netra!"}},
                },
            },
            Usage: Usage{TotalTokens: 15},
        },
	}

	payloadBytes, err := json.Marshal(logEntry)
	if err != nil {
		fmt.Printf("Error marshalling log: %v\n", err)
		return
	}

	var compressedPayload bytes.Buffer
	gzipWriter := gzip.NewWriter(&compressedPayload)
	_, err = gzipWriter.Write(payloadBytes)
    if err != nil {
		fmt.Printf("Error compressing payload: %v\n", err)
		return
	}
    // Add the required newline for NDJSON
	gzipWriter.Write([]byte("\n"))
	if err := gzipWriter.Close(); err != nil {
		fmt.Printf("Error closing gzip writer: %v\n", err)
		return
	}

	req, _ := http.NewRequest("POST", endpoint, &compressedPayload)
	req.Header.Set("Authorization", "Bearer "+apiKey)
	req.Header.Set("Content-Type", "application/x-ndjson")
	req.Header.Set("Content-Encoding", "gzip")

	client := &http.Client{Timeout: 15 * time.Second}
	resp, err := client.Do(req)
	if err != nil {
		fmt.Printf("Error sending request: %v\n", err)
		return
	}
	defer resp.Body.Close()

	fmt.Printf("Received status code: %d\n", resp.StatusCode)
	body, _ := io.ReadAll(resp.Body)
	if len(body) > 0 {
		fmt.Printf("Response body:\n%s\n", string(body))
	}
}