publisher

package
v0.1.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 24, 2024 License: MIT Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultInitialRetryInterval       = 50 * time.Millisecond
	DefaultFlushDelayThreshold        = time.Millisecond * 10
	DefaultMaxRetryCount        int16 = 3
	DefaultBatchSize            int32 = 1000
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BatchResult

type BatchResult struct {
	Results []*BufferedPublishResult
}

BatchResult - result from batch publishing. It contains reference Id to the original message published.

type BfPublisher

type BfPublisher struct {
	sync.Mutex

	Done chan struct{}
	// contains filtered or unexported fields
}

BfPublisher - BufferedPublisher struct implementation.

func (*BfPublisher) Close

func (p *BfPublisher) Close(ctx context.Context) error

Close - close the BufferedPublisher and all the related goroutines.

func (*BfPublisher) Publish

func (p *BfPublisher) Publish(ctx context.Context, topicName string, payloadBatch []messaging.Message) (*BatchResult, error)

Publish - Publish a Batch of Messages in Json

type BuffPublisherWithRetry

type BuffPublisherWithRetry struct {
	sync.Mutex

	TopicCache *TopicCache
	Done       chan struct{}
	RetryCh    chan retryBatch
	// contains filtered or unexported fields
}

BuffPublisherWithRetry - buffered publisher struct implementation.

func (*BuffPublisherWithRetry) Close

Close - close the BufferedPublisher and all the related goroutines.

func (*BuffPublisherWithRetry) Flush

Flush all the messages for all the topics.

func (*BuffPublisherWithRetry) GetBufferedMessages

func (p *BuffPublisherWithRetry) GetBufferedMessages(topic string) []*messaging.Message

GetBufferedMessages - get hte messages in the buyffer. Useful for testing.

func (*BuffPublisherWithRetry) Publish

func (p *BuffPublisherWithRetry) Publish(ctx context.Context, topic string, message messaging.Message) error

Publish - publish a message in Json format and buffer it in an internal buffer of a given size, before Flushing in batch. The batching mechanism is abstracted from the user that just need to publish one message at time.

type BufferedPublishResult

type BufferedPublishResult struct {
	MsgRefId string
	Success  bool
	Err      error
}

BufferedPublishResult - published result.

type BufferedPublisher

type BufferedPublisher interface {
	Publish(ctx context.Context, topicName string, payloadBatch []messaging.Message) (*BatchResult, error)
	Close(ctx context.Context) error
}

BufferedPublisher - interface for the publisher

func NewBufferedPublisher

func NewBufferedPublisher(
	client Client,
	publishConfig TopicPublishConfig) (BufferedPublisher, error)

NewBufferedPublisher - Constructor of BufferedPublisher.

type BufferedPublisherWithRetry

type BufferedPublisherWithRetry interface {
	Publish(ctx context.Context, topic string, message messaging.Message) error
	Flush(ctx context.Context) error
	Close(ctx context.Context) error
	GetBufferedMessages(topic string) []*messaging.Message
}

BufferedPublisherWithRetry - interface for the publisher

func NewBufferedPublisherWithRetry

func NewBufferedPublisherWithRetry(
	ctx context.Context,
	client Client,
	publishConfig TopicPublishConfig) (BufferedPublisherWithRetry, error)

NewBufferedPublisherWithRetry - Constructor.

type Client

type Client interface {
	Topic(id string) Topic
	Close() error
}

Client - Client wrapper interface.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error - General PubSub Error.

func NewMessagingError

func NewMessagingError(err error, msg string, args ...any) *Error

NewMessagingError - NewMessagingError constructor.

func NewMessagingErrorCode

func NewMessagingErrorCode(code ErrorCode, err error) *Error

NewMessagingErrorCode - MessagingError constructor given a predefined Error Code.

func (*Error) Error

func (ge *Error) Error() string

type ErrorCode

type ErrorCode int

ErrorCode - error code enum.

const (
	ErrorPublisherClosed ErrorCode = iota
	ErrorInitializingPubsubClient
	ErrorConvertingToProto
	ErrorSerializingProtoMessage
	ErrorSerializingJsonMessage
	ErrorClosingPubsubClient
)

type PublishResult

type PublishResult interface {
	Get(ctx context.Context) (string, error)
	Ready() <-chan struct{}
}

PublishResult - PubSub Publish Result wrapper interface.

type Topic

type Topic interface {
	Publish(ctx context.Context, msg messaging.Message) PublishResult
	Stop()
	Flush()
	String() string
	ConfigPublishSettings(config TopicPublishConfig)
}

Topic - Topic wrapper interface.

type TopicCache

type TopicCache struct {
	sync.Mutex
	Cache map[string]*TopicCacheItem
}

TopicCache - Topic Cache with mutex access.

type TopicCacheItem

type TopicCacheItem struct {
	// contains filtered or unexported fields
}

TopicCacheItem - topic cache item.

func NewTopicCacheItem

func NewTopicCacheItem(topic Topic, initialValue int32) *TopicCacheItem

NewTopicCacheItem -

type TopicPublishConfig

type TopicPublishConfig struct {
	BatchSize            int32
	FlushDelayThreshold  time.Duration
	InitialRetryInterval time.Duration
	MaxRetryCount        int16
}

TopicPublishConfig - configuration struct for the publisher.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL