queue

package
v2.0.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 4, 2023 License: Apache-2.0 Imports: 3 Imported by: 12

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func WithRabbitMqConsumeOptionsBindingExchangeName

func WithRabbitMqConsumeOptionsBindingExchangeName(name string) func(*ConsumeOptions)

WithRabbitMqConsumeOptionsBindingExchangeName returns a function that sets the exchange name the queue will be bound to

func WithRabbitMqConsumeOptionsBindingExchangeType

func WithRabbitMqConsumeOptionsBindingExchangeType(kind string) func(*ConsumeOptions)

WithRabbitMqConsumeOptionsBindingExchangeType returns a function that sets the binding exchange kind/type

func WithRabbitMqConsumeOptionsBindingRoutingKeys

func WithRabbitMqConsumeOptionsBindingRoutingKeys(keys []string) func(*ConsumeOptions)

WithRabbitMqConsumeOptionsBindingRoutingKeys returns a function that sets the exchange name the RoutingKeys will be bound to

func WithRabbitMqConsumeOptionsConcurrency

func WithRabbitMqConsumeOptionsConcurrency(concurrency int) func(*ConsumeOptions)

WithRabbitMqConsumeOptionsConcurrency returns a function that sets the concurrency, which means that many goroutines will be spawned to run the provided handler on messages

func WithRabbitMqConsumeOptionsConsumerAutoAck

func WithRabbitMqConsumeOptionsConsumerAutoAck(autoAck bool) func(*ConsumeOptions)

WithRabbitMqConsumeOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer if unset the default will be used (false)

func WithRabbitMqConsumeOptionsConsumerName

func WithRabbitMqConsumeOptionsConsumerName(consumerName string) func(*ConsumeOptions)

WithRabbitMqConsumeOptionsConsumerName returns a function that sets the name on the server of this consumer if unset a random name will be given

func WithRabbitMqConsumeOptionsExchangeDeclare added in v2.0.15

func WithRabbitMqConsumeOptionsExchangeDeclare(declare bool) func(*ConsumeOptions)

WithRabbitMqConsumeOptionsExchangeDeclare returns a function that sets the exchange is a passive exchange

func WithRabbitMqConsumeOptionsExchangeDurable added in v2.0.16

func WithRabbitMqConsumeOptionsExchangeDurable(durable bool) func(*ConsumeOptions)

WithRabbitMqConsumeOptionsExchangeDurable returns a function that sets the exchange is a durable exchange

func WithRabbitMqConsumeOptionsExchangePassive added in v2.0.15

func WithRabbitMqConsumeOptionsExchangePassive(passive bool) func(*ConsumeOptions)

WithRabbitMqConsumeOptionsExchangePassive returns a function that sets the exchange is a passive exchange

func WithRabbitMqConsumeOptionsQOSPrefetch

func WithRabbitMqConsumeOptionsQOSPrefetch(prefetchCount int) func(*ConsumeOptions)

WithRabbitMqConsumeOptionsQOSPrefetch returns a function that sets the prefetch count, which means that many messages will be fetched from the server in advance to help with throughput. This doesn't affect the handler, messages are still processed one at a time.

func WithRabbitMqPublishGroupName added in v2.0.18

func WithRabbitMqPublishGroupName(name string) func(*PublishOptions)

WithRabbitMqPublishGroupName set group name address

func WithRabbitMqPublishOptionsAppID

func WithRabbitMqPublishOptionsAppID(appID string) func(*PublishOptions)

WithRabbitMqPublishOptionsAppID returns a function that sets the application id

func WithRabbitMqPublishOptionsContentType

func WithRabbitMqPublishOptionsContentType(contentType string) func(*PublishOptions)

WithRabbitMqPublishOptionsContentType returns a function that sets the content type, i.e. "application/json"

func WithRabbitMqPublishOptionsExchange

func WithRabbitMqPublishOptionsExchange(exchange string) func(*PublishOptions)

WithRabbitMqPublishOptionsExchange returns a function that sets the exchange to publish to

func WithRabbitMqPublishOptionsMessageID

func WithRabbitMqPublishOptionsMessageID(messageID string) func(*PublishOptions)

WithRabbitMqPublishOptionsMessageID returns a function that sets the message identifier

func WithRabbitMqPublishOptionsReplyTo

func WithRabbitMqPublishOptionsReplyTo(replyTo string) func(*PublishOptions)

WithRabbitMqPublishOptionsReplyTo returns a function that sets the reply to field

func WithRabbitMqPublishOptionsUserID

func WithRabbitMqPublishOptionsUserID(userID string) func(*PublishOptions)

WithRabbitMqPublishOptionsUserID returns a function that sets the user id i.e. "user"

func WithRocketMqAutoCommit

func WithRocketMqAutoCommit(auto bool) func(*ConsumeOptions)

func WithRocketMqGroupName

func WithRocketMqGroupName(name string) func(*ConsumeOptions)

WithRocketMqGroupName set group name address

func WithRocketMqMaxReconsumeTimes

func WithRocketMqMaxReconsumeTimes(times int32) func(*ConsumeOptions)

WithRocketMqMaxReconsumeTimes set MaxReconsumeTimes of options, if message reconsume greater than MaxReconsumeTimes, it will be sent to retry or dlq topic. more info reference by examples/consumer/retry.

func WithRocketMqPublishGroupName

func WithRocketMqPublishGroupName(name string) func(*PublishOptions)

WithRocketMqPublishGroupName set group name address

func WithRocketMqPublishRetry

func WithRocketMqPublishRetry(times int) func(*PublishOptions)

WithRocketMqPublishRetry return an Option that specifies the retry times when send failed.

Types

type BindingExchangeOptions

type BindingExchangeOptions struct {
	Name       string
	Kind       string // possible values: empty string for default exchange or direct, topic, fanout
	Durable    bool
	AutoDelete bool
	Internal   bool
	NoWait     bool
	Passive    bool // if false, a missing exchange will be created on the server
	Args       rabbitmq.Table
	Declare    bool
}

BindingExchangeOptions are used when binding to an exchange. it will verify the exchange is created before binding to it.

type ConsumeOptions

type ConsumeOptions struct {
	// rabbitmq
	BindingRoutingKeys []string
	BindingExchange    *BindingExchangeOptions
	Concurrency        int
	QOSPrefetch        int
	ConsumerName       string
	ConsumerAutoAck    bool
	// rocketmq
	GroupName         string
	MaxReconsumeTimes int32
	AutoCommit        bool
}

ConsumeOptions are used to describe how a new consumer will be created.

func GetDefaultConsumeOptions

func GetDefaultConsumeOptions() ConsumeOptions

GetDefaultConsumeOptions descibes the options that will be used when a value isn't provided

type ConsumerFunc

type ConsumerFunc func(ctx context.Context, msg message.IMessage) error

type IQueue

type IQueue interface {
	String() string
	Publish(ctx context.Context, message message.IMessage, optionFuncs ...func(*PublishOptions)) error

	RpcRequest(ctx context.Context, key string, data []byte, optionFuncs ...func(*PublishOptions)) ([]byte, error)
	Consumer(ctx context.Context, name string, f ConsumerFunc, optionFuncs ...func(*ConsumeOptions))
	Run(ctx context.Context)
	Shutdown(ctx context.Context)
}

type PublishOptions

type PublishOptions struct {
	Exchange string
	// MIME content type
	ContentType string
	// address to reply to (ex: RPC)
	ReplyTo string
	// message identifier
	MessageID string
	// creating user id - ex: "guest"
	UserID string
	// creating application id
	AppID string

	// rocketmq
	GroupName  string
	RetryTimes int
}

PublishOptions are used to control how data is published

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL