pq

package
v0.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 3, 2020 License: Apache-2.0 Imports: 11 Imported by: 87

Documentation

Index

Constants

View Source
const (

	// SzRoot is the size of the queue header in bytes.
	SzRoot = int(unsafe.Sizeof(queuePage{}))
)

Variables

This section is empty.

Functions

func IsQueueCorrupt added in v0.0.2

func IsQueueCorrupt(err error) bool

IsQueueCorrupt checks if the error value indicates a corrupted queue, which can not be used anymore.

func MakeRoot

func MakeRoot() [SzRoot]byte

MakeRoot prepares the queue header (empty queue). When creating a queue with `New`, the queue header must be available. Still, a delegate is allowed to create the queue header lazily.

Types

type ACKStats added in v0.0.4

type ACKStats struct {
	Duration time.Duration
	Failed   bool

	Events uint // number of released events
	Pages  uint // number of released pages
}

ACKStats reports stats on the most recent ACK transaction.

type Delegate

type Delegate interface {
	// PageSize reports the page size to be used by the backing file.
	PageSize() int

	// Root returns the queues root on file.
	Root() (txfile.PageID, uintptr)

	Offset(id txfile.PageID, offset uintptr) uintptr

	SplitOffset(uintptr) (txfile.PageID, uintptr)

	// BeginWrite must create a read-write transaction for use by the writer.
	// The transaction will be used to allocate pages and flush the current write
	// buffer.
	BeginWrite() (*txfile.Tx, error)

	// BeginRead must return a readonly transaction.
	BeginRead() (*txfile.Tx, error)

	// BeginCleanup must return a read-write transaction for the ACK handling to
	// remove events. No new contents will be written, but pages will be freed
	// and the queue root page being updated.
	BeginCleanup() (*txfile.Tx, error)
}

Delegate is used by the persistent queue to query common parameters and start transactions when required.

func NewStandaloneDelegate

func NewStandaloneDelegate(f *txfile.File) (Delegate, error)

NewStandaloneDelegate creates a standaonle Delegate from an txfile.File instance. This function will allocate and initialize the queue root page.

type ErrKind added in v0.0.2

type ErrKind int

ErrKind provides the pq related error kinds

const (
	NoError            ErrKind = iota // no error
	InitFailed                        // failed to initialize queue
	InvalidParam                      // invalid parameter
	InvalidPageSize                   // invalid page size
	InvalidConfig                     // invalid queue config
	QueueClosed                       // queue is already closed
	ReaderClosed                      // reader is already closed
	WriterClosed                      // writer is already closed
	NoQueueRoot                       // no queue root
	InvalidQueueRoot                  // queue root is invalid
	QueueVersion                      // unsupported queue version
	ACKEmptyQueue                     // invalid ack on empty queue
	ACKTooMany                        // too many events acked
	SeekFail                          // failed to seek to next page
	ReadFail                          // failed to read page
	InactiveTx                        // no active transaction
	UnexpectedActiveTx                // unexpected active transaction
)

func (ErrKind) Error added in v0.0.2

func (k ErrKind) Error() string

Error returns a user readable error message.

func (ErrKind) String added in v0.0.2

func (i ErrKind) String() string

type Error added in v0.0.2

type Error struct {
	// contains filtered or unexported fields
}

Error is the actual error type returned by all functions/methods within the pq package. The Error is compatible to error and txerr.Error, but adds a few additional meta-data for applications to report and handle errors.

func (*Error) Cause added in v0.0.2

func (e *Error) Cause() error

Cause returns the causing error, if any.

func (*Error) Context added in v0.0.2

func (e *Error) Context() string

Context returns a formatted string of the related meta-data as key/value pairs.

func (*Error) Error added in v0.0.2

func (e *Error) Error() string

Error returns the error message. The cause will not be included in the error string. Use fmt with %+v to create a formatted multiline error.

func (*Error) Errors added in v0.0.2

func (e *Error) Errors() []error

Errors is similar to `Cause()`, but returns a slice of errors. This way the error value can be consumed and formatted by zap (and propably other loggers).

func (*Error) Format added in v0.0.2

func (e *Error) Format(s fmt.State, c rune)

Format adds support for fmt.Formatter to Error. The format patterns %v and %s print the top-level error message only (similar to `(*Error).Error()`). The format pattern "q" is similar to "%s", but adds double quotes before and after the message. Use %+v to create a multiline string containing the full trace of errors.

func (*Error) Kind added in v0.0.2

func (e *Error) Kind() error

Kind returns the error kind of the error. The kind should be used by applications to check if it is possible to recover from an error condition. Kind return nil if the error value does not define a kind. Better use `txerr.Is` or `txerr.GetKind` to query the error kind.

func (*Error) Message added in v0.0.2

func (e *Error) Message() string

Message returns the user-focused error message.

func (*Error) Op added in v0.0.2

func (e *Error) Op() string

Op returns the operation the error occured at. Returns "" if the error value is used to wrap another error. Better use `txerr.GetOp(err)` to query an error value for the causing operation.

type FlushStats added in v0.0.4

type FlushStats struct {
	Duration       time.Duration // duration of flush operation
	Oldest, Newest time.Time     // timestamp of oldest/newest event in buffer

	Failed      bool // set to true if flush operation failed
	OutOfMemory bool // set to true if flush failed due to the file being full

	Pages    uint // number of pages to be flushed
	Allocate uint // number of pages to allocate during flush operation
	Events   uint // number of events to be flushed

	BytesTotal uint // total number of bytes written (ignoring headers, just event sizes)
	BytesMin   uint // size of 'smallest' event in current transaction
	BytesMax   uint // size of 'biggest' event in current transaction
}

FlushStats reports internal stats on the most recent flush operation.

type Observer added in v0.0.4

type Observer interface {
	OnQueueInit(headerOffset uintptr, version uint32, available uint)

	OnQueueFlush(headerOffset uintptr, stats FlushStats)

	OnQueueRead(headerOffset uintptr, stats ReadStats)

	OnQueueACK(headerOffset uintptr, stats ACKStats)
}

Observer defines common callbacks to observe operations, outcomes and stats on queues. Each callback reports the header offset for uniquely identifying a queue in case a file holds many queues.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue implements the on-disk queue data structure. The queue requires a Delegate, so to start transactions at any time. The Queue provides a reader and writer. While it is safe to use the Reader and Writer concurrently, the Reader and Writer themselves are not thread-safe.

func New

func New(delegate Delegate, settings Settings) (*Queue, error)

New creates a new Queue. The delegate is required to access the file and start transactions. An error is returned if the delegate is nil, the queue header is invalid, some settings are invalid, or if some IO error occurred.

func (*Queue) ACK

func (q *Queue) ACK(n uint) error

ACK signals the queue, the most n events at the front of the queue have been processed. The queue will try to remove these asynchronously.

func (*Queue) Active

func (q *Queue) Active() (uint, error)

Active returns the number of active, not yet ACKed events.

func (*Queue) Close

func (q *Queue) Close() error

Close will try to flush the current write buffer, but after closing the queue, no more reads or writes can be executed

func (*Queue) Pending

func (q *Queue) Pending() (int, error)

Pending returns the total number of enqueued, but unacked events.

func (*Queue) Reader

func (q *Queue) Reader() *Reader

Reader returns the queue reader for reading a new events from the queue. A queue has only one single reader instance. The reader is not thread safe.

func (*Queue) Writer

func (q *Queue) Writer() (*Writer, error)

Writer returns the queue writer for inserting new events into the queue. A queue has only one single writer instance, which is returned by GetWriter. The writer is is not thread safe.

type ReadStats added in v0.0.4

type ReadStats struct {
	Duration time.Duration // duration of read transaction

	Skipped uint // number of events skipped (e.g. upon error while reading/parsing)
	Read    uint // number of events read

	BytesTotal   uint // total number of bytes read (ignoring headers). Include partially but skipped events
	BytesSkipped uint // number of event bytes skipped
	BytesMin     uint // size of 'smallest' event fully read in current transaction
	BytesMax     uint // size of 'biggest' event fully read in current transaction
}

ReadStats reports stats on the most recent transaction for reading events.

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader is used to iterate events stored in the queue.

func (*Reader) Available

func (r *Reader) Available() (uint, error)

Available returns the number of unread events that can be read.

func (*Reader) Begin added in v0.0.2

func (r *Reader) Begin() error

Begin starts a new read transaction, shared between multiple read calls. User must execute Done, to close the file transaction.

func (*Reader) Done added in v0.0.2

func (r *Reader) Done()

Done closes the active read transaction.

func (*Reader) Next

func (r *Reader) Next() (int, error)

Next advances to the next event to be read. The event size in bytes is returned. A size of 0 is reported if no more event is available in the queue. If Begin is not been called before Next, a temporary read transaction is created.

func (*Reader) Read

func (r *Reader) Read(b []byte) (int, error)

Read reads the contents of the current event into the buffer. Returns 0 without reading if end of the current event has been reached. Use `Next` to skip/continue reading the next event. If Begin is not been called before Read, a temporary read transaction is created.

type Settings

type Settings struct {
	// Queue write buffer size. If a single event is bigger then the
	// write-buffer, the write-buffer will grow. In this case will the write
	// buffer be flushed and reset to its original size.
	WriteBuffer uint

	// Optional Flushed callback. Will be used to notify n events being
	// successfully committed.
	Flushed func(n uint)

	// Optional ACK callback. Will be use to notify number of events being successfully
	// ACKed and pages being freed.
	ACKed func(event, pages uint)

	Observer Observer
}

Settings configures a queue when being instantiated with `New`.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is used to push new events onto the queue. The writer uses a write buffer, which is flushed once the buffer is full or if Flush is called. Only complete events are flushed. If an event is bigger then the configured write buffer, the write buffer will grow with the event size.

func (*Writer) Flush

func (w *Writer) Flush() error

Flush flushes the write buffer. Returns an error if the queue is closed, some error occurred or no more space is available in the file.

func (*Writer) Next

func (w *Writer) Next() error

Next is used to indicate the end of the current event. If write is used with a streaming encoder, the buffers of the actual writer must be flushed before calling Next on this writer. Upon next, the queue writer will add the event framing header and footer.

func (*Writer) Write

func (w *Writer) Write(p []byte) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL