Documentation
¶
Overview ¶
Package mq is an interface used for asynchronous messaging, the default implementation is kafka
Index ¶
- Constants
- Variables
- type Codecer
- type Event
- type Handler
- type JsonCodec
- type Logger
- type MQ
- type MQConfig
- type Message
- type Option
- func Addresses(addrs ...string) Option
- func Codec(c Codecer) Option
- func Context(c context.Context) Option
- func ContextWithValue(k, v interface{}) Option
- func ErrorHandler(h Handler) Option
- func Log(log Logger) Option
- func Otel(b bool) Option
- func Sasl(user, pass, algorithm string) Option
- func Secure(b bool) Option
- func SetTLSConfig(t *tls.Config) Option
- func Version(version sarama.KafkaVersion) Option
- type Options
- type PublishOption
- type PublishOptions
- type Strategy
- type SubscribeOption
- type SubscribeOptions
- type Subscriber
- type TLSConfig
Constants ¶
const ( StrategyKindRetry = "retry" StrategyKindDoOnce = "do_once" StrategyKindSendBack = "send_back" )
Variables ¶
var ( StrategyRetry = strategyImpl(StrategyKindRetry) StrategyDoOnce = strategyImpl(StrategyKindDoOnce) StrategySendBack = strategyImpl(StrategyKindSendBack) )
Functions ¶
This section is empty.
Types ¶
type Codecer ¶
type Codecer interface { Marshal(interface{}) ([]byte, error) Unmarshal([]byte, interface{}) error String() string }
Codec is a simple encoding interface used for the mq/transport
type Event ¶
type Event interface { // Topic return the topic of the message Topic() string // Message return the message body Message() *Message // ACK message reply operation Ack() error // Error get the error of message consumption Error() error // Extra the important information other than the message body Extra() map[string]interface{} }
Event is given to a subscription handler for processing.
type Handler ¶
Handler is used to process messages via a subscription of a topic. The handler is passed a publication interface which contains the message and optional Ack method to acknowledge receipt of the message.
type Logger ¶
type Message ¶
type Message struct { Header map[string]string Body []byte // contains filtered or unexported fields }
Message is the message entity.
func (Message) MessageKey ¶
MessageKey get the flag that represents the message
func (*Message) SetMessageKey ¶
SetMessageKey set a flag that represents the message
type Option ¶
type Option func(*Options)
func ContextWithValue ¶
func ContextWithValue(k, v interface{}) Option
func Version ¶
func Version(version sarama.KafkaVersion) Option
Version set the kafka version for sarama
type Options ¶
type Options struct { Addresses []string Version sarama.KafkaVersion Secure bool Codec Codecer Username string Password string Algorithm string // Handler executed when error happens in mq message processing ErrorHandler Handler TLSConfig *tls.Config // Other options for implementations of the interface // can be stored in a context Context context.Context Log Logger // Whether otel tracing is enabled Otel bool }
type PublishOption ¶
type PublishOption func(*PublishOptions)
func PublishContext ¶
func PublishContext(ctx context.Context) PublishOption
PublishContext set context
type PublishOptions ¶
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func DisableAutoAck ¶
func DisableAutoAck() SubscribeOption
DisableAutoAck will disable auto acking of messages after they have been handled.
func Queue ¶
func Queue(name string) SubscribeOption
Queue sets the name of the queue to share messages on
func SubscribeContext ¶
func SubscribeContext(ctx context.Context) SubscribeOption
SubscribeContext set context
func SubscribeRetryNum ¶
func SubscribeRetryNum(v int) SubscribeOption
SubscribeRetryNum sets RetryNum
func SubscribeStrategy ¶
func SubscribeStrategy(v Strategy) SubscribeOption
SubscribeStrategy sets Strategy
type SubscribeOptions ¶
type SubscribeOptions struct { // AutoAck defaults to true. When a handler returns // with a nil error the message is receipt already. AutoAck bool // Subscribers with the same queue name // will create a shared subscription where each // receives a subset of messages. Queue string // RetryNum specifies the one that retry when handle failed RetryNum int // Strategy specifies the one for handling message Strategy Strategy // Other options for implementations of the interface // can be stored in a context Context context.Context }
func NewSubscribeOptions ¶
func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions
type Subscriber ¶
type Subscriber interface { Options() SubscribeOptions Topics() []string Unsubscribe() error }
Subscriber is a convenience return type for the Subscribe method.
type TLSConfig ¶
type TLSConfig struct { // CertFile the optional certificate file for client authentication CertFile string `json:"cert_file,omitempty"` // KeyFile the optional key file for client authentication KeyFile string `json:"key_file,omitempty"` // CAFile the optional certificate authority file for TLS client authentication CAFile string `json:"ca_file,omitempty"` // VerifySSL optional verify ssl certificates chain VerifySSL bool `json:"verify_ssl,omitempty"` }