toyqueue

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2024 License: MIT Imports: 3 Imported by: 2

README

Toy queue

This micro-library contains [][]byte-centric primitives (interfaces, job queues). The default approach in Golang is something like "protobuf for serialization, channels for job queues". Protobuf implies request-response RPC and full parsing, channels imply item-by-item push/pop. That is not really that handy if you have real-time streams of tiny records; imagine network packet processing or database ops. Assuming your data serialization format is well-specified and you can parse things lazily, what will work better for you is probably this:

type Records [][]byte

This approach encourages batching; for example, RecordQueue consumer takes all the records at once. Also, [][]byte works well with writev() and net.Buffers.

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrClosed = errors.New("queue is closed")
View Source
var ErrNotKnown = errors.New("unknown drain")
View Source
var ErrWouldBlock = errors.New("the queue is over capacity")

Functions

func FanoutQueue added in v0.1.5

func FanoutQueue(limit int) (fanout *Fanout, queue DrainCloser)

func Pump

func Pump(feeder Feeder, drainer Drainer) (err error)

func PumpN added in v0.1.5

func PumpN(feeder Feeder, drainer Drainer, n int) (err error)

func PumpThenClose added in v0.1.5

func PumpThenClose(feed FeedCloser, drain DrainCloser) error

func Relay added in v0.1.5

func Relay(feeder Feeder, drainer Drainer) error

Types

type DrainCloser

type DrainCloser interface {
	Drainer
	io.Closer
}

type DrainSeekCloser

type DrainSeekCloser interface {
	Drainer
	io.Seeker
	io.Closer
}

type DrainSeeker

type DrainSeeker interface {
	Drainer
	io.Seeker
}

type Drainer

type Drainer interface {
	Drain(recs Records) error
}

type Fanout added in v0.1.5

type Fanout struct {
	// contains filtered or unexported fields
}

func FanoutFeeder added in v0.1.5

func FanoutFeeder(feeder FeedCloser) *Fanout

func (*Fanout) AddDrain added in v0.1.5

func (f2ds *Fanout) AddDrain(drain DrainCloser)

func (*Fanout) HasDrain added in v0.1.5

func (f2ds *Fanout) HasDrain(drain DrainCloser) (has bool)

func (*Fanout) RemoveDrain added in v0.1.5

func (f2ds *Fanout) RemoveDrain(drain DrainCloser) (err error)

func (*Fanout) Run added in v0.1.5

func (f2ds *Fanout) Run()

Run shovels the data from the feeder to the drains.

type FeedCloser

type FeedCloser interface {
	Feeder
	io.Closer
}

type FeedDrainCloser

type FeedDrainCloser interface {
	Feeder
	Drainer
	io.Closer
}

func BlockingRecordQueuePair

func BlockingRecordQueuePair(limit int) (i, o FeedDrainCloser)

func RecordQueuePair

func RecordQueuePair(limit int) (i, o FeedDrainCloser)

type FeedDrainer

type FeedDrainer interface {
	Feeder
	Drainer
}

func JoinedFeedDrainer

func JoinedFeedDrainer(feeder Feeder, drainer Drainer) FeedDrainer

type FeedSeekCloser

type FeedSeekCloser interface {
	Feeder
	io.Seeker
	io.Closer
}

type FeedSeeker

type FeedSeeker interface {
	Feeder
	io.Seeker
}

type Feeder

type Feeder interface {
	// Feed reads and returns records.
	// The EoF convention follows that of io.Reader:
	// can either return `records, EoF` or
	// `records, nil` followed by `nil/{}, EoF`
	Feed() (recs Records, err error)
}

type RecordQueue

type RecordQueue struct {
	Limit int
	// contains filtered or unexported fields
}

func (*RecordQueue) Blocking

func (q *RecordQueue) Blocking() FeedDrainCloser

func (*RecordQueue) Close

func (q *RecordQueue) Close() error

func (*RecordQueue) Drain

func (q *RecordQueue) Drain(recs Records) error

func (*RecordQueue) Feed

func (q *RecordQueue) Feed() (recs Records, err error)

type Records

type Records [][]byte

Records (a batch of) as a very universal primitive, especially for database/network op/packet processing. Batching allows for writev() and other performance optimizations. ALso, if you have cryptography, blobs are way handier than structs. Records converts easily to net.Buffers.

func (Records) ExactSuffix

func (recs Records) ExactSuffix(total int64) (suffix Records)

func (Records) TotalLen

func (recs Records) TotalLen() (total int64)

func (Records) WholeRecordPrefix

func (recs Records) WholeRecordPrefix(limit int64) (prefix Records, remainder int64)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL