rocketmq

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2023 License: MIT, MIT Imports: 9 Imported by: 0

Documentation

Overview

Package rocketmq fork from github.com/yflau/watermill-rocketmq@ea447b0

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func DefaultSendAsyncCallback

func DefaultSendAsyncCallback(ctx context.Context, result *primitive.SendResult, err error)

DefaultSendAsyncCallback default SendAsyncCallback

Types

type DefaultMarshaler

type DefaultMarshaler struct{}

DefaultMarshaler default message mashaler

func (DefaultMarshaler) Marshal

func (DefaultMarshaler) Marshal(topic string, msg *message.Message) ([]*primitive.Message, error)

Marshal implement MarshalerUnmarshaler

func (DefaultMarshaler) Unmarshal

func (DefaultMarshaler) Unmarshal(msgs []*primitive.MessageExt) ([]*message.Message, error)

Unmarshal implement MarshalerUnmarshaler

type Marshaler

type Marshaler interface {
	Marshal(topic string, msg *message.Message) ([]*primitive.Message, error)
}

Marshaler marshals Watermill's message to Kafka message.

type MarshalerUnmarshaler

type MarshalerUnmarshaler interface {
	Marshaler
	Unmarshaler
}

MarshalerUnmarshaler un/marshaler interface for rocketmq message

type Publisher

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher the rocketmq publisher

func NewPublisher

func NewPublisher(
	config PublisherConfig,
	logger watermill.LoggerAdapter,
) (*Publisher, error)

NewPublisher creates a new RocketMQ Publisher.

func (*Publisher) Close

func (p *Publisher) Close() error

Close closes the publisher

func (*Publisher) Publish

func (p *Publisher) Publish(ctx context.Context, topic string, msgs ...*message.Message) error

Publish publishes message to RocketMQ.

Publish is blocking and wait for ack from RocketMQ. When one of messages delivery fails - function is interrupted.

type PublisherConfig

type PublisherConfig struct {
	GroupName             string
	InstanceName          string
	Namespace             string
	SendMsgTimeout        time.Duration
	VIPChannelEnabled     bool
	RetryTimes            int
	Interceptors          []primitive.Interceptor
	Selector              producer.QueueSelector
	Credentials           *primitive.Credentials
	DefaultTopicQueueNums int
	CreateTopicKey        string

	SendMode SendMode // ["sync", "async", "oneway"]
	SendAsyncCallback

	// Marshaler is used to marshal messages from Watermill format into Rocketmq format.
	Marshaler Marshaler
}

PublisherConfig the rocketmq publisher config

func (*PublisherConfig) Options

func (c *PublisherConfig) Options() []producer.Option

Options generate options

func (PublisherConfig) Validate

func (c PublisherConfig) Validate() error

Validate validate publisher config

type SendAsyncCallback

type SendAsyncCallback func(ctx context.Context, result *primitive.SendResult, err error)

SendAsyncCallback callback for each message send aysnc result

type SendMode

type SendMode string

SendMode send mode

const (
	// Sync the syns mode
	Sync SendMode = "sync"
	// Async the async mode
	Async SendMode = "async"
	// OneWay the one way mode, no resule
	OneWay SendMode = "one_way"
)

type Unmarshaler

type Unmarshaler interface {
	Unmarshal([]*primitive.MessageExt) ([]*message.Message, error)
}

Unmarshaler unmarshals Kafka's message to Watermill's message.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL