Documentation ¶
Overview ¶
GKES (Go Kafka Event Source) attempts to fill the gaps ub the Go/Kafka library ecosystem. It supplies Exactly Once Semantics (EOS), local state stores and incremental consumer rebalancing to Go Kafka consumers, making it a viable alternative to a traditional Kafka Streams application written in Java.
What it is ¶
GKES is Go/Kafka library tailored towards the development of Event Sourcing applications, by providing a high-throughput, low-latency Kafka client framework. Using Kafka transactions, it provides for EOS, data integrity and high availability. If you wish to use GKES as straight Kafka consumer, it will fit the bill as well. Though there are plenty of libraries for that, and researching which best fits your use case is time well spent.
GKES is not an all-in-one, do-everything black box. Some elements, in particular the StateStore, have been left without comprehensive implementations.
StateStores ¶
A useful and performant local state store rarely has a flat data structure. If your state store does, there are some convenient implementations provided. However, to achieve optimum performance, you will not only need to write a StateStore implementation, but will also need to understand what the proper data structures are for your use case (trees, heaps, maps, disk-based LSM trees or combinations thereof). You can use the provided github.com/aws/go-kafka-event-source/streams/stores.SimpleStore as a starting point.
Vending State ¶
GKES purposefully does not provide a pre-canned way for exposing StateStore data, other than a producing to another Kafka topic. There are as many ways to vend data as there are web applications. Rather than putting effort into inventing yet another one, GKES provides the mechanisms to query StateStores via Interjections. This mechanism can be plugged into whatever request/response mechanism that suits your use-case (gRPC, RESTful HTTP service...any number of web frameworks already in the Go ecosystem). [TODO: provide a simple http example]
Interjections ¶
For this familiar with thw Kafka Streams API, GKES provides for stream `Punctuators“, but we call them `Interjections` (because it sounds cool). Interjections allow you to insert actions into your EventSource at specicifed interval per partition assigned via streams.EventSource.ScheduleInterjection, or at any time via streams.EventSource.Interject. This is useful for bookeeping activities, aggregated metric production or even error handling. Interjections have full access to the StateStore associated with an EventSource and can interact with output topics like any other EventProcessor.
Incremental Consumer Rebalancing ¶
One issue that Kafka conumer applications have long suffered from are latency spikes during a consumer rebalance. The cooperative sticky rebalancing introduced by Kafka and implemented by kgo helps resolve this issue. However, once StateStore are thrown into the mix, things get a bit more complicated because initializing the StateStore on a host invloves consuming a compacted TopicPartion from start to end. GKES solves this with the IncrementalRebalancer and takes it one step further. The IncrementalRebalancer rebalances consumer partitions in a controlled fashion, minimizing latency spikes and limiting the blast of a bad deployment.
Async Processing ¶
GKES provides conventions for asynchronously processing events on the same Kafka partition while still maintaining data/stream integrity. The AsyncBatcher and AsyncJobScheduler allow you to split a TopicPartition into sub-streams by key, ensuring all events for a partitcular key are processed in order, allowing for parallel processing on a given TopicPartition.
For more details, see Async Processing Examples
High-Throughput/Low-Latency EOS ¶
A Kafka transaction is a powerful tool which allows for Exactly Once Semantics (EOS) by linking a consumer offset commit to one or more records that are being produced by your application (a StateStore record for example). The history of Kafka EOS is a long and complicated one with varied degrees of performance and efficiency.
Early iterations required one producer transaction per consumer partition, which was very ineffiecient as Topic with 1000 partitions would also require 1000 clients in order to provide EOS. This has since been addressed, but depending on client implementations, there is a high risk of running into "producer fenced" errors as well as reduced throughput.
In a traditional Java Kafka Streams application, transactions are committed according to the auto-commit frequency, which defaults to 100ms. This means that your application will only produce readable records every 100ms per partition. The effect of this is that no matter what you do, your tail latency will be at least 100ms and downstream consumers will receive records in bursts rather than a steady stream. For many use cases, this is unaceptable.
GKES solves this issue by using a configurable transactional producer pool and a type of "Nagle's algorithm". Uncommitted offsets are added to the transaction pool in sequence. Once a producer has reach its record limit, or enough time has elapsed (10ms by default), the head transaction will wait for any incomplete events to finsh, then flush and commit. While this transaction is committing, GKES continues to process events and optimistically begins a new transaction and produces records on the next producer in pool. Since trasnaction produce in sequence, there is no danger of commit offset overlap or duplicate message processing in the case of a failure.
To ensure EOS, your EventSource must use either the IncrementalRebalancer, or kgos cooperative sticky implementation. Though if you're using a StateStore, IncrementalRebalancer should be used to avoid lengthy periods of inactivity during application deployments.
Kafka Client Library ¶
Rather than create yet another Kafka driver, GKES is built on top of kgo. This Kafka client was chosen as it (in our testing) has superior throughput and latency profiles compared to other client libraries currently available to Go developers.
One other key adavantage is that it provides a migration path to cooperative consumer rebalancing, required for our EOS implementation. Other Go Kafka libraries provide cooperative rebalancing, but do not allow you to migrate froma non-cooperative rebalancing strategy (range, sticky etc.). This is a major roadblock for existing deployemtns as the only migration paths are an entirely new consumer group, or to bring your application completely down and re-deploy with a new rebalance strategy. These migration plans, to put it mildly, are big challenge for zero-downtime/live applications. The kgo package now makes this migration possible with zero downtime.
Kgo also has the proper hooks need to implement the IncrementalGroupRebalancer, which is necessary for safe deployments when using a local state store. Kudos to kgo!
Index ¶
- Constants
- Variables
- func DeleteSource(sourceConfig EventSourceConfig) error
- func JsonItemDecoder[T any](record IncomingRecord) (T, error)
- func NewClient(cluster Cluster, options ...kgo.Opt) (*kgo.Client, error)
- func RegisterDefaultHandler[T StateStore](es *EventSource[T], recordProcessor EventProcessor[T, IncomingRecord], ...)
- func RegisterEventType[T StateStore, V any](es *EventSource[T], transformer IncomingRecordDecoder[V], ...)
- func SetRecordType(r *kgo.Record, recordType string)
- type AsyncBatcher
- type AsyncCompleter
- type AsyncJob
- type AsyncJobFinalizer
- type AsyncJobProcessor
- type AsyncJobScheduler
- func CreateAsyncJobScheduler[S StateStore, K comparable, V any](eventSource *EventSource[S], processor AsyncJobProcessor[K, V], ...) (*AsyncJobScheduler[S, K, V], error)
- func NewAsyncJobScheduler[S StateStore, K comparable, V any](runStatus sak.RunStatus, processor AsyncJobProcessor[K, V], ...) (*AsyncJobScheduler[S, K, V], error)
- type BalanceStrategy
- type BatchCallback
- type BatchExecutor
- type BatchItem
- type BatchItems
- type BatchProducer
- type BatchProducerCallback
- type ChangeLogEntry
- func (cle ChangeLogEntry) KeyWriter() *bytes.Buffer
- func (cle ChangeLogEntry) ValueWriter() *bytes.Buffer
- func (cle ChangeLogEntry) WithEntryType(entryType string) ChangeLogEntry
- func (cle ChangeLogEntry) WithHeader(key string, value []byte) ChangeLogEntry
- func (cle ChangeLogEntry) WithKey(key ...[]byte) ChangeLogEntry
- func (cle ChangeLogEntry) WithKeyString(key ...string) ChangeLogEntry
- func (cle ChangeLogEntry) WithValue(value ...[]byte) ChangeLogEntry
- func (cle ChangeLogEntry) WriteKey(bs ...[]byte)
- func (cle ChangeLogEntry) WriteKeyString(ss ...string)
- func (cle ChangeLogEntry) WriteValue(bs ...[]byte)
- func (cle ChangeLogEntry) WriteValueString(ss ...string)
- type ChangeLogReceiver
- type CleanupPolicy
- type Cluster
- type Codec
- type DeserializationErrorHandler
- type Destination
- type EosConfig
- type ErrorContext
- type ErrorResponse
- type EventContext
- func (ec *EventContext[T]) AsyncJobComplete(finalize func() ExecutionState)
- func (ec *EventContext[T]) Fail(err error) ExecutionState
- func (ec *EventContext[T]) Forward(records ...*Record)
- func (ec *EventContext[T]) Input() (IncomingRecord, bool)
- func (ec *EventContext[T]) IsInterjection() bool
- func (ec *EventContext[T]) Offset() int64
- func (ec *EventContext[T]) RecordChange(entries ...ChangeLogEntry)
- func (ec *EventContext[T]) Store() T
- func (ec *EventContext[T]) TopicPartition() TopicPartition
- type EventContextProducer
- type EventProcessor
- type EventSource
- func (es *EventSource[T]) ConsumeEvents()
- func (es *EventSource[T]) Done() <-chan struct{}
- func (es *EventSource[T]) EmitMetric(m Metric)
- func (es *EventSource[T]) ForkRunStatus() sak.RunStatus
- func (es *EventSource[T]) Interject(partition int32, cmd Interjector[T]) <-chan error
- func (es *EventSource[T]) InterjectAll(interjector Interjector[T])
- func (es *EventSource[T]) InterjectAllSync(interjector Interjector[T])
- func (es *EventSource[T]) ScheduleInterjection(interjector Interjector[T], every, jitter time.Duration)
- func (es *EventSource[T]) Source() *Source
- func (es *EventSource[T]) State() EventSourceState
- func (es *EventSource[T]) Stop()
- func (es *EventSource[T]) StopNow()
- func (es *EventSource[T]) WaitForChannel(c chan struct{}, callback func())
- func (es *EventSource[T]) WaitForSignals(preHook func(os.Signal) bool, signals ...os.Signal)
- type EventSourceConfig
- type EventSourceState
- type ExecutionState
- type GlobalChangeLog
- type IncomingRecord
- func (r IncomingRecord) HeaderValue(name string) []byte
- func (r IncomingRecord) Headers() []kgo.RecordHeader
- func (r IncomingRecord) Key() []byte
- func (r IncomingRecord) LeaderEpoch() int32
- func (r IncomingRecord) Offset() int64
- func (r IncomingRecord) RecordType() string
- func (r IncomingRecord) Timestamp() time.Time
- func (r IncomingRecord) TopicPartition() TopicPartition
- func (r IncomingRecord) Value() []byte
- type IncomingRecordDecoder
- type IncrGroupMemberInstructions
- type IncrGroupMemberMeta
- type IncrGroupPartitionState
- type IncrRebalanceInstructionHandler
- type IncrementalGroupRebalancer
- type Interjector
- type JsonCodec
- type LogLevel
- type Logger
- type MemberStatus
- type Metric
- type MetricsHandler
- type OptionalPartitioner
- type Producer
- type Record
- func (r *Record) AsIncomingRecord() IncomingRecord
- func (r *Record) Error() error
- func (r *Record) KeyWriter() *bytes.Buffer
- func (r *Record) Offset() int64
- func (r *Record) Release()
- func (r *Record) ToKafkaRecord() *kgo.Record
- func (r *Record) TopicPartition() TopicPartition
- func (r *Record) ValueWriter() *bytes.Buffer
- func (r *Record) WithError(err error) *Record
- func (r *Record) WithHeader(key string, value []byte) *Record
- func (r *Record) WithKey(key ...[]byte) *Record
- func (r *Record) WithKeyString(key ...string) *Record
- func (r *Record) WithPartition(partition int32) *Record
- func (r *Record) WithRecordType(recordType string) *Record
- func (r *Record) WithTopic(topic string) *Record
- func (r *Record) WithValue(value ...[]byte) *Record
- func (r *Record) WriteKey(bs ...[]byte)
- func (r *Record) WriteKeyString(ss ...string)
- func (r *Record) WriteValue(bs ...[]byte)
- func (r *Record) WriteValueString(ss ...string)
- type SchedulerConfig
- type SimpleCluster
- type SimpleLogger
- type Source
- func (s *Source) AsDestination() Destination
- func (s *Source) BalanceStrategies() []BalanceStrategy
- func (s *Source) CommitLogTopicNameForGroupId() string
- func (s *Source) Config() EventSourceConfig
- func (s *Source) GroupId() string
- func (s *Source) NumPartitions() int
- func (s *Source) State() EventSourceState
- func (s *Source) StateStoreTopicName() string
- func (s *Source) Topic() string
- type SourcePartitionEventHandler
- type StateStore
- type StateStoreFactory
- type TopicPartition
- type TopicPartitionCallback
- type TopicPartitionSet
- type TxnErrorHandler
Examples ¶
Constants ¶
const AutoAssign = int32(-1)
const DefaultBatchDelay = 10 * time.Millisecond
const DefaultMaxBatchSize = 10000
const DefaultPendingTxnCount = 1
const DefaultPoolSize = 3
const DefaultTargetBatchSize = 1000
const IncrementalCoopProtocol = "incr_coop"
const LexInt64Size = int(unsafe.Sizeof(uint64(1))) + 1
const PartitionPreppedOperation = "PartitionPrepped"
const RecordTypeHeaderKey = "__grt__" // let's keep it small. every byte counts
The record.Header key that GKES uses to transmit type information about an IncomingRecord or a ChangeLogEntry.
const TxnCommitOperation = "TxnCommit"
Variables ¶
var ComputeConfig = SchedulerConfig{ Concurrency: runtime.NumCPU(), WorkerQueueDepth: 1000, MaxConcurrentKeys: 10000, }
var DefaultBalanceStrategies = []BalanceStrategy{IncrementalBalanceStrategy}
var DefaultConfig = SchedulerConfig{ Concurrency: runtime.NumCPU(), WorkerQueueDepth: 1000, MaxConcurrentKeys: 10000, }
var DefaultEosConfig = EosConfig{ PoolSize: DefaultPoolSize, PendingTxnCount: DefaultPendingTxnCount, TargetBatchSize: DefaultTargetBatchSize, MaxBatchSize: DefaultMaxBatchSize, BatchDelay: DefaultBatchDelay, }
var ErrPartitionNotAssigned = errors.New("partition is not assigned")
var ErrPartitionNotReady = errors.New("partition is not ready")
var FastNetworkConfig = SchedulerConfig{ Concurrency: runtime.NumCPU() * 4, WorkerQueueDepth: 100, MaxConcurrentKeys: 10000, }
var Int32Codec = intCodec[int32]{}
Convenience codec for working with int32 types Will never induce an error unless there is an OOM condition, so they are safe to ignore on Encode/Decode
var Int64Codec = intCodec[int64]{}
Convenience codec for working with int64 types Will never induce an error unless there is an OOM condition, so they are safe to ignore on Encode/Decode
var IntCodec = intCodec[int]{}
Convenience codec for working with int types. Will never induce an error unless there is an OOM condition, so they are safe to ignore on Encode/Decode
var LexoInt64Codec = lexoInt64Codec{}
A convenience Codec for integers where the encoded value is suitable for sorting in data structure which use []byte as keys (such as an LSM based db like BadgerDB or RocksDB). Useful if you need to persist items in order by timestamp or some other integer value. Decode will generate an error if the input []byte size is not LexInt64Size.
var SlowNetworkConfig = SchedulerConfig{ Concurrency: runtime.NumCPU() * 16, WorkerQueueDepth: 100, MaxConcurrentKeys: 10000, }
var WideNetworkConfig = SchedulerConfig{ Concurrency: runtime.NumCPU() * 32, WorkerQueueDepth: 1000, MaxConcurrentKeys: 10000, }
Functions ¶
func DeleteSource ¶
func DeleteSource(sourceConfig EventSourceConfig) error
Deletes all topics associated with a Source. Provided for local testing purpoose only. Do not call this in deployed applications unless your topics are transient in nature.
func JsonItemDecoder ¶
func JsonItemDecoder[T any](record IncomingRecord) (T, error)
A convenience function for decoding an IncomingRecord. Conforms to streams.IncomingRecordDecoder interface needed for streams.RegisterEventType
streams.RegisterEventType(myEventSource, codec.JsonItemDecoder[myType], myHandler, "myType") // or standalone myDecoder := codec.JsonItemDecoder[myType] myItem := myDecoder(incomingRecord)
func NewClient ¶
NewClient creates a kgo.Client from the options retuned from the provided Cluster and addtional `options`. Used internally and exposed for convenience.
func RegisterDefaultHandler ¶ added in v1.0.4
func RegisterDefaultHandler[T StateStore](es *EventSource[T], recordProcessor EventProcessor[T, IncomingRecord], eventType string)
A convenience method to avoid chick-egg scenarios when initializing an EventSource. Must not be called after `EventSource.ConsumeEvents()`
func RegisterEventType ¶
func RegisterEventType[T StateStore, V any](es *EventSource[T], transformer IncomingRecordDecoder[V], eventProcessor EventProcessor[T, V], eventType string)
Registers eventType with a transformer (usuall a codec.Codec) with the supplied EventProcessor. Must not be called after `EventSource.ConsumeEvents()`
func SetRecordType ¶
A convenience function provided in case you are working with a raw kgo producer and want to integrate with streams. This will ensure that the EventSource will route the record to the proper handler without falling back to the defaultHandler
Types ¶
type AsyncBatcher ¶
type AsyncBatcher[S any, K comparable, V any] struct { // contains filtered or unexported fields }
AsyncBatcher performs a similar function to the AsyncJobScheduler, but is intended for performing actions for multiple events at a time. This is particularly useful when interacting with systems which provide a batch API.
For detailed examples, see https://github.com/aws/go-kafka-event-source/docs/asynprocessing.md
func NewAsyncBatcher ¶
func NewAsyncBatcher[S StateStore, K comparable, V any](executor BatchExecutor[K, V], maxBatchSize, maxConcurrentBatches int, delay time.Duration) *AsyncBatcher[S, K, V]
Create a new AsynBatcher. Each invocation of `executor` will have a maximum of `maxBatchSize` items. No more than `maxConcurrentBatches` will be executing at any given time. AsynBatcher will accumulate items until `delay` has elapsed, or `maxBatchSize` items have been received.
func (*AsyncBatcher[S, K, V]) Add ¶
func (ab *AsyncBatcher[S, K, V]) Add(batch *BatchItems[S, K, V]) ExecutionState
Schedules items in BatchItems to be executed when capoacity is available.
type AsyncCompleter ¶ added in v1.0.4
type AsyncJob ¶ added in v1.0.4
type AsyncJob[T any] struct { // contains filtered or unexported fields }
func (AsyncJob[T]) Finalize ¶ added in v1.0.4
func (aj AsyncJob[T]) Finalize() ExecutionState
type AsyncJobFinalizer ¶
type AsyncJobFinalizer[T any, K comparable, V any] func(*EventContext[T], K, V, error) ExecutionState
A callback invoked when a previously scheduled AsyncJob has been completed.
type AsyncJobProcessor ¶
type AsyncJobProcessor[K comparable, V any] func(K, V) error
A handler invoked when a previously scheduled AsyncJob should be performed.
type AsyncJobScheduler ¶
type AsyncJobScheduler[S StateStore, K comparable, V any] struct { // contains filtered or unexported fields }
The AsyncJobScheduler provides a generic work scheduler/job serializer which takes a key/value as input via Schedule. All work is organized into queues by 'key'. So for a given key, all work is serial allowing the use of the single writer principle in an asynchronous fashion. In practice, it divides a stream partition into it's individual keys and processes the keys in parallel.
After the the scheduling is complete for a key/value, Scheduler will call the `processor` callback defined at initialization. The output of this call will be passed to the `finalizer` callback. If `finalizer` is nil, the event is marked as `Complete`, once the job is finished, ignoring any errors.
For detailed examples, see https://github.com/aws/go-kafka-event-source/docs/asynprocessing.md
Example ¶
package main import ( "context" "fmt" "time" "github.com/aws/go-kafka-event-source/streams" "github.com/aws/go-kafka-event-source/streams/sak" "github.com/aws/go-kafka-event-source/streams/stores" ) type Contact struct { Id string PhoneNumber string Email string FirstName string LastName string LastContact time.Time } type NotifyContactEvent struct { ContactId string NotificationType string } type EmailNotification struct { ContactId string Address string Payload string } func (c Contact) Key() string { return c.Id } func createContact(ctx *streams.EventContext[ContactStore], contact Contact) streams.ExecutionState { contactStore := ctx.Store() ctx.RecordChange(contactStore.Put(contact)) fmt.Printf("Created contact: %s\n", contact.Id) return streams.Complete } // simply providing an example of how you might wrap the store into your own type type ContactStore struct { *stores.SimpleStore[Contact] } func NewContactStore(tp streams.TopicPartition) ContactStore { return ContactStore{stores.NewJsonSimpleStore[Contact](tp)} } var notificationScheduler *streams.AsyncJobScheduler[ContactStore, string, EmailNotification] func notifyContactAsync(ctx *streams.EventContext[ContactStore], notification NotifyContactEvent) streams.ExecutionState { contactStore := ctx.Store() if contact, ok := contactStore.Get(notification.ContactId); ok { fmt.Printf("Notifying contact: %s asynchronously by %s\n", contact.Id, notification.NotificationType) return notificationScheduler.Schedule(ctx, contact.Email, EmailNotification{ ContactId: contact.Id, Address: contact.Email, Payload: "sending you mail...from a computer!", }) } else { fmt.Printf("Contact %s does not exist!\n", notification.ContactId) } return streams.Complete } func sendEmailToContact(key string, notification EmailNotification) error { fmt.Printf("Processing an email job with key: '%s'. This may take some time, emails are tricky!\n", key) time.Sleep(500 * time.Millisecond) return nil } func emailToContactComplete(ctx *streams.EventContext[ContactStore], _ string, email EmailNotification, err error) streams.ExecutionState { contactStore := ctx.Store() if contact, ok := contactStore.Get(email.ContactId); ok { fmt.Printf("Notified contact: %s, address: %s, payload: '%s'\n", contact.Id, email.Address, email.Payload) contact.LastContact = time.Now() contactStore.Put(contact) } return streams.Complete } func main() { streams.InitLogger(streams.SimpleLogger(streams.LogLevelError), streams.LogLevelError) contactsCluster := streams.SimpleCluster([]string{"127.0.0.1:9092"}) sourceConfig := streams.EventSourceConfig{ GroupId: "ExampleAsyncJobSchedulerGroup", Topic: "ExampleAsyncJobScheduler", NumPartitions: 10, SourceCluster: contactsCluster, } destination := streams.Destination{ Cluster: sourceConfig.SourceCluster, DefaultTopic: sourceConfig.Topic, } eventSource := sak.Must(streams.NewEventSource(sourceConfig, NewContactStore, nil)) streams.RegisterEventType(eventSource, streams.JsonItemDecoder[Contact], createContact, "CreateContact") streams.RegisterEventType(eventSource, streams.JsonItemDecoder[NotifyContactEvent], notifyContactAsync, "NotifyContact") notificationScheduler = sak.Must(streams.CreateAsyncJobScheduler(eventSource, sendEmailToContact, emailToContactComplete, streams.DefaultConfig)) eventSource.ConsumeEvents() contact := Contact{ Id: "123", Email: "billy@bob.com", PhoneNumber: "+18005551212", FirstName: "Billy", LastName: "Bob", } notification := NotifyContactEvent{ ContactId: "123", NotificationType: "email", } producer := streams.NewProducer(destination) createContactRecord := streams.JsonItemEncoder("CreateContact", contact) createContactRecord.WriteKeyString(contact.Id) notificationRecord := streams.JsonItemEncoder("NotifyContact", notification) notificationRecord.WriteKeyString(notification.ContactId) producer.Produce(context.Background(), createContactRecord) producer.Produce(context.Background(), notificationRecord) eventSource.WaitForSignals(nil) // Expected Output: Created contact: 123 // Notifying contact: 123 asynchronously by email // Processing an email job with key: 'billy@bob.com'. This may take some time, emails are tricky! // Notified contact: 123, address: billy@bob.com, payload: 'sending you mail...from a computer!' }
Output:
func CreateAsyncJobScheduler ¶
func CreateAsyncJobScheduler[S StateStore, K comparable, V any]( eventSource *EventSource[S], processor AsyncJobProcessor[K, V], finalizer AsyncJobFinalizer[S, K, V], config SchedulerConfig) (*AsyncJobScheduler[S, K, V], error)
Creates an AsyncJobScheduler which is tied to the RunStatus of EventSource.
func NewAsyncJobScheduler ¶ added in v1.0.4
func NewAsyncJobScheduler[S StateStore, K comparable, V any]( runStatus sak.RunStatus, processor AsyncJobProcessor[K, V], finalizer AsyncJobFinalizer[S, K, V], config SchedulerConfig) (*AsyncJobScheduler[S, K, V], error)
Creates an AsyncJobScheduler which will continue to run while runStatus.Running()
func (*AsyncJobScheduler[S, K, V]) Schedule ¶
func (ap *AsyncJobScheduler[S, K, V]) Schedule(ec *EventContext[S], key K, value V) ExecutionState
Schedules the value for processing in order by key. The finalizer will be invoked once processing is complete.
func (*AsyncJobScheduler[S, K, V]) SetMaxConcurrentKeys ¶
func (ap *AsyncJobScheduler[S, K, V]) SetMaxConcurrentKeys(size int)
Dynamically update the MaxConcurrentKeys for the current scheduler.
func (*AsyncJobScheduler[S, K, V]) SetWorkerQueueDepth ¶
func (ap *AsyncJobScheduler[S, K, V]) SetWorkerQueueDepth(size int)
type BalanceStrategy ¶
type BalanceStrategy int
const ( RangeBalanceStrategy BalanceStrategy = 0 RoundRobinBalanceStrategy BalanceStrategy = 1 CooperativeStickyBalanceStrategy BalanceStrategy = 2 IncrementalBalanceStrategy BalanceStrategy = 3 )
type BatchCallback ¶
type BatchCallback[S any, K comparable, V any] func(*EventContext[S], *BatchItems[S, K, V]) ExecutionState
type BatchExecutor ¶
type BatchExecutor[K comparable, V any] func(batch []*BatchItem[K, V])
type BatchItem ¶
type BatchItem[K comparable, V any] struct { Value V Err error UserData any // contains filtered or unexported fields }
type BatchItems ¶ added in v1.0.4
type BatchItems[S any, K comparable, V any] struct { UserData any // contains filtered or unexported fields }
func NewBatchItems ¶ added in v1.0.4
func NewBatchItems[S any, K comparable, V any](ec *EventContext[S], key K, cb BatchCallback[S, K, V]) *BatchItems[S, K, V]
Creates a container for BatchItems and ties them to an EventContext. Once all items in BatchItems.Items() have been executed, the provided BatchCallback will be executed.
func (*BatchItems[S, K, V]) Add ¶ added in v1.0.4
func (b *BatchItems[S, K, V]) Add(values ...V) *BatchItems[S, K, V]
Adds items to BatchItems container. Values added in this method will inherit their key from the BatchItems container.
func (*BatchItems[S, K, V]) AddWithKey ¶ added in v1.0.4
func (b *BatchItems[S, K, V]) AddWithKey(key K, values ...V) *BatchItems[S, K, V]
AddWithKey() is similar to Add(), but the items added do not inherit their key from the BatchItems. Useful for interjectors that may need to batch items that belong to multiple keys.
func (*BatchItems[S, K, V]) Items ¶ added in v1.0.4
func (b *BatchItems[S, K, V]) Items() []BatchItem[K, V]
func (*BatchItems[S, K, V]) Key ¶ added in v1.0.4
func (b *BatchItems[S, K, V]) Key() K
type BatchProducer ¶
type BatchProducer[S any] struct { // contains filtered or unexported fields }
func NewBatchProducer ¶
func NewBatchProducer[S any](destination Destination, opts ...kgo.Opt) *BatchProducer[S]
Provides similar functionality to AsyncBatcher, but in the context of producing Kafka records. Since the underlying Kafka producer already batches in an ordered fashion, there is no need to add the overhead of the AsyncBatcher. Records produced by a BatchProducer are not transactional, and therefore duplicates could be created. The use cases for the BatchProducer vs EventContext.Forward are as follows:
- The topic you are producing to is not on the same Kafka cluster as your EventSource
- Duplicates are OK and you do not want to wait for the transaction to complete before the consumers of these records can see the data (lower latency)
If your use case does not fall into the above buckets, it is recommended to just use [EventConetxt.Forward]
func (*BatchProducer[S]) Produce ¶
func (p *BatchProducer[S]) Produce(ec *EventContext[S], records []*Record, cb BatchProducerCallback[S], userData any) ExecutionState
Produces `records` and invokes BatchProducerCallback once all records have been produced or have errored out. If there was an error in producing, it can be retrieved with record.Error()
It is important to note that GKES uses a Record pool. After the transaction has completed for this record, it is returned to the pool for reuse. Your application should not hold on to references to the Record(s) after BatchProducerCallback has been invoked.
type BatchProducerCallback ¶
type BatchProducerCallback[S any] func(eventContext *EventContext[S], records []*Record, userData any) ExecutionState
type ChangeLogEntry ¶
type ChangeLogEntry struct {
// contains filtered or unexported fields
}
ChangeLogEntry represents a Kafka record which wil be produced to the StateStore for your EventSource. Note that you can not set a topic or partition on a ChangeLogEntry. These values are managed by GKES.
func CreateChangeLogEntry ¶
func CreateChangeLogEntry[T any](item T, codec Codec[T]) (ChangeLogEntry, error)
A shortcut method for createing a ChangeLogEntry with a value endcoded using the provided codec.
cle := CreateChangeLogEntry(myValue, myCodec).WithKeyString(myKey).WithEntryType(myType) eventContext.RecordChange(cle)
func CreateJsonChangeLogEntry ¶
func CreateJsonChangeLogEntry[T any](item T) (ChangeLogEntry, error)
A shortcut method for createing a ChangeLogEntry with a json endcoded value.
cle := CreateJsonChangeLogEntry(myValue).WithKeyString(myKey).WithEntryType(myType) eventContext.RecordChange(cle)
func EncodeJsonChangeLogEntryValue ¶ added in v1.0.4
func EncodeJsonChangeLogEntryValue[T any](entryType string, item T) ChangeLogEntry
A convenience function for encoding an item into a ChangeLogEntry suitable writing to a StateStore Please not that the Key on the entry will be left uninitialized. Usage:
entry := codec.EncodeJsonChangeLogEntryValue("myType", myItem) entry.WriteKeyString(myItem.Key)
func NewChangeLogEntry ¶
func NewChangeLogEntry() ChangeLogEntry
func (ChangeLogEntry) KeyWriter ¶
func (cle ChangeLogEntry) KeyWriter() *bytes.Buffer
func (ChangeLogEntry) ValueWriter ¶
func (cle ChangeLogEntry) ValueWriter() *bytes.Buffer
func (ChangeLogEntry) WithEntryType ¶
func (cle ChangeLogEntry) WithEntryType(entryType string) ChangeLogEntry
func (ChangeLogEntry) WithHeader ¶ added in v1.0.5
func (cle ChangeLogEntry) WithHeader(key string, value []byte) ChangeLogEntry
func (ChangeLogEntry) WithKey ¶
func (cle ChangeLogEntry) WithKey(key ...[]byte) ChangeLogEntry
func (ChangeLogEntry) WithKeyString ¶
func (cle ChangeLogEntry) WithKeyString(key ...string) ChangeLogEntry
func (ChangeLogEntry) WithValue ¶
func (cle ChangeLogEntry) WithValue(value ...[]byte) ChangeLogEntry
func (ChangeLogEntry) WriteKey ¶
func (cle ChangeLogEntry) WriteKey(bs ...[]byte)
func (ChangeLogEntry) WriteKeyString ¶
func (cle ChangeLogEntry) WriteKeyString(ss ...string)
func (ChangeLogEntry) WriteValue ¶
func (cle ChangeLogEntry) WriteValue(bs ...[]byte)
func (ChangeLogEntry) WriteValueString ¶
func (cle ChangeLogEntry) WriteValueString(ss ...string)
type ChangeLogReceiver ¶
type ChangeLogReceiver interface {
ReceiveChange(IncomingRecord) error
}
type CleanupPolicy ¶
type CleanupPolicy int
const ( CompactCleanupPolicy CleanupPolicy = iota DeleteCleanupPolicy )
type Cluster ¶
type Cluster interface { // Returns the list of kgo.Opt(s) that will be used whenever a connection is made to this cluster. // At minimum, it should return the kgo.SeedBrokers() option. Config() ([]kgo.Opt, error) }
An interface for implementing a resusable Kafka client configuration. TODO: document reserved options
type DeserializationErrorHandler ¶
type DeserializationErrorHandler func(ec ErrorContext, eventType string, err error) ErrorResponse
type Destination ¶
type Destination struct { // The topic to use for records being produced which have empty topic data DefaultTopic string // Optional, used in CreateDestination call. NumPartitions int // Optional, used in CreateDestination call. ReplicationFactor int // Optional, used in CreateDestination call. MinInSync int // The Kafka cluster where this destination resides. Cluster Cluster }
func CreateDestination ¶
func CreateDestination(destination Destination) (resolved Destination, err error)
type EosConfig ¶
type EosConfig struct { // PoolSize is the number of transactional producer clients in the pool. PoolSize int // The maximum number of pending transactions to be allowed in the pool at any given point in time. PendingTxnCount int // TargetBatchSize is the target number of events or records (whichever is greater) for a transaction before a commit is attempted. TargetBatchSize int // MaxBatchSize is the maximum number of events or records (whichever is greater) for a transaction before it will stop accepting new events. // Once a transaction reaches MaxBatchSize, it ust be commited. MaxBatchSize int // The maximum amount of time to wait before committing a transaction. Once this time has elapsed, the transaction will commit // even if TargetBatchSize has not been achieved. This number will be the tail latency of the consume/produce cycle during periods of low activity. // Under high load, this setting has little impact unless set too low. If this value is too low, produce batch sizes will be extremely small a // and Kafka will need to manage an excessive number of transactions. // The recommnded value is 10ms and the minimum allowed value is 1ms. BatchDelay time.Duration }
EosDiagarm
On-Deck Txn Pending Txn Channel Commit Go-Routine ┌───────────┐ ┌─────────────────┐ ┌─────────────────────────────────────┐ │ EventCtx │ │ Pending Txn(s) │ │ Committing Txn │ │ Offset: 7 │ │ ┌───────────┐ │ │ ┌───────────┐ │ ├───────────┤ │ │ EventCtx │ │ │ │ EventCtx │ 1: Receive Txn │ │ EventCtx │ │ │ Offset: 4 │ │ │ │ Offset: 1 │ │ │ Offset: 8 │ │ ├───────────┤ │ │ ├───────────┤ 2: EventCtx(s).Wait │ ├───────────┼──────────►│ │ EventCtx │ ├─────────►│ │ EventCtx │ │ │ EventCtx │ │ │ Offset: 5 │ │ │ │ Offset: 2 │ 3: Flush Records │ │ Offset: 9 │ │ ├───────────┤ │ │ ├───────────┤ │ └───────────┘ │ │ EventCtx │ │ │ │ EventCtx │ 4: Commit │ ▲ │ │ Offset: 6 │ │ │ │ Offset: 3 │ │ │ │ └───────────┘ │ │ └───────────┘ │ │ └─────────────────┘ └─────────────────────────────────────┘ Incoming EventCtx
func (EosConfig) IsZero ¶
IsZero returns true if EosConfig is uninitialized, or all values equal zero. Used to determine whether the EventSource should fall back to DefaultEosConfig.
type ErrorContext ¶
type ErrorContext interface { TopicPartition() TopicPartition Offset() int64 Input() (IncomingRecord, bool) }
type ErrorResponse ¶
type ErrorResponse int
in structs GKES and how to proceed when an error is encountered.
const ( // Instructs GKES to ignore any error stateand continue processing as normal. If this is used in response to // Kafka transaction error, there will likely be data loss or corruption. This ErrorResponse is not recommended as it is unlikely that // a consumer will be able to recover gracefully from a transaction error. In almost all situations, FailConsumer is preferred. Continue ErrorResponse = iota // Instructs GKES to immediately stop processing and the consumer to immediately leave the group. // This is preferable to a FatallyExit as Kafka will immediatly recognize the consumer as exiting the group // (if there is still comminication with the cluster) and processing of the // failed partitions will begin without waiting for the session timeout value. FailConsumer // As the name implies, the application will fatally exit. The partitions owned by this consumer will not be reassigned until the configured // session timeout on the broker. FatallyExit )
func DefaultDeserializationErrorHandler ¶
func DefaultDeserializationErrorHandler(ec ErrorContext, eventType string, err error) ErrorResponse
The default DeserializationErrorHandler. Simply logs the error and returns Continue.
func DefaultTxnErrorHandler ¶
func DefaultTxnErrorHandler(err error) ErrorResponse
The default and recommended TxnErrorHandler. Returns FailConsumer on txn errors.
type EventContext ¶
type EventContext[T any] struct { // contains filtered or unexported fields }
Contains information about the current event. Is passed to EventProcessors and Interjections
func MockEventContext ¶ added in v1.0.4
func MockEventContext[T any](ctx context.Context, input *Record, stateStoreTopc string, store T, asyncCompleter AsyncCompleter[T], producer EventContextProducer[T]) *EventContext[T]
A convenience function for creating unit tests for an EventContext from an incoming Kafka Record. All arguments other than `ctx` are optional unless you are interacting with those resources. For example, if you call EventContext.Forward/RecordChange, you will need to provide a mock producer. If you run the EventContext through an async process, you will need to provide a mock AsyncCompleter.
func TestMyHandler(t *testing.T) { eventContext := streams.MockEventContext(context.TODO(), mockRecord(), "storeTopic", mockStore(), mockCompleter(), mockProducer()) if err := testMyHandler(eventContext, eventContext.Input()); { t.Error(err) } }
func MockInterjectionEventContext ¶ added in v1.0.4
func MockInterjectionEventContext[T any](ctx context.Context, topicPartition TopicPartition, stateStoreTopc string, store T, asyncCompleter AsyncCompleter[T], producer EventContextProducer[T]) *EventContext[T]
A convenience function for creating unit tests for an EventContext from an interjection. All arguments other than `ctx` are optional unless you are interacting with those resources. For example, if you call EventContext.Forward/RecordChange, you will need to provide a mock producer. If you run the EventContext through an async process, you will need to provide a mock AsyncCompleter.
func TestMyInterjector(t *testing.T) { eventContext := streams.MockInterjectionEventContext(context.TODO(), myTopicPartition, "storeTopic", mockStore(), mockCompleter(), mockProducer()) if err := testMyInterjector(eventContext, time.Now()); { t.Error(err) } }
func (*EventContext[T]) AsyncJobComplete ¶
func (ec *EventContext[T]) AsyncJobComplete(finalize func() ExecutionState)
AsyncJobComplete should be called when an async event processor has performed it's function. the finalize cunction should return Complete if there are no other pending asynchronous jobs for the event context in question, regardless of error state. `finalize` does no accept any arguments, so you're callback should encapsulate any pertinent data needed for processing. If you are using [ ], AsyncJobScheduler or BatchProducer, you should not need to interact with this method directly.
func (*EventContext[T]) Fail ¶ added in v1.0.6
func (ec *EventContext[T]) Fail(err error) ExecutionState
Fail the event with the given error and return Fail, which has the effect of marking the consumer as unhealthy and shutting it down.
func (*EventContext[T]) Forward ¶
func (ec *EventContext[T]) Forward(records ...*Record)
Forwards produces records on the transactional producer for your EventSource. If the transaction fails, records produced in this fashion will not be visible to other consumers who have a fetch isolation of `read_commited`. An isolation level of `read_commited“ is required for Exactly Once Semantics
It is important to note that GKES uses a Record pool. After the transaction has completed for this record, it is returned to the pool for reuse. Your application should not hold on to references to the Record(s) after Forward has been invoked.
func (*EventContext[T]) Input ¶
func (ec *EventContext[T]) Input() (IncomingRecord, bool)
Return the raw input record for this event or an uninitialized record and false if the EventContect represents an Interjections
func (*EventContext[T]) IsInterjection ¶
func (ec *EventContext[T]) IsInterjection() bool
Returns true if this EventContext represents an Interjection
func (*EventContext[T]) Offset ¶
func (ec *EventContext[T]) Offset() int64
The offset for this event, -1 for an Interjection
func (*EventContext[T]) RecordChange ¶
func (ec *EventContext[T]) RecordChange(entries ...ChangeLogEntry)
Forwards records to the transactional producer for your EventSource. When you add an item to your StateStore, you must call this method for that change to be recorded in the stream. This ensures that when the TopicPartition for this change is tansferred to a new consumer, it will also have this change. If the transaction fails, records produced in this fashion will not be visible to other consumers who have a fetch isolation of `read_commited`. An isolation level of `read_commited“ is required for Exactly Once Semantics
It is important to note that GKES uses a Record pool. After the transaction has completed for this record, it is returned to the pool for reuse. Your application should not hold on to references to the ChangeLogEntry(s) after RecordChange has been invoked.
func (*EventContext[T]) Store ¶
func (ec *EventContext[T]) Store() T
Returns the StateStore for this event/TopicPartition
func (*EventContext[T]) TopicPartition ¶
func (ec *EventContext[T]) TopicPartition() TopicPartition
The TopicParition for this event. It is present for both normal events and Interjections
type EventContextProducer ¶ added in v1.0.4
type EventContextProducer[T any] interface { ProduceRecord(*EventContext[T], *Record, func(*Record, error)) }
type EventProcessor ¶
type EventProcessor[T any, V any] func(*EventContext[T], V) ExecutionState
A callback invoked when a new record has been received from the EventSource, after it has been transformed via IncomingRecordTransformer.
type EventSource ¶
type EventSource[T StateStore] struct { // contains filtered or unexported fields }
EventSource provides an abstraction over raw kgo.Record/streams.IncomingRecord consumption, allowing the use of strongly typed event handlers. One of the key features of the EventSource is to allow for the routing of events based off of a type header. See RegisterEventType for details.
Example ¶
package main import ( "context" "fmt" "time" "github.com/aws/go-kafka-event-source/streams" "github.com/aws/go-kafka-event-source/streams/sak" "github.com/aws/go-kafka-event-source/streams/stores" ) type Contact struct { Id string PhoneNumber string Email string FirstName string LastName string LastContact time.Time } type NotifyContactEvent struct { ContactId string NotificationType string } func (c Contact) Key() string { return c.Id } func createContact(ctx *streams.EventContext[ContactStore], contact Contact) streams.ExecutionState { contactStore := ctx.Store() ctx.RecordChange(contactStore.Put(contact)) fmt.Printf("Created contact: %s\n", contact.Id) return streams.Complete } func deleteContact(ctx *streams.EventContext[ContactStore], contact Contact) streams.ExecutionState { contactStore := ctx.Store() if entry, ok := contactStore.Delete(contact); ok { ctx.RecordChange(entry) fmt.Printf("Deleted contact: %s\n", contact.Id) } return streams.Complete } func notifyContact(ctx *streams.EventContext[ContactStore], notification NotifyContactEvent) streams.ExecutionState { contactStore := ctx.Store() if contact, ok := contactStore.Get(notification.ContactId); ok { fmt.Printf("Notifying contact: %s by %s\n", contact.Id, notification.NotificationType) } else { fmt.Printf("Contact %s does not exist!\n", notification.ContactId) } return streams.Complete } // simply providing an example of how you might wrap the store into your own type type ContactStore struct { *stores.SimpleStore[Contact] } func NewContactStore(tp streams.TopicPartition) ContactStore { return ContactStore{stores.NewJsonSimpleStore[Contact](tp)} } func main() { streams.InitLogger(streams.SimpleLogger(streams.LogLevelError), streams.LogLevelError) contactsCluster := streams.SimpleCluster([]string{"127.0.0.1:9092"}) sourceConfig := streams.EventSourceConfig{ GroupId: "ExampleEventSourceGroup", Topic: "ExampleEventSource", NumPartitions: 10, SourceCluster: contactsCluster, } destination := streams.Destination{ Cluster: sourceConfig.SourceCluster, DefaultTopic: sourceConfig.Topic, } eventSource := sak.Must(streams.NewEventSource(sourceConfig, NewContactStore, nil)) streams.RegisterEventType(eventSource, streams.JsonItemDecoder[Contact], createContact, "CreateContact") streams.RegisterEventType(eventSource, streams.JsonItemDecoder[Contact], deleteContact, "DeleteContact") streams.RegisterEventType(eventSource, streams.JsonItemDecoder[NotifyContactEvent], notifyContact, "NotifyContact") eventSource.ConsumeEvents() contact := Contact{ Id: "123", PhoneNumber: "+18005551212", FirstName: "Billy", LastName: "Bob", } notification := NotifyContactEvent{ ContactId: "123", NotificationType: "email", } producer := streams.NewProducer(destination) createContactRecord := streams.JsonItemEncoder("CreateContact", contact) createContactRecord.WriteKeyString(contact.Id) deleteContactRecord := streams.JsonItemEncoder("DeleteContact", contact) deleteContactRecord.WriteKeyString(contact.Id) notificationRecord := streams.JsonItemEncoder("NotifyContact", notification) notificationRecord.WriteKeyString(notification.ContactId) producer.Produce(context.Background(), createContactRecord) producer.Produce(context.Background(), notificationRecord) producer.Produce(context.Background(), deleteContactRecord) producer.Produce(context.Background(), notificationRecord) eventSource.WaitForSignals(nil) // Expected Output: Created contact: 123 // Notifying contact: 123 by email // Deleted contact: 123 // Contact 123 does not exist! }
Output:
func NewEventSource ¶
func NewEventSource[T StateStore](sourceConfig EventSourceConfig, stateStoreFactory StateStoreFactory[T], defaultProcessor EventProcessor[T, IncomingRecord], additionalClientOptions ...kgo.Opt) (*EventSource[T], error)
Create an EventSource. `defaultProcessor` will be invoked if a suitable EventProcessor can not be found, or the IncomingRecord has no RecordType header. `additionalClientoptions` allows you to add additional options to the underlying kgo.Client. There are some restrictions here however. The following options are reserved:
kgo.Balancers kgo.ConsumerGroup kgo.ConsumeTopics kgo.OnPartitionsAssigned kgo.OnPartitionsRevoked kgo.AdjustFetchOffsetsFn
In addition, if you wish to set a TopicPartitioner for use in EventContext.Forward(), the partitioner must be of the supplied OptionalPartitioner as StateStore entries require manual partitioning and are produced on the same client as used by the EventContext for producing records. The default partitioner is initialized as follows, which should give parity with the canonical Java murmur2 partitioner:
kgo.RecordPartitioner(NewOptionalPartitioner(kgo.StickyKeyPartitioner(nil)))
func (*EventSource[T]) ConsumeEvents ¶
func (es *EventSource[T]) ConsumeEvents()
ConsumeEvents starts the underlying Kafka consumer. This call is non-blocking, so if called from main(), it should be followed by some other blocking call to prevent the application from exiting. See streams.EventSource.WaitForSignals for an example.
func (*EventSource[T]) Done ¶
func (es *EventSource[T]) Done() <-chan struct{}
Done blocks while the underlying Kafka consumer is active.
func (*EventSource[T]) EmitMetric ¶
func (es *EventSource[T]) EmitMetric(m Metric)
func (*EventSource[T]) ForkRunStatus ¶ added in v1.0.5
func (es *EventSource[T]) ForkRunStatus() sak.RunStatus
func (*EventSource[T]) Interject ¶
func (es *EventSource[T]) Interject(partition int32, cmd Interjector[T]) <-chan error
Executes `cmd` in the context of the given partition.
func (*EventSource[T]) InterjectAll ¶
func (es *EventSource[T]) InterjectAll(interjector Interjector[T])
InterjectAll is a convenience function which allows you to Interject into every active partition assigned to the consumer without create an individual timer per partition. The equivalent of calling Interject() on each active partition, blocking until all are performed. It is worth noting that the interjections are run in parallel, so care must be taken not to create a deadlock between partitions via locking mechanisms such as a Mutex. If parallel processing is not of concern, streams.EventSource.InterjectAllSync is an alternative. Useful for gathering store statistics, but can be used in place of a standard Interjection. Example:
preCount := int64(0) postCount := int64(0) eventSource.InterjectAllAsync(func (ec *EventContext[myStateStore], when time.Time) streams.ExecutionState { store := ec.Store() atomic.AddInt64(&preCount, int64(store.Len())) store.performBookeepingTasks() atomic.AddInt64(&postCount, int64(store.Len())) return streams.Complete }) fmt.Printf("Number of items before: %d, after: %d\n", preCount, postCount)
func (*EventSource[T]) InterjectAllSync ¶
func (es *EventSource[T]) InterjectAllSync(interjector Interjector[T])
InterjectAllSync performs the same function as streams.EventSource.InterjectAll, however it blocks on each iteration. It may be useful if parallel processing is not of concern andyou want to avoid locking on a shared data structure. Example:
itemCount := 0 eventSource.InterjectAll(func (ec *EventContext[myStateStore], when time.Time) streams.ExecutionState { store := ec.Store() itemCount += store.Len() return streams.Complete }) fmt.Println("Number of items: ", itemCount)
func (*EventSource[T]) ScheduleInterjection ¶
func (es *EventSource[T]) ScheduleInterjection(interjector Interjector[T], every, jitter time.Duration)
ScheduleInterjection sets a timer for `interjector` to be run `every` time interval, plus or minues a random time.Duration not greater than the absolute value of `jitter` on every invocation. `interjector` will have access to EventContext.Store() and can create/delete store items, or forward events just as a standard EventProcessor. Example:
func cleanupStaleItems(ec *EventContext[myStateStore], when time.Time) streams.ExecutionState { ec.Store().cleanup(when) return ec.Complete } // schedules cleanupStaleItems to be executed every 900ms - 1100ms eventSource.ScheduleInterjection(cleanupStaleItems, time.Second, 100 * time.Millisecond)
func (*EventSource[T]) Source ¶ added in v1.0.2
func (es *EventSource[T]) Source() *Source
The Source used by the EventSource.
func (*EventSource[T]) State ¶
func (es *EventSource[T]) State() EventSourceState
Returns the EventSourceState of the underlying Source, Healthy or Unhealthy. When the EventSource encounters an unrecoverable error (unable to execute a transaction for example), it will enter an Unhealthy state. Intended to be used by a health check processes for rolling back during a bad deployment.
func (*EventSource[T]) Stop ¶
func (es *EventSource[T]) Stop()
Signals the underlying *kgo.Client that the underlying consumer should exit the group. If you are using an IncrementalGroupRebalancer, this will trigger a graceful exit where owned partitions are surrendered according to it's configuration. If you are not, this call has the same effect as streams.EventSource.StopNow.
Calls to Stop are not blocking. To block during the shut down process, this call should be followed by `<-eventSource.Done()`
To simplify running from main(), the streams.EventSource.WaitForSignals and streams.EventSource.WaitForChannel calls have been provided. So unless you have extremely complex application shutdown logic, you should not need to interact with this method directly.
func (*EventSource[T]) StopNow ¶
func (es *EventSource[T]) StopNow()
Immediately stops the underlying consumer *kgo.Client by invoking sc.client.Close() This has the effect of immediately surrendering all owned partitions, then closing the client. If you are using an IncrementalGroupRebalancer, this can be used as a force quit.
func (*EventSource[T]) WaitForChannel ¶
func (es *EventSource[T]) WaitForChannel(c chan struct{}, callback func())
WaitForChannel is similar to WaitForSignals, but blocks on a `chan struct{}` then invokes `callback` when finished. Useful when you have multiple EventSources in a single application. Example:
func main() { myEventSource1 := initEventSource1() myEventSource2.ConsumeEvents() myEventSource2 := initEventSource2() myEventSource2.ConsumeEvents() wg := &sync.WaitGroup{} wg.Add(2) eventSourceChannel = make(chan struct{}) go myEventSource1.WaitForChannel(eventSourceChannel, wg.Done) go myEventSource2.WaitForChannel(eventSourceChannel, wg.Done) osChannel := make(chan os.Signal) signal.Notify(osChannel, syscall.SIGINT, syscall.SIGHUP) <-osChannel close(eventSourceChannel) wg.Wait() fmt.Println("exiting") }
func (*EventSource[T]) WaitForSignals ¶
WaitForSignals is convenience function suitable for use in a main() function. Blocks until `signals` are received then gracefully closes the consumer by calling streams.EventSource.Stop. If `signals` are not provided, syscall.SIGINT and syscall.SIGHUP are used. If `preHook` is non-nil, it will be invoked before Stop() is invoked. If the preHook returns false, this call continues to block. If true is returned, `signal.Reset(signals...)` is invoked and the consumer shutdown process begins. Simple example:
func main(){ myEventSource := initEventSource() myEventSource.ConsumeEvents() myEventSource.WaitForSignals(nil) fmt.Println("exiting") }
Prehook example:
func main(){ myEventSource := initEventSource() myEventSource.ConsumeEvents() myEventSource.WaitForSignals(func(s os.Signal) bool { fmt.Printf("starting shutdown from signal %v\n", s) shutDownSomeOtherProcess() return true }) fmt.Println("exiting") }
In this example, The consumer will close on syscall.SIGINT or syscall.SIGHUP but not syscall.SIGUSR1:
func main(){ myEventSource := initEventSource() myEventSource.ConsumeEvents() myEventSource.WaitForSignals(func(s os.Signal) bool { if s == syscall.SIGUSR1 { fmt.Println("user signal received") performSomeTask() return false } return true }, syscall.SIGINT and syscall.SIGHUP, syscall.SIGUSR1) fmt.Println("exiting") }
type EventSourceConfig ¶
type EventSourceConfig struct { // The group id for the underlying Kafka consumer group. GroupId string // The Kafka topic to consume Topic string // The compacted Kafka topic on which to publish/consume [StateStore] data. If not provided, GKES will generate a name which includes // Topic and GroupId. StateStoreTopic string // The desired number of partitions for Topic. NumPartitions int // The desired replication factor for Topic. Defaults to 1. ReplicationFactor int // The desired min-insync-replicas for Topic. Defaults to 1. MinInSync int // The number of Kafka partitions to use for the applications commit log. Defaults to 5 if unset. CommitLogPartitions int // The Kafka cluster on which Topic resides, or the source of incoming events. SourceCluster Cluster // StateCluster is the Kafka cluster on which the commit log and the StateStore topic resides. If left unset (recommended), defaults to SourceCluster. StateCluster Cluster // The consumer rebalance strategies to use for the underlying Kafka consumer group. BalanceStrategies []BalanceStrategy /* CommitOffsets should be set to true if you are migrating from a traditional consumer group. This will ensure that the offsets are commited to the consumer group when in a mixed fleet scenario (migrating into an EventSource from a standard consumer). If the deploytment fails, the original non-EventSource application can then resume consuming from the commited offsets. Once the EventSource application is well-established, this setting should be switched to false as offsets are managed by another topic. In a EventSource application, committing offsets via the standard mechanism only consumes resources and provides no benefit. */ CommitOffsets bool /* The config used for the eos producer pool. If empty, [DefaultEosConfig] is used. If an EventSource is initialized with an invalid [EosConfig], the application will panic. */ EosConfig EosConfig // If non-nil, the EventSorce will emit [Metric] objects of varying types. This is backed by a channel. If the channel is full // (presumably because the MetricHandler is not able to keep up), // GKES will drop the metric and log at WARN level to prevent processing slow down. MetricsHandler MetricsHandler // Called when a partition has been assigned to the EventSource consumer client. This does not indicate that the partion is being processed. OnPartitionAssigned SourcePartitionEventHandler // Called when a perviously assigned partition has been activated, meaning the EventSource will start processing events for this partition. At the time this handler is called, the StateStore associated with this partition has been bootstrapped and is ready for use. OnPartitionActivated SourcePartitionEventHandler // Called when a partition is about to be revoked from the EventSource consumer client. // This is a blocking call and, as such, should return quickly. OnPartitionWillRevoke SourcePartitionEventHandler // Called when a partition has been revoked from the EventSource consumer client. // This handler is invoked after GKES has stopped processing and has finished removing any associated resources for the partition. OnPartitionRevoked SourcePartitionEventHandler // The error handler invoked when record deserilaization errors occur for a given [EventSource] DeserializationErrorHandler DeserializationErrorHandler // The error handler invoked when errors occur in the transactionl producer for a given [EventSource] TxnErrorHandler TxnErrorHandler }
type EventSourceState ¶
type EventSourceState uint64
const ( Healthy EventSourceState = iota Unhealthy )
type ExecutionState ¶
type ExecutionState int
Returned by an EventProcessor or Interjector in response to an EventContext. ExecutionState should not be conflated with concepts of error state, such as Success or Failure.
const ( // Complete signals the EventSource that the event or interjection is completely processed. // Once Complete is returned, the offset for the associated EventContext will be commited. Complete ExecutionState = 0 // Incomplete signals the EventSource that the event or interjection is still ongoing, and // that your application promises to fulfill the EventContext in the future. // The offset for the associated EventContext will not be commited. Incomplete ExecutionState = 1 // Signifies that the consumer should no longer continue processing events. The EvntSource will go into an unhealthy state and execute [StopNow] Fail ExecutionState = 2 // Signifies that the consumer should panic. Fatal ExecutionState = 3 )
func DefaultProcessingErrorHandler ¶ added in v1.0.6
func DefaultProcessingErrorHandler(userData any, err error) ExecutionState
The default ProcessingErrorHandler. Invoked by EventContext.Fail and returns Complete.
type GlobalChangeLog ¶
type GlobalChangeLog[T ChangeLogReceiver] struct { // contains filtered or unexported fields }
A GlobalChangeLog is simply a consumer which continously consumes all partitions within the given topic and forwards all records to it's StateStore. GlobalChangeLogs can be useful for sharing small amounts of data between a group of hosts. For example, GKES uses a global change log to keep track of consumer group offsets.
func NewGlobalChangeLog ¶
func NewGlobalChangeLog[T ChangeLogReceiver](cluster Cluster, receiver T, numPartitions int, topic string, cleanupPolicy CleanupPolicy) GlobalChangeLog[T]
Creates a NewGlobalChangeLog consumer and forward all records to `receiver`.
func NewGlobalChangeLogWithRunStatus ¶ added in v1.0.5
func NewGlobalChangeLogWithRunStatus[T ChangeLogReceiver](runStatus sak.RunStatus, cluster Cluster, receiver T, numPartitions int, topic string, cleanupPolicy CleanupPolicy) GlobalChangeLog[T]
Creates a NewGlobalChangeLog consumer and forward all records to `receiver`.
func (GlobalChangeLog[T]) Pause ¶
func (cl GlobalChangeLog[T]) Pause(partition int32)
Pauses consumption of a partition.
func (GlobalChangeLog[T]) PauseAllPartitions ¶
func (cl GlobalChangeLog[T]) PauseAllPartitions()
Pauses consumption of all partitions.
func (GlobalChangeLog[T]) ResumePartitionAt ¶
func (cl GlobalChangeLog[T]) ResumePartitionAt(partition int32, offset int64)
Resumes consumption of a partition at offset.
func (GlobalChangeLog[T]) Start ¶
func (cl GlobalChangeLog[T]) Start()
func (GlobalChangeLog[T]) Stop ¶
func (cl GlobalChangeLog[T]) Stop()
type IncomingRecord ¶
type IncomingRecord struct {
// contains filtered or unexported fields
}
func (IncomingRecord) HeaderValue ¶
func (r IncomingRecord) HeaderValue(name string) []byte
func (IncomingRecord) Headers ¶
func (r IncomingRecord) Headers() []kgo.RecordHeader
func (IncomingRecord) Key ¶
func (r IncomingRecord) Key() []byte
func (IncomingRecord) LeaderEpoch ¶
func (r IncomingRecord) LeaderEpoch() int32
func (IncomingRecord) Offset ¶
func (r IncomingRecord) Offset() int64
func (IncomingRecord) RecordType ¶
func (r IncomingRecord) RecordType() string
func (IncomingRecord) Timestamp ¶
func (r IncomingRecord) Timestamp() time.Time
func (IncomingRecord) TopicPartition ¶
func (r IncomingRecord) TopicPartition() TopicPartition
func (IncomingRecord) Value ¶
func (r IncomingRecord) Value() []byte
type IncomingRecordDecoder ¶
type IncomingRecordDecoder[V any] func(IncomingRecord) (V, error)
A callback invoked when a new record has been received from the EventSource.
type IncrGroupMemberInstructions ¶
type IncrGroupMemberInstructions struct { Prepare []TopicPartition Forget []TopicPartition // not currently used }
type IncrGroupMemberMeta ¶
type IncrGroupMemberMeta struct { Preparing []TopicPartition Ready []TopicPartition Status MemberStatus LeftAt int64 }
type IncrGroupPartitionState ¶
type IncrGroupPartitionState struct { Preparing []TopicPartition Ready []TopicPartition }
type IncrRebalanceInstructionHandler ¶
type IncrRebalanceInstructionHandler interface { // Called by the IncrementalGroupRebalancer. Signals the instruction handler that this partition is destined for this consumer. // In the case of the EventSource, prepartion involves pre-populating the StateStore for this partition. PrepareTopicPartition(tp TopicPartition) // Called by the IncrementalGroupRebalancer. Signals the instruction handler that it is safe to forget this previously prepped TopicPartition. ForgetPreparedTopicPartition(tp TopicPartition) // // Called by the IncrementalGroupRebalancer. A valid *kgo.Client, which is on the same cluster as the Source.Topic, must be returned. Client() *kgo.Client }
Defines the interface needed for the IncrementalGroupRebalancer to function. EventSource fulfills this interface. If you are using EventSource, there is nothing else for you to implement.
type IncrementalGroupRebalancer ¶
type IncrementalGroupRebalancer interface { kgo.GroupBalancer // Must be called by the InstuctionHandler once a TopicPartition is ready for consumption PartitionPrepared(TopicPartition) // Must be called by the InstuctionHandler if it fails to prepare a TopicPartition it was previously instructed to prepare PartitionPreparationFailed(TopicPartition) // Must be called by the InstuctionHandler once it receives an assignment PartitionsAssigned(...TopicPartition) // Must be called by the InstuctionHandler if it wishes to leave the consumer group in a graceful fashion GracefullyLeaveGroup() <-chan struct{} }
The IncrementalGroupRebalancer interface is an extension to kgo.GroupBalancer. This balancer allows for slowly moving partions during consumer topology changes. This helps reduce blast radius in the case of failures, as well as keep the inherent latency penalty of trasnistioning partitions to a minumum.
func IncrementalRebalancer ¶
func IncrementalRebalancer(instructionHandler IncrRebalanceInstructionHandler) IncrementalGroupRebalancer
Creates an IncrementalRebalancer suitatble for use by the kgo Kafka driver. In most cases, the instructionHandler is the EventSource. `activeTransitions` defines how many partitons may be in receivership at any given point in time.
Example, when `activeTransitions` is 1 and the grpoup stat is imbalanced (a new member is added or a member signals it wishes to leave the group), the IncrementalGroupRebalancer will choose 1 partition to move. Once the receiver of that partition signals it is ready for the partition, it will assign it, then choose anothe partion to move. This process continues until the group has reached a balanced state.
In all cases, any unassigned partitions will be assigned immediately. If a consumer host crashes, for example, it's partitions will be assigned immediately, regardless of preparation state.
receivership - the state of being dealt with by an official receiver.
type Interjector ¶
type Interjector[T any] func(*EventContext[T], time.Time) ExecutionState
Defines the method signature needed by the EventSource to perform a stream interjection. See EventSource.Interject.
type JsonCodec ¶
type JsonCodec[T any] struct{}
A generic JSON en/decoder. Uses "github.com/json-iterator/go".ConfigCompatibleWithStandardLibrary for en/decoding JSON in a performant way
type Logger ¶
type Logger interface { Tracef(msg string, args ...any) Debugf(msg string, args ...any) Infof(msg string, args ...any) Warnf(msg string, args ...any) Errorf(msg string, args ...any) }
Provides the interface needed by GKES to intergrate with your loggin mechanism. Example:
import ( "mylogger" "github.com/aws/go-kafka-event-source/streams" ) func main() { // GKES will emit log at whatever level is defined by NewLogger() // kgo will emit logs at LogLevelError streams.InitLogger(mylogger.NewLogger(), streams.LogLevelError) }
func InitLogger ¶
Initializes the GKES logger. `kafkaDriverLogLevel` defines the log level for the underlying kgo clients. This call should be the first interaction with the GKES module. Subsequent calls will have no effect. If never called, the default unitialized logger writes to STDOUT at LogLevelError for both GKES and kgo. Example:
import "github.com/aws/go-kafka-event-source/streams" func main() { streams.InitLogger(streams.SimpleLogger(streams.LogLevelInfo), streams.LogLevelError) // ... initialize your application }
func WrapLogger ¶
WrapLogger allows GKES to emit logs at a higher level than your own Logger. Useful if you need debug level logging for your own application, but want to cluuter your logs with gstream output. Example:
import ( "mylogger" "github.com/aws/go-kafka-event-source/streams" ) func main() { // your application will emit logs at "Debug" // GKES will emit logs at LogLevelError // kgo will emit logs at LogLevelNone gkesLogger := streams.WrapLogger(mylogger.NewLogger("Debug"), streams.LogLevelError) streams.InitLogger(gkesLogger, streams.LogLevelNone) }
type MemberStatus ¶
type MemberStatus int
The status of a consumer group member.
const ( ActiveMember MemberStatus = iota InactiveMember Defunct )
type Metric ¶
type Metric struct { StartTime time.Time ExecuteTime time.Time EndTime time.Time Count int Bytes int PartitionCount int Partition int32 Operation string Topic string GroupId string }
func (Metric) ExecuteDuration ¶
type MetricsHandler ¶
type MetricsHandler func(Metric)
type OptionalPartitioner ¶
type OptionalPartitioner struct {
// contains filtered or unexported fields
}
func NewOptionalPartitioner ¶
func NewOptionalPartitioner(partitioner kgo.Partitioner) OptionalPartitioner
A kgo compatible partitioner which respects Record partitions that are manually assigned. If the record partition is AutoAssign, the provided kgo.Partitioner will be used for partition assignment. Note: NewRecord will return a record with a partition of AutoAssign.
func NewOptionalPerTopicPartitioner ¶ added in v1.0.4
func NewOptionalPerTopicPartitioner(defaultPartitioner kgo.Partitioner, topicPartitioners map[string]kgo.Partitioner) OptionalPartitioner
A kgo compatible partitioner which respects Record partitions that are manually assigned. Allows you to set different partitioner per topic. If a topic is encountered that has not been defined, defaultPartitioner will be used.
func (OptionalPartitioner) ForTopic ¶
func (op OptionalPartitioner) ForTopic(topic string) kgo.TopicPartitioner
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
A simple kafka producer
func NewProducer ¶
func NewProducer(destination Destination, opts ...kgo.Opt) *Producer
Create a new Producer. Destination provides cluster connect information. Defaults options are: kgo.ProducerLinger(5 * time.Millisecond) and kgo.RecordPartitioner(NewOptionalPartitioner(kgo.StickyKeyPartitioner(nil)))
func (*Producer) Produce ¶
Produces a record, blocking until complete. If the record has not topic, the DefaultTopic of the producer's Destination will be used.
func (*Producer) ProduceAsync ¶
Produces a record asynchronously. If callback is non-nill, it will be executed `callback` when the call is complete. If the record has not topic, the DefaultTopic of the producer's Destination will be used.
type Record ¶
type Record struct {
// contains filtered or unexported fields
}
func JsonItemEncoder ¶
A convenience function for encoding an item into a Record suitable for sending to a producer Please not that the Key on the record will be left uninitialized. Usage:
record := codec.JsonItemEncoder("myType", myItem) record.WriteKeyString(myItem.Key)
func (*Record) AsIncomingRecord ¶ added in v1.0.4
func (r *Record) AsIncomingRecord() IncomingRecord
A convenience function for unit testing. This method should not need to be invoked in a production code.
func (*Record) ToKafkaRecord ¶
Creates a newly allocated kgo.Record. The Key and Value fields are freshly allocated bytes, copied from streams.Record.
func (*Record) TopicPartition ¶ added in v1.0.4
func (r *Record) TopicPartition() TopicPartition
func (*Record) ValueWriter ¶
func (*Record) WithKeyString ¶
func (*Record) WithPartition ¶
func (*Record) WithRecordType ¶
func (*Record) WriteKeyString ¶
func (*Record) WriteValue ¶
func (*Record) WriteValueString ¶
type SchedulerConfig ¶
type SchedulerConfig struct {
Concurrency, WorkerQueueDepth, MaxConcurrentKeys int
}
type SimpleCluster ¶
type SimpleCluster []string
A Cluster implementation useful for local development/testing. Establishes a plain text connection to a Kafka cluster. For a more advanced example, see github.com/aws/go-kafka-event-source/msk.
cluster := streams.SimpleCluster([]string{"127.0.0.1:9092"})
type SimpleLogger ¶
type SimpleLogger LogLevel
SimpleLogger implements Logger and writes to STDOUT. Good for development purposes.
func (SimpleLogger) Debugf ¶
func (sl SimpleLogger) Debugf(msg string, args ...any)
func (SimpleLogger) Errorf ¶
func (sl SimpleLogger) Errorf(msg string, args ...any)
func (SimpleLogger) Infof ¶
func (sl SimpleLogger) Infof(msg string, args ...any)
func (SimpleLogger) Tracef ¶
func (sl SimpleLogger) Tracef(msg string, args ...any)
func (SimpleLogger) Warnf ¶
func (sl SimpleLogger) Warnf(msg string, args ...any)
type Source ¶
type Source struct {
// contains filtered or unexported fields
}
A readonly wrapper of EventSourceConfig. When an EventSource is initialized, it reconciles the actual Topic configuration (NumPartitions) from the Kafka cluster (or creates it if missing) and wraps the corrected EventSourceConfig.
func CreateSource ¶
func CreateSource(sourceConfig EventSourceConfig) (resolved *Source, err error)
Creates all necessary topics in the Kafka appropriate clusters as defined by Source. Automatically invoked as part of NewSourceConsumer(). Ignores errros TOPIC_ALREADT_EXISTS errors. Returns a corrected Source where NumPartitions and CommitLogPartitions are pulled from a ListTopics call. This is to prevent drift errors. Returns an error if the details for Source topics could not be retrieved, or if there is a mismatch in partition counts fo the source topic and change log topic.
func (*Source) AsDestination ¶
func (s *Source) AsDestination() Destination
A convenience method for creating a Destination form your Source. Can be used for creating a Producer or BatchProducer which publishes to your EventSource.
func (*Source) BalanceStrategies ¶
func (s *Source) BalanceStrategies() []BalanceStrategy
func (*Source) CommitLogTopicNameForGroupId ¶
Returns the formatted topic name used for the commit log of Source
func (*Source) Config ¶
func (s *Source) Config() EventSourceConfig
func (*Source) NumPartitions ¶
func (*Source) State ¶
func (s *Source) State() EventSourceState
func (*Source) StateStoreTopicName ¶ added in v1.0.2
Returns the formatted topic name used for the StateStore of Source
type StateStore ¶
type StateStore interface { ReceiveChange(IncomingRecord) error Revoked() }
type StateStoreFactory ¶
type StateStoreFactory[T StateStore] TopicPartitionCallback[T]
A callback invoked when a new TopicPartition has been assigned to a EventSource. Your callback should return an empty StateStore.
type TopicPartition ¶
type TopicPartitionCallback ¶
type TopicPartitionCallback[T any] func(TopicPartition) T
Defines a method which accepts a TopiCPartition argument and returns T
type TopicPartitionSet ¶
type TopicPartitionSet struct { *btree.BTreeG[TopicPartition] }
A convenience data structure. It is what the name implies, a Set of TopicPartitions. This data structure is not thread-safe. You will need to providde your own locking mechanism.
func NewTopicPartitionSet ¶
func NewTopicPartitionSet() TopicPartitionSet
Returns a new, empty TopicPartitionSet.
func (TopicPartitionSet) Contains ¶
func (tps TopicPartitionSet) Contains(tp TopicPartition) bool
Tertuens true if the tp is currently a member of TopicPartitionSet
func (TopicPartitionSet) Insert ¶
func (tps TopicPartitionSet) Insert(tp TopicPartition) bool
Insert the TopicPartition. Returns true if the item was inserted, false if the item was aready present
func (TopicPartitionSet) Items ¶
func (tps TopicPartitionSet) Items() []TopicPartition
Converts the set to a newly allocate slice of TopicPartitions.
func (TopicPartitionSet) Remove ¶
func (tps TopicPartitionSet) Remove(tp TopicPartition) bool
Removes tp from the TopicPartitionSet. Rerurns true is the item was present.
type TxnErrorHandler ¶
type TxnErrorHandler func(err error) ErrorResponse
Source Files ¶
- async_batcher.go
- async_item_queue.go
- async_scheduler.go
- balancer_options.go
- codec.go
- commit_log.go
- doc.go
- eos_config.go
- eos_producer.go
- error_handling.go
- event_context.go
- event_source.go
- function_types.go
- global_change_log.go
- group_state.go
- incr_rebalancer.go
- interjection.go
- log.go
- metric.go
- partition_worker.go
- partitioned_store.go
- producer.go
- record.go
- source.go
- source_consumer.go
- state_store_consumer.go
- topics.go