Documentation ¶
Index ¶
- Constants
- Variables
- func NewOneShotConsumer(opts Options, src Source, onUpdate UpdateFunc) (err error)
- func NewOneShotConsumerWithContext(ctx context.Context, opts Options, src Source, onUpdate UpdateFunc) (err error)
- func Read(filename string, p Processor) (err error)
- type BatchFn
- type Block
- type Consumer
- type File
- type Filename
- type IOSource
- func (i *IOSource) Export(ctx context.Context, prefix, filename string, r io.Reader) (newFilename string, err error)
- func (i *IOSource) Get(ctx context.Context, prefix, filename string, fn func(io.Reader) error) (err error)
- func (i *IOSource) GetNext(ctx context.Context, prefix, lastFilename string) (filename string, err error)
- func (i *IOSource) Import(ctx context.Context, prefix, filename string, w io.Writer) (err error)
- type Ledger
- type Meta
- type NOOP
- func (n *NOOP) Export(ctx context.Context, prefix, filename string, r io.Reader) (newFilename string, err error)
- func (n *NOOP) Get(ctx context.Context, prefix, filename string, fn func(io.Reader) error) error
- func (n *NOOP) GetNext(ctx context.Context, prefix, lastFilename string) (filename string, err error)
- func (n *NOOP) Import(ctx context.Context, prefix, filename string, w io.Writer) error
- type Options
- type Processor
- type Producer
- func (p *Producer) Batch(fn BatchFn) (err error)
- func (p *Producer) BatchBlock(value []byte) (err error)
- func (p *Producer) Close() (err error)
- func (p *Producer) Meta() (meta Meta, err error)
- func (p *Producer) Snapshot(fn func(*Snapshot) error) (err error)
- func (p *Producer) Transaction(fn TransactionFn) (err error)
- type Reader
- type Snapshot
- type Source
- type Transaction
- type TransactionFn
- type Type
- type UpdateFunc
- type Writer
Constants ¶
const ( // ErrConsumerNilSource is returned when a mirror is initialized with a nil source ErrConsumerNilSource = errors.Error("mirrors cannot have a nil source") // ErrConsumerTransaction is returned when a mirror attempts a transaction ErrConsumerTransaction = errors.Error("mirrors cannot perform transactions") // ErrConsumerSnapshot is returned when a mirror attempts to snapshot ErrConsumerSnapshot = errors.Error("mirrors cannot perform snapshots") )
const ( // ErrEmptyDirectory is returned when a directory is empty ErrEmptyDirectory = errors.Error("invalid directory, cannot be empty") // ErrEmptyName is returned when a name is empty ErrEmptyName = errors.Error("invalid name, cannot be empty") )
const ( // DefaultEndOfResultsDelay is the default value for EndOfResultsDelay DefaultEndOfResultsDelay = time.Second * 10 // DefaultErrorDelay is the default value for ErrorDelay DefaultErrorDelay = time.Second * 30 // DefaultBatchDuration is the default value for BatchDuration DefaultBatchDuration = time.Second * 10 )
Variables ¶
var ErrEmptyBlock = errors.New("invalid block, cannot be empty")
Functions ¶
func NewOneShotConsumer ¶ added in v0.11.0
func NewOneShotConsumer(opts Options, src Source, onUpdate UpdateFunc) (err error)
NewOneShotConsumer will initialize a new one-shot Consumer instance with a provided context.Context
func NewOneShotConsumerWithContext ¶ added in v0.11.0
func NewOneShotConsumerWithContext(ctx context.Context, opts Options, src Source, onUpdate UpdateFunc) (err error)
NewConsumerWithContext will initialize a new Consumer instance with a provided context.Context
Types ¶
type BatchFn ¶ added in v0.10.5
type BatchFn func(*Transaction)
type Block ¶
type Block []byte
Block represents a block of data stored within history
func (Block) MarshalEnkodo ¶
MarshalEnkodo is a enkodo encoding helper func
type Consumer ¶ added in v0.11.0
type Consumer struct {
// contains filtered or unexported fields
}
Consumer represents a read-only instance of historical DB entries Note: The mirror is updated through it's Importer
func NewConsumer ¶ added in v0.11.0
func NewConsumer(opts Options, src Source, onUpdate UpdateFunc) (mp *Consumer, err error)
NewConsumer will initialize a new Consumer instance
func NewConsumerWithContext ¶ added in v0.11.0
func NewConsumerWithContext(ctx context.Context, opts Options, src Source, onUpdate UpdateFunc) (c *Consumer, err error)
NewConsumerWithContext will initialize a new Consumer instance with a provided context.Context
type Filename ¶ added in v0.11.0
func ParseFilename ¶ added in v0.12.6
type IOSource ¶ added in v0.11.0
type IOSource struct {
// contains filtered or unexported fields
}
func NewIOSource ¶ added in v0.11.0
type Meta ¶
type Meta struct { // LastProcessedTimestamp is the last processed timestamp LastProcessedTimestamp int64 `json:"lastProcessedTimestamp"` LastProcessedType Type `json:"type"` }
Meta represents the historical meta data
type NOOP ¶ added in v0.9.6
type NOOP struct { }
type Options ¶
type Options struct { Dir string `toml:"dir" json:"dir"` Name string `toml:"name" json:"name"` Namespace string `toml:"namespace" json:"namespace"` OnLog func(message string) OnError func(err error) OnResume func() AvoidExportOnClose bool `toml:"avoid_export_on_close" json:"avoidExportOnClose"` AvoidProcessOnClose bool `toml:"avoid_merge_on_close" json:"avoidMergeOnClose"` ConsumerFileLimit int64 `toml:"consumer_file_limit" json:"consumerFileLimit"` // BatchDuration represents the amount of time to keep a transaction open for a // Batch operation BatchDuration time.Duration `toml:"batch_duration" json:"batchDuration"` // EndOfResultsDelay represents the amount of time to wait before pulling "Next" after // receiving empty results (Default is 10 seconds). EndOfResultsDelay time.Duration `toml:"end_of_results_delay" json:"endOfResultsDelay"` // ErrorDelay represents the amount of time to wait before pulling "Next" after // receiving an error ErrorDelay time.Duration `toml:"error_delay" json:"errorDelay"` // RangeStart will determine the moment in time from which syncs will begin RangeStart time.Time `toml:"range_start" json:"rangeStart"` // RangeEnd will determine the moment in time from which syncs will end // Note: This feature is slated to be implemented within the following // release. As of now, this will act as a field placeholder RangeEnd time.Time `toml:"range_end" json:"rangeEnd"` }
Options represent Kiroku options
func MakeOptions ¶ added in v0.2.0
MakeOptions will create new Options
type Producer ¶ added in v0.10.5
type Producer struct {
// contains filtered or unexported fields
}
Producer represents historical DB entries
func NewProducer ¶ added in v0.10.5
New will initialize a new Producer instance Note: Processor and Options are optional
func NewProducerWithContext ¶ added in v0.10.5
NewWithContext will initialize a new Producer instance with a provided context.Context Note: Processor and Options are optional
func (*Producer) BatchBlock ¶ added in v0.10.5
Batch will engage a new history batch transaction
func (*Producer) Transaction ¶ added in v0.10.5
func (p *Producer) Transaction(fn TransactionFn) (err error)
Transaction will engage a new history transaction
type Reader ¶
type Reader struct {
// contains filtered or unexported fields
}
Reader will parse and read a history chunk
func (*Reader) ReadSeeker ¶ added in v0.1.2
func (r *Reader) ReadSeeker() io.ReadSeeker
ReadSeeker will return the Reader's underlying ReadSeeker
type Snapshot ¶
type Snapshot struct {
// contains filtered or unexported fields
}
Snapshot manages a Kiroku transaction
type Source ¶ added in v0.6.0
type Source interface { Export(ctx context.Context, prefix, filename string, r io.Reader) (newFilename string, err error) Import(ctx context.Context, prefix, filename string, w io.Writer) error Get(ctx context.Context, prefix, filename string, fn func(io.Reader) error) error GetNext(ctx context.Context, prefix, lastFilename string) (filename string, err error) }
Source is used for importing
type Transaction ¶
type Transaction struct {
// contains filtered or unexported fields
}
Transaction manages a Kiroku transaction
func (*Transaction) Write ¶ added in v0.11.0
func (t *Transaction) Write(value []byte) (err error)
AddBlock will add a row
type TransactionFn ¶ added in v0.10.5
type TransactionFn func(*Transaction) error
type Type ¶
type Type uint8