pbq

package
v0.5.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2022 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var COBErr error = errors.New("error while writing to pbq, pbq is closed")

Functions

func DefaultOptions

func DefaultOptions() *options

Types

type Manager

type Manager struct {

	// we need lock to access pbqMap, since deregister will be called inside pbq
	// and each pbq will be inside a go routine, and also entire PBQ could be managed
	// through a go routine (depends on the orchestrator)
	sync.RWMutex
	// contains filtered or unexported fields
}

Manager helps in managing the lifecycle of PBQ instances

func NewManager

func NewManager(ctx context.Context, opts ...PBQOption) (*Manager, error)

NewManager returns new instance of manager We don't intend this to be called by multiple routines.

func (*Manager) CreateNewPBQ

func (m *Manager) CreateNewPBQ(ctx context.Context, partitionID string) (ReadWriteCloser, error)

CreateNewPBQ creates new pbq for a partition

func (*Manager) GetPBQ

func (m *Manager) GetPBQ(partitionID string) ReadWriteCloser

GetPBQ returns pbq for the given partitionID

func (*Manager) ListPartitions

func (m *Manager) ListPartitions() []*PBQ

ListPartitions returns all the pbq instances

func (*Manager) Replay

func (m *Manager) Replay(ctx context.Context)

Replay replays messages which are persisted in pbq store

func (*Manager) ShutDown

func (m *Manager) ShutDown(ctx context.Context)

ShutDown for clean shut down, flushes pending messages to store and closes the store

func (*Manager) StartUp

func (m *Manager) StartUp(_ context.Context) error

StartUp restores the state of the pbqManager

type PBQ

type PBQ struct {
	// contains filtered or unexported fields
}

PBQ Buffer queue which is backed with a persisted store, each partition will have a PBQ associated with it

func (*PBQ) Close

func (p *PBQ) Close() error

Close is used by the writer to indicate close of context we should flush pending messages to store

func (*PBQ) CloseOfBook

func (p *PBQ) CloseOfBook()

CloseOfBook closes output channel

func (*PBQ) GC

func (p *PBQ) GC() error

GC is invoked after the Reader (ProcessAndForward) has finished forwarding the output to ISB.

func (*PBQ) ReadCh

func (p *PBQ) ReadCh() <-chan *isb.Message

ReadCh exposes read channel to read messages from PBQ close on read channel indicates COB

func (*PBQ) Write

func (p *PBQ) Write(ctx context.Context, message *isb.Message) error

Write writes message to pbq and persistent store

type PBQOption

type PBQOption func(options *options) error

func WithChannelBufferSize

func WithChannelBufferSize(size int64) PBQOption

WithChannelBufferSize sets buffer size option

func WithPBQStoreOptions

func WithPBQStoreOptions(opts ...store.StoreOption) PBQOption

WithPBQStoreOptions sets different pbq store options

func WithReadBatchSize

func WithReadBatchSize(size int64) PBQOption

WithReadBatchSize sets read batch size option

func WithReadTimeout

func WithReadTimeout(seconds time.Duration) PBQOption

WithReadTimeout sets read timeout option

type ReadWriteCloser

type ReadWriteCloser interface {
	Reader
	WriteCloser
}

ReadWriteCloser is an unified interface to PBQ read and write interfaces. Close is only for Writer.

type Reader

type Reader interface {
	// ReadCh exposes channel to read from PBQ
	ReadCh() <-chan *isb.Message
	// GC does garbage collection, it deletes all the persisted data from the store
	GC() error
}

Reader provides methods to read from PBQ.

type WriteCloser

type WriteCloser interface {
	// Write writes message to PBQ
	Write(ctx context.Context, msg *isb.Message) error
	// CloseOfBook (cob) closes PBQ, no writes will be accepted after cob
	CloseOfBook()
	// Close to handle context close on writer
	// Any pending data can be flushed to the persistent store at this point.
	Close() error
}

WriteCloser provides methods to write data to the PQB and close the PBQ. No data can be written to PBQ after cob.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL