chancacher

package
v4.0.0-...-5981c31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: BSD-2-Clause Imports: 10 Imported by: 0

Documentation

Overview

Package chancacher implements a pipeline of channels (in->out) that provides internal buffering (via a simple buffered channel), and caching data to disk.

Index

Constants

View Source
const MaxDepth = 1000000

The maximum channel depth, which is also used when the channel depth is set to 0. We could set this to MaxInt but we'd likely just run out of memory without a clean way to triage. It's best to just enforce a sensible maximum.

Variables

View Source
var (
	ErrInvalidCachePath = errors.New("Invalid cache path")
)

Functions

func NewFileCounter

func NewFileCounter(f *os.File) (*fileCounter, error)

Types

type ChanCacher

type ChanCacher struct {
	In  chan interface{}
	Out chan interface{}
	// contains filtered or unexported fields
}

A ChanCacher is a pipeline of channels with a variable-sized internal buffer. The buffer can also cache to disk. The user is expected to connect ChanCacher.In and ChanCacher.Out.

func NewChanCacher

func NewChanCacher(maxDepth int, cachePath string, maxSize int) (*ChanCacher, error)

Create a new ChanCacher with maximum depth, and optional backing file. If maxDepth == 0, the ChanCacher will be unbuffered. If maxDepth == -1, the ChanCacher depth will be set to MaxDepth. To enable a backing store, provide a path to backingPath. chancachers create two files using this prefix named cache_a and cache_b.

The maxSize argument sets the maximum amount of disk commit, in bytes.

When a new ChanCacher is made, if cachePath points to existing cache files, the ChanCacher will immediately attempt to drain them from disk. In this way, you can recover data sent to disk on a crash or previous use of Commit().

func (*ChanCacher) BufferSize

func (c *ChanCacher) BufferSize() int

Returns the number of elements on the internal buffer.

func (*ChanCacher) CacheHasData

func (c *ChanCacher) CacheHasData() bool

Return if the cache has outstanding data not written to the output channel.

func (*ChanCacher) CacheStart

func (c *ChanCacher) CacheStart()

Enable a stopped cache.

func (*ChanCacher) CacheStop

func (c *ChanCacher) CacheStop()

Stop a running cache. Calling Stop() will prevent the ChanCacher from writing any new data to the backing file, but will not stop it from reading (draining) the cache to the output channel.

func (*ChanCacher) Commit

func (c *ChanCacher) Commit()

Commit drains the buffer to the backing file and shuts down the cache. Commit should be called after closing the input channel if the buffer needs to be saved. Commit will block until the In channel is closed. The ChanCacher will not close the output channel until it's empty, so a typical production would look like:

close(c.In)
drainSomeDataFrom(c.Out)

// commit the rest of my data to disk
c.Commit()

// c.Out is now closed

Once Commit() is called, draining the cache cannot be restarted, though writing to the cache will still work. Commit should only be used for teardown scenarios.

func (*ChanCacher) Drain

func (c *ChanCacher) Drain()

Drain blocks until the internal buffer is empty. It's possible that new data is still being consumed, so care should be taken when using Drain(). You probably don't want to use Drain(), but instead close ChanCacher.In and wait for the ChanCacher.Out to close, which does carry guarantees that the internal buffers and cache are fully drained.

func (*ChanCacher) Size

func (c *ChanCacher) Size() int

Returns the number of bytes committed to disk. This does not include data in the in-memory buffer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL