Documentation ¶
Index ¶
Constants ¶
const ( // SzRoot is the size of the queue header in bytes. SzRoot = int(unsafe.Sizeof(queuePage{})) )
Variables ¶
This section is empty.
Functions ¶
func IsQueueCorrupt ¶ added in v0.0.2
IsQueueCorrupt checks if the error value indicates a corrupted queue, which can not be used anymore.
Types ¶
type ACKStats ¶ added in v0.0.4
type ACKStats struct { Duration time.Duration Failed bool Events uint // number of released events Pages uint // number of released pages }
ACKStats reports stats on the most recent ACK transaction.
type Delegate ¶
type Delegate interface { // PageSize reports the page size to be used by the backing file. PageSize() int // Root returns the queues root on file. Root() (txfile.PageID, uintptr) Offset(id txfile.PageID, offset uintptr) uintptr SplitOffset(uintptr) (txfile.PageID, uintptr) // BeginWrite must create a read-write transaction for use by the writer. // The transaction will be used to allocate pages and flush the current write // buffer. BeginWrite() (*txfile.Tx, error) // BeginRead must return a readonly transaction. BeginRead() (*txfile.Tx, error) // BeginCleanup must return a read-write transaction for the ACK handling to // remove events. No new contents will be written, but pages will be freed // and the queue root page being updated. BeginCleanup() (*txfile.Tx, error) }
Delegate is used by the persistent queue to query common parameters and start transactions when required.
func NewStandaloneDelegate ¶
NewStandaloneDelegate creates a standaonle Delegate from an txfile.File instance. This function will allocate and initialize the queue root page.
type ErrKind ¶ added in v0.0.2
type ErrKind int
ErrKind provides the pq related error kinds
const ( NoError ErrKind = iota // no error InitFailed // failed to initialize queue InvalidParam // invalid parameter InvalidPageSize // invalid page size InvalidConfig // invalid queue config QueueClosed // queue is already closed ReaderClosed // reader is already closed WriterClosed // writer is already closed NoQueueRoot // no queue root InvalidQueueRoot // queue root is invalid QueueVersion // unsupported queue version ACKEmptyQueue // invalid ack on empty queue ACKTooMany // too many events acked SeekFail // failed to seek to next page ReadFail // failed to read page InactiveTx // no active transaction UnexpectedActiveTx // unexpected active transaction )
type Error ¶ added in v0.0.2
type Error struct {
// contains filtered or unexported fields
}
Error is the actual error type returned by all functions/methods within the pq package. The Error is compatible to error and txerr.Error, but adds a few additional meta-data for applications to report and handle errors.
func (*Error) Context ¶ added in v0.0.2
Context returns a formatted string of the related meta-data as key/value pairs.
func (*Error) Error ¶ added in v0.0.2
Error returns the error message. The cause will not be included in the error string. Use fmt with %+v to create a formatted multiline error.
func (*Error) Errors ¶ added in v0.0.2
Errors is similar to `Cause()`, but returns a slice of errors. This way the error value can be consumed and formatted by zap (and propably other loggers).
func (*Error) Format ¶ added in v0.0.2
Format adds support for fmt.Formatter to Error. The format patterns %v and %s print the top-level error message only (similar to `(*Error).Error()`). The format pattern "q" is similar to "%s", but adds double quotes before and after the message. Use %+v to create a multiline string containing the full trace of errors.
func (*Error) Kind ¶ added in v0.0.2
Kind returns the error kind of the error. The kind should be used by applications to check if it is possible to recover from an error condition. Kind return nil if the error value does not define a kind. Better use `txerr.Is` or `txerr.GetKind` to query the error kind.
type FlushStats ¶ added in v0.0.4
type FlushStats struct { Duration time.Duration // duration of flush operation Oldest, Newest time.Time // timestamp of oldest/newest event in buffer Failed bool // set to true if flush operation failed OutOfMemory bool // set to true if flush failed due to the file being full Pages uint // number of pages to be flushed Allocate uint // number of pages to allocate during flush operation Events uint // number of events to be flushed BytesTotal uint // total number of bytes written (ignoring headers, just event sizes) BytesMin uint // size of 'smallest' event in current transaction BytesMax uint // size of 'biggest' event in current transaction }
FlushStats reports internal stats on the most recent flush operation.
type Observer ¶ added in v0.0.4
type Observer interface { OnQueueInit(headerOffset uintptr, version uint32, available uint) OnQueueFlush(headerOffset uintptr, stats FlushStats) OnQueueRead(headerOffset uintptr, stats ReadStats) OnQueueACK(headerOffset uintptr, stats ACKStats) }
Observer defines common callbacks to observe operations, outcomes and stats on queues. Each callback reports the header offset for uniquely identifying a queue in case a file holds many queues.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue implements the on-disk queue data structure. The queue requires a Delegate, so to start transactions at any time. The Queue provides a reader and writer. While it is safe to use the Reader and Writer concurrently, the Reader and Writer themselves are not thread-safe.
func New ¶
New creates a new Queue. The delegate is required to access the file and start transactions. An error is returned if the delegate is nil, the queue header is invalid, some settings are invalid, or if some IO error occurred.
func (*Queue) ACK ¶
ACK signals the queue, the most n events at the front of the queue have been processed. The queue will try to remove these asynchronously.
func (*Queue) Close ¶
Close will try to flush the current write buffer, but after closing the queue, no more reads or writes can be executed
type ReadStats ¶ added in v0.0.4
type ReadStats struct { Duration time.Duration // duration of read transaction Skipped uint // number of events skipped (e.g. upon error while reading/parsing) Read uint // number of events read BytesTotal uint // total number of bytes read (ignoring headers). Include partially but skipped events BytesSkipped uint // number of event bytes skipped BytesMin uint // size of 'smallest' event fully read in current transaction BytesMax uint // size of 'biggest' event fully read in current transaction }
ReadStats reports stats on the most recent transaction for reading events.
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader is used to iterate events stored in the queue.
func (*Reader) Begin ¶ added in v0.0.2
Begin starts a new read transaction, shared between multiple read calls. User must execute Done, to close the file transaction.
func (*Reader) Done ¶ added in v0.0.2
func (r *Reader) Done()
Done closes the active read transaction.
func (*Reader) Next ¶
Next advances to the next event to be read. The event size in bytes is returned. A size of 0 is reported if no more event is available in the queue. If Begin is not been called before Next, a temporary read transaction is created.
type Settings ¶
type Settings struct { // Queue write buffer size. If a single event is bigger then the // write-buffer, the write-buffer will grow. In this case will the write // buffer be flushed and reset to its original size. WriteBuffer uint // Optional Flushed callback. Will be used to notify n events being // successfully committed. Flushed func(n uint) // Optional ACK callback. Will be use to notify number of events being successfully // ACKed and pages being freed. ACKed func(event, pages uint) Observer Observer }
Settings configures a queue when being instantiated with `New`.
type Writer ¶
type Writer struct {
// contains filtered or unexported fields
}
Writer is used to push new events onto the queue. The writer uses a write buffer, which is flushed once the buffer is full or if Flush is called. Only complete events are flushed. If an event is bigger then the configured write buffer, the write buffer will grow with the event size.
func (*Writer) Flush ¶
Flush flushes the write buffer. Returns an error if the queue is closed, some error occurred or no more space is available in the file.