Documentation ¶
Overview ¶
Package nakadi is a client library for the Nakadi event broker. It provides convenient access to many features of Nakadi's API. The package can be used to manage event type definitions.
The EventAPI can be used to inspect existing event types and allows further to create new event types and to update existing ones. The SubscriptionAPI provides subscription management: existing subscriptions can be fetched from Nakadi and of course it is also possible to create new ones. The PublishAPI of this package is used to broadcast event types of all event type categories via Nakadi. Last but not least, the package also implements a StreamAPI, which enables event processing on top of Nakadi's subscription based high level API.
To make the communication with Nakadi more resilient all sub APIs of this package can be configured to retry failed requests using an exponential back-off algorithm.
Example (Complete) ¶
// create a new client client := nakadi.New("http://localhost:8080", &nakadi.ClientOptions{ConnectionTimeout: 500 * time.Millisecond}) // create an event api create a new event type eventAPI := nakadi.NewEventAPI(client, &nakadi.EventOptions{Retry: true}) eventType := &nakadi.EventType{ Name: "test-type", OwningApplication: "test-app", Category: "data", EnrichmentStrategies: []string{"metadata_enrichment"}, PartitionStrategy: "random", Schema: &nakadi.EventTypeSchema{ Type: "json_schema", Schema: `{"properties":{"test":{"type":"string"}}}`, }, } err := eventAPI.Create(eventType) if err != nil { log.Fatal(err) } // create a new subscription API and a new subscription subAPI := nakadi.NewSubscriptionAPI(client, &nakadi.SubscriptionOptions{Retry: true}) sub := &nakadi.Subscription{ OwningApplication: "another-app", EventTypes: []string{"test-type"}, ReadFrom: "begin", } sub, err = subAPI.Create(sub) if err != nil { log.Fatal(err) } // create a publish api and publish events pubAPI := nakadi.NewPublishAPI(client, eventType.Name, nil) event := nakadi.DataChangeEvent{ Metadata: nakadi.EventMetadata{ EID: "9aabcd94-7ebd-11e7-898b-97df92934aa5", OccurredAt: time.Now(), }, Data: map[string]string{"test": "some value"}, DataOP: "U", DataType: "test", } err = pubAPI.PublishDataChangeEvent([]nakadi.DataChangeEvent{event}) if err != nil { log.Fatal(err) } fmt.Println("event published") // create a new stream and read one event stream := nakadi.NewStream(client, sub.ID, nil) cursor, _, err := stream.NextEvents() if err != nil { log.Fatal(err) } fmt.Println("1 event received") stream.CommitCursor(cursor) stream.Close() subAPI.Delete(sub.ID) eventAPI.Delete(eventType.Name)
Output: event published 1 event received
Index ¶
- type BatchItemResponse
- type BatchItemsError
- type BusinessEvent
- type Client
- type ClientOptions
- type Cursor
- type DataChangeEvent
- type EventAPI
- type EventMetadata
- type EventOptions
- type EventType
- type EventTypeOptions
- type EventTypeSchema
- type EventTypeStatistics
- type Operation
- type PartitionStats
- type Processor
- type ProcessorOptions
- type PublishAPI
- type PublishOptions
- type StreamAPI
- type StreamOptions
- type Subscription
- type SubscriptionAPI
- func (s *SubscriptionAPI) Create(subscription *Subscription) (*Subscription, error)
- func (s *SubscriptionAPI) Delete(id string) error
- func (s *SubscriptionAPI) Get(id string) (*Subscription, error)
- func (s *SubscriptionAPI) GetStats(id string) ([]*SubscriptionStats, error)
- func (s *SubscriptionAPI) List() ([]*Subscription, error)
- type SubscriptionOptions
- type SubscriptionStats
- type UndefinedEvent
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchItemResponse ¶
type BatchItemResponse struct { EID string `json:"eid"` PublishingStatus string `json:"publishing_status"` Step string `json:"step"` Detail string `json:"detail"` }
BatchItemResponse if a batch is only published partially each batch item response contains information about whether a singe event was successfully published or not.
type BatchItemsError ¶
type BatchItemsError []BatchItemResponse
BatchItemsError represents an error which contains information about the publishing status of each single event in a batch.
func (BatchItemsError) Error ¶
func (err BatchItemsError) Error() string
Error implements the error interface for BatchItemsError.
type BusinessEvent ¶
type BusinessEvent struct { Metadata EventMetadata `json:"metadata"` OrderNumber string `json:"order_number"` }
BusinessEvent represents a Nakadi events from the category "business".
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
A Client represents a basic configuration to access a Nakadi instance. The client is used to configure other sub APIs of the `go-nakadi` package.
func New ¶
func New(url string, options *ClientOptions) *Client
New creates a new Nakadi client. New receives the URL of the Nakadi instance the client should connect to. In addition the second parameter options can be used to configure the behavior of the client and of all sub APIs in this package. The options may be nil.
type ClientOptions ¶
ClientOptions contains all non mandatory parameters used to instantiate the Nakadi client.
type Cursor ¶
type Cursor struct { Partition string `json:"partition"` Offset string `json:"offset"` EventType string `json:"event_type"` CursorToken string `json:"cursor_token"` NakadiStreamID string `json:"-"` }
A Cursor marks the current read position in a stream. It returned along with each received batch of events and is furthermore used to commit a batch of events (as well as all previous events).
type DataChangeEvent ¶
type DataChangeEvent struct { Metadata EventMetadata `json:"metadata"` Data interface{} `json:"data"` DataOP string `json:"data_op"` DataType string `json:"data_type"` }
DataChangeEvent is a Nakadi event from the event category "data".
type EventAPI ¶
type EventAPI struct {
// contains filtered or unexported fields
}
EventAPI is a sub API that allows to inspect and manage event types on a Nakadi instance.
func NewEventAPI ¶
func NewEventAPI(client *Client, options *EventOptions) *EventAPI
NewEventAPI creates a new instance of a EventAPI implementation which can be used to manage event types on a specific Nakadi service. The last parameter is a struct containing only optional parameters. The options may be nil.
type EventMetadata ¶
type EventMetadata struct { EID string `json:"eid"` OccurredAt time.Time `json:"occurred_at"` EventType string `json:"event_type,omitempty"` Partition string `json:"partition,omitempty"` ParentEIDs []string `json:"parent_eids,omitempty"` FlowID string `json:"flow_id,omitempty"` ReceivedAt *time.Time `json:"received_at,omitempty"` }
EventMetadata represents the meta information which comes along with all Nakadi events. For publishing purposes only the fields eid and occurred_at must be present.
type EventOptions ¶
type EventOptions struct { // Whether or not methods of the EventAPI retry when a request fails. If // set to true InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have // no effect (default: false). Retry bool // The initial (minimal) retry interval used for the exponential backoff algorithm // when retry is enables. InitialRetryInterval time.Duration // MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches // this value the retry intervals remain constant. MaxRetryInterval time.Duration // MaxElapsedTime is the maximum time spent on retries when when performing a request. // Once this value was reached the exponential backoff is halted and the request will // fail with an error. MaxElapsedTime time.Duration }
EventOptions is a set of optional parameters used to configure the EventAPI.
type EventType ¶
type EventType struct { Name string `json:"name"` OwningApplication string `json:"owning_application"` Category string `json:"category"` EnrichmentStrategies []string `json:"enrichment_strategies,omitempty"` PartitionStrategy string `json:"partition_strategy,omitempty"` CompatibilityMode string `json:"compatibility_mode,omitempty"` Schema *EventTypeSchema `json:"schema"` PartitionKeyFields []string `json:"partition_key_fields"` DefaultStatistics *EventTypeStatistics `json:"default_statistics,omitempty"` Options *EventTypeOptions `json:"options,omitempty"` CreatedAt time.Time `json:"created_at,omitempty"` UpdatedAt time.Time `json:"updated_at,omitempty"` }
An EventType defines a kind of event that can be processed on a Nakadi service.
type EventTypeOptions ¶
type EventTypeOptions struct {
RetentionTime int64 `json:"retention_time"`
}
EventTypeOptions provide additional parameters for tuning Nakadi.
type EventTypeSchema ¶
type EventTypeSchema struct { Version string `json:"version,omitempty"` Type string `json:"type"` Schema string `json:"schema"` CreatedAt time.Time `json:"created_at,omitempty"` }
EventTypeSchema is a non optional description of the schema on an event type.
type EventTypeStatistics ¶
type EventTypeStatistics struct { MessagesPerMinute int `json:"messages_per_minute"` MessageSize int `json:"message_size"` ReadParallelism int `json:"read_parallelism"` WriteParallelism int `json:"write_parallelism"` }
EventTypeStatistics describe operational statistics for an event type. This statistics are used by Nakadi to optimize the throughput events from a certain kind. They are provided on event type creation.
type Operation ¶ added in v1.2.0
Operation defines a certain procedure that consumes the event data from a processor. An operation, can either be successful or may fail with an error. Operation receives three parameters: the stream number or position in the processor, the Nakadi stream id of the underlying stream, and the json encoded event payload.
type PartitionStats ¶ added in v1.1.0
type PartitionStats struct { Partition string `json:"partition"` State string `json:"state"` UnconsumedEvents int `json:"unconsumed_events"` StreamID string `json:"stream_id"` }
PartitionStats represents statistic information for the particular partition
type Processor ¶ added in v1.2.0
A Processor for nakadi events. The Processor is a high level API for consuming events from Nakadi. It can process event batches from multiple partitions (streams) and can be configured with a certain event rate, that limits the number of processed events per minute. The cursors of event batches that were successfully processed are automatically committed.
func NewProcessor ¶ added in v1.2.0
func NewProcessor(client *Client, subscriptionID string, options *ProcessorOptions) *Processor
NewProcessor creates a new processor for a given subscription ID. The constructor receives a configured Nakadi client as first parameter. Furthermore a valid subscription ID must be provided. The last parameter is a struct containing only optional parameters. The options may be nil, in this case the processor falls back to the defaults defined in the ProcessorOptions.
func (*Processor) Start ¶ added in v1.2.0
Start begins event processing. All event batches received from the underlying streams are passed to the operation function. If the operation function terminates without error the respective cursor will be automatically committed to Nakadi. If the operations terminates with an error, the underlying stream will be halted and a new stream will continue to pass event batches to the operation function.
Event processing will go on indefinitely unless the processor is stopped via its Stop method. Star will return an error if the processor is already running.
type ProcessorOptions ¶ added in v1.2.0
type ProcessorOptions struct { // The maximum number of Events in each chunk (and therefore per partition) of the stream (default: 1) BatchLimit uint // Maximum time in seconds to wait for the flushing of each chunk (per partition).(default: 30) FlushTimeout uint // The number of parallel streams the Processor will use to consume events (default: 1) StreamCount uint // Limits the number of events that the processor will handle per minute. This value represents an // upper bound, if some streams are not healthy e.g. if StreamCount exceeds the number of partitions, // or the actual batch size is lower than BatchLimit the actual number of processed events can be // much lower. 0 is interpreted as no limit at all (default: no limit) EventsPerMinute uint // The amount of uncommitted events Nakadi will stream before pausing the stream. When in paused // state and commit comes - the stream will resume. If MaxUncommittedEvents is lower than BatchLimit, // effective batch size will be upperbound by MaxUncommittedEvents. (default: 10, minimum: 1) MaxUncommittedEvents uint // The initial (minimal) retry interval used for the exponential backoff. This value is applied for // stream initialization as well as for cursor commits. InitialRetryInterval time.Duration // MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches this value // the retry intervals remain constant. This value is applied for stream initialization as well as // for cursor commits. MaxRetryInterval time.Duration // CommitMaxElapsedTime is the maximum time spent on retries when committing a cursor. Once this value // was reached the exponential backoff is halted and the cursor will not be committed. CommitMaxElapsedTime time.Duration // NotifyErr is called when an error occurs that leads to a retry. This notify function can be used to // detect unhealthy streams. The first parameter indicates the stream No that encountered the error. NotifyErr func(uint, error, time.Duration) // NotifyOK is called whenever a successful operation was completed. This notify function can be used // to detect that a stream is healthy again. The first parameter indicates the stream No that just // regained health. NotifyOK func(uint) }
ProcessorOptions contains optional parameters that are used to create a Processor.
type PublishAPI ¶
type PublishAPI struct {
// contains filtered or unexported fields
}
PublishAPI is a sub API for publishing Nakadi events. All publish methods emit events as a single batch. If a publish method returns an error, the caller should check whether the error is a BatchItemsError in order to verify which events of a batch have been published.
func NewPublishAPI ¶
func NewPublishAPI(client *Client, eventType string, options *PublishOptions) *PublishAPI
NewPublishAPI creates a new instance of the PublishAPI which can be used to publish Nakadi events. As for all sub APIs of the `go-nakadi` package NewPublishAPI receives a configured Nakadi client. Furthermore the name of the event type must be provided. The last parameter is a struct containing only optional parameters. The options may be nil.
func (*PublishAPI) Publish ¶
func (p *PublishAPI) Publish(events interface{}) error
Publish is used to emit a batch of undefined events. But can also be used to publish data change or business events. Depending on the options used when creating the PublishAPI this method will retry to publish the events if the were not successfully published.
func (*PublishAPI) PublishBusinessEvent ¶
func (p *PublishAPI) PublishBusinessEvent(events []BusinessEvent) error
PublishBusinessEvent emits a batch of business events. Depending on the options used when creating the PublishAPI this method will retry to publish the events if the were not successfully published.
func (*PublishAPI) PublishDataChangeEvent ¶
func (p *PublishAPI) PublishDataChangeEvent(events []DataChangeEvent) error
PublishDataChangeEvent emits a batch of data change events. Depending on the options used when creating the PublishAPI this method will retry to publish the events if the were not successfully published.
type PublishOptions ¶
type PublishOptions struct { // Whether or not publish methods retry when publishing fails. If set to true // InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have no effect // (default: false). Retry bool // The initial (minimal) retry interval used for the exponential backoff algorithm // when retry is enables. InitialRetryInterval time.Duration // MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches // this value the retry intervals remain constant. MaxRetryInterval time.Duration // MaxElapsedTime is the maximum time spent on retries when publishing events. Once // this value was reached the exponential backoff is halted and the events will not be // published. MaxElapsedTime time.Duration }
PublishOptions is a set of optional parameters used to configure the PublishAPI.
type StreamAPI ¶
type StreamAPI struct {
// contains filtered or unexported fields
}
A StreamAPI is a sub API which is used to consume events from a specific subscription using Nakadi's high level stream API. In order to ensure that only successfully processed events are committed, it is crucial to commit cursors of respective event batches in the same order they were received.
func NewStream ¶
func NewStream(client *Client, subscriptionID string, options *StreamOptions) *StreamAPI
NewStream is used to instantiate a new steam processing sub API. As for all sub APIs of the `go-nakadi` package NewStream receives a configured Nakadi client. Furthermore a valid subscription ID must be provided. Use the SubscriptionAPI in order to obtain subscriptions. The options parameter can be used to configure the behavior of the stream. The options may be nil.
func (*StreamAPI) CommitCursor ¶
CommitCursor commits a cursor to Nakadi.
func (*StreamAPI) NextEvents ¶
NextEvents reads the next batch of events from the stream and returns the encoded events along with the respective cursor. It blocks until the batch of events can be read from the stream, or the stream is closed.
type StreamOptions ¶
type StreamOptions struct { // The maximum number of Events in each chunk (and therefore per partition) of the stream (default: 1) BatchLimit uint // Maximum time in seconds to wait for the flushing of each chunk (per partition).(default: 30) FlushTimeout uint // The amount of uncommitted events Nakadi will stream before pausing the stream. When in paused // state and commit comes - the stream will resume. If MaxUncommittedEvents is lower than BatchLimit, // effective batch size will be upperbound by MaxUncommittedEvents. (default: 10, minimum: 1) MaxUncommittedEvents uint // The initial (minimal) retry interval used for the exponential backoff. This value is applied for // stream initialization as well as for cursor commits. InitialRetryInterval time.Duration // MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches this value // the retry intervals remain constant. This value is applied for stream initialization as well as // for cursor commits. MaxRetryInterval time.Duration // MaxElapsedTime is the maximum time spent on retries when committing a cursor. Once this value // was reached the exponential backoff is halted and the cursor will not be committed. CommitMaxElapsedTime time.Duration // Whether or not CommitCursor will retry when a request fails. If // set to true InitialRetryInterval, MaxRetryInterval, and CommitMaxElapsedTime have // no effect for commit requests (default: false). CommitRetry bool // NotifyErr is called when an error occurs that leads to a retry. This notify function can be used to // detect unhealthy streams. NotifyErr func(error, time.Duration) // NotifyOK is called whenever a successful operation was completed. This notify function can be used // to detect that a stream is healthy again. NotifyOK func() }
StreamOptions contains optional parameters that are used to create a StreamAPI.
type Subscription ¶
type Subscription struct { ID string `json:"id,omitempty"` OwningApplication string `json:"owning_application"` EventTypes []string `json:"event_types"` ConsumerGroup string `json:"consumer_group,omitempty"` ReadFrom string `json:"read_from,omitempty"` CreatedAt time.Time `json:"created_at,omitempty"` }
Subscription represents a subscription as used by the Nakadi high level API.
type SubscriptionAPI ¶
type SubscriptionAPI struct {
// contains filtered or unexported fields
}
SubscriptionAPI is a sub API that is used to manage subscriptions.
func NewSubscriptionAPI ¶
func NewSubscriptionAPI(client *Client, options *SubscriptionOptions) *SubscriptionAPI
NewSubscriptionAPI crates a new instance of the SubscriptionAPI. As for all sub APIs of the `go-nakadi` package NewSubscriptionAPI receives a configured Nakadi client. The last parameter is a struct containing only optional \ parameters. The options may be nil.
func (*SubscriptionAPI) Create ¶
func (s *SubscriptionAPI) Create(subscription *Subscription) (*Subscription, error)
Create initializes a new subscription. If the subscription already exists the pre existing subscription is returned.
func (*SubscriptionAPI) Delete ¶
func (s *SubscriptionAPI) Delete(id string) error
Delete removes an existing subscription.
func (*SubscriptionAPI) Get ¶
func (s *SubscriptionAPI) Get(id string) (*Subscription, error)
Get obtains a single subscription identified by its ID.
func (*SubscriptionAPI) GetStats ¶ added in v1.1.0
func (s *SubscriptionAPI) GetStats(id string) ([]*SubscriptionStats, error)
GetStats returns statistic information for subscription
func (*SubscriptionAPI) List ¶
func (s *SubscriptionAPI) List() ([]*Subscription, error)
List returns all available subscriptions.
type SubscriptionOptions ¶
type SubscriptionOptions struct { // Whether or not methods of the SubscriptionAPI retry when a request fails. If // set to true InitialRetryInterval, MaxRetryInterval, and MaxElapsedTime have // no effect (default: false). Retry bool // The initial (minimal) retry interval used for the exponential backoff algorithm // when retry is enables. InitialRetryInterval time.Duration // MaxRetryInterval the maximum retry interval. Once the exponential backoff reaches // this value the retry intervals remain constant. MaxRetryInterval time.Duration // MaxElapsedTime is the maximum time spent on retries when when performing a request. // Once this value was reached the exponential backoff is halted and the request will // fail with an error. MaxElapsedTime time.Duration }
SubscriptionOptions is a set of optional parameters used to configure the SubscriptionAPI.
type SubscriptionStats ¶ added in v1.1.0
type SubscriptionStats struct { EventType string `json:"event_type"` Partitions []*PartitionStats `json:"partitions"` }
SubscriptionStats represents detailed statistics for the subscription
type UndefinedEvent ¶
type UndefinedEvent struct {
Metadata EventMetadata `json:"metadata"`
}
UndefinedEvent can be embedded in structs representing Nakadi events from the event category "undefined".