Documentation ¶
Index ¶
- Variables
- type Consumer
- type Global
- type Option
- func ConsumerOptions(fn func(c *Consumer)) Option
- func DisableEnvironmentConfig() Option
- func InmemListen() Option
- func InmemStore(s stream.Store) Option
- func KafkaBroker(s string) Option
- func KafkaCommitInterval(d time.Duration) Option
- func KafkaCompressionCodec(s kafkaconfig.Compression) Option
- func KafkaDebug() Option
- func KafkaGroupID(s string) Option
- func KafkaGroupIDRandom() Option
- func KafkaHeartbeatInterval(d time.Duration) Option
- func KafkaID(s string) Option
- func KafkaMaxDeliveryRetries(i int) Option
- func KafkaMaxInFlightRequests(i int) Option
- func KafkaMaxQueueBufferDuration(d time.Duration) Option
- func KafkaMaxQueueSizeKBytes(i int) Option
- func KafkaMaxQueueSizeMessages(i int) Option
- func KafkaOffsetHead(i uint32) Option
- func KafkaOffsetInitial(s kafkaconfig.Offset) Option
- func KafkaOffsetTail(i uint32) Option
- func KafkaOrderedDelivery() Option
- func KafkaRequireAllAck() Option
- func KafkaRequireLeaderAck() Option
- func KafkaRequireNoAck() Option
- func KafkaSSL(capath, certpath, crlpath, keypassword, keypath, keystorepassword, ... string) Option
- func KafkaSecurityProtocol(s kafkaconfig.Protocol) Option
- func KafkaSessionTimeout(d time.Duration) Option
- func KafkaStatisticsInterval(d time.Duration) Option
- func KafkaTopic(s string) Option
- func Logger(l *zap.Logger) Option
- func ManualErrorHandling() Option
- func ManualInterruptHandling() Option
- func Name(s string) Option
- func ProducerOptions(fn func(p *Producer)) Option
- func StandardstreamReader(w io.ReadCloser) Option
- func StandardstreamWriter(w io.Writer) Option
- func TestConsumerOptions(tb testing.TB, options ...Option) []Option
- type Producer
Constants ¶
This section is empty.
Variables ¶
var ConsumerDefaults = Consumer{Global: GlobalDefaults}
ConsumerDefaults holds the default values for Consumer.
var GlobalDefaults = Global{ HandleErrors: true, HandleInterrupt: true, Name: "", AllowEnvironmentBasedConfiguration: true, }
GlobalDefaults provide a default of global preferences.
var ProducerDefaults = Producer{Global: GlobalDefaults}
ProducerDefaults holds the default values for Producer.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { Inmem inmemconfig.Consumer Kafka kafkaconfig.Consumer Pubsub pubsubconfig.Consumer Standardstream standardstreamconfig.Consumer Global }
Consumer contains the configuration for all the different consumer that implement the stream.Consumer interface. When a consumer is instantiated, these options can be passed into the new consumer to determine its behavior. If the consumer only has to support a single implementation of the interface, then all other configuration values can be ignored.
func NewConsumer ¶
NewConsumer returns a new Consumer configuration struct, containing the values passed into the function. If any error occurs during configuration validation, an error is returned as the second argument.
func TestNewConsumer ¶
TestNewConsumer returns a new consumer configuration struct, optionally with the default values removed.
func (Consumer) FromEnv ¶
FromEnv populates the Consumer based on the environment variables set with the prefix based on the consumer name.
func (Consumer) WithOptions ¶
WithOptions takes the current Consumer, applies the supplied Options, and returns the resulting Consumer.
type Global ¶
type Global struct { // AllowEnvironmentBasedConfiguration allows you to disable configuring the // stream client based on predefined environment variables. This is enabled by // default, but can be disabled if you want full control over the behavior of // the stream client without any outside influence. AllowEnvironmentBasedConfiguration bool `ignored:"true"` // HandleErrors determines whether the consumer should handle any stream // errors by itself, and terminate the application if any error occurs. This // defaults to true. If manually set to false, the `Errors()` channel needs to // be consumed manually, and any appropriate action needs to be taken when an // errors occurs, otherwise the consumer will get stuck once an error occurs. HandleErrors bool `ignored:"true"` // HandleInterrupt determines whether the consumer should close itself // gracefully when an interrupt signal (^C) is received. This defaults to true // to increase first-time ease-of-use, but if the application wants to handle // these signals manually, this flag disables the automated implementation. HandleInterrupt bool `ignored:"true"` // Logger is the configurable logger instance to log messages. If left // undefined, a no-op logger will be used. Logger *zap.Logger `ignored:"true"` // Name is the name of the current processor. It is currently only used to // determine the prefix for environment-variable based configuration values. // For example, if Name is set to `MyProcessor`, then all environment // variable-based configurations need to start with `MYPROCESSOR_`. If no name // is set, then the prefix is "consumer" is used, so you prepend all // environment variables with "CONSUMER_. Name string `ignored:"true"` // Type is the type of the current processor. Type string `envconfig:"client_type"` }
Global is a common set of preferences shared between consumers and producers.
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
An Option configures a Consumer and/or Producer.
func ConsumerOptions ¶
ConsumerOptions is a convenience accessor to manually set consumer options.
func DisableEnvironmentConfig ¶
func DisableEnvironmentConfig() Option
DisableEnvironmentConfig prevents the consumer or producer to be configured via environment variables, instead of the default configuration to allow environment variable-based configurations.
func InmemListen ¶
func InmemListen() Option
InmemListen configures the inmem consumer to continuously listen for any new messages in the configured store.
This option has no effect when applied to a producer.
func InmemStore ¶
InmemStore adds a store to the inmem consumer and producer.
func KafkaBroker ¶
KafkaBroker adds a broker to the list of configured Kafka brokers.
func KafkaCommitInterval ¶
KafkaCommitInterval sets the consumer's CommitInterval.
This option has no effect when applied to a producer.
func KafkaCompressionCodec ¶
func KafkaCompressionCodec(s kafkaconfig.Compression) Option
KafkaCompressionCodec sets the compression codec for the produced messages.
// This option has no effect when applied to a consumer.
func KafkaGroupID ¶
KafkaGroupID sets the group ID for the consumer.
This option has no effect when applied to a producer.
func KafkaGroupIDRandom ¶
func KafkaGroupIDRandom() Option
KafkaGroupIDRandom sets the group ID for the consumer to a random ID. This can be used to configure one-off consumers that should not share their state in a consumer group.
This option has no effect when applied to a producer.
func KafkaHeartbeatInterval ¶
KafkaHeartbeatInterval sets the consumer or producer HeartbeatInterval.
func KafkaMaxDeliveryRetries ¶
KafkaMaxDeliveryRetries sets the MaxDeliveryRetries.
This option has no effect when applied to a consumer.
func KafkaMaxInFlightRequests ¶
KafkaMaxInFlightRequests sets the maximum allowed in-flight requests for both consumers and producers.
func KafkaMaxQueueBufferDuration ¶
KafkaMaxQueueBufferDuration sets the MaxQueueBufferDuration.
This option has no effect when applied to a consumer.
func KafkaMaxQueueSizeKBytes ¶
KafkaMaxQueueSizeKBytes sets the MaxQueueSizeKBytes.
This option has no effect when applied to a consumer.
func KafkaMaxQueueSizeMessages ¶
KafkaMaxQueueSizeMessages sets the MaxQueueSizeMessages.
This option has no effect when applied to a consumer.
func KafkaOffsetHead ¶
KafkaOffsetHead sets the OffsetDefault.
This option has no effect when applied to a producer.
func KafkaOffsetInitial ¶
func KafkaOffsetInitial(s kafkaconfig.Offset) Option
KafkaOffsetInitial sets the OffsetInitial.
This option has no effect when applied to a producer.
func KafkaOffsetTail ¶
KafkaOffsetTail sets the OffsetDefault.
This option has no effect when applied to a producer.
func KafkaOrderedDelivery ¶
func KafkaOrderedDelivery() Option
KafkaOrderedDelivery sets `MaxInFlightRequests` to `1` for the producer, to guarantee ordered delivery of messages.
see: https://git.io/vpgiV see: https://git.io/vpgDg
This option has no effect when applied to a consumer.
func KafkaRequireAllAck ¶
func KafkaRequireAllAck() Option
KafkaRequireAllAck configures the producer wait for a acks from all brokers available in the Kafka cluster.
This option has no effect when applied to a consumer.
func KafkaRequireLeaderAck ¶
func KafkaRequireLeaderAck() Option
KafkaRequireLeaderAck configures the producer wait for a single ack by the Kafka cluster leader broker.
This option has no effect when applied to a consumer.
func KafkaRequireNoAck ¶
func KafkaRequireNoAck() Option
KafkaRequireNoAck configures the producer not to wait for any broker acks.
This option has no effect when applied to a consumer.
func KafkaSSL ¶
func KafkaSSL(capath, certpath, crlpath, keypassword, keypath, keystorepassword, keystorepath string) Option
KafkaSSL configures the producer or consumer to use the specified SSL config.
func KafkaSecurityProtocol ¶
func KafkaSecurityProtocol(s kafkaconfig.Protocol) Option
KafkaSecurityProtocol configures the producer or consumer to use the specified security protocol.
func KafkaSessionTimeout ¶
KafkaSessionTimeout configures the producer or consumer to use the specified session timeout.
func KafkaStatisticsInterval ¶
KafkaStatisticsInterval configures the producer or consumer to use the specified statistics interval.
func KafkaTopic ¶
KafkaTopic configures the producer or consumer to use the specified topic. In case of the consumer, this option can be used multiple times to consume from more than one topic. In case of the producer, the last usage of this option will set the final topic to produce to.
func ManualErrorHandling ¶
func ManualErrorHandling() Option
ManualErrorHandling prevents the consumer or producer to automatically handle stream errors. When this option is passed, the application itself needs to listen to, and act on the `Errors()` channel.
func ManualInterruptHandling ¶
func ManualInterruptHandling() Option
ManualInterruptHandling prevents the consumer or producer to automatically handle interrupt signals. When this option is passed, the application itself needs to handle Unix interrupt signals to properly close the consumer or producer when required.
func ProducerOptions ¶
ProducerOptions is a convenience accessor to manually set producer options.
func StandardstreamReader ¶
func StandardstreamReader(w io.ReadCloser) Option
StandardstreamReader sets the reader to use as the message stream from which to read.
This option has no effect when applied to a producer.
func StandardstreamWriter ¶
StandardstreamWriter sets the writer to use as the message stream to write to.
This option has no effect when applied to a consumer.
type Producer ¶
type Producer struct { Inmem inmemconfig.Producer Kafka kafkaconfig.Producer Pubsub pubsubconfig.Producer Standardstream standardstreamconfig.Producer Global }
Producer contains the configuration for all the different consumer that implement the stream.Producer interface. When a producer is instantiated, these options can be passed into the new producer to determine its behavior. If the producer only has to support a single implementation of the interface, then all other configuration values can be ignored.
func NewProducer ¶
NewProducer returns a new Producer configuration struct, containing the values passed into the function. If any error occurs during configuration validation, an error is returned as the second argument.
func TestNewProducer ¶
TestNewProducer returns a new producer configuration struct, optionally with the default values removed.
func (Producer) FromEnv ¶
FromEnv populates the Producer based on the environment variables set with the prefix based on the producer name.
func (Producer) WithOptions ¶
WithOptions takes the current Producer, applies the supplied Options, and returns the resulting Producer.