subscribers

package
v0.73.0-rc.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 26, 2023 License: AGPL-3.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Base

type Base struct {
	// contains filtered or unexported fields
}

func NewBase

func NewBase(ctx context.Context, buf int, ack bool) *Base

func (*Base) Ack

func (b *Base) Ack() bool

Ack returns whether or not this is a synchronous/async subscriber.

func (*Base) C

func (b *Base) C() chan<- []events.Event

C returns the event channel for optional subscribers.

func (*Base) Closed

func (b *Base) Closed() <-chan struct{}

Closed indicates to the broker that the subscriber is closed for business.

func (*Base) Halt

func (b *Base) Halt()

Halt is called by the broker on shutdown, this closes the open channels.

func (*Base) ID

func (b *Base) ID() int

ID returns the subscriber ID.

func (*Base) Pause

func (b *Base) Pause()

Pause the current subscriber will not receive events from the channel.

func (*Base) Resume

func (b *Base) Resume()

Resume unpauzes the subscriber.

func (*Base) SetID

func (b *Base) SetID(id int)

SetID set the ID (exposed only to broker).

func (*Base) Skip

func (b *Base) Skip() <-chan struct{}

Skip lets the broker know that the subscriber is not receiving events.

type Broker

type Broker interface {
	Subscribe(s broker.Subscriber) int
	Unsubscribe(id int)
}

type EventFilter

type EventFilter func(events.Event) bool

type MarketStreamEvent

type MarketStreamEvent interface {
	StreamEvent
	StreamMarketMessage() *eventspb.BusEvent
}

type Service

type Service struct {
	// contains filtered or unexported fields
}

func NewService

func NewService(log *logging.Logger, broker Broker, maxBufferSize int) *Service

func (*Service) ObserveEvents

func (s *Service) ObserveEvents(ctx context.Context, retries int, eTypes []events.Type, batchSize int, filters ...EventFilter) (<-chan []*eventspb.BusEvent, chan<- int)

func (*Service) ObserveEventsOnStream

func (s *Service) ObserveEventsOnStream(ctx context.Context, retries int,
	sub StreamSubscription,
) (<-chan []*eventspb.BusEvent, chan<- int)

type StreamEvent

type StreamEvent interface {
	events.Event
	StreamMessage() *eventspb.BusEvent
}

type StreamSub

type StreamSub struct {
	*Base
	// contains filtered or unexported fields
}

func NewStreamSub

func NewStreamSub(ctx context.Context, types []events.Type, batchSize int, filters ...EventFilter) *StreamSub

func (*StreamSub) GetData

func (s *StreamSub) GetData(ctx context.Context) []*eventspb.BusEvent

GetData returns events from buffer, all if bufSize == 0, or max buffer size (rest are kept in data slice).

func (*StreamSub) Halt

func (s *StreamSub) Halt()

func (*StreamSub) Push

func (s *StreamSub) Push(evts ...events.Event)

func (StreamSub) Types

func (s StreamSub) Types() []events.Type

func (*StreamSub) UpdateBatchSize

func (s *StreamSub) UpdateBatchSize(ctx context.Context, size int) []*eventspb.BusEvent

UpdateBatchSize changes the batch size, and returns whatever the current buffer contains it's effectively a poll of current events ignoring requested batch size.

type StreamSubscription

type StreamSubscription interface {
	Halt()
	Push(evts ...events.Event)
	UpdateBatchSize(ctx context.Context, size int) []*eventspb.BusEvent
	Types() []events.Type
	GetData(ctx context.Context) []*eventspb.BusEvent
	C() chan<- []events.Event
	Closed() <-chan struct{}
	Skip() <-chan struct{}
	SetID(id int)
	ID() int
	Ack() bool
}

Directories

Path Synopsis
Package mocks is a generated GoMock package.
Package mocks is a generated GoMock package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL