manager

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2024 License: AGPL-3.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	TaskManage  TaskManager
	EventManage EventManager
	DlqManage   DlqManager
)
View Source
var (
	ErrMsgHandlerNotExist = errors.New("msg handler not exist")

	ErrStreamNotExist = errors.New("stream not exist")
)

Functions

func UpdateTestBackOff

func UpdateTestBackOff()

UpdateTestBackOff Most test samples are suspended for 30 seconds to wait for message consumption, and test whether retry and dead letter queues work properly, so to ensure that the test samples execute properly, you need to retry at least 10 times within 30 seconds.

Types

type ConsumerManger added in v1.0.1

type ConsumerManger interface {
	NewConsumer(context.Context, func(*jetstream.ConsumerConfig) error, consumerMessageHandler) (
		jetstream.Consumer, error,
	)
	Consume(context.Context, jetstream.Consumer, consumerMessageHandler) error
	ReConsume(ctx context.Context, consumerName string, msg *jetstream.RawStreamMsg) error
	UpdateAllConsumerConfig(func(*jetstream.ConsumerConfig) error, context.Context) error
}

ConsumerManger is used to manage the consumer group in the stream It records the consumption method of the consumption group, which will help with the retry of messages in the dead letter queue

func NewConsumerManger added in v1.0.1

func NewConsumerManger(_ context.Context, stream jetstream.Stream, consumer jetstream.Consumer, logger *zap.Logger) (
	ConsumerManger, error,
)

type DlqManager

type DlqManager interface {
	RepublishBatch(batch int, ctx context.Context) (msgNum int, err error)
}

The DlqManager is used to manage dead letters. Use a dead letter queue by passing in a jetstream.Stream registration. Messages in DlqManager will use stream_seq to query the complete message on the registered stream, provided that the message still exists, and the message will be published as a new message on the registered stream.

type Event

type Event string
const EventRetryTriggerTask Event = "retry_trigger_task"

type EventManager

type EventManager interface {
	Publish(event Event, payload []byte) bool
	Subscribe(event Event, triggerTask Task, fetchTaskData func(eventData []byte) ([]byte, error))
	SubscribeToNewConsumer(event Event, name string, handler MessageHandler)
	// contains filtered or unexported methods
}

EventManager is used to manage events. EventManager has a dedicated consumer group that publishes tasks to TaskManage when events are triggered, and new consumer groups can be created to Consume events. These consumer groups use the same stream.

type MessageHandler

type MessageHandler func(payload []byte) error

type MsgType

type MsgType interface {
	// contains filtered or unexported methods
}

type RetryTriggerTask

type RetryTriggerTask struct {
	Task Task
	Data []byte
}

type Task

type Task string

type TaskManager

type TaskManager interface {
	Publish(task Task, payload []byte) bool
	Subscribe(task Task, handler MessageHandler)
	GetMessageHandler(task Task) (MessageHandler, error)
}

TaskManager is used to manage tasks. All tasks will be placed in a consumer group.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL