mq

package
v0.0.0-...-b2af994 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 14, 2023 License: Apache-2.0 Imports: 7 Imported by: 5

Documentation

Overview

Package mq is an interface used for asynchronous messaging, the default implementation is kafka

Index

Constants

View Source
const (
	StrategyKindRetry    = "retry"
	StrategyKindDoOnce   = "do_once"
	StrategyKindSendBack = "send_back"
)

Variables

View Source
var (
	StrategyRetry    = strategyImpl(StrategyKindRetry)
	StrategyDoOnce   = strategyImpl(StrategyKindDoOnce)
	StrategySendBack = strategyImpl(StrategyKindSendBack)
)

Functions

This section is empty.

Types

type Codecer

type Codecer interface {
	Marshal(interface{}) ([]byte, error)
	Unmarshal([]byte, interface{}) error
	String() string
}

Codec is a simple encoding interface used for the mq/transport

type Event

type Event interface {
	// Topic return the topic of the message
	Topic() string
	// Message return the message body
	Message() *Message
	// ACK message reply operation
	Ack() error
	// Error get the error of message consumption
	Error() error
	// Extra the important information other than the message body
	Extra() map[string]interface{}
}

Event is given to a subscription handler for processing.

type Handler

type Handler func(Event) error

Handler is used to process messages via a subscription of a topic. The handler is passed a publication interface which contains the message and optional Ack method to acknowledge receipt of the message.

type JsonCodec

type JsonCodec struct{}

func (JsonCodec) Marshal

func (jc JsonCodec) Marshal(v interface{}) ([]byte, error)

func (JsonCodec) String

func (jc JsonCodec) String() string

func (JsonCodec) Unmarshal

func (jc JsonCodec) Unmarshal(data []byte, v interface{}) error

type Logger

type Logger interface {
	Info(args ...interface{})
	Warn(args ...interface{})
	Error(args ...interface{})
	Errorf(format string, args ...interface{})
	Infof(format string, args ...interface{})
}

func NewLogger

func NewLogger() Logger

type MQ

type MQ interface {
	Init(...Option) error
	Options() Options
	Address() string
	Connect() error
	Disconnect() error
	Publish(topic string, m *Message, opts ...PublishOption) error
	Subscribe(h Handler, topics []string, opts ...SubscribeOption) (Subscriber, error)
	String() string
}

type MQConfig

type MQConfig struct {
	// Addresses of the mq cluster
	Addresses []string `json:"addresses,omitempty"`

	TLSConfig
}

type Message

type Message struct {
	Header map[string]string
	Body   []byte
	// contains filtered or unexported fields
}

Message is the message entity.

func (Message) MessageKey

func (msg Message) MessageKey() string

MessageKey get the flag that represents the message

func (*Message) SetMessageKey

func (msg *Message) SetMessageKey(key string)

SetMessageKey set a flag that represents the message

type Option

type Option func(*Options)

func Addresses

func Addresses(addrs ...string) Option

Addresses set the host addresses to be used by the mq

func Codec

func Codec(c Codecer) Option

Codec sets the codec used for encoding/decoding used where

func Context

func Context(c context.Context) Option

func ContextWithValue

func ContextWithValue(k, v interface{}) Option

func ErrorHandler

func ErrorHandler(h Handler) Option

ErrorHandler set the error handler

func Log

func Log(log Logger) Option

func Sasl

func Sasl(user, pass, algorithm string) Option

user/password/algorithm are needed by SASL auth

func Secure

func Secure(b bool) Option

Secure communication with the mq

func SetTLSConfig

func SetTLSConfig(t *tls.Config) Option

SetTLSConfig Specify TLS Config

func Version

func Version(version sarama.KafkaVersion) Option

Version set the kafka version for sarama

type Options

type Options struct {
	Addresses []string
	Version   sarama.KafkaVersion
	Secure    bool
	Codec     Codecer
	Username  string
	Password  string
	Algorithm string

	// Handler executed when error happens in mq message processing
	ErrorHandler Handler

	TLSConfig *tls.Config

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context

	Log Logger
}

type PublishOption

type PublishOption func(*PublishOptions)

func PublishContext

func PublishContext(ctx context.Context) PublishOption

PublishContext set context

type PublishOptions

type PublishOptions struct {
	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

type Strategy

type Strategy interface {
	Strategy() string
}

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func DisableAutoAck

func DisableAutoAck() SubscribeOption

DisableAutoAck will disable auto acking of messages after they have been handled.

func Queue

func Queue(name string) SubscribeOption

Queue sets the name of the queue to share messages on

func SubscribeContext

func SubscribeContext(ctx context.Context) SubscribeOption

SubscribeContext set context

func SubscribeRetryNum

func SubscribeRetryNum(v int) SubscribeOption

SubscribeRetryNum sets RetryNum

func SubscribeStrategy

func SubscribeStrategy(v Strategy) SubscribeOption

SubscribeStrategy sets Strategy

type SubscribeOptions

type SubscribeOptions struct {
	// AutoAck defaults to true. When a handler returns
	// with a nil error the message is receipt already.
	AutoAck bool

	// Subscribers with the same queue name
	// will create a shared subscription where each
	// receives a subset of messages.
	Queue string

	// RetryNum specifies the one that retry when handle failed
	RetryNum int

	// Strategy specifies the one for handling message
	Strategy Strategy

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

type Subscriber

type Subscriber interface {
	Options() SubscribeOptions
	Topics() []string
	Unsubscribe() error
}

Subscriber is a convenience return type for the Subscribe method.

type TLSConfig

type TLSConfig struct {
	// CertFile the optional certificate file for client authentication
	CertFile string `json:"cert_file,omitempty"`

	// KeyFile the optional key file for client authentication
	KeyFile string `json:"key_file,omitempty"`

	// CAFile  the optional certificate authority file for TLS client authentication
	CAFile string `json:"ca_file,omitempty"`

	// VerifySSL optional verify ssl certificates chain
	VerifySSL bool `json:"verify_ssl,omitempty"`
}

func (*TLSConfig) TLSConfig

func (tc *TLSConfig) TLSConfig() (t *tls.Config, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL