Documentation
¶
Index ¶
- type ApplyFunc
- type DoneFunc
- type FilterFunc
- type InitFunc
- type PartionedProjectionBuilder
- func (b *PartionedProjectionBuilder) Apply(fn ApplyFunc) *PartionedProjectionBuilder
- func (b *PartionedProjectionBuilder) Build() *StreamProjection
- func (b *PartionedProjectionBuilder) Done(fn DoneFunc) *PartionedProjectionBuilder
- func (b *PartionedProjectionBuilder) Init(fn InitFunc) *PartionedProjectionBuilder
- func (b *PartionedProjectionBuilder) Persist(name string, obj interface{}) *PartionedProjectionBuilder
- func (b *PartionedProjectionBuilder) ToStream(name string) *PartionedProjectionBuilder
- func (b *PartionedProjectionBuilder) ToStreams(fn SplitToStreamsFunc) *PartionedProjectionBuilder
- type PartitionFunc
- type Projection
- type ProjectionState
- type ReduceFunc
- type SplitToStreamsFunc
- type StreamProjection
- type StreamProjectionBuilder
- func (b *StreamProjectionBuilder) Apply(fn ApplyFunc) *StreamProjectionBuilder
- func (b *StreamProjectionBuilder) Build() *StreamProjection
- func (b *StreamProjectionBuilder) Daily(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
- func (b *StreamProjectionBuilder) Done(fn DoneFunc) *StreamProjectionBuilder
- func (b *StreamProjectionBuilder) Filter(fn FilterFunc) *StreamProjectionBuilder
- func (b *StreamProjectionBuilder) Hourly(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
- func (b *StreamProjectionBuilder) Init(fn InitFunc) *StreamProjectionBuilder
- func (b *StreamProjectionBuilder) Monthly(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
- func (b *StreamProjectionBuilder) PartitionBy(fns ...PartitionFunc) *PartionedProjectionBuilder
- func (b *StreamProjectionBuilder) Persist(name string, obj interface{}) *StreamProjectionBuilder
- func (b *StreamProjectionBuilder) Quarterly(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
- func (b *StreamProjectionBuilder) Reduce(fn ReduceFunc, initialValue interface{}) *StreamProjectionBuilder
- func (b *StreamProjectionBuilder) TimeSeries(tsPartitioner TimePartitioner, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
- func (b *StreamProjectionBuilder) ToStream(name string) *StreamProjectionBuilder
- func (b *StreamProjectionBuilder) ToStreams(fn SplitToStreamsFunc) *StreamProjectionBuilder
- func (b *StreamProjectionBuilder) Weekly(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
- func (b *StreamProjectionBuilder) Yearly(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
- type StreamProjectionEngine
- type TimePartitioner
- type TimePartitionerBase
- type TimeSeriesPartitionFunc
- type TimeSeriesProjectionBuilder
- func (b *TimeSeriesProjectionBuilder) Apply(fn ApplyFunc) *TimeSeriesProjectionBuilder
- func (b *TimeSeriesProjectionBuilder) Build() *StreamProjection
- func (b *TimeSeriesProjectionBuilder) Done(fn DoneFunc) *TimeSeriesProjectionBuilder
- func (b *TimeSeriesProjectionBuilder) Init(fn InitFunc) *TimeSeriesProjectionBuilder
- func (b *TimeSeriesProjectionBuilder) Persist(name string, obj interface{}) *TimeSeriesProjectionBuilder
- func (b *TimeSeriesProjectionBuilder) ToStream(name string) *TimeSeriesProjectionBuilder
- func (b *TimeSeriesProjectionBuilder) ToStreams(fn SplitToStreamsFunc) *TimeSeriesProjectionBuilder
- func (b *TimeSeriesProjectionBuilder) Window(preceeding, following int, format string, fn TimeWindowApplyFunc) *TimeSeriesProjectionBuilder
- type TimeSeriesProjectionState
- type TimeSeriesWindowFunc
- type TimeWindowApplyFunc
- type WindowSlice
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type FilterFunc ¶
type FilterFunc func(msg interface{}) bool
type PartionedProjectionBuilder ¶
type PartionedProjectionBuilder struct { *StreamProjectionBuilder // contains filtered or unexported fields }
func (*PartionedProjectionBuilder) Apply ¶
func (b *PartionedProjectionBuilder) Apply(fn ApplyFunc) *PartionedProjectionBuilder
func (*PartionedProjectionBuilder) Build ¶
func (b *PartionedProjectionBuilder) Build() *StreamProjection
func (*PartionedProjectionBuilder) Done ¶
func (b *PartionedProjectionBuilder) Done(fn DoneFunc) *PartionedProjectionBuilder
func (*PartionedProjectionBuilder) Init ¶
func (b *PartionedProjectionBuilder) Init(fn InitFunc) *PartionedProjectionBuilder
func (*PartionedProjectionBuilder) Persist ¶
func (b *PartionedProjectionBuilder) Persist(name string, obj interface{}) *PartionedProjectionBuilder
func (*PartionedProjectionBuilder) ToStream ¶
func (b *PartionedProjectionBuilder) ToStream(name string) *PartionedProjectionBuilder
func (*PartionedProjectionBuilder) ToStreams ¶
func (b *PartionedProjectionBuilder) ToStreams(fn SplitToStreamsFunc) *PartionedProjectionBuilder
type PartitionFunc ¶
type PartitionFunc func(msg interface{}) (key string, value interface{})
type Projection ¶
type Projection interface {
Run(in streams.Readable) []ProjectionState
}
type ProjectionState ¶
type ProjectionState interface{}
type ReduceFunc ¶
type ReduceFunc func(accumulator interface{}, currentValue interface{}) interface{}
type SplitToStreamsFunc ¶
type SplitToStreamsFunc func(s []ProjectionState) map[string][]ProjectionState
type StreamProjection ¶
type StreamProjection struct { FromStreams []string ToStreams SplitToStreamsFunc Projection Projection PersistTo string PersistObject interface{} }
type StreamProjectionBuilder ¶
type StreamProjectionBuilder struct {
// contains filtered or unexported fields
}
func FromStream ¶
func FromStream(name string) *StreamProjectionBuilder
func FromStreams ¶
func FromStreams(names ...string) *StreamProjectionBuilder
func (*StreamProjectionBuilder) Apply ¶
func (b *StreamProjectionBuilder) Apply(fn ApplyFunc) *StreamProjectionBuilder
func (*StreamProjectionBuilder) Build ¶
func (b *StreamProjectionBuilder) Build() *StreamProjection
func (*StreamProjectionBuilder) Daily ¶
func (b *StreamProjectionBuilder) Daily(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
func (*StreamProjectionBuilder) Done ¶
func (b *StreamProjectionBuilder) Done(fn DoneFunc) *StreamProjectionBuilder
func (*StreamProjectionBuilder) Filter ¶
func (b *StreamProjectionBuilder) Filter(fn FilterFunc) *StreamProjectionBuilder
func (*StreamProjectionBuilder) Hourly ¶
func (b *StreamProjectionBuilder) Hourly(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
func (*StreamProjectionBuilder) Init ¶
func (b *StreamProjectionBuilder) Init(fn InitFunc) *StreamProjectionBuilder
func (*StreamProjectionBuilder) Monthly ¶
func (b *StreamProjectionBuilder) Monthly(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
func (*StreamProjectionBuilder) PartitionBy ¶
func (b *StreamProjectionBuilder) PartitionBy(fns ...PartitionFunc) *PartionedProjectionBuilder
func (*StreamProjectionBuilder) Persist ¶
func (b *StreamProjectionBuilder) Persist(name string, obj interface{}) *StreamProjectionBuilder
func (*StreamProjectionBuilder) Quarterly ¶
func (b *StreamProjectionBuilder) Quarterly(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
func (*StreamProjectionBuilder) Reduce ¶
func (b *StreamProjectionBuilder) Reduce(fn ReduceFunc, initialValue interface{}) *StreamProjectionBuilder
func (*StreamProjectionBuilder) TimeSeries ¶
func (b *StreamProjectionBuilder) TimeSeries(tsPartitioner TimePartitioner, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
func (*StreamProjectionBuilder) ToStream ¶
func (b *StreamProjectionBuilder) ToStream(name string) *StreamProjectionBuilder
func (*StreamProjectionBuilder) ToStreams ¶
func (b *StreamProjectionBuilder) ToStreams(fn SplitToStreamsFunc) *StreamProjectionBuilder
func (*StreamProjectionBuilder) Weekly ¶
func (b *StreamProjectionBuilder) Weekly(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
func (*StreamProjectionBuilder) Yearly ¶
func (b *StreamProjectionBuilder) Yearly(fn TimeSeriesPartitionFunc, fns ...PartitionFunc) *TimeSeriesProjectionBuilder
type StreamProjectionEngine ¶
type StreamProjectionEngine interface { SetLogger(logger log.Logger) Register(streamProjection *StreamProjection) }
func New ¶
func New(bus streams.Bus, persister streams.StreamPersister) StreamProjectionEngine
type TimePartitioner ¶
type TimePartitioner interface { Partition(msg interface{}) time.Time GetFormat() string FillMissingValues(data []*timeProjectionState) []*timeProjectionState Window(preceeding, following int, data []interface{}) []*WindowSlice }
type TimePartitionerBase ¶
type TimePartitionerBase struct { ExtractTimeFn TimeSeriesPartitionFunc StepFn func(time.Time) time.Time Format string }
func NewTimeSeriesPartitionerBase ¶
func NewTimeSeriesPartitionerBase(extractTimeFn TimeSeriesPartitionFunc, stepFn func(time.Time) time.Time, format string) *TimePartitionerBase
func (*TimePartitionerBase) FillMissingValues ¶
func (p *TimePartitionerBase) FillMissingValues(data []*timeProjectionState) []*timeProjectionState
func (*TimePartitionerBase) GetFormat ¶
func (p *TimePartitionerBase) GetFormat() string
func (*TimePartitionerBase) Window ¶
func (p *TimePartitionerBase) Window(preceeding, following int, data []interface{}) []*WindowSlice
type TimeSeriesPartitionFunc ¶
type TimeSeriesProjectionBuilder ¶
type TimeSeriesProjectionBuilder struct { *PartionedProjectionBuilder // contains filtered or unexported fields }
func (*TimeSeriesProjectionBuilder) Apply ¶
func (b *TimeSeriesProjectionBuilder) Apply(fn ApplyFunc) *TimeSeriesProjectionBuilder
func (*TimeSeriesProjectionBuilder) Build ¶
func (b *TimeSeriesProjectionBuilder) Build() *StreamProjection
func (*TimeSeriesProjectionBuilder) Done ¶
func (b *TimeSeriesProjectionBuilder) Done(fn DoneFunc) *TimeSeriesProjectionBuilder
func (*TimeSeriesProjectionBuilder) Init ¶
func (b *TimeSeriesProjectionBuilder) Init(fn InitFunc) *TimeSeriesProjectionBuilder
func (*TimeSeriesProjectionBuilder) Persist ¶
func (b *TimeSeriesProjectionBuilder) Persist(name string, obj interface{}) *TimeSeriesProjectionBuilder
func (*TimeSeriesProjectionBuilder) ToStream ¶
func (b *TimeSeriesProjectionBuilder) ToStream(name string) *TimeSeriesProjectionBuilder
func (*TimeSeriesProjectionBuilder) ToStreams ¶
func (b *TimeSeriesProjectionBuilder) ToStreams(fn SplitToStreamsFunc) *TimeSeriesProjectionBuilder
func (*TimeSeriesProjectionBuilder) Window ¶
func (b *TimeSeriesProjectionBuilder) Window(preceeding, following int, format string, fn TimeWindowApplyFunc) *TimeSeriesProjectionBuilder
type TimeSeriesProjectionState ¶
type TimeSeriesProjectionState struct {
ProjectionState
}
type TimeSeriesWindowFunc ¶
type TimeWindowApplyFunc ¶
type TimeWindowApplyFunc interface{}
type WindowSlice ¶
type WindowSlice struct {
// contains filtered or unexported fields
}
Click to show internal directories.
Click to hide internal directories.