kiroku

package module
v0.12.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 17, 2023 License: MIT Imports: 18 Imported by: 9

README

Kiroku GoDoc Status Go Report Card Go Test Coverage

Kiroku is a general purpose historical record system which utilizes data blocks. It was built to be used as the action persistence layer for Mojura.

Usage

New
func ExampleNewProducer() {
	var err error
	if testProducer, err = NewProducer("./test_data", "tester", nil); err != nil {
		log.Fatal(err)
		return
	}
}
Kiroku.Transaction
func ExampleKiroku_Transaction() {
	var err error
	if err = testProducer.Transaction(func(t *Transaction) (err error) {
		return t.AddBlock(TypeWriteAction, []byte("hello world!"))
	}); err != nil {
		log.Fatal(err)
		return
	}
}
Kiroku.Snapshot
func ExampleKiroku_Snapshot() {
	var err error
	if err = testProducer.Snapshot(func(s *Snapshot) (err error) {
		return s.Write([]byte("hello world!"))
	}); err != nil {
		log.Fatal(err)
		return
	}
}
NewWriter
func ExampleNewWriter() {
	var err error
	if testWriter, err = NewWriter("./test_data", "testie"); err != nil {
		log.Fatal(err)
		return
	}
}
Writer.AddBlock
func ExampleWriter_AddBlock() {
	var err error
	if err = testWriter.AddBlock(TypeWriteAction, []byte("Hello world!")); err != nil {
		log.Fatalf("error adding row: %v", err)
		return
	}
}
NewReader
func ExampleNewReader() {
	var (
		f   *os.File
		err error
	)

	if f, err = os.Open("filename.kir"); err != nil {
		log.Fatalf("error opening: %v", err)
		return
	}

	if testReader, err = NewReader(f); err != nil {
		log.Fatalf("error initializing reader: %v", err)
		return
	}
}
Reader.Meta
func ExampleReader_Meta() {
	var m Meta
	m = testReader.Meta()
	fmt.Println("Meta!", m)
}
Reader.ForEach
func ExampleReader_ForEach() {
	var err error
	if err = testReader.ForEach(0, func(b Block) (err error) {
		fmt.Println("Block data:", string(b.Value))
		return
	}); err != nil {
		log.Fatalf("Error iterating through blocks: %v", err)
	}
}
Reader.Copy
func ExampleReader_Copy() {
	var (
		f   *os.File
		err error
	)

	if f, err = os.Create("chunk.copy.kir"); err != nil {
		log.Fatal(err)
		return
	}
	defer f.Close()

	if _, err = testReader.Copy(f); err != nil {
		log.Fatalf("Error copying chunk: %v", err)
	}
}
Read
func ExampleRead() {
	var err error
	if err = Read("filename.kir", func(r *Reader) (err error) {
		var m Meta
		m = testReader.Meta()
		fmt.Println("Meta!", m)

		if err = r.ForEach(0, func(b Block) (err error) {
			fmt.Println("Block data:", string(b.Value))
			return
		}); err != nil {
			log.Fatalf("Error iterating through blocks: %v", err)
		}

		return
	}); err != nil {
		log.Fatal(err)
		return
	}
}

Documentation

Index

Constants

View Source
const (
	// ErrConsumerNilSource is returned when a mirror is initialized with a nil source
	ErrConsumerNilSource = errors.Error("mirrors cannot have a nil source")
	// ErrConsumerTransaction is returned when a mirror attempts a transaction
	ErrConsumerTransaction = errors.Error("mirrors cannot perform transactions")
	// ErrConsumerSnapshot is returned when a mirror attempts to snapshot
	ErrConsumerSnapshot = errors.Error("mirrors cannot perform snapshots")
)
View Source
const (
	// ErrEmptyDirectory is returned when a directory is empty
	ErrEmptyDirectory = errors.Error("invalid directory, cannot be empty")
	// ErrEmptyName is returned when a name is empty
	ErrEmptyName = errors.Error("invalid name, cannot be empty")
)
View Source
const (
	// DefaultEndOfResultsDelay is the default value for EndOfResultsDelay
	DefaultEndOfResultsDelay = time.Second * 10
	// DefaultErrorDelay is the default value for ErrorDelay
	DefaultErrorDelay = time.Second * 30
	// DefaultBatchDuration is the default value for BatchDuration
	DefaultBatchDuration = time.Second * 10
)

Variables

View Source
var ErrEmptyBlock = errors.New("invalid block, cannot be empty")

Functions

func NewOneShotConsumer added in v0.11.0

func NewOneShotConsumer(opts Options, src Source, onUpdate UpdateFunc) (err error)

NewOneShotConsumer will initialize a new one-shot Consumer instance with a provided context.Context

func NewOneShotConsumerWithContext added in v0.11.0

func NewOneShotConsumerWithContext(ctx context.Context, opts Options, src Source, onUpdate UpdateFunc) (err error)

NewConsumerWithContext will initialize a new Consumer instance with a provided context.Context

func Read

func Read(filename string, p Processor) (err error)

Read will read a filename and provide a temporary reader

Types

type BatchFn added in v0.10.5

type BatchFn func(*Transaction)

type Block

type Block []byte

Block represents a block of data stored within history

func (Block) MarshalEnkodo

func (b Block) MarshalEnkodo(enc *enkodo.Encoder) (err error)

MarshalEnkodo is a enkodo encoding helper func

func (*Block) UnmarshalEnkodo

func (b *Block) UnmarshalEnkodo(dec *enkodo.Decoder) (err error)

UnmarshalEnkodo is a enkodo decoding helper func

type Consumer added in v0.11.0

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer represents a read-only instance of historical DB entries Note: The mirror is updated through it's Importer

func NewConsumer added in v0.11.0

func NewConsumer(opts Options, src Source, onUpdate UpdateFunc) (mp *Consumer, err error)

NewConsumer will initialize a new Consumer instance

func NewConsumerWithContext added in v0.11.0

func NewConsumerWithContext(ctx context.Context, opts Options, src Source, onUpdate UpdateFunc) (c *Consumer, err error)

NewConsumerWithContext will initialize a new Consumer instance with a provided context.Context

func (*Consumer) Close added in v0.11.0

func (c *Consumer) Close() (err error)

Close will close the selected instance of Kiroku

func (*Consumer) Meta added in v0.11.0

func (c *Consumer) Meta() (meta Meta, err error)

Meta will return a copy of the current Meta

type File added in v0.5.0

type File interface {
	io.Seeker
	io.Reader
	io.ReaderAt
}

type Filename added in v0.11.0

type Filename struct {
	Name      string
	CreatedAt int64
	Filetype  Type
}

func ParseFilename added in v0.12.6

func ParseFilename(filename string) (parsed Filename, err error)

func (Filename) String added in v0.11.0

func (f Filename) String() string

type IOSource added in v0.11.0

type IOSource struct {
	// contains filtered or unexported fields
}

func NewIOSource added in v0.11.0

func NewIOSource(dir string) (ip *IOSource, err error)

func (*IOSource) Export added in v0.11.0

func (i *IOSource) Export(ctx context.Context, prefix, filename string, r io.Reader) (newFilename string, err error)

func (*IOSource) Get added in v0.11.0

func (i *IOSource) Get(ctx context.Context, prefix, filename string, fn func(io.Reader) error) (err error)

func (*IOSource) GetNext added in v0.11.0

func (i *IOSource) GetNext(ctx context.Context, prefix, lastFilename string) (filename string, err error)

func (*IOSource) Import added in v0.11.0

func (i *IOSource) Import(ctx context.Context, prefix, filename string, w io.Writer) (err error)

type Ledger added in v0.4.0

type Ledger interface {
	Meta() (m Meta, err error)
	Transaction(fn func(*Transaction) error) (err error)
	Snapshot(fn func(*Snapshot) error) (err error)
	Filename() (filename string, err error)
	Close() (err error)
}

type Meta

type Meta struct {
	// LastProcessedTimestamp is the last processed timestamp
	LastProcessedTimestamp int64 `json:"lastProcessedTimestamp"`
	LastProcessedType      Type  `json:"type"`
}

Meta represents the historical meta data

func (*Meta) IsEmpty added in v0.11.0

func (m *Meta) IsEmpty() bool

type NOOP added in v0.9.6

type NOOP struct {
}

func (*NOOP) Export added in v0.9.6

func (n *NOOP) Export(ctx context.Context, prefix, filename string, r io.Reader) (newFilename string, err error)

func (*NOOP) Get added in v0.9.6

func (n *NOOP) Get(ctx context.Context, prefix, filename string, fn func(io.Reader) error) error

func (*NOOP) GetNext added in v0.9.6

func (n *NOOP) GetNext(ctx context.Context, prefix, lastFilename string) (filename string, err error)

func (*NOOP) Import added in v0.9.6

func (n *NOOP) Import(ctx context.Context, prefix, filename string, w io.Writer) error

type Options

type Options struct {
	Dir       string `toml:"dir" json:"dir"`
	Name      string `toml:"name" json:"name"`
	Namespace string `toml:"namespace" json:"namespace"`

	OnLog    func(message string)
	OnError  func(err error)
	OnResume func()

	AvoidExportOnClose  bool `toml:"avoid_export_on_close" json:"avoidExportOnClose"`
	AvoidProcessOnClose bool `toml:"avoid_merge_on_close" json:"avoidMergeOnClose"`

	ConsumerFileLimit int64 `toml:"consumer_file_limit" json:"consumerFileLimit"`

	// BatchDuration represents the amount of time to keep a transaction open for a
	// Batch operation
	BatchDuration time.Duration `toml:"batch_duration" json:"batchDuration"`

	// EndOfResultsDelay represents the amount of time to wait before pulling "Next" after
	// receiving empty results (Default is 10 seconds).
	EndOfResultsDelay time.Duration `toml:"end_of_results_delay" json:"endOfResultsDelay"`

	// ErrorDelay represents the amount of time to wait before pulling "Next" after
	// receiving an error
	ErrorDelay time.Duration `toml:"error_delay" json:"errorDelay"`

	// RangeStart will determine the moment in time from which syncs will begin
	RangeStart time.Time `toml:"range_start" json:"rangeStart"`
	// RangeEnd will determine the moment in time from which syncs will end
	// Note: This feature is slated to be implemented within the following
	// release. As of now, this will act as a field placeholder
	RangeEnd time.Time `toml:"range_end" json:"rangeEnd"`
}

Options represent Kiroku options

func MakeOptions added in v0.2.0

func MakeOptions(dir, name string) (o Options)

MakeOptions will create new Options

func (*Options) FullName added in v0.10.1

func (o *Options) FullName() string

func (*Options) Validate added in v0.2.0

func (o *Options) Validate() (err error)

Validate ensures that the Options have all the required fields set

type Processor

type Processor func(*Reader) error

Processor will process chunks

type Producer added in v0.10.5

type Producer struct {
	// contains filtered or unexported fields
}

Producer represents historical DB entries

func NewProducer added in v0.10.5

func NewProducer(o Options, src Source) (kp *Producer, err error)

New will initialize a new Producer instance Note: Processor and Options are optional

func NewProducerWithContext added in v0.10.5

func NewProducerWithContext(ctx context.Context, o Options, src Source) (kp *Producer, err error)

NewWithContext will initialize a new Producer instance with a provided context.Context Note: Processor and Options are optional

func (*Producer) Batch added in v0.10.5

func (p *Producer) Batch(fn BatchFn) (err error)

Batch will engage a new history batch transaction

func (*Producer) BatchBlock added in v0.10.5

func (p *Producer) BatchBlock(value []byte) (err error)

Batch will engage a new history batch transaction

func (*Producer) Close added in v0.10.5

func (p *Producer) Close() (err error)

Close will close the selected instance of Producer

func (*Producer) Meta added in v0.11.0

func (p *Producer) Meta() (meta Meta, err error)

Meta will return a copy of the current Meta

func (*Producer) Snapshot added in v0.11.0

func (p *Producer) Snapshot(fn func(*Snapshot) error) (err error)

Snapshot will engage a new history snapshot

func (*Producer) Transaction added in v0.10.5

func (p *Producer) Transaction(fn TransactionFn) (err error)

Transaction will engage a new history transaction

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader will parse and read a history chunk

func NewReader

func NewReader(f File) (rp *Reader)

NewReader will initialize a new chunk reader

func (*Reader) Copy

func (r *Reader) Copy(destination io.Writer) (n int64, err error)

Copy will copy the entire reader

func (*Reader) ForEach

func (r *Reader) ForEach(seek int64, fn func(Block) error) (err error)

ForEach will iterate through all the blocks within the reader

func (*Reader) ReadSeeker added in v0.1.2

func (r *Reader) ReadSeeker() io.ReadSeeker

ReadSeeker will return the Reader's underlying ReadSeeker

type Snapshot

type Snapshot struct {
	// contains filtered or unexported fields
}

Snapshot manages a Kiroku transaction

func (*Snapshot) Write

func (s *Snapshot) Write(value []byte) (err error)

Write will add a write block to a writer

type Source added in v0.6.0

type Source interface {
	Export(ctx context.Context, prefix, filename string, r io.Reader) (newFilename string, err error)
	Import(ctx context.Context, prefix, filename string, w io.Writer) error
	Get(ctx context.Context, prefix, filename string, fn func(io.Reader) error) error
	GetNext(ctx context.Context, prefix, lastFilename string) (filename string, err error)
}

Source is used for importing

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

Transaction manages a Kiroku transaction

func (*Transaction) Write added in v0.11.0

func (t *Transaction) Write(value []byte) (err error)

AddBlock will add a row

type TransactionFn added in v0.10.5

type TransactionFn func(*Transaction) error

type Type

type Type uint8
const (
	TypeInvalid Type = iota
	TypeChunk
	TypeSnapshot
	TypeTemporary
)

func (Type) MarshalJSON added in v0.11.0

func (t Type) MarshalJSON() (bs []byte, err error)

func (Type) String

func (t Type) String() (out string)

func (*Type) UnmarshalJSON added in v0.11.0

func (t *Type) UnmarshalJSON(bs []byte) (err error)

func (Type) Validate

func (t Type) Validate() (err error)

type UpdateFunc added in v0.11.0

type UpdateFunc func(Type, *Reader) error

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer will write a history chunk

func (*Writer) Close added in v0.1.4

func (w *Writer) Close() (err error)

Close will close a writer

func (*Writer) Write added in v0.11.0

func (w *Writer) Write(value Block) (err error)

AddBlock will add a row

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL