Amazon Kinesis enables real-time processing of streaming data at massive scale. Kinesis Streams is useful for rapidly moving data off data producers and then continuously processing the data, be it to transform the data before emitting to a data store, run real-time data processing, metrics and analytics or derive a more complex data streamfor further processing.
How Kinesis Works
Benefits of Using Kinesis Data Streams
- Enables ingesting, buffering and processing streams of data in real-time.
- Facilitates architectural changes in transforming a system from API-driven to event-driven.
- Place data into Kinesis Data Streams, which ensures durability and elasticity.
- Implement multiple Kinesis Data Streams applications that can consume data from a stream so that multiple actions, like archiving and data processing, can take place concurrently and independently.
Publishing and Consuming
- To send data to the stream, configure producers using the Kinesis Streams PUT API operation or the Amazon Kinesis Producer Library (KPL). Learn more about setting up this configuration.
- For custom processing andreal-timedata analytics ofstreaming data, use the Kinesis Client Library (KCL).
Amazon Kinesis Client Library
The Kinesis Client Library (KCL) enables fault-tolerant consumption of data from streams and provides scaling support for Kinesis Data Streams applications. KCL is built on top of AWS Kinesis SDK instead of replacing it. For a simple application, AWS Kinesis SDK provides simple API to poll data from a stream continuously; it works relatively well if there is only a single shard for the stream. In a multi-shard scenario, the client developer must deal with significant complexity:
- Connects to the stream
- Enumerates the shards
- Coordinates shard associations with other workers (if any)
- Instantiates a record processor for every shard it manages
- Pulls data records from the stream
- Pushes the records to the corresponding record processor
- Checkpoints processed records
- Balances shard-worker associations when the worker instance count changes
- Balances shard-worker associations when shards are split or merged
Therefore, in production, almost no one directly uses AWS Kinesis SDK. For better understanding on how Kinesis works and the complexity of Kinesis consumption—especially how KCL helps to solve the problem—please refer to this presentation by Marvin Theimer on AWS re:Invent (BDT311). KCL is the de facto standard of Kinesis Consumption. Unfortunately, KCL is a Java native implementation only and there is no Go native implementation for KCL.
In order to support other languages like Python and Go, AWS has implemented a Java based daemon called MultiLangDaemon that does all the heavy lifting. MultiLangDaemon itself is a Java KCL application which implements an internal record processor. The MultiLangDaemon spawns a sub-process, which in turn runs another record processor which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over STDIN and STDOUT using a defined protocol. There will be a one-to-one correspondence amongst record processors, child processes and shards. The major drawbacks of this approach are:
- Java dependency
- Inefficiency: two record processors, internal processor in Java and external processor in any other language
VMware-Go-KCL
VMware-Go-KCL brings the Go/Kubernetes community with Go language native implementation of KCL matching exactly the same API and functional spec of original Java KCL v2.0 without the resource overhead of installing Java-based MultiLangDaemon.
Interface:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
type ( // IRecordProcessor is the interface for some callback functions invoked by KCL will // The main task of using KCL is to provide implementation on IRecordProcessor interface. // Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2 IRecordProcessor interface { /** * Invoked by the Amazon Kinesis Client Library before data records are delivered to the RecordProcessor instance * (via processRecords). * * @param initializationInput Provides information related to initialization */ Initialize(initializationInput *InitializationInput) /** * Process data records. The Amazon Kinesis Client Library will invoke this method to deliver data records to the * application. * Upon fail over, the new instance will get records with sequence number > checkpoint position * for each partition key. * * @param processRecordsInput Provides the records to be processed as well as information and capabilities related * to them (eg checkpointing). */ ProcessRecords(processRecordsInput *ProcessRecordsInput) /** * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this * RecordProcessor instance. * * <h2><b>Warning</b></h2> * * When the value of {@link ShutdownInput#getShutdownReason()} is * {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you * checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. * * @param shutdownInput * Provides information and capabilities (eg checkpointing) related to shutdown of this record processor. */ Shutdown(shutdownInput *ShutdownInput) } // IRecordProcessorFactory is interface for creating IRecordProcessor. Each Worker can have multiple threads // for processing shard. Client can choose either creating one processor per shard or sharing them. IRecordProcessorFactory interface { /** * Returns a record processor to be used for processing data records for a (assigned) shard. * * @return Returns a processor object. */ CreateProcessor() IRecordProcessor } ) |
Usage Example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
func main() { kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). WithInitialPositionInStream(cfg.LATEST). WithMaxRecords(10). WithMaxLeasesForWorker(1). WithShardSyncIntervalMillis(5000). WithFailoverTimeMillis(300000) // configure cloudwatch as metrics system metricsConfig := &metrics.MonitoringConfiguration{ MonitoringService: "cloudwatch", Region: regionName, CloudWatch: metrics.CloudWatchMonitoringService{ // Those value should come from kclConfig MetricsBufferTimeMillis: 10000, MetricsMaxQueueSize: 20, }, worker := wk.NewWorker(recordProcessorFactory(), kclConfig, metricsConfig) worker.Start() // Put some data into stream. for i := 0; i < 100; i++ { // Use random string as partition key to ensure even distribution across shards err := worker.Publish(streamName, utils.RandStringBytesMaskImpr(10), []byte("hello world")) if err != nil { log.Printf("Errorin Publish. %+v", err) } } // wait a few seconds before shutdown processing time.Sleep(10 * time.Second) worker.Shutdown() } // Record processor factory is used to create RecordProcessor func recordProcessorFactory() kc.IRecordProcessorFactory { return &dumpRecordProcessorFactory{} } // simple record processor and dump everything type dumpRecordProcessorFactory struct { } func (d *dumpRecordProcessorFactory) CreateProcessor() kc.IRecordProcessor { return &dumpRecordProcessor{} } // Create a dump record processor for printing out all data from record. type dumpRecordProcessor struct { } func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) { log.Printf("Processing SharId: %v at checkpoint: %v", input.ShardId, aws.StringValue(input.ExtendedSequenceNumber.SequenceNumber)) } func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { log.Print("Processing Records...") // don't process empty record if len(input.Records) == 0 { return } for _, v := range input.Records { log.Printf("Record = %s", v.Data) } // checkpoint it after processing this batch lastRecordSequenceNubmer := input.Records[len(input.Records)-1].SequenceNumber log.Printf("Checkpoint progress at: %v, MillisBehindLatest = %v", lastRecordSequenceNubmer, input.MillisBehindLatest) input.Checkpointer.Checkpoint(lastRecordSequenceNubmer) } func (dd *dumpRecordProcessor) Shutdown(input *kc.ShutdownInput) { log.Printf("Shutdown Reason: %v", aws.StringValue(kc.ShutdownReasonMessage(input.ShutdownReason))) // When the value of {@link ShutdownInput#getShutdownReason()} is // {@link ShutdownReason#TERMINATE} it is required that you // checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress. if input.ShutdownReason == kc.TERMINATE { input.Checkpointer.Checkpoint(nil) } } |
Reference and Tutorial
Because VMware-Go-KCL matches exactly the same interface and programming model from original Amazon Kinesis Client Library, the best place for getting reference, tutorial and service is from Amazon itself:
Support and Contact
Open source embodies a model for people to work together, building something greater than they can create on their own. For any project or code related issues and questions, please create an issue in VMware-Go-KCL repository.
Stay tuned to the Open Source Blog for more project deep-dives and follow us on Twitter (@vmwopensource).