Documentation ¶
Index ¶
- func NoopCommit()
- type BrokerAcknowledgment
- type Controller
- type ControllerOption
- func WithAutoCommit(enabled bool) ControllerOption
- func WithConnectionTest(enabled bool) ControllerOption
- func WithGroupID(groupID string) ControllerOption
- func WithLogger(logger extensions.Logger) ControllerOption
- func WithMaxBytes(maxBytes int) ControllerOption
- func WithPartition(partition int) ControllerOption
- func WithSasl(sasl sasl.Mechanism) ControllerOption
- func WithTLS(tls *tls.Config) ControllerOption
- type MessagesHandler
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type BrokerAcknowledgment ¶ added in v0.35.0
type BrokerAcknowledgment struct {
// contains filtered or unexported fields
}
BrokerAcknowledgment for kafka broker. Naks are not supported on kafka side. Committing the message is the only way to handling the message. Proper errorhandling needs to be done by the subscriber.
func (BrokerAcknowledgment) AckMessage ¶ added in v0.35.0
func (k BrokerAcknowledgment) AckMessage()
AckMessage acknowledges the message.
func (BrokerAcknowledgment) NakMessage ¶ added in v0.35.0
func (k BrokerAcknowledgment) NakMessage()
NakMessage negatively acknowledges the message.
type Controller ¶
type Controller struct {
// contains filtered or unexported fields
}
Controller is the Kafka implementation for asyncapi-codegen.
func NewController ¶
func NewController(hosts []string, options ...ControllerOption) (*Controller, error)
NewController creates a new KafkaController that fulfill the BrokerLinker interface.
func (*Controller) Publish ¶
func (c *Controller) Publish(ctx context.Context, channel string, um extensions.BrokerMessage) error
Publish a message to the broker.
func (*Controller) Subscribe ¶
func (c *Controller) Subscribe(ctx context.Context, channel string) (extensions.BrokerChannelSubscription, error)
Subscribe to messages from the broker.
type ControllerOption ¶
type ControllerOption func(controller *Controller)
ControllerOption is a function that can be used to configure a Kafka controller Examples: WithGroupID(), WithPartition(), WithMaxBytes(), WithLogger().
func WithAutoCommit ¶ added in v0.35.0
func WithAutoCommit(enabled bool) ControllerOption
WithAutoCommit set if a AutoCommitMessagesHandler or ManualCommitMessagesHandler should be used for processing the messages.
func WithConnectionTest ¶ added in v0.36.0
func WithConnectionTest(enabled bool) ControllerOption
WithConnectionTest set the connectionTest feature toggle to configure if NewController should validate the connection on creation.
func WithGroupID ¶
func WithGroupID(groupID string) ControllerOption
WithGroupID set a custom group ID for channel subscription.
func WithLogger ¶
func WithLogger(logger extensions.Logger) ControllerOption
WithLogger set a custom logger that will log operations on broker controller.
func WithMaxBytes ¶
func WithMaxBytes(maxBytes int) ControllerOption
WithMaxBytes set the maximum size of a message.
func WithPartition ¶
func WithPartition(partition int) ControllerOption
WithPartition set the partition to use for the topic.
func WithSasl ¶ added in v0.36.0
func WithSasl(sasl sasl.Mechanism) ControllerOption
WithSasl set the sasl.Mechanism that will be used for kafka.Dial, kafka.Reader and kafka.Writer.
func WithTLS ¶ added in v0.36.0
func WithTLS(tls *tls.Config) ControllerOption
WithTLS set the tls.Config that will be used for kafka.Dial, kafka.Reader and kafka.Writer.
type MessagesHandler ¶ added in v0.35.0
type MessagesHandler func( ctx context.Context, r *kafka.Reader, sub extensions.BrokerChannelSubscription, )
MessagesHandler is a function that can be used to process messages from the broker.