HTTP Endpoints

This pages describes the HTTP endpoints available in the WarpStream Kafka product.

HTTP Fetch APIs

circle-info

Requires Agent version v755+.

The WarpStream Agent exposes HTTP/JSON endpoints for fetching records from Kafka topics without a native Kafka client. All endpoints are served on the Agent's HTTP port (8080 by default).

Authentication and Authorization

Authentication

All endpoints support optional HTTP Basic Auth. When the Agent is configured with SASL authentication, you must include your WarpStream credentials (the same ones used for Kafka SASL/PLAIN) as HTTP Basic Auth.

With curl, use the -u flag:

curl -u 'ccun_YOUR_USERNAME:ccp_YOUR_PASSWORD' ...

This is equivalent to setting the Authorization header manually with the base64-encoded username:password pair:

curl -H 'Authorization: Basic Y2N1bl9ZT1VSX1VTRVJOQU1FOmNjcF9ZT1VSX1BBU1NXT1JE' ...

If the Agent does not require authentication, these headers can be omitted.

Authorization (ACLs)

The HTTP fetch endpoints enforce the same Kafka ACLs as the native Kafka protocol. The ACL principal is derived from the Basic Auth username as User:<username>. For example, if you authenticate as ccun_abc123, the ACL principal will be User:ccun_abc123.

The endpoints require READ permission on each topic being fetched. If the authenticated user does not have READ access to a topic, the partition will be returned with a TOPIC_AUTHORIZATION_FAILED error code (for /v1/kafka/fetch) or a 400 error response (for the single-record endpoints).

Common Headers

Header
Description
Default

kafka-client-id

Identifies the client for logging and diagnostics

http-fetch-client / http-fetch-single-record-client

Endpoints

GET or POST /v1/kafka/fetch

Full-featured fetch endpoint that mirrors the Kafka Fetch protocol. Supports fetching from multiple topics and partitions in a single request.

Request Body (JSON)

Field
Type
Required
Description

topics

array

yes

List of topics to fetch from

topics[].topic

string

yes

Topic name

topics[].partitions

array

yes

List of partitions to fetch from

topics[].partitions[].partition

integer

yes

Partition index (≥ 0)

topics[].partitions[].fetch_offset

integer

yes

Offset to start fetching from (≥ 0)

topics[].partitions[].partition_max_bytes

integer

yes

Max bytes to fetch for this partition (> 0)

max_bytes

integer

no

Max bytes for the entire fetch response (≥ 0). Default: 0 (agent-managed)

max_records

integer

no

Max total records to return across all partitions (≥ 0). 0 means unlimited. Parsing stops early once the budget is reached.

isolation_level

string

no

"read_uncommitted" (default) or "read_committed"

long_poll

boolean

no

If true, the agent waits up to 30s for new data instead of returning immediately. Default: false

Example Request

Success Response (200 OK)

Note: key, value, and header value fields are base64-encoded byte arrays. A JSON null indicates a nil key/value.

Error Response (400 Bad Request / 500 Internal Server Error)


GET /v1/kafka/fetch_single_record

Convenience endpoint for fetching a single record at a specific offset using query parameters.

Query Parameters

Parameter
Type
Required
Description

topic

string

yes

Topic name

partition

integer

yes

Partition index (≥ 0)

offset

integer

yes

Exact offset to fetch (≥ 0)

Example Request

Success Response (200 OK)

Returns the record directly as a JSON object (not wrapped in the topic/partition structure):

Not Found Response (404 Not Found)

Returned when no record exists at the requested offset (e.g., the offset is beyond the high watermark or below the low watermark):

Error Response (400 Bad Request)


GET /v1/kafka/topics/{topic}/partitions/{partition}/records/{offset}

REST-style convenience endpoint for fetching a single record at a specific offset using path parameters. Functionally identical to /v1/kafka/fetch_single_record.

Path Parameters

Parameter
Type
Required
Description

topic

string

yes

Topic name

partition

integer

yes

Partition index (≥ 0)

offset

integer

yes

Exact offset to fetch (≥ 0)

Example Request

Responses

Identical to /v1/kafka/fetch_single_record:

  • 200 OK — The record as a JSON object.

  • 404 Not Found — No record at the requested offset (OFFSET_OUT_OF_RANGE).

  • 400 Bad Request — Invalid path parameters.


Error Codes

The code field in error responses maps to Kafka protocol error codes:

HTTP Status
Code
Meaning

400

INVALID_REQUEST

Malformed request (bad JSON, missing fields, invalid values)

401

SASL_AUTHENTICATION_FAILED

Missing or invalid credentials

403

TOPIC_AUTHORIZATION_FAILED

ACL denied read access to the topic

404

OFFSET_OUT_OF_RANGE

No record exists at the requested offset

500

KAFKA_STORAGE_ERROR

Internal error fetching data

Datadog HTTP Log Intake

Datadog Log Intake APIs

circle-info

Request version v772+ of the Agent.

The WarpStream Agent exposes Datadog-compatible HTTP/JSON endpoints for accepting log batches from the Datadog Agent and producing them into Kafka topics. All endpoints are served on the Agent's HTTP port (8080 by default).

Authentication and Authorization

Authentication

These endpoints use the DD-API-KEY header rather than HTTP Basic Auth.

WarpStream interprets DD-API-KEY as Kafka SASL/PLAIN credentials encoded as:

$SASL_USERNAME:$SASL_PASSWORD

Behavior depends on how the Agent is configured:

  • If the Agent does not require SASL authentication, DD-API-KEY is optional.

  • If the Agent does require SASL authentication, DD-API-KEY must be present and must contain valid WarpStream Kafka credentials.

  • If DD-API-KEY is present but malformed and SASL auth is optional, it is ignored.

  • If DD-API-KEY is missing or malformed and SASL auth is required, the request fails with 401 SASL_AUTHENTICATION_FAILED.

  • mTLS authentication is not supported for these endpoints.

Example:

Authorization (ACLs)

The endpoints enforce the same Kafka ACLs as the native Kafka produce path.

  • If authentication succeeds, the ACL principal is derived from the username as User:$SASL_USERNAME

  • If authentication is optional and no valid DD-API-KEY is provided, the request is evaluated as the anonymous principal.

  • The endpoints require WRITE permission on the destination topic.

  • If the authenticated principal does not have write access to the topic, the request fails with 403 TOPIC_AUTHORIZATION_FAILED.

Common Headers

Header
Description
Default

DD-API-KEY

WarpStream Kafka SASL/PLAIN credentials encoded as username:password

omitted

kafka-client-id

Identifies the client for logging and diagnostics

http-datadog-logs-client

Content-Type

Must be application/json

none

Content-Encoding

Optional request compression

none

Datadog Agent Configuration

When configuring the Datadog Agent, point logs_dd_url at the path prefix only. Do not include /api/v2/logs or /v1/input in the configured URL, because the Datadog Agent appends the intake suffix itself.

Example Datadog Agent configuration for the primary endpoint:

If you want Datadog to use the compatibility endpoint instead, set:

Common Behavior

These endpoints share the following behavior:

  • The $TOPIC_NAME path parameter is the exact Kafka topic name to produce to.

  • The topic must already exist.

  • The topic name must be a valid Kafka topic name.

  • The request body limit is 64 MiB after decompression.

  • Each JSON object in the submitted array becomes one Kafka record.

  • The JSON object is stored as the Kafka record value exactly as submitted.

  • No Kafka key is set.

  • No Datadog-specific fields are interpreted or transformed.

  • All records from a single HTTP request are written to the same partition.

  • Across requests, the Agent rotates partitions over time for rough byte-based balancing.

Raw Endpoint Specs

circle-info

The raw endpoint specifications is not required to use this integration. Simply follow the instructions above to configure the Datadog Agent and you'll be good to go. That said, the endpoints are documented for posterity.

POST /dd/$TOPIC_NAME/api/v2/logs

Primary Datadog-compatible log intake endpoint.

This is the preferred route for Datadog Agents configured with use_v2_api: true.

Path Parameters

Parameter
Type
Required
Description

topic

string

yes

Destination Kafka topic name

Request Body (JSON)

The request body must be one of:

Shape
Description

[{...}, {...}]

A JSON array of log objects. Each array element becomes one Kafka record value.

{}

Connectivity probe. Accepted but does not produce any records.

The Agent treats each array element as an opaque JSON object. Common Datadog fields like message, service, ddsource, ddtags, hostname, status, and timestamp are preserved exactly as sent.

Example Request

Success Response (200 OK)

POST /dd/$TOPIC_NAME/v1/input

Compatibility alias for Datadog Agents configured with use_v2_api: false.

This endpoint has the same authentication, authorization, request-body, partitioning, and response semantics as /dd/$TOPIC_NAME/api/v2/logs.

Example Request

Success Response (200 OK)

Connectivity Probe

The Datadog Agent may send an empty JSON object to check HTTP connectivity:

This is accepted and returns 200 OK with an empty JSON response body:

No Kafka records are produced for connectivity probes.

Request Compression

These endpoints support compressed request bodies via Content-Encoding.

Supported encodings for Datadog-style traffic:

  • gzip

  • zstd

The 64 MiB request-size limit is enforced after decompression.

Error Codes

The code field in error responses maps either to Kafka protocol error codes or endpoint-specific validation errors.

HTTP Status
Code
Meaning

400

INVALID_REQUEST

Malformed JSON, unsupported content type, non-empty top-level object, trailing JSON tokens, or other request-shape validation failure

400

INVALID_TOPIC_EXCEPTION

Invalid topic name or topic does not exist

401

SASL_AUTHENTICATION_FAILED

Missing, malformed, or invalid DD-API-KEY when authentication is required

403

TOPIC_AUTHORIZATION_FAILED

ACL denied write access to the topic

413

PAYLOAD_TOO_LARGE

Request body exceeds 64 MiB after decompression

413

MESSAGE_TOO_LARGE

One or more produced records exceed the Agent's maximum record size

500

KAFKA_STORAGE_ERROR

Internal metadata lookup, authorization, or produce failure

Example Error Response

Notes

  • These endpoints are intended for Datadog-compatible log ingestion, not general-purpose Kafka-over-HTTP produce.

  • Partitioning is request-scoped: all records in one HTTP request go to one partition.

  • Partition selection starts from a random partition per topic and rotates round-robin over time based on accumulated request bytes.

  • If you need different routing, use a different $TOPIC_NAME path prefix for each Datadog sender configuration.

Last updated

Was this helpful?