# HTTP Endpoints

## HTTP Fetch APIs

{% hint style="info" %}
Requires Agent version v755+.
{% endhint %}

The WarpStream Agent exposes HTTP/JSON endpoints for fetching records from Kafka topics without a native Kafka client. All endpoints are served on the Agent's HTTP port (8080 by default).

### Authentication and Authorization

#### Authentication

All endpoints support optional HTTP Basic Auth. When the Agent is configured with SASL authentication, you must include your WarpStream credentials (the same ones used for Kafka SASL/PLAIN) as HTTP Basic Auth.

With curl, use the `-u` flag:

```bash
curl -u 'ccun_YOUR_USERNAME:ccp_YOUR_PASSWORD' ...
```

This is equivalent to setting the `Authorization` header manually with the base64-encoded `username:password` pair:

```bash
curl -H 'Authorization: Basic Y2N1bl9ZT1VSX1VTRVJOQU1FOmNjcF9ZT1VSX1BBU1NXT1JE' ...
```

If the Agent does not require authentication, these headers can be omitted.

#### Authorization (ACLs)

The HTTP fetch endpoints enforce the same Kafka ACLs as the native Kafka protocol. The ACL principal is derived from the Basic Auth username as `User:<username>`. For example, if you authenticate as `ccun_abc123`, the ACL principal will be `User:ccun_abc123`.

The endpoints require `READ` permission on each topic being fetched. If the authenticated user does not have `READ` access to a topic, the partition will be returned with a `TOPIC_AUTHORIZATION_FAILED` error code (for `/v1/kafka/fetch`) or a `400` error response (for the single-record endpoints).

### Common Headers

| Header            | Description                                       | Default                                                 |
| ----------------- | ------------------------------------------------- | ------------------------------------------------------- |
| `kafka-client-id` | Identifies the client for logging and diagnostics | `http-fetch-client` / `http-fetch-single-record-client` |

### Endpoints

#### `GET` or `POST /v1/kafka/fetch`

Full-featured fetch endpoint that mirrors the Kafka Fetch protocol. Supports fetching from multiple topics and partitions in a single request.

**Request Body (JSON)**

| Field                                       | Type    | Required | Description                                                                                                                   |
| ------------------------------------------- | ------- | -------- | ----------------------------------------------------------------------------------------------------------------------------- |
| `topics`                                    | array   | **yes**  | List of topics to fetch from                                                                                                  |
| `topics[].topic`                            | string  | **yes**  | Topic name                                                                                                                    |
| `topics[].partitions`                       | array   | **yes**  | List of partitions to fetch from                                                                                              |
| `topics[].partitions[].partition`           | integer | **yes**  | Partition index (≥ 0)                                                                                                         |
| `topics[].partitions[].fetch_offset`        | integer | **yes**  | Offset to start fetching from (≥ 0)                                                                                           |
| `topics[].partitions[].partition_max_bytes` | integer | **yes**  | Max bytes to fetch for this partition (> 0)                                                                                   |
| `max_bytes`                                 | integer | no       | Max bytes for the entire fetch response (≥ 0). Default: `0` (agent-managed)                                                   |
| `max_records`                               | integer | no       | Max total records to return across all partitions (≥ 0). `0` means unlimited. Parsing stops early once the budget is reached. |
| `isolation_level`                           | string  | no       | `"read_uncommitted"` (default) or `"read_committed"`                                                                          |
| `long_poll`                                 | boolean | no       | If `true`, the agent waits up to 30s for new data instead of returning immediately. Default: `false`                          |

**Example Request**

```bash
curl -X GET \
  'http://localhost:8080/v1/kafka/fetch' \
  -H 'Content-Type: application/json' \
  -u 'username:password' \
  -d '{
    "max_bytes": 1048576,
    "max_records": 10,
    "topics": [
      {
        "topic": "my-topic",
        "partitions": [
          {
            "partition": 0,
            "fetch_offset": 0,
            "partition_max_bytes": 1048576
          }
        ]
      }
    ]
  }'
```

**Success Response (`200 OK`)**

```json
{
  "throttle_time_ms": 0,
  "topics": [
    {
      "topic": "my-topic",
      "partitions": [
        {
          "partition": 0,
          "error_code": "NONE",
          "high_watermark": 150,
          "last_stable_offset": 150,
          "log_start_offset": 0,
          "records": [
            {
              "offset": 0,
              "timestamp": 1707744000000,
              "key": "dGVzdC1rZXk=",
              "value": "dGVzdC12YWx1ZQ==",
              "headers": [
                {
                  "key": "header-name",
                  "value": "aGVhZGVyLXZhbHVl"
                }
              ]
            }
          ]
        }
      ]
    }
  ]
}
```

> **Note:** `key`, `value`, and header `value` fields are base64-encoded byte arrays. A JSON `null` indicates a nil key/value.

**Error Response (`400 Bad Request` / `500 Internal Server Error`)**

```json
{
  "code": "INVALID_REQUEST",
  "message": "invalid request: topics must not be empty"
}
```

***

#### `GET /v1/kafka/fetch_single_record`

Convenience endpoint for fetching a single record at a specific offset using query parameters.

**Query Parameters**

| Parameter   | Type    | Required | Description                 |
| ----------- | ------- | -------- | --------------------------- |
| `topic`     | string  | **yes**  | Topic name                  |
| `partition` | integer | **yes**  | Partition index (≥ 0)       |
| `offset`    | integer | **yes**  | Exact offset to fetch (≥ 0) |

**Example Request**

```bash
curl -X GET \
  'http://localhost:8080/v1/kafka/fetch_single_record?topic=my-topic&partition=0&offset=42' \
  -u 'username:password'
```

**Success Response (`200 OK`)**

Returns the record directly as a JSON object (not wrapped in the topic/partition structure):

```json
{
  "offset": 42,
  "timestamp": 1707744000000,
  "key": "dGVzdC1rZXk=",
  "value": "dGVzdC12YWx1ZQ==",
  "headers": []
}
```

**Not Found Response (`404 Not Found`)**

Returned when no record exists at the requested offset (e.g., the offset is beyond the high watermark or below the low watermark):

```json
{
  "code": "OFFSET_OUT_OF_RANGE",
  "message": "The requested offset is not within the range of offsets maintained by the server.: no record found at topic=my-topic partition=0 offset=999999"
}
```

**Error Response (`400 Bad Request`)**

```json
{
  "code": "INVALID_REQUEST",
  "message": "missing required query parameter: topic"
}
```

***

#### `GET /v1/kafka/topics/{topic}/partitions/{partition}/records/{offset}`

REST-style convenience endpoint for fetching a single record at a specific offset using path parameters. Functionally identical to `/v1/kafka/fetch_single_record`.

**Path Parameters**

| Parameter   | Type    | Required | Description                 |
| ----------- | ------- | -------- | --------------------------- |
| `topic`     | string  | **yes**  | Topic name                  |
| `partition` | integer | **yes**  | Partition index (≥ 0)       |
| `offset`    | integer | **yes**  | Exact offset to fetch (≥ 0) |

**Example Request**

```bash
curl -X GET \
  'http://localhost:8080/v1/kafka/topics/my-topic/partitions/0/records/42' \
  -u 'username:password'
```

**Responses**

Identical to `/v1/kafka/fetch_single_record`:

* **`200 OK`** — The record as a JSON object.
* **`404 Not Found`** — No record at the requested offset (`OFFSET_OUT_OF_RANGE`).
* **`400 Bad Request`** — Invalid path parameters.

***

### Error Codes

The `code` field in error responses maps to Kafka protocol error codes:

| HTTP Status | Code                         | Meaning                                                      |
| ----------- | ---------------------------- | ------------------------------------------------------------ |
| `400`       | `INVALID_REQUEST`            | Malformed request (bad JSON, missing fields, invalid values) |
| `401`       | `SASL_AUTHENTICATION_FAILED` | Missing or invalid credentials                               |
| `403`       | `TOPIC_AUTHORIZATION_FAILED` | ACL denied read access to the topic                          |
| `404`       | `OFFSET_OUT_OF_RANGE`        | No record exists at the requested offset                     |
| `500`       | `KAFKA_STORAGE_ERROR`        | Internal error fetching data                                 |

## Datadog HTTP Log Intake

### Datadog Log Intake APIs

{% hint style="info" %}
Request version v772+ of the Agent.
{% endhint %}

The WarpStream Agent exposes Datadog-compatible HTTP/JSON endpoints for accepting log batches from the Datadog Agent and producing them into Kafka topics. All endpoints are served on the Agent's HTTP port (`8080` by default).

### Authentication and Authorization

#### Authentication

These endpoints use the `DD-API-KEY` header rather than HTTP Basic Auth.

WarpStream interprets `DD-API-KEY` as Kafka SASL/PLAIN credentials encoded as:

`$SASL_USERNAME:$SASL_PASSWORD`

Behavior depends on how the Agent is configured:

* If the Agent does not require SASL authentication, `DD-API-KEY` is optional.
* If the Agent does require SASL authentication, `DD-API-KEY` must be present and must contain valid WarpStream Kafka credentials.
* If `DD-API-KEY` is present but malformed and SASL auth is optional, it is ignored.
* If `DD-API-KEY` is missing or malformed and SASL auth is required, the request fails with `401 SASL_AUTHENTICATION_FAILED`.
* mTLS authentication is not supported for these endpoints.

Example:

{% code overflow="wrap" %}

```bash
curl -X POST \'http://localhost:8080/dd/my-topic/api/v2/logs' \-H 'Content-Type: application/json' \-H 'DD-API-KEY: YOUR_USERNAME:YOUR_PASSWORD' \-d '[{"message":"hello from datadog","service":"my-service"}]'
```

{% endcode %}

#### Authorization (ACLs)

The endpoints enforce the same Kafka ACLs as the native Kafka produce path.

* If authentication succeeds, the ACL principal is derived from the username as `User:$SASL_USERNAME`
* If authentication is optional and no valid `DD-API-KEY` is provided, the request is evaluated as the anonymous principal.
* The endpoints require `WRITE` permission on the destination topic.
* If the authenticated principal does not have write access to the topic, the request fails with `403 TOPIC_AUTHORIZATION_FAILED`.

### Common Headers

| Header             | Description                                                            | Default                    |
| ------------------ | ---------------------------------------------------------------------- | -------------------------- |
| `DD-API-KEY`       | WarpStream Kafka SASL/PLAIN credentials encoded as `username:password` | omitted                    |
| `kafka-client-id`  | Identifies the client for logging and diagnostics                      | `http-datadog-logs-client` |
| `Content-Type`     | Must be `application/json`                                             | none                       |
| `Content-Encoding` | Optional request compression                                           | none                       |

### Datadog Agent Configuration

When configuring the Datadog Agent, point `logs_dd_url` at the path prefix only. Do not include `/api/v2/logs` or `/v1/input` in the configured URL, because the Datadog Agent appends the intake suffix itself.

Example Datadog Agent configuration for the primary endpoint:

{% code overflow="wrap" %}

```yaml
logs_enabled: true
logs_config:
    use_v2_api: true
    logs_dd_url: https://warpstream.example.com/dd/$TOPIC_NAME
```

{% endcode %}

If you want Datadog to use the compatibility endpoint instead, set:

{% code overflow="wrap" %}

```yaml
logs_enabled: true
logs_config:
    use_v2_api: false
    logs_dd_url: https://warpstream.example.com/dd/$TOPIC_NAME
```

{% endcode %}

### Common Behavior

These endpoints share the following behavior:

* The `$TOPIC_NAME` path parameter is the exact Kafka topic name to produce to.
* The topic must already exist.
* The topic name must be a valid Kafka topic name.
* The request body limit is `64 MiB` after decompression.
* Each JSON object in the submitted array becomes one Kafka record.
* The JSON object is stored as the Kafka record value exactly as submitted.
* No Kafka key is set.
* No Datadog-specific fields are interpreted or transformed.
* All records from a single HTTP request are written to the same partition.
* Across requests, the Agent rotates partitions over time for rough byte-based balancing.

### Raw Endpoint Specs

{% hint style="info" %}
The raw endpoint specifications is not required to use this integration. Simply follow the instructions above to configure the Datadog Agent and you'll be good to go. That said, the endpoints are documented for posterity.
{% endhint %}

#### `POST /dd/$TOPIC_NAME/api/v2/logs`

Primary Datadog-compatible log intake endpoint.

This is the preferred route for Datadog Agents configured with `use_v2_api: true`.

**Path Parameters**

| Parameter | Type   | Required | Description                  |
| --------- | ------ | -------- | ---------------------------- |
| `topic`   | string | yes      | Destination Kafka topic name |

**Request Body (JSON)**

The request body must be one of:

| Shape            | Description                                                                     |
| ---------------- | ------------------------------------------------------------------------------- |
| `[{...}, {...}]` | A JSON array of log objects. Each array element becomes one Kafka record value. |
| `{}`             | Connectivity probe. Accepted but does not produce any records.                  |

The Agent treats each array element as an opaque JSON object. Common Datadog fields like `message`, `service`, `ddsource`, `ddtags`, `hostname`, `status`, and `timestamp` are preserved exactly as sent.

**Example Request**

{% code overflow="wrap" %}

```bash
curl -X POST \'http://localhost:8080/dd/my-topic/api/v2/logs' \-H 'Content-Type: application/json' \-H 'DD-API-KEY: YOUR_USERNAME:YOUR_PASSWORD' \-d '[{"message": "application started","service": "payments","ddsource": "kubernetes","ddtags": "env:staging,team:data"},{"message": "worker ready","service": "payments","ddsource": "kubernetes","ddtags": "env:staging,team:data"}]'
```

{% endcode %}

**Success Response (`200 OK`)**

{% code overflow="wrap" %}

```
{}
```

{% endcode %}

#### `POST /dd/$TOPIC_NAME/v1/input`

Compatibility alias for Datadog Agents configured with `use_v2_api: false`.

This endpoint has the same authentication, authorization, request-body, partitioning, and response semantics as `/dd/$TOPIC_NAME/api/v2/logs`.

**Example Request**

{% code overflow="wrap" %}

```yaml
curl -X POST \'http://localhost:8080/dd/my-topic/v1/input' \-H 'Content-Type: application/json' \-H 'DD-API-KEY: YOUR_USERNAME:YOUR_PASSWORD' \-d '[{"message": "legacy intake example","service": "payments"}]'
```

{% endcode %}

**Success Response (`200 OK`)**

{% code overflow="wrap" %}

```
{}
```

{% endcode %}

### Connectivity Probe

The Datadog Agent may send an empty JSON object to check HTTP connectivity:

{% code overflow="wrap" %}

```
{}
```

{% endcode %}

This is accepted and returns `200 OK` with an empty JSON response body:

{% code overflow="wrap" %}

```
{}
```

{% endcode %}

No Kafka records are produced for connectivity probes.

### Request Compression

These endpoints support compressed request bodies via `Content-Encoding`.

Supported encodings for Datadog-style traffic:

* `gzip`
* `zstd`

The `64 MiB` request-size limit is enforced after decompression.

### Error Codes

The `code` field in error responses maps either to Kafka protocol error codes or endpoint-specific validation errors.

| HTTP Status | Code                         | Meaning                                                                                                                               |
| ----------- | ---------------------------- | ------------------------------------------------------------------------------------------------------------------------------------- |
| `400`       | `INVALID_REQUEST`            | Malformed JSON, unsupported content type, non-empty top-level object, trailing JSON tokens, or other request-shape validation failure |
| `400`       | `INVALID_TOPIC_EXCEPTION`    | Invalid topic name or topic does not exist                                                                                            |
| `401`       | `SASL_AUTHENTICATION_FAILED` | Missing, malformed, or invalid `DD-API-KEY` when authentication is required                                                           |
| `403`       | `TOPIC_AUTHORIZATION_FAILED` | ACL denied write access to the topic                                                                                                  |
| `413`       | `PAYLOAD_TOO_LARGE`          | Request body exceeds `64 MiB` after decompression                                                                                     |
| `413`       | `MESSAGE_TOO_LARGE`          | One or more produced records exceed the Agent's maximum record size                                                                   |
| `500`       | `KAFKA_STORAGE_ERROR`        | Internal metadata lookup, authorization, or produce failure                                                                           |

#### Example Error Response

{% code overflow="wrap" %}

```json
{"code": "INVALID_REQUEST","message": "payload must be either an empty JSON object or a JSON array"}
```

{% endcode %}

### Notes

* These endpoints are intended for Datadog-compatible log ingestion, not general-purpose Kafka-over-HTTP produce.
* Partitioning is request-scoped: all records in one HTTP request go to one partition.
* Partition selection starts from a random partition per topic and rotates round-robin over time based on accumulated request bytes.
* If you need different routing, use a different `$TOPIC_NAME` path prefix for each Datadog sender configuration.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.warpstream.com/warpstream/kafka/reference/protocol-and-feature-support/http-endpoints.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
