Documentation ¶
Index ¶
- Constants
- func NewProduceRequestHandler(r *watermillmessage.Router, handler watermillmessage.HandlerFunc, ...) kafkaproxy.KeyHandler
- func NewProxy(c *ProxyConfig, r *watermillmessage.Router) (proxy.Proxy, error)
- type ProxyConfig
- type ProxyOption
- func WithDebug(enabled bool) ProxyOption
- func WithDialAddressMapping(mapping []string) ProxyOption
- func WithExtra(extra []string) ProxyOption
- func WithMessageHandler(handler watermillmessage.HandlerFunc) ProxyOption
- func WithMessagePublisher(publisher watermillmessage.Publisher, topic string) ProxyOption
- func WithMessageSubscriber(subscriber watermillmessage.Subscriber) ProxyOption
- type TLSConfig
Constants ¶
const (
// RequestAPIKeyProduce is the Kafka request API Key for the Produce Request.
RequestAPIKeyProduce = 0
)
Kafka request API Keys. See https://kafka.apache.org/protocol#protocol_api_keys.
Variables ¶
This section is empty.
Functions ¶
func NewProduceRequestHandler ¶
func NewProduceRequestHandler(r *watermillmessage.Router, handler watermillmessage.HandlerFunc, publisher watermillmessage.Publisher, publishToTopic string) kafkaproxy.KeyHandler
NewProduceRequestHandler creates a new request key handler for the Produce Request.
func NewProxy ¶
func NewProxy(c *ProxyConfig, r *watermillmessage.Router) (proxy.Proxy, error)
NewProxy creates a new Kafka Proxy based on a given configuration.
Types ¶
type ProxyConfig ¶
type ProxyConfig struct { // Address for this proxy. Should be reachable by your clients. Most probably a domain name. // If not set, 0.0.0.0 will be used. Address string BrokersMapping []string DialAddressMapping []string ExtraConfig []string MessageHandler watermillmessage.HandlerFunc MessagePublisher watermillmessage.Publisher PublishToTopic string MessageSubscriber watermillmessage.Subscriber TLS *TLSConfig Debug bool }
ProxyConfig holds the configuration for the Kafka Proxy.
func NewProxyConfig ¶
func NewProxyConfig(brokersMapping []string, opts ...ProxyOption) (*ProxyConfig, error)
NewProxyConfig creates a new ProxyConfig.
func (*ProxyConfig) Validate ¶
func (c *ProxyConfig) Validate() error
Validate validates ProxyConfig.
type ProxyOption ¶
type ProxyOption func(*ProxyConfig) error
ProxyOption represents a functional configuration for the Proxy.
func WithDialAddressMapping ¶
func WithDialAddressMapping(mapping []string) ProxyOption
WithDialAddressMapping configures Dial Address Mapping.
func WithMessageHandler ¶
func WithMessageHandler(handler watermillmessage.HandlerFunc) ProxyOption
WithMessageHandler configures a handler that will handle all incoming messages.
func WithMessagePublisher ¶
func WithMessagePublisher(publisher watermillmessage.Publisher, topic string) ProxyOption
WithMessagePublisher configures a publisher where the messages will be published after being handled.
func WithMessageSubscriber ¶
func WithMessageSubscriber(subscriber watermillmessage.Subscriber) ProxyOption
WithMessageSubscriber configures a subscriber subscribed to the messages published by the configured c.MessagePublisher.