Documentation ¶
Index ¶
- Constants
- Variables
- func GetPrincipal(app, subnet, domain string) string
- func GetRetryTime(attempt int, retryTime, maxRetryTime time.Duration) time.Duration
- func NewAdmin(brokerAddrs []string, adminCfg *AdminConfig) (sarama.ClusterAdmin, error)
- func SafeClose(ch chan struct{}) (justClosed bool)
- func SafeCloseBool(ch chan bool) (justClosed bool)
- func SafeCloseBytes(ch chan []byte) (justClosed bool)
- func SafeCloseErr(ch chan error) (justClosed bool)
- func SafeCloseMessage(ch chan Message) (justClosed bool)
- func SafeSendBool(ch chan bool, val bool) (err error)
- func SafeSendBytes(ch chan []byte, val []byte) (err error)
- func SafeSendErr(ch chan error, val error) (err error)
- func SafeSendProducerMessage(ch chan<- *sarama.ProducerMessage, val *sarama.ProducerMessage) (err error)
- func SetMaxMessageSize(maxSize int32)
- func UnwrapLogData(err error) log.Data
- func WaitWithTimeout(wg *sync.WaitGroup, tout time.Duration) bool
- type Acls
- type AdminConfig
- type Batch
- type BatchHandler
- type Commiter
- type ConsumerGroup
- func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels
- func (cg *ConsumerGroup) Checker(ctx context.Context, state *healthcheck.CheckState) error
- func (cg *ConsumerGroup) Close(ctx context.Context, optFuncs ...OptFunc) (err error)
- func (cg *ConsumerGroup) Initialise(ctx context.Context) error
- func (cg *ConsumerGroup) IsInitialised() bool
- func (cg *ConsumerGroup) LogErrors(ctx context.Context)
- func (cg *ConsumerGroup) OnHealthUpdate(status string)
- func (cg *ConsumerGroup) RegisterBatchHandler(ctx context.Context, batchHandler BatchHandler) error
- func (cg *ConsumerGroup) RegisterHandler(ctx context.Context, h Handler) error
- func (cg *ConsumerGroup) Start() error
- func (cg *ConsumerGroup) State() State
- func (cg *ConsumerGroup) StateWait(state State)
- func (cg *ConsumerGroup) Stop() error
- func (cg *ConsumerGroup) StopAndWait() error
- type ConsumerGroupChannels
- type ConsumerGroupConfig
- type ConsumerStateChannels
- type Error
- type Handler
- type HealthInfo
- type HealthInfoMap
- type IConsumerGroup
- type IProducer
- type Message
- type OptFunc
- type Producer
- func (p *Producer) AddHeader(key, value string)
- func (p *Producer) Channels() *ProducerChannels
- func (p *Producer) Checker(ctx context.Context, state *healthcheck.CheckState) error
- func (p *Producer) Close(ctx context.Context) (err error)
- func (p *Producer) Initialise(ctx context.Context) error
- func (p *Producer) IsInitialised() bool
- func (p *Producer) LogErrors(ctx context.Context)
- func (p *Producer) Send(schema *avro.Schema, event interface{}) error
- type ProducerChannels
- type ProducerConfig
- type SaramaMessage
- func (m SaramaMessage) Commit()
- func (m SaramaMessage) CommitAndRelease()
- func (M SaramaMessage) Context() context.Context
- func (m SaramaMessage) GetData() []byte
- func (M SaramaMessage) GetHeader(key string) string
- func (m SaramaMessage) Mark()
- func (m SaramaMessage) Offset() int64
- func (m SaramaMessage) Release()
- func (m SaramaMessage) UpstreamDone() chan struct{}
- type SecurityConfig
- type State
- type StateChan
- type StateMachine
- type TopicAuth
- type TopicAuthList
Constants ¶
const ( Errors = "Errors" Initialised = "Initialised" Consume = "Consume" Closer = "Closer" Closed = "Closed" Upstream = "Upstream" Output = "Output" )
channel names
const ( OffsetNewest = sarama.OffsetNewest OffsetOldest = sarama.OffsetOldest )
Consumer config constants
const ErrorChanBufferSize = 20
const MsgHealthyConsumerGroup = "kafka consumer group is healthy"
MsgHealthyConsumerGroup Check message returned when Kafka consumer group is healthy.
const MsgHealthyProducer = "kafka producer is healthy"
MsgHealthyProducer Check message returned when Kafka producer is healthy.
const ServiceName = "Kafka"
ServiceName is the name of this service: Kafka.
const (
TraceIDHeaderKey = string(request.RequestIdKey)
)
Variables ¶
var ErrTLSCannotLoadCACerts = errors.New("cannot load CA Certs")
ErrTLSCannotLoadCACerts is returned when the certs file cannot be loaded
Functions ¶
func GetPrincipal ¶
func GetRetryTime ¶
GetRetryTime will return a duration based on the attempt and initial retry time. It uses the algorithm `2^n` where `n` is the modulerised attempt number, so that we don't get values greater than 'maxRetryTime'. A randomization factor of ±25% is added to the initial retry time so that the server isn't being hit at the same time by many clients. The first attempt is assumed to be number 1, any negative or 0 value will be assumed to be 1.
func NewAdmin ¶
func NewAdmin(brokerAddrs []string, adminCfg *AdminConfig) (sarama.ClusterAdmin, error)
NewAdmin creates an admin-based client
func SafeClose ¶
func SafeClose(ch chan struct{}) (justClosed bool)
SafeClose closes a struct{} channel and ignores the panic if the channel was already closed
func SafeCloseBool ¶
SafeCloseBool closes a bool channel and ignores the panic if the channel was already closed
func SafeCloseBytes ¶
SafeCloseBytes closes a byte array channel and ignores the panic if the channel was already closed
func SafeCloseErr ¶
SafeCloseErr closes an error channel and ignores the panic if the channel was already closed
func SafeCloseMessage ¶
SafeCloseMessage closes a Message channel and ignores the panic if the channel was already closed
func SafeSendBool ¶
SafeSendBool sends a provided bool value to the provided bool chan and returns an error instead of panicking if the channel is closed
func SafeSendBytes ¶
SafeSendBytes sends a provided byte array value to the provided byte array chan and returns an error instead of panicking if the channel is closed
func SafeSendErr ¶
SafeSendErr sends a provided error value to the provided error chan and returns an error instead of panicking if the channel is closed
func SafeSendProducerMessage ¶
func SafeSendProducerMessage(ch chan<- *sarama.ProducerMessage, val *sarama.ProducerMessage) (err error)
SafeSendProducerMessage sends a provided ProducerMessage value to the provided ProducerMessage chan and returns an error instad of panicking if the channel is closed
func SetMaxMessageSize ¶
func SetMaxMessageSize(maxSize int32)
SetMaxMessageSize sets the Sarama MaxRequestSize and MaxResponseSize values to the provided maxSize
func UnwrapLogData ¶
UnwrapLogData recursively unwraps logData from an error. This allows an error to be wrapped with log.Data at each level of the call stack, and then extracted and combined here as a single log.Data entry. This allows us to log errors only once but maintain the context provided by log.Data at each level.
Types ¶
type Acls ¶
type Acls []*sarama.AclCreation
type AdminConfig ¶
type AdminConfig struct { KafkaVersion *string KeepAlive *time.Duration RetryBackoff *time.Duration RetryMax *int SecurityConfig *SecurityConfig }
AdminConfig exposes the optional configurable parameters for an admin client to overwrite default Sarama config values. Any value that is not provided will use the default Sarama config value.
type Batch ¶
type Batch struct {
// contains filtered or unexported fields
}
Batch handles adding raw messages to a batch of ObservationExtracted events.
func (*Batch) Commit ¶
func (batch *Batch) Commit()
Commit is called when the batch has been processed. The last message has been released already, so at this point we just need to commit
type BatchHandler ¶
BatchHandler represents a handler for processing a batch of kafka messages. This method will be called by only one go-routine at a time.
type Commiter ¶
type Commiter interface {
Commit() bool
}
Commiter represents an error type that defines a bool method to enable or disable a message/batch to be committed.
type ConsumerGroup ¶
type ConsumerGroup struct {
// contains filtered or unexported fields
}
ConsumerGroup is a Kafka consumer group instance.
func NewConsumerGroup ¶
func NewConsumerGroup(ctx context.Context, cgConfig *ConsumerGroupConfig) (*ConsumerGroup, error)
NewConsumerGroup creates a new consumer group with the provided parameters
func (*ConsumerGroup) Channels ¶
func (cg *ConsumerGroup) Channels() *ConsumerGroupChannels
Channels returns the ConsumerGroup channels for this consumer group
func (*ConsumerGroup) Checker ¶
func (cg *ConsumerGroup) Checker(ctx context.Context, state *healthcheck.CheckState) error
Checker checks health of Kafka consumer-group and updates the provided CheckState accordingly
func (*ConsumerGroup) Close ¶
func (cg *ConsumerGroup) Close(ctx context.Context, optFuncs ...OptFunc) (err error)
Close safely closes the consumer and releases all resources. pass in a context with a timeout or deadline.
func (*ConsumerGroup) Initialise ¶
func (cg *ConsumerGroup) Initialise(ctx context.Context) error
Initialise creates a new Sarama ConsumerGroup and the consumer/error loops, only if it was not already initialised.
func (*ConsumerGroup) IsInitialised ¶
func (cg *ConsumerGroup) IsInitialised() bool
IsInitialised returns true only if Sarama ConsumerGroup has been correctly initialised.
func (*ConsumerGroup) LogErrors ¶
func (cg *ConsumerGroup) LogErrors(ctx context.Context)
LogErrors creates a go-routine that waits on Errors channel and logs any error received. It exits on Closer channel closed.
func (*ConsumerGroup) OnHealthUpdate ¶
func (cg *ConsumerGroup) OnHealthUpdate(status string)
OnHealthUpdate implements the healthcheck Subscriber interface so that the kafka consumer can be notified of state changes. This method is intended to be used for managed start/stop's only, and should not be called manually. WARNING: Having more than one notifier calling this method will result in unexpected behavior. - On Health status OK: start consuming - On Warning or Critical: stop consuming
func (*ConsumerGroup) RegisterBatchHandler ¶
func (cg *ConsumerGroup) RegisterBatchHandler(ctx context.Context, batchHandler BatchHandler) error
func (*ConsumerGroup) RegisterHandler ¶
func (cg *ConsumerGroup) RegisterHandler(ctx context.Context, h Handler) error
func (*ConsumerGroup) Start ¶
func (cg *ConsumerGroup) Start() error
Start has different effects depending on the state: - Initialising: the consumer will try to start consuming straight away once it's initialised - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start consuming - Closing: an error will be returned
func (*ConsumerGroup) State ¶
func (cg *ConsumerGroup) State() State
State returns the state of the consumer group
func (*ConsumerGroup) StateWait ¶ added in v3.1.0
func (cg *ConsumerGroup) StateWait(state State)
StateWait blocks the calling thread until the provided state is reached
func (*ConsumerGroup) Stop ¶
func (cg *ConsumerGroup) Stop() error
Stop has different effects depending on the state: - Initialising: the consumer will remain in the Stopped state once initialised, without consuming messages - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start start consuming - Closing: an error will be returned This method does not wait until the consumerGroup reaches the stopped state, it only triggers the stopping action. Any error while trying to trigger the stop will be returned
func (*ConsumerGroup) StopAndWait ¶
func (cg *ConsumerGroup) StopAndWait() error
StopAndWait has different effects depending on the state: - Initialising: the consumer will remain in the Stopped state once initialised, without consuming messages - Starting/Consumer: no change will happen - Stopping/Stopped: the consumer will start start consuming - Closing: an error will be returned This method waits until the consumerGroup reaches the stopped state if it was starting/consuming. Any error while trying to trigger the stop will be returned and the therad will not be blocked.
type ConsumerGroupChannels ¶
type ConsumerGroupChannels struct { Upstream chan Message Errors chan error Initialised chan struct{} Consume chan bool Closer chan struct{} Closed chan struct{} State *ConsumerStateChannels }
ConsumerGroupChannels represents the channels used by ConsumerGroup.
func CreateConsumerGroupChannels ¶
func CreateConsumerGroupChannels(upstreamBufferSize, errorsBufferSize int) *ConsumerGroupChannels
CreateConsumerGroupChannels initialises a ConsumerGroupChannels with new channels. You can provide the buffer size to determine the number of messages that will be buffered in the upstream channel (to receive messages) The State channels are not initialised until the state machine is created.
func (*ConsumerGroupChannels) Validate ¶
func (consumerChannels *ConsumerGroupChannels) Validate() error
Validate returns an Error with a list of missing channels if any consumer channel is nil
type ConsumerGroupConfig ¶
type ConsumerGroupConfig struct { // Sarama config overrides KafkaVersion *string KeepAlive *time.Duration RetryBackoff *time.Duration RetryBackoffFunc *func(retries int) time.Duration Offset *int64 SecurityConfig *SecurityConfig MessageConsumeTimeout *time.Duration // dp-kafka specific config overrides NumWorkers *int BatchSize *int BatchWaitTime *time.Duration MinRetryPeriod *time.Duration MaxRetryPeriod *time.Duration MinBrokersHealthy *int Topic string GroupName string BrokerAddrs []string }
ConsumerGroupConfig exposes the configurable parameters for a consumer group to overwrite default config values and any other defult config values set by dp-kafka. Any value that is not provided will use the default Sarama config value, or the default dp-kafka value. The only 3 compulsory values are: - Topic - GroupName - BrokerAddrs
func (*ConsumerGroupConfig) Get ¶
func (c *ConsumerGroupConfig) Get() (*sarama.Config, error)
Get creates a default sarama config for a consumer-group and overwrites any values provided in cgConfig. If any required value is not provided or any override is invalid, an error will be returned
func (*ConsumerGroupConfig) Validate ¶
func (c *ConsumerGroupConfig) Validate() error
Validate that compulsory values are provided in config
type ConsumerStateChannels ¶
type ConsumerStateChannels struct { Initialising *StateChan Stopped *StateChan Starting *StateChan Consuming *StateChan Stopping *StateChan Closing *StateChan }
ConsumerStateChannels represents the channels that are used to notify of consumer-group state changes
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error is the handler package's error type. Is not meant to be compared as a a type, but information should be extracted via the interfaces it implements with callback functions. Is not guaranteed to remain exported so shouldn't be treated as such.
type Handler ¶
Handler represents a handler for processing a single kafka message. IMPORTANT: if maxConsumers > 1 then this method needs to be thread safe.
type HealthInfo ¶
HealthInfo contains the health information for one broker
type HealthInfoMap ¶
type HealthInfoMap struct {
// contains filtered or unexported fields
}
HealthInfoMap contains the health information for a set of brokers with a common topic expected to be available in all of them
func Healthcheck ¶
func Healthcheck(ctx context.Context, brokers []interfaces.SaramaBroker, topic string, cfg *sarama.Config) HealthInfoMap
Healthcheck validates all the provided brokers for the provided topic. It returns a HealthInfoMap containing all the information.
func (*HealthInfoMap) Set ¶
func (h *HealthInfoMap) Set(broker interfaces.SaramaBroker, healthInfo HealthInfo)
Set creates or overrides a HealthInfo value for the provided broker
func (*HealthInfoMap) UpdateStatus ¶
func (h *HealthInfoMap) UpdateStatus(state *health.CheckState, minHealthyThreshold int, msgHealthy string) error
UpdateStatus update the provided health.Check state with the current state according to the provided minimum number of healthy brokers for the group to be considered healthy. If the health status is OK, the provided msgHealthy will be used as status message.
type IConsumerGroup ¶
type IConsumerGroup interface { Channels() *ConsumerGroupChannels State() State StateWait(state State) RegisterHandler(ctx context.Context, h Handler) error RegisterBatchHandler(ctx context.Context, batchHandler BatchHandler) error Checker(ctx context.Context, state *healthcheck.CheckState) error IsInitialised() bool Initialise(ctx context.Context) error OnHealthUpdate(status string) Start() error Stop() error StopAndWait() error LogErrors(ctx context.Context) Close(ctx context.Context, optFuncs ...OptFunc) error }
IConsumerGroup is an interface representing a Kafka Consumer Group, as implemented in dp-kafka/consumer
type IProducer ¶
type IProducer interface { Channels() *ProducerChannels Checker(ctx context.Context, state *healthcheck.CheckState) error LogErrors(ctx context.Context) IsInitialised() bool Initialise(ctx context.Context) error Send(schema *avro.Schema, event interface{}) error Close(ctx context.Context) (err error) AddHeader(key, value string) }
IProducer is an interface representing a Kafka Producer, as implemented in dp-kafka/producer
type Message ¶
type Message interfaces.Message
type OptFunc ¶ added in v3.7.0
type OptFunc func()
OptFunc is basically an optional function that is run once the upstream channel is closed and before consumer closer is called. for example , while doing the graceful shutdown you would have received messages which are not processed, like you would have released a message when you receive it from upstream and added to a batch and after a certain time when the batch is processed then only messages are committed back . Now if you don't process this batch during the graceful shutdown this can create a lag in the consumer group.
The optional functions can basically do this for you. During the graceful shutdown you can pass, say for the above case the batch processing to process the unprocessed batch and commit them back to the consumer group while making sure no new messages are received.The optional function is run once the upstream channel is closed to make sure no new messages are received and before the consumer is closed so that the remaining messages can be processed and committed back to the consumer group.
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
Producer is a producer of Kafka messages
func NewProducer ¶
func NewProducer(ctx context.Context, pConfig *ProducerConfig) (producer *Producer, err error)
NewProducer returns a new producer instance using the provided config and channels. The rest of the config is set to defaults. If any channel parameter is nil, an error will be returned.
func (*Producer) Channels ¶
func (p *Producer) Channels() *ProducerChannels
Channels returns the Producer channels for this producer
func (*Producer) Checker ¶
func (p *Producer) Checker(ctx context.Context, state *healthcheck.CheckState) error
Checker checks health of Kafka producer and updates the provided CheckState accordingly
func (*Producer) Close ¶
Close safely closes the producer and releases all resources. pass in a context with a timeout or deadline.
func (*Producer) Initialise ¶
Initialise creates a new Sarama AsyncProducer and the channel redirection, only if it was not already initialised.
func (*Producer) IsInitialised ¶
IsInitialised returns true only if Sarama producer has been correctly initialised.
type ProducerChannels ¶
type ProducerChannels struct { Output chan []byte Errors chan error Initialised chan struct{} Closer chan struct{} Closed chan struct{} }
ProducerChannels represents the channels used by Producer.
func CreateProducerChannels ¶
func CreateProducerChannels() *ProducerChannels
CreateProducerChannels initialises a ProducerChannels with new channels.
func (*ProducerChannels) Validate ¶
func (producerChannels *ProducerChannels) Validate() error
Validate returns an error with a list of missing channels if any producer channel is nil
type ProducerConfig ¶
type ProducerConfig struct { // Sarama config overrides KafkaVersion *string MaxMessageBytes *int RetryMax *int KeepAlive *time.Duration RetryBackoff *time.Duration RetryBackoffFunc *func(retries, maxRetries int) time.Duration SecurityConfig *SecurityConfig MinBrokersHealthy *int // dp-kafka specific config Topic string BrokerAddrs []string MinRetryPeriod *time.Duration MaxRetryPeriod *time.Duration }
ProducerConfig exposes the optional configurable parameters for a producer to overwrite default Sarama config values. Any value that is not provided will use the default Sarama config value.
func (*ProducerConfig) Get ¶
func (p *ProducerConfig) Get() (*sarama.Config, error)
Get creates a default sarama config and overwrites any values provided in pConfig
func (*ProducerConfig) Validate ¶
func (p *ProducerConfig) Validate() error
Validate that compulsory values are provided in config
type SaramaMessage ¶
type SaramaMessage struct {
// contains filtered or unexported fields
}
SaramaMessage represents a Sarama specific Kafka message
func NewSaramaMessage ¶
func NewSaramaMessage(m *sarama.ConsumerMessage, s sarama.ConsumerGroupSession, ud chan struct{}) *SaramaMessage
func (SaramaMessage) Commit ¶
func (m SaramaMessage) Commit()
Commit marks the message as consumed, and then commits the offset to the backend
func (SaramaMessage) CommitAndRelease ¶
func (m SaramaMessage) CommitAndRelease()
CommitAndRelease marks the message as consumed, commits the offset to the backend and releases the UpstreamDone channel
func (SaramaMessage) Context ¶ added in v3.4.0
func (M SaramaMessage) Context() context.Context
Context returns a context with traceid.
func (SaramaMessage) GetData ¶
func (m SaramaMessage) GetData() []byte
GetData returns the message contents.
func (SaramaMessage) GetHeader ¶ added in v3.4.0
func (M SaramaMessage) GetHeader(key string) string
GetHeader takes a key for the header and returns the value if the key exist in the header.
func (SaramaMessage) Mark ¶
func (m SaramaMessage) Mark()
Mark marks the message as consumed, but doesn't commit the offset to the backend
func (SaramaMessage) Offset ¶
func (m SaramaMessage) Offset() int64
Offset returns the message offset
func (SaramaMessage) Release ¶
func (m SaramaMessage) Release()
Release closes the UpstreamDone channel, but doesn't mark the message or commit the offset
func (SaramaMessage) UpstreamDone ¶
func (m SaramaMessage) UpstreamDone() chan struct{}
UpstreamDone returns the upstreamDone channel. Closing this channel notifies that the message has been consumed (same effect as calling Release)
type SecurityConfig ¶
type SecurityConfig struct { RootCACerts string ClientCert string ClientKey string InsecureSkipVerify bool }
SecurityConfig is common to producers and consumer configs, above
func GetSecurityConfig ¶
func GetSecurityConfig(caCerts, clientCert, clientKey string, skipVerify bool) *SecurityConfig
type StateChan ¶ added in v3.1.0
type StateChan struct {
// contains filtered or unexported fields
}
StateChan provides a concurrency-safe channel for state machines, representing one state.
func NewStateChan ¶ added in v3.1.0
func NewStateChan() *StateChan
NewStateChan creates a new StateChan with a new struct channel and read-write mutex
func (*StateChan) Channel ¶ added in v3.1.0
func (sc *StateChan) Channel() chan struct{}
Get returns the channel wrapped by this StateChan struct in a concurrency-safe manner. Note: if you intend to wait on this channel, you will need to acquire a read lock on RWMutex() if you want to wait on the channel while a concurrent sc.leave() call may happen.
type StateMachine ¶
type StateMachine struct {
// contains filtered or unexported fields
}
func NewConsumerStateMachine ¶
func NewConsumerStateMachine() *StateMachine
func (*StateMachine) GetChan ¶ added in v3.1.0
func (sm *StateMachine) GetChan(s State) *StateChan
GetChan returns the StateChan pointer corresponding to the provided state
func (*StateMachine) Set ¶
func (sm *StateMachine) Set(newState State)
Set sets the state machine to the provided state value
func (*StateMachine) SetIf ¶
func (sm *StateMachine) SetIf(allowed []State, newState State) error
SetIf sets the state machine to the provided state value only if the current state is one of the values provided in the list
func (*StateMachine) String ¶
func (sm *StateMachine) String() string
String returns the string representation of the current state
type TopicAuth ¶
type TopicAuthList ¶
func (TopicAuthList) Apply ¶
func (t TopicAuthList) Apply(adm sarama.ClusterAdmin) error
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package avro provides a user functionality to return the avro encoding of s.
|
Package avro provides a user functionality to return the avro encoding of s. |
examples
|
|