stream_sum

package
v0.82.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 4, 2019 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package stream_sum is an example application consisting of three stages:

  1. A `chunker` job randomly generates a number of unique "streams", with stream content emitted across a number of interleaved data chunks.
  1. A `summer` consumer accumulates stream chunks and computes a running SHA1-sum of each stream's content. When the stream is completed, the `summer` consumer emits a final sum to an output journal.
  1. Having written a complete stream, the `chunker` job confirms that the correct sum is written to the output journal.

The `chunker` and `summer` tasks may be independently scaled, and are invariant to process failures and restarts.

Index

Constants

View Source
const FinalSumsJournal pb.Journal = "examples/stream-sum/sums"

FinalSumsJournal to which final stream sums are written.

Variables

This section is empty.

Functions

func GenerateAndVerifyStreams

func GenerateAndVerifyStreams(ctx context.Context, cfg *ChunkerConfig) error

GenerateAndVerifyStreams is the main routine of the `chunker` job. It generates and verifies streams based on the ChunkerConfig.

Types

type Chunk

type Chunk struct {
	ID    StreamID // Unique ID of the stream.
	SeqNo int      // Monotonic sequence number, starting from 1.
	Data  []byte   // Raw data included in the Value. If empty, this is the stream's final chunk.
}

Chunk is an ordered slice of stream content.

type ChunkerConfig

type ChunkerConfig struct {
	Chunker struct {
		mbp.ZoneConfig
		Streams int           `long:"streams" default:"-1" description:"Total number of streams to create. <0 for infinite"`
		Chunks  int           `long:"chunks" default:"100" description:"Number of chunks per stream"`
		Delay   time.Duration `long:"delay" default:"30s" description:"Maximum delay tolerance for an expected chunk"`
	} `group:"Chunker" namespace:"chunker" env-namespace:"CHUNKER"`

	Broker      mbp.ClientConfig      `group:"Broker" namespace:"broker" env-namespace:"BROKER"`
	Log         mbp.LogConfig         `group:"Logging" namespace:"log" env-namespace:"LOG"`
	Diagnostics mbp.DiagnosticsConfig `group:"Debug" namespace:"debug" env-namespace:"DEBUG"`
}

ChunkerConfig is the configuration used by the `chunker` job binary.

type StreamID

type StreamID [16]byte

StreamID uniquely identifies a stream.

type Sum

type Sum struct {
	ID    StreamID // Unique ID of the stream.
	SeqNo int      // SeqNo of last Chunk summed over.
	Value uint64   // Computed sum through SeqNo.
}

Sum represents a partial or final CRC64 sum of a stream.

func (*Sum) Update

func (s *Sum) Update(chunk Chunk) (done bool, err error)

Update folds a Chunk into this Sum, returning whether this is the last Chunk of the Stream. Update requires that SeqNo be totally ordered, however replays of previous SeqNo are ignored.

type Summer

type Summer struct{}

Summer consumes stream chunks, aggregates chunk data, and emits final sums. It implements the runconsumer.Application interface.

func (Summer) ConsumeMessage

func (Summer) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope) error

ConsumeMessage folds a Chunk into its respective partial stream sum. If the Chunk represents a stream EOF, it emits a final sum. consumer.Application implementation.

func (Summer) FinalizeTxn

func (Summer) FinalizeTxn(shard consumer.Shard, store consumer.Store) error

FinalizeTxn marshals partial stream sums to the |store| to ensure persistence across consumer transactions. consumer.Application implementation.

func (Summer) InitApplication

func (Summer) InitApplication(args runconsumer.InitArgs) error

InitApplication is a no-op, as Summer provides no client-facing APIs.

func (Summer) NewConfig

func (Summer) NewConfig() runconsumer.Config

NewConfig returns a new BaseConfig.

func (Summer) NewMessage

func (Summer) NewMessage(*pb.JournalSpec) (message.Message, error)

NewMessage returns a Chunk message. consumer.Application implementation.

func (Summer) NewStore

func (Summer) NewStore(shard consumer.Shard, dir string, rec *recoverylog.Recorder) (consumer.Store, error)

NewStore builds a RocksDB store for the Shard. consumer.Application implementation.

Directories

Path Synopsis
Package summer runs the stream_sum.Summer consumer.
Package summer runs the stream_sum.Summer consumer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL