Links
Comment on page

"Hello World" for AWS Kinesis

First, install the WarpStream Agent for your platform.

Running the WarpStream Agent

warpstream playground
The playground command will start an Agent on localhost, store all the data for the Agent in your current directory, and sign up for a temporary account for you to play around with.

Create a Stream

Let's create a stream using the aws-cli tool.
aws kinesis create-stream \
--no-sign-request \
--endpoint "http://localhost:8080" \
--stream-name helloworld \
--shard-count 1
If you don't receive an error back, your request to create the stream succeeded.

Add Records To The Stream

Now let's write a record to the stream. We'll write a record with the partition key "hello" and the data "world". Record data in Kinesis is base64-encoded, which is why the Data field in the example command below is base64-encoded.
aws kinesis put-records \
--no-sign-request \
--endpoint "http://localhost:8080" \
--records "Data=\"d29ybGQ=\",PartitionKey=hello" \
--stream-name helloworld
You should receive the response as JSON in your terminal that looks something like this:
{
"FailedRecordCount": 0,
"Records": [
{
"SequenceNumber": "0",
"ShardId": "shardId-000000000000"
}
],
"EncryptionType": "NONE"
}

Read Records From The Stream

aws kinesis get-shard-iterator \
--no-sign-request \
--endpoint "http://localhost:8080" \
--stream-name helloworld \
--shard-id shardId-000000000000 \
--shard-iterator-type TRIM_HORIZON
Once you've received a shard iterator back from the Agent, you can start using it to read the record you just wrote in the last step.
aws kinesis get-records \
--no-sign-request \
--endpoint "http://localhost:8080" \
--shard-iterator "SHARD_ITERATOR"
Now you should see your record you wrote previously printed to the console. The PartitionKey field is "hello" and the Data field is the same base64-encoded "world" string as before. You can repeat the process of writing another record and running get-records command again to read and write more records.
{
"Records": [
{
"SequenceNumber": "0",
"Data": "d29ybGQ=",
"PartitionKey": "hello",
"EncryptionType": "NONE"
}
],
"NextShardIterator": "CgtoZWxsb3dvcmxkMxgB",
"MillisBehindLatest": 0
}
You don't need to run the get-shard-iterator command again. You can re-use the shard iterator value returned after each successful get-records.
And that's it! You've successfully set up a WarpStream Agent Pool to power your Virtual Cluster, created a stream, and processed some data through it.
You can now move on to trying to create a real application which reads or writes from the stream, or connect your existing tools like Flink to WarpStream.
Apache, Apache Kafka, Kafka, and associated open source project names are trademarks of the Apache Software Foundation. Kinesis is a trademark of Amazon Web Services.