standalone

package
v0.0.0-...-32ff608 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2023 License: Apache-2.0 Imports: 12 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetOffsetFromEvent

func GetOffsetFromEvent(event *ce.Event) int64

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker a manager of different topic message queue

func GetBroker

func GetBroker() *Broker

GetBroker all topic shared same broker

func (*Broker) CreateNewQueueIfAbsent

func (b *Broker) CreateNewQueueIfAbsent(topicName string) (err error)

func (*Broker) ExistTopic

func (b *Broker) ExistTopic(topicName string) (exist bool)

func (*Broker) PutMessage

func (b *Broker) PutMessage(topicName string, event *ce.Event) (message *Message, err error)

func (*Broker) TakeMessage

func (b *Broker) TakeMessage(topicName string) (*Message, error)

func (*Broker) TakeMessageByOffset

func (b *Broker) TakeMessageByOffset(topicName string, offset int64) (*Message, error)

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer() *Consumer

func (*Consumer) InitConsumer

func (c *Consumer) InitConsumer(properties map[string]string) error

func (*Consumer) IsClosed

func (c *Consumer) IsClosed() bool

func (*Consumer) IsStarted

func (c *Consumer) IsStarted() bool

func (*Consumer) RegisterEventListener

func (c *Consumer) RegisterEventListener(listener *connector.EventListener)

func (*Consumer) Shutdown

func (c *Consumer) Shutdown() error

func (*Consumer) Start

func (c *Consumer) Start() error

func (*Consumer) Subscribe

func (c *Consumer) Subscribe(topicName string) error

func (*Consumer) Unsubscribe

func (c *Consumer) Unsubscribe(topicName string) error

func (*Consumer) UpdateOffset

func (c *Consumer) UpdateOffset(ctx context.Context, events []*ce.Event) error

type Factory

type Factory struct {
	plugin.Plugin
	// contains filtered or unexported fields
}

func (*Factory) GetConsumer

func (f *Factory) GetConsumer() (connector.Consumer, error)

func (*Factory) GetProducer

func (f *Factory) GetProducer() (connector.Producer, error)

func (*Factory) GetResource

func (f *Factory) GetResource() (connector.Resource, error)

func (*Factory) Setup

func (f *Factory) Setup(name string, dec plugin.Decoder) error

func (*Factory) Type

func (f *Factory) Type() string

type Message

type Message struct {
	// contains filtered or unexported fields
}

func (*Message) GetOffset

func (m *Message) GetOffset() int64

func (*Message) GetTopicName

func (m *Message) GetTopicName() string

func (*Message) SetOffset

func (m *Message) SetOffset(offset int64)

SetOffset since the CloudEvents go-sdk doesn't support int64 in v1 version, need to store offset by string

type MessageExpireWorker

type MessageExpireWorker struct {
	// contains filtered or unexported fields
}

MessageExpireWorker periodically evict expire message

type MessageQueue

type MessageQueue struct {
	// contains filtered or unexported fields
}

MessageQueue message storage of standalone broker

func NewMessageQueue

func NewMessageQueue() (*MessageQueue, error)

func NewMessageQueueWithCapacity

func NewMessageQueueWithCapacity(capacity int) (*MessageQueue, error)

func (*MessageQueue) Get

func (q *MessageQueue) Get() (*Message, error)

Get fetch message in top of queue, block if message queue is empty

func (*MessageQueue) GetByOffset

func (q *MessageQueue) GetByOffset(offset int64) (*Message, error)

GetByOffset fetch message of curtain offset return error if message has been deleted, block if message of that offset is not available currently

func (*MessageQueue) GetIfNotEmpty

func (q *MessageQueue) GetIfNotEmpty() *Message

GetIfNotEmpty fetch message in top of queue, return nil if message queue is empty

func (*MessageQueue) PopMessage

func (q *MessageQueue) PopMessage() *Message

PopMessage remove message in the top, return nil if message queue is empty

func (*MessageQueue) Put

func (q *MessageQueue) Put(message *Message)

func (*MessageQueue) Size

func (q *MessageQueue) Size() int

type Producer

type Producer struct {
	// contains filtered or unexported fields
}

Producer standalone producer

func NewProducer

func NewProducer() *Producer

func (*Producer) CheckTopicExist

func (p *Producer) CheckTopicExist(topicName string) (exist bool, err error)

func (*Producer) InitProducer

func (p *Producer) InitProducer(properties map[string]string) error

func (*Producer) IsClosed

func (p *Producer) IsClosed() bool

func (*Producer) IsStarted

func (p *Producer) IsStarted() bool

func (*Producer) Publish

func (p *Producer) Publish(ctx context.Context, event *ce.Event, callback *connector.SendCallback) (err error)

func (*Producer) Reply

func (p *Producer) Reply(ctx context.Context, event *ce.Event, callback *connector.SendCallback) error

func (*Producer) Request

func (p *Producer) Request(ctx context.Context, event *ce.Event, callback *connector.RequestReplyCallback, timeout time.Duration) error

func (*Producer) SendOneway

func (p *Producer) SendOneway(ctx context.Context, event *ce.Event) (err error)

func (*Producer) SetExtFields

func (p *Producer) SetExtFields() error

func (*Producer) Shutdown

func (p *Producer) Shutdown() error

func (*Producer) Start

func (p *Producer) Start() error

type Resource

type Resource struct {
}

func (Resource) Init

func (r Resource) Init() error

func (Resource) Release

func (r Resource) Release() error

type SubscribeWorker

type SubscribeWorker struct {
	// contains filtered or unexported fields
}

SubscribeWorker pollMessage from topic and manage consume offset

func (*SubscribeWorker) Stop

func (w *SubscribeWorker) Stop()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL