rabbit

package
v1.11.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2023 License: MIT Imports: 9 Imported by: 2

Documentation

Index

Constants

View Source
const DefaultExchange = "amq.direct"

Variables

View Source
var (
	ErrNotConnected = errors.New("not connected to a server")
	ErrTxBufferFull = errors.New("tx queue is full")
)
View Source
var MaxTxBuffer = 1024
View Source
var TxWorkerOption = func(workNum int) SessionOption {
	return func(session *sessionRabbit) {
		log.Fine("set tx workers:%d", workNum)
		session.multiTx = workNum
	}
}
View Source
var WithPublishConfirm = func(usePublishConfirm bool) SessionOption {
	return func(session *sessionRabbit) {
		session.publishConfirm = usePublishConfirm
	}
}

WithPublishConfirm 启用publish confirm模式

Functions

This section is empty.

Types

type Delay

type Delay struct {
	// contains filtered or unexported fields
}

func NewDelay

func NewDelay(min int, max int, randomIncrement bool) (d *Delay)

func (*Delay) Delay

func (delay *Delay) Delay(after ...func(curDelayDuration int))

func (*Delay) Reset

func (delay *Delay) Reset()

type Message

type Message struct {
	Payload interface{}
	Router  Route
	Options []PublishOption
}

func (*Message) SetOption

func (message *Message) SetOption(option ...PublishOption)

type PublishOption

type PublishOption func(m amqp.Publishing, payload interface{}) (amqp.Publishing, error)
var (
	JsonEncodeOption PublishOption = func(m amqp.Publishing, payload interface{}) (p amqp.Publishing, err error) {
		bs, err := json.Marshal(payload)
		if err != nil {
			return
		}
		p = m
		p.ContentType = "application/json"
		p.Body = bs
		return
	}
	TextEncodeOption PublishOption = func(m amqp.Publishing, payload interface{}) (p amqp.Publishing, err error) {
		var bs []byte
		switch payload.(type) {
		case string:
			s := payload.(string)
			bs = []byte(s)
		case []byte:
			bs = payload.([]byte)
		default:
			err = fmt.Errorf("need txt payload")
			log.Fine(err)
			return
		}
		p = m
		p.ContentType = "text/plain"
		p.Body = bs
		return
	}
	UUIDMsgIdOption PublishOption = func(m amqp.Publishing, payload interface{}) (p amqp.Publishing, err error) {
		p = m
		id, err := uuid.NewRandom()
		if err != nil {
			return
		}
		p.MessageId = id.String()
		return
	}
)

type Route

type Route struct {
	Exchange string
	Key      string
	Queue    string
}

type RxHandler

type RxHandler func(msg amqp.Delivery, txHandler TxHandler, serverContext ServerContext) (err error)

type ServerContext added in v1.8.4

type ServerContext struct {
	Consumers int
	Messages  int
	Queue     string
	Router    Route
}

type Session

type Session interface {
	Publish(m Message) error
	Close() error
	Subscribe(r Route, rxHandler RxHandler)
}

func New

func New(ctx context.Context, addr string, options ...SessionOption) Session

type SessionOption added in v1.8.7

type SessionOption func(session *sessionRabbit)

type TxHandler

type TxHandler func(m Message) (err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL