Documentation
¶
Index ¶
- Variables
- func MustNew(opts ...Option) []goduck.Stream
- func New(opts ...Option) ([]goduck.Stream, error)
- type KafkaOption
- func WithKafkaBrokers(brokers ...string) KafkaOption
- func WithKafkaConfigMap(cm kafka.ConfigMap) KafkaOption
- func WithKafkaConfigValue(name string, value kafka.ConfigValue) KafkaOption
- func WithKafkaGroupID(id string) KafkaOption
- func WithKafkaSaslPlainAuthentication(username, password string) KafkaOption
- func WithKafkaTopic(topics ...string) KafkaOption
- type Option
- type Provider
Constants ¶
This section is empty.
Variables ¶
var ( // ErrZeroStreams is returned if the chosen number of streams is less than one. ErrZeroStreams = errors.New("invalid number of streams, must be higher than zero") // ErrNoProvider is returned if no provider is set. ErrNoProvider = errors.New("no provider configured") // ErrConflictionAppOptions is returned with inputstream is provided with conflicting app configuration, like // setting a specific app.App but also using the defualt app. ErrConflictionAppOptions = errors.New("confliction app options detected") )
var ( // ErrEmptyKafkaTopic is returned when the provided topic is an empty string. ErrEmptyKafkaTopic = errors.New("empty kafka topic") // ErrNoKafkaTopic is returned when the topics are not set. ErrNoKafkaTopic = errors.New("no kafka topic provided") )
Functions ¶
func New ¶
New returns a new slice of goduck streams. By default it returns 1 stream. This can be configured by passing WithNumberOfStreams option. The new streams uses the default foundationkit app. This can be configured by passing WithApp option. The shudown priority defaults to zero. This can be configured by passing WithShutdownPriority option.
Types ¶
type KafkaOption ¶
type KafkaOption func(*kafkaProvider)
KafkaOption configures the kafka provider.
func WithKafkaBrokers ¶
func WithKafkaBrokers(brokers ...string) KafkaOption
WithKafkaBrokers sets the kafka topics for the input stream.
func WithKafkaConfigMap ¶
func WithKafkaConfigMap(cm kafka.ConfigMap) KafkaOption
WithKafkaConfigMap configures goducks inner librdkafka. All values in the provided ConfigMap are copied to the ConfigMap inside the provider. So it's to call this with maps with different keys, otherwise keys will be replaced.
func WithKafkaConfigValue ¶
func WithKafkaConfigValue(name string, value kafka.ConfigValue) KafkaOption
WithKafkaConfigValue configures goducks inner librdkafka. The provided value sets or replaces the existing value in the ConfigMap inside the provider.
func WithKafkaGroupID ¶
func WithKafkaGroupID(id string) KafkaOption
WithKafkaGroupID sets the kafka group id.
func WithKafkaSaslPlainAuthentication ¶
func WithKafkaSaslPlainAuthentication(username, password string) KafkaOption
WithKafkaSaslPlainAuthentication configures kafka sasl authentication.
func WithKafkaTopic ¶
func WithKafkaTopic(topics ...string) KafkaOption
WithKafkaTopic sets the kafka topic or topics.
type Option ¶
type Option func(*options) error
Option allows for configuring the input stream.
func WithApp ¶
WithApp tells the pipeline that we are using a foundationkit's app so it may register the shutdown handler for the streams it creates on the given app. This will override the default which is to use de default app.
func WithKafkaProvider ¶
func WithKafkaProvider(opts ...KafkaOption) Option
WithKafkaProvider configures the input stream with a kafka provider.
func WithNumberOfStreams ¶
WithNumberOfStreams sets the number of streams returned by New. Defaults to 1 stream if not provided.
func WithShutdownPriority ¶
func WithShutdownPriority(p app.ShutdownPriority) Option
WithShutdownPriority sets the shutdown priority when configuring the graceful shudown for the input streams. If not provided, the default is zero, the lowest priority.