Documentation ¶
Index ¶
- Variables
- func DefaultOptions() *options
- type Manager
- func (m *Manager) CreateNewPBQ(ctx context.Context, partitionID string) (ReadWriteCloser, error)
- func (m *Manager) GetPBQ(partitionID string) ReadWriteCloser
- func (m *Manager) ListPartitions() []*PBQ
- func (m *Manager) Replay(ctx context.Context)
- func (m *Manager) ShutDown(ctx context.Context)
- func (m *Manager) StartUp(_ context.Context) error
- type PBQ
- type PBQOption
- type ReadWriteCloser
- type Reader
- type WriteCloser
Constants ¶
This section is empty.
Variables ¶
var COBErr error = errors.New("error while writing to pbq, pbq is closed")
Functions ¶
func DefaultOptions ¶
func DefaultOptions() *options
Types ¶
type Manager ¶
type Manager struct { // we need lock to access pbqMap, since deregister will be called inside pbq // and each pbq will be inside a go routine, and also entire PBQ could be managed // through a go routine (depends on the orchestrator) sync.RWMutex // contains filtered or unexported fields }
Manager helps in managing the lifecycle of PBQ instances
func NewManager ¶
NewManager returns new instance of manager We don't intend this to be called by multiple routines.
func (*Manager) CreateNewPBQ ¶
CreateNewPBQ creates new pbq for a partition
func (*Manager) GetPBQ ¶
func (m *Manager) GetPBQ(partitionID string) ReadWriteCloser
GetPBQ returns pbq for the given partitionID
func (*Manager) ListPartitions ¶
ListPartitions returns all the pbq instances
type PBQ ¶
type PBQ struct {
// contains filtered or unexported fields
}
PBQ Buffer queue which is backed with a persisted store, each partition will have a PBQ associated with it
func (*PBQ) Close ¶
Close is used by the writer to indicate close of context we should flush pending messages to store
func (*PBQ) GC ¶
GC is invoked after the Reader (ProcessAndForward) has finished forwarding the output to ISB.
type PBQOption ¶
type PBQOption func(options *options) error
func WithChannelBufferSize ¶
WithChannelBufferSize sets buffer size option
func WithPBQStoreOptions ¶
func WithPBQStoreOptions(opts ...store.StoreOption) PBQOption
WithPBQStoreOptions sets different pbq store options
func WithReadBatchSize ¶
WithReadBatchSize sets read batch size option
func WithReadTimeout ¶
WithReadTimeout sets read timeout option
type ReadWriteCloser ¶
type ReadWriteCloser interface { Reader WriteCloser }
ReadWriteCloser is an unified interface to PBQ read and write interfaces. Close is only for Writer.
type Reader ¶
type Reader interface { // ReadCh exposes channel to read from PBQ ReadCh() <-chan *isb.Message // GC does garbage collection, it deletes all the persisted data from the store GC() error }
Reader provides methods to read from PBQ.
type WriteCloser ¶
type WriteCloser interface { // Write writes message to PBQ Write(ctx context.Context, msg *isb.Message) error // CloseOfBook (cob) closes PBQ, no writes will be accepted after cob CloseOfBook() // Close to handle context close on writer // Any pending data can be flushed to the persistent store at this point. Close() error }
WriteCloser provides methods to write data to the PQB and close the PBQ. No data can be written to PBQ after cob.