Documentation ¶
Index ¶
- type ConnectionConfig
- type ConnectionsConfig
- type Container
- func (cont *Container) AddConnection(name string, cfg *ConnectionConfig) error
- func (cont *Container) CreateConsumer(connectionName string, consumerGroup string, topics []string) (*kafka.Reader, error)
- func (cont *Container) CreateProducer(connectionName string) (*kafka.Writer, error)
- func (cont *Container) Exists(name string) bool
- type StartOffset
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ConnectionConfig ¶
type ConnectionConfig struct { // Broker Address. Comma-separated list of "host:port" expected Address string `mapstructure:"address"` StartOffset StartOffset `mapstructure:"start_offset"` }
func (*ConnectionConfig) Validate ¶
func (c *ConnectionConfig) Validate() error
type ConnectionsConfig ¶
type ConnectionsConfig map[string]*ConnectionConfig
func (*ConnectionsConfig) Validate ¶
func (c *ConnectionsConfig) Validate() error
type Container ¶
type Container struct {
// contains filtered or unexported fields
}
Container is a simple container for holding named kafka connections.
func NewContainer ¶
func NewContainer() *Container
func (*Container) AddConnection ¶
func (cont *Container) AddConnection(name string, cfg *ConnectionConfig) error
AddConnection adds a named connection to a container. It's possible to create consumer or producer on created connection later using CreateProducer ot CreateConsumer.
func (*Container) CreateConsumer ¶
func (cont *Container) CreateConsumer( connectionName string, consumerGroup string, topics []string, ) (*kafka.Reader, error)
CreateConsumer creates a new kafka consumer by a connection name and subscribes it to a given topic
func (*Container) CreateProducer ¶
CreateProducer creates a new kafka producer by a connection name
type StartOffset ¶
type StartOffset string
const ( StartOffsetFirst StartOffset = "first" StartOffsetLast StartOffset = "last" )
Click to show internal directories.
Click to hide internal directories.