Documentation ¶
Overview ¶
Package stream_sum is an example application consisting of three stages:
- A `chunker` job randomly generates a number of unique "streams", with stream content emitted across a number of interleaved data chunks.
- A `summer` consumer accumulates stream chunks and computes a running SHA1-sum of each stream's content. When the stream is completed, the `summer` consumer emits a final sum to an output journal.
- Having written a complete stream, the `chunker` job confirms that the correct sum is written to the output journal.
The `chunker` and `summer` tasks may be independently scaled, and are invariant to process failures and restarts.
Index ¶
- Constants
- func GenerateAndVerifyStreams(ctx context.Context, cfg *ChunkerConfig) error
- type Chunk
- type ChunkerConfig
- type StreamID
- type Sum
- type Summer
- func (Summer) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope) error
- func (Summer) FinalizeTxn(shard consumer.Shard, store consumer.Store) error
- func (Summer) InitApplication(args runconsumer.InitArgs) error
- func (Summer) NewConfig() runconsumer.Config
- func (Summer) NewMessage(*pb.JournalSpec) (message.Message, error)
- func (Summer) NewStore(shard consumer.Shard, dir string, rec *recoverylog.Recorder) (consumer.Store, error)
Constants ¶
const FinalSumsJournal pb.Journal = "examples/stream-sum/sums"
FinalSumsJournal to which final stream sums are written.
Variables ¶
This section is empty.
Functions ¶
func GenerateAndVerifyStreams ¶
func GenerateAndVerifyStreams(ctx context.Context, cfg *ChunkerConfig) error
GenerateAndVerifyStreams is the main routine of the `chunker` job. It generates and verifies streams based on the ChunkerConfig.
Types ¶
type Chunk ¶
type Chunk struct { ID StreamID // Unique ID of the stream. SeqNo int // Monotonic sequence number, starting from 1. Data []byte // Raw data included in the Value. If empty, this is the stream's final chunk. }
Chunk is an ordered slice of stream content.
type ChunkerConfig ¶
type ChunkerConfig struct { Chunker struct { mbp.ZoneConfig Streams int `long:"streams" default:"-1" description:"Total number of streams to create. <0 for infinite"` Chunks int `long:"chunks" default:"100" description:"Number of chunks per stream"` Delay time.Duration `long:"delay" default:"30s" description:"Maximum delay tolerance for an expected chunk"` } `group:"Chunker" namespace:"chunker" env-namespace:"CHUNKER"` Broker mbp.ClientConfig `group:"Broker" namespace:"broker" env-namespace:"BROKER"` Log mbp.LogConfig `group:"Logging" namespace:"log" env-namespace:"LOG"` Diagnostics mbp.DiagnosticsConfig `group:"Debug" namespace:"debug" env-namespace:"DEBUG"` }
ChunkerConfig is the configuration used by the `chunker` job binary.
type Sum ¶
type Sum struct { ID StreamID // Unique ID of the stream. SeqNo int // SeqNo of last Chunk summed over. Value uint64 // Computed sum through SeqNo. }
Sum represents a partial or final CRC64 sum of a stream.
type Summer ¶
type Summer struct{}
Summer consumes stream chunks, aggregates chunk data, and emits final sums. It implements the runconsumer.Application interface.
func (Summer) ConsumeMessage ¶
func (Summer) ConsumeMessage(shard consumer.Shard, store consumer.Store, env message.Envelope) error
ConsumeMessage folds a Chunk into its respective partial stream sum. If the Chunk represents a stream EOF, it emits a final sum. consumer.Application implementation.
func (Summer) FinalizeTxn ¶
FinalizeTxn marshals partial stream sums to the |store| to ensure persistence across consumer transactions. consumer.Application implementation.
func (Summer) InitApplication ¶
func (Summer) InitApplication(args runconsumer.InitArgs) error
InitApplication is a no-op, as Summer provides no client-facing APIs.
func (Summer) NewConfig ¶
func (Summer) NewConfig() runconsumer.Config
NewConfig returns a new BaseConfig.
func (Summer) NewMessage ¶
NewMessage returns a Chunk message. consumer.Application implementation.