Documentation ¶
Index ¶
- Variables
- func AreStreamsEqual(ctx context.Context, first, second RecordStream) error
- func AreStreamsEqualNoOrdering(ctx context.Context, stateStorage storage.Storage, first, second RecordStream, ...) error
- func AreStreamsEqualNoOrderingWithCount(ctx context.Context, stateStorage storage.Storage, first, second RecordStream, ...) error
- func AreStreamsEqualNoOrderingWithIDCheck(ctx context.Context, stateStorage storage.Storage, ...) error
- func AreStreamsEqualNoOrderingWithRetractionReductionAndIDChecking(ctx context.Context, stateStorage storage.Storage, got, want RecordStream, ...) error
- func AreStreamsEqualWithOrdering(ctx context.Context, stateStorage storage.Storage, first, second RecordStream) error
- func DefaultEquality(record1 *Record, record2 *Record) error
- func EqualityOfAll(fs ...RecordEqualityFunc) func(record1 *Record, record2 *Record) error
- func EqualityOfEventTimeField(record1 *Record, record2 *Record) error
- func EqualityOfEverythingButIDs(record1 *Record, record2 *Record) error
- func EqualityOfFieldsAndValues(record1 *Record, record2 *Record) error
- func EqualityOfID(record1 *Record, record2 *Record) error
- func EqualityOfUndo(record1 *Record, record2 *Record) error
- func GetAndStartAllShuffles(ctx context.Context, stateStorage storage.Storage, rootStreamID *StreamID, ...) ([]RecordStream, []*ExecutionOutput, error)
- func GetRawStringID() string
- func GetSourceStringID(tx storage.StateTransaction, inputName octosql.Value) (string, error)
- func NewErrWaitForChanges(subscription *storage.Subscription) error
- func ParseType(str string) octosql.Value
- func SystemField(field string) octosql.VariableName
- type Aggregate
- type AggregatePrototype
- type AliasedExpression
- type And
- type AreEqualConfig
- type AreEqualOpt
- type BatchSizeManager
- func (bsm *BatchSizeManager) CommitAborted()
- func (bsm *BatchSizeManager) CommitSuccessful()
- func (bsm *BatchSizeManager) CommitTooBig()
- func (bsm *BatchSizeManager) MarkRecordsProcessed(count int)
- func (bsm *BatchSizeManager) RecordsLeftToTake() int
- func (bsm *BatchSizeManager) Reset()
- func (bsm *BatchSizeManager) ShouldTakeNextRecord() bool
- type Constant
- type ConstantStrategy
- type ConstantStrategyPrototype
- type ConstantValue
- type CountingTrigger
- type Datatype
- type DelayTrigger
- type Distinct
- type DistinctStream
- type DummyNode
- type Equal
- type ErrWaitForChanges
- type ExecutionOutput
- type Expression
- type Field
- type Filter
- type FilteredStream
- type Formula
- type Function
- type FunctionExpression
- type GetTestStreamOption
- type GreaterEqual
- type GroupBy
- type GroupByStream
- type HashMap
- type In
- type InMemoryStream
- type IntermediateRecordStore
- type Iterator
- type JobOutputQueueIntermediateRecordStore
- func (j *JobOutputQueueIntermediateRecordStore) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, ...) error
- func (j *JobOutputQueueIntermediateRecordStore) Close(ctx context.Context, storage storage.Storage) error
- func (j *JobOutputQueueIntermediateRecordStore) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
- func (j *JobOutputQueueIntermediateRecordStore) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error
- func (j *JobOutputQueueIntermediateRecordStore) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error
- func (j *JobOutputQueueIntermediateRecordStore) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)
- func (j *JobOutputQueueIntermediateRecordStore) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error
- func (j *JobOutputQueueIntermediateRecordStore) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)
- func (j *JobOutputQueueIntermediateRecordStore) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
- type JoinType
- type JoinedStream
- type KeyHashingStrategy
- type KeyHashingStrategyPrototype
- type LessEqual
- type LessThan
- type Like
- type LogicExpression
- type LookupJoin
- type LookupJoinStream
- func (rs *LookupJoinStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, ...) error
- func (rs *LookupJoinStream) Close(ctx context.Context, storage storage.Storage) error
- func (rs *LookupJoinStream) GetNextRecord(ctx context.Context, tx storage.StateTransaction) (*Record, error)
- func (rs *LookupJoinStream) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
- func (rs *LookupJoinStream) HandleControlMessages(ctx context.Context, tx storage.StateTransaction) error
- func (rs *LookupJoinStream) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error
- func (rs *LookupJoinStream) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error
- func (rs *LookupJoinStream) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)
- func (rs *LookupJoinStream) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error
- func (rs *LookupJoinStream) RunScheduler(ctx context.Context)
- func (rs *LookupJoinStream) RunWorker(ctx context.Context, id *RecordID) error
- func (j *LookupJoinStream) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)
- func (rs *LookupJoinStream) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
- type Map
- type MappedStream
- type Metadata
- func (*Metadata) Descriptor() ([]byte, []int)
- func (m *Metadata) GetEventTimeField() string
- func (m *Metadata) GetId() *RecordID
- func (m *Metadata) GetUndo() bool
- func (*Metadata) ProtoMessage()
- func (m *Metadata) Reset()
- func (m *Metadata) String() string
- func (m *Metadata) XXX_DiscardUnknown()
- func (m *Metadata) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Metadata) XXX_Merge(src proto.Message)
- func (m *Metadata) XXX_Size() int
- func (m *Metadata) XXX_Unmarshal(b []byte) error
- type MoreThan
- type MultiTrigger
- type NamedExpression
- type NextShuffleMetadataChange
- type Node
- type NodeExpression
- type Not
- type NotEqual
- type NotIn
- type Or
- type OrderBy
- type OrderByKey
- type OrderByStream
- type OrderDirection
- type OutputOptions
- type OutputQueue
- type PipelineMetadata
- type Predicate
- type ProcessByKey
- func (p *ProcessByKey) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, ...) error
- func (p *ProcessByKey) Close(ctx context.Context, storage storage.Storage) error
- func (p *ProcessByKey) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
- func (p *ProcessByKey) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error
- func (p *ProcessByKey) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error
- func (p *ProcessByKey) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)
- func (p *ProcessByKey) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error
- func (p *ProcessByKey) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)
- func (p *ProcessByKey) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
- type ProcessFunction
- type PullEngine
- type QueueElement
- func (*QueueElement) Descriptor() ([]byte, []int)
- func (m *QueueElement) GetEndOfStream() bool
- func (m *QueueElement) GetError() string
- func (m *QueueElement) GetRecord() *Record
- func (m *QueueElement) GetType() isQueueElement_Type
- func (m *QueueElement) GetWatermark() *timestamp.Timestamp
- func (*QueueElement) ProtoMessage()
- func (m *QueueElement) Reset()
- func (m *QueueElement) String() string
- func (m *QueueElement) XXX_DiscardUnknown()
- func (m *QueueElement) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *QueueElement) XXX_Merge(src proto.Message)
- func (*QueueElement) XXX_OneofWrappers() []interface{}
- func (m *QueueElement) XXX_Size() int
- func (m *QueueElement) XXX_Unmarshal(b []byte) error
- type QueueElement_EndOfStream
- type QueueElement_Error
- type QueueElement_Record
- type QueueElement_Watermark
- type Record
- func NewRecord(fields []octosql.VariableName, data map[octosql.VariableName]octosql.Value, ...) *Record
- func NewRecordFromRecord(record *Record, opts ...RecordOption) *Record
- func NewRecordFromSlice(fields []octosql.VariableName, data []octosql.Value, opts ...RecordOption) *Record
- func NewRecordFromSliceWithNormalize(fields []octosql.VariableName, data []interface{}, opts ...RecordOption) *Record
- func Normalize(rec *Record) *Record
- func ReadAll(ctx context.Context, stateStorage storage.Storage, stream RecordStream) ([]*Record, error)
- func ReadAllWithCount(ctx context.Context, stateStorage storage.Storage, stream RecordStream, ...) ([]*Record, error)
- func (r *Record) AsTuple() octosql.Value
- func (r *Record) AsVariables() octosql.Variables
- func (*Record) Descriptor() ([]byte, []int)
- func (r *Record) Equal(other *Record) bool
- func (r *Record) EventTime() octosql.Value
- func (r *Record) EventTimeField() octosql.VariableName
- func (r *Record) Fields() []Field
- func (m *Record) GetData() []*octosql.Value
- func (m *Record) GetFieldNames() []string
- func (m *Record) GetMetadata() *Metadata
- func (r *Record) GetVariableNames() []octosql.VariableName
- func (r *Record) Hash() (uint64, error)
- func (r *Record) ID() *RecordID
- func (r *Record) IsUndo() bool
- func (*Record) ProtoMessage()
- func (m *Record) Reset()
- func (r *Record) Show() string
- func (r *Record) ShowFields() []Field
- func (m *Record) String() string
- func (r *Record) Value(field octosql.VariableName) octosql.Value
- func (m *Record) XXX_DiscardUnknown()
- func (m *Record) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *Record) XXX_Merge(src proto.Message)
- func (m *Record) XXX_Size() int
- func (m *Record) XXX_Unmarshal(b []byte) error
- type RecordEqualityFunc
- type RecordExpression
- type RecordID
- func (id *RecordID) AsPrefix() []byte
- func (*RecordID) Descriptor() ([]byte, []int)
- func (m *RecordID) GetID() string
- func (id *RecordID) MonotonicMarshal() []byte
- func (id *RecordID) MonotonicUnmarshal(data []byte) error
- func (*RecordID) ProtoMessage()
- func (m *RecordID) Reset()
- func (id RecordID) Show() string
- func (m *RecordID) String() string
- func (m *RecordID) XXX_DiscardUnknown()
- func (m *RecordID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *RecordID) XXX_Merge(src proto.Message)
- func (m *RecordID) XXX_Size() int
- func (m *RecordID) XXX_Unmarshal(b []byte) error
- type RecordOption
- type RecordStream
- type Regexp
- type Relation
- type RequalifiedStream
- type Requalifier
- type Shuffle
- type ShuffleData
- type ShuffleID
- func (id *ShuffleID) AsMapKey() string
- func (id *ShuffleID) AsPrefix() []byte
- func (*ShuffleID) Descriptor() ([]byte, []int)
- func (m *ShuffleID) GetId() string
- func (*ShuffleID) ProtoMessage()
- func (m *ShuffleID) Reset()
- func (m *ShuffleID) String() string
- func (m *ShuffleID) XXX_DiscardUnknown()
- func (m *ShuffleID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *ShuffleID) XXX_Merge(src proto.Message)
- func (m *ShuffleID) XXX_Size() int
- func (m *ShuffleID) XXX_Unmarshal(b []byte) error
- type ShuffleReceiver
- type ShuffleSender
- func (node *ShuffleSender) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, ...) error
- func (node *ShuffleSender) Close(ctx context.Context, storage storage.Storage) error
- func (node *ShuffleSender) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
- func (node *ShuffleSender) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error
- func (node *ShuffleSender) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error
- func (node *ShuffleSender) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)
- func (node *ShuffleSender) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error
- func (node *ShuffleSender) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)
- func (node *ShuffleSender) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
- type ShuffleStrategy
- type ShuffleStrategyPrototype
- type StarExpression
- type StreamID
- func (id *StreamID) AsPrefix() []byte
- func (*StreamID) Descriptor() ([]byte, []int)
- func (m *StreamID) GetId() string
- func (*StreamID) ProtoMessage()
- func (m *StreamID) Reset()
- func (m *StreamID) String() string
- func (m *StreamID) XXX_DiscardUnknown()
- func (m *StreamID) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
- func (m *StreamID) XXX_Merge(src proto.Message)
- func (m *StreamID) XXX_Size() int
- func (m *StreamID) XXX_Unmarshal(b []byte) error
- type StreamJoin
- type Task
- type Trigger
- type TriggerPrototype
- type TupleExpression
- type UnionWatermarkGenerator
- type Validator
- type Variable
- type WatermarkSource
- type WatermarkTrigger
- type ZeroWatermarkGenerator
Constants ¶
This section is empty.
Variables ¶
var ErrEndOfStream = errors.New("end of stream")
var ErrNewTransactionRequired = fmt.Errorf("new transaction required")
var MaxWatermark = time.Date(9999, 1, 1, 0, 0, 0, 0, time.UTC)
Based on protocol buffer max timestamp value.
var SystemSource string = "sys"
Functions ¶
func AreStreamsEqual ¶
func AreStreamsEqual(ctx context.Context, first, second RecordStream) error
func AreStreamsEqualNoOrdering ¶
func AreStreamsEqualNoOrdering(ctx context.Context, stateStorage storage.Storage, first, second RecordStream, opts ...AreEqualOpt) error
func AreStreamsEqualNoOrderingWithCount ¶ added in v0.3.0
func AreStreamsEqualNoOrderingWithIDCheck ¶ added in v0.3.0
func AreStreamsEqualNoOrderingWithIDCheck(ctx context.Context, stateStorage storage.Storage, gotStream, wantStream RecordStream, opts ...AreEqualOpt) error
func AreStreamsEqualNoOrderingWithRetractionReductionAndIDChecking ¶ added in v0.3.0
func AreStreamsEqualNoOrderingWithRetractionReductionAndIDChecking(ctx context.Context, stateStorage storage.Storage, got, want RecordStream, opts ...AreEqualOpt) error
func AreStreamsEqualWithOrdering ¶ added in v0.3.0
func DefaultEquality ¶ added in v0.3.0
func EqualityOfAll ¶ added in v0.3.0
func EqualityOfAll(fs ...RecordEqualityFunc) func(record1 *Record, record2 *Record) error
func EqualityOfEventTimeField ¶ added in v0.3.0
func EqualityOfEverythingButIDs ¶ added in v0.3.0
func EqualityOfFieldsAndValues ¶ added in v0.3.0
func EqualityOfID ¶ added in v0.3.0
func EqualityOfUndo ¶ added in v0.3.0
func GetAndStartAllShuffles ¶ added in v0.3.0
func GetAndStartAllShuffles(ctx context.Context, stateStorage storage.Storage, rootStreamID *StreamID, nodes []Node, variables octosql.Variables) ([]RecordStream, []*ExecutionOutput, error)
This is used to start the whole plan. It starts each phase (separated by shuffles) one by one and takes care to properly pass shuffle ID's to shuffle receivers and senders.
func GetRawStringID ¶ added in v0.3.0
func GetRawStringID() string
func GetSourceStringID ¶ added in v0.3.0
GetSourceStreamID loads the StreamID of the given input stream in case it exists (from a previous run maybe?) Otherwise it allocates a new StreamID and saves it.
func NewErrWaitForChanges ¶ added in v0.3.0
func NewErrWaitForChanges(subscription *storage.Subscription) error
func ParseType ¶
ParseType tries to parse the given string into any type it succeeds to. Returns back the string on failure.
func SystemField ¶ added in v0.3.0
func SystemField(field string) octosql.VariableName
Types ¶
type Aggregate ¶
type Aggregate interface { docs.Documented AddValue(ctx context.Context, tx storage.StateTransaction, value octosql.Value) error RetractValue(ctx context.Context, tx storage.StateTransaction, value octosql.Value) error GetValue(ctx context.Context, tx storage.StateTransaction) (octosql.Value, error) String() string }
type AggregatePrototype ¶
type AggregatePrototype func() Aggregate
type AliasedExpression ¶
type AliasedExpression struct {
// contains filtered or unexported fields
}
func NewAliasedExpression ¶
func NewAliasedExpression(name octosql.VariableName, expr Expression) *AliasedExpression
func (*AliasedExpression) ExpressionValue ¶
func (*AliasedExpression) Name ¶
func (alExpr *AliasedExpression) Name() octosql.VariableName
type AreEqualConfig ¶ added in v0.3.0
type AreEqualConfig struct {
Equality RecordEqualityFunc
}
type AreEqualOpt ¶ added in v0.3.0
type AreEqualOpt func(*AreEqualConfig)
func WithEqualityBasedOn ¶ added in v0.3.0
func WithEqualityBasedOn(fs ...RecordEqualityFunc) AreEqualOpt
type BatchSizeManager ¶ added in v0.3.0
type BatchSizeManager struct {
// contains filtered or unexported fields
}
The batch size manager decides if a batch should take more records. It tries to satisfy the target latency and will try not to ever surpass it. It will also grow the batch size on successful commit by at least 1. In case the commit is too big to finalize, it will drastically reduce the batch size.
func NewBatchSizeManager ¶ added in v0.3.0
func NewBatchSizeManager(latencyTarget time.Duration) *BatchSizeManager
func (*BatchSizeManager) CommitAborted ¶ added in v0.3.0
func (bsm *BatchSizeManager) CommitAborted()
func (*BatchSizeManager) CommitSuccessful ¶ added in v0.3.0
func (bsm *BatchSizeManager) CommitSuccessful()
func (*BatchSizeManager) CommitTooBig ¶ added in v0.3.0
func (bsm *BatchSizeManager) CommitTooBig()
func (*BatchSizeManager) MarkRecordsProcessed ¶ added in v0.3.0
func (bsm *BatchSizeManager) MarkRecordsProcessed(count int)
You can use this to process records in a way other than one by one.
func (*BatchSizeManager) RecordsLeftToTake ¶ added in v0.3.0
func (bsm *BatchSizeManager) RecordsLeftToTake() int
You can use this to process records in a way other than one by one.
func (*BatchSizeManager) Reset ¶ added in v0.3.0
func (bsm *BatchSizeManager) Reset()
func (*BatchSizeManager) ShouldTakeNextRecord ¶ added in v0.3.0
func (bsm *BatchSizeManager) ShouldTakeNextRecord() bool
type ConstantStrategy ¶ added in v0.3.0
type ConstantStrategy struct {
// contains filtered or unexported fields
}
func (*ConstantStrategy) CalculatePartition ¶ added in v0.3.0
type ConstantStrategyPrototype ¶ added in v0.3.0
type ConstantStrategyPrototype struct {
// contains filtered or unexported fields
}
func (*ConstantStrategyPrototype) Get ¶ added in v0.3.0
func (s *ConstantStrategyPrototype) Get(ctx context.Context, variables octosql.Variables) (ShuffleStrategy, error)
type ConstantValue ¶ added in v0.3.0
type ConstantValue struct {
// contains filtered or unexported fields
}
func NewConstantValue ¶ added in v0.3.0
func NewConstantValue(value octosql.Value) *ConstantValue
func (*ConstantValue) ExpressionValue ¶ added in v0.3.0
type CountingTrigger ¶ added in v0.3.0
type CountingTrigger struct {
// contains filtered or unexported fields
}
func NewCountingTrigger ¶ added in v0.3.0
func NewCountingTrigger(count Expression) *CountingTrigger
type DelayTrigger ¶ added in v0.3.0
type DelayTrigger struct {
// contains filtered or unexported fields
}
func NewDelayTrigger ¶ added in v0.3.0
func NewDelayTrigger(delay Expression) *DelayTrigger
type Distinct ¶
type Distinct struct {
// contains filtered or unexported fields
}
func NewDistinct ¶
type DistinctStream ¶
type DistinctStream struct {
// contains filtered or unexported fields
}
type DummyNode ¶
type DummyNode struct {
// contains filtered or unexported fields
}
func NewDummyNode ¶
type ErrWaitForChanges ¶ added in v0.3.0
type ErrWaitForChanges struct {
*storage.Subscription
}
func GetErrWaitForChanges ¶ added in v0.3.0
func GetErrWaitForChanges(err error) *ErrWaitForChanges
func (*ErrWaitForChanges) Error ¶ added in v0.3.0
func (e *ErrWaitForChanges) Error() string
type ExecutionOutput ¶ added in v0.3.0
type ExecutionOutput struct { // Watermark source is the highest (in the execution tree) // watermark source available, which the record consumer should consume. WatermarkSource WatermarkSource // Next shuffles contains information about the next shuffles down the execution plan // which need to be started. NextShuffles map[string]ShuffleData // Tasks to run are functions which need to be run asynchronously, // after the storage initialization has been committed (and will thus be available for reading). TasksToRun []Task }
This struct represents additional metadata to be returned with Get() and used recursively (like WatermarkSource)
func NewExecutionOutput ¶ added in v0.3.0
func NewExecutionOutput(ws WatermarkSource, nextShuffles map[string]ShuffleData, tasksToRun []Task) *ExecutionOutput
type Expression ¶
type Expression interface {
ExpressionValue(ctx context.Context, variables octosql.Variables) (octosql.Value, error)
}
func NewRecordExpression ¶ added in v0.3.0
func NewRecordExpression() Expression
type Field ¶
type Field struct {
Name octosql.VariableName
}
type FilteredStream ¶
type FilteredStream struct {
// contains filtered or unexported fields
}
type Function ¶
type Function struct { Name string ArgumentNames [][]string Description docs.Documentation Validator Validator Logic func(...octosql.Value) (octosql.Value, error) }
func (*Function) Document ¶
func (f *Function) Document() docs.Documentation
type FunctionExpression ¶
type FunctionExpression struct {
// contains filtered or unexported fields
}
func NewFunctionExpression ¶
func NewFunctionExpression(fun *Function, args []Expression) *FunctionExpression
func (*FunctionExpression) ExpressionValue ¶
type GetTestStreamOption ¶ added in v0.3.0
type GetTestStreamOption func(*StreamID)
func GetTestStreamWithStreamID ¶ added in v0.3.0
func GetTestStreamWithStreamID(id *StreamID) GetTestStreamOption
type GreaterEqual ¶
type GreaterEqual struct { }
func (*GreaterEqual) Apply ¶
func (rel *GreaterEqual) Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)
type GroupBy ¶
type GroupBy struct {
// contains filtered or unexported fields
}
func NewGroupBy ¶
func NewGroupBy(storage storage.Storage, source Node, key []Expression, fields []octosql.VariableName, aggregatePrototypes []AggregatePrototype, eventTimeField octosql.VariableName, as []octosql.VariableName, outEventTimeField octosql.VariableName, triggerPrototype TriggerPrototype) *GroupBy
type GroupByStream ¶
type GroupByStream struct {
// contains filtered or unexported fields
}
type HashMap ¶
type HashMap struct {
// contains filtered or unexported fields
}
func NewHashMap ¶
func NewHashMap() *HashMap
func (*HashMap) GetIterator ¶
type InMemoryStream ¶
type InMemoryStream struct {
// contains filtered or unexported fields
}
func NewInMemoryStream ¶
func NewInMemoryStream(ctx context.Context, data []*Record) *InMemoryStream
type IntermediateRecordStore ¶ added in v0.3.0
type IntermediateRecordStore interface { // ReadyForMore is used to check if the intermediate record store is able to consume more data. // This allows it to communicate back-pressure. ReadyForMore(ctx context.Context, tx storage.StateTransaction) error AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error Next(ctx context.Context, tx storage.StateTransaction) (*Record, error) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error MarkError(ctx context.Context, tx storage.StateTransaction, err error) error Close(ctx context.Context, storage storage.Storage) error }
type JobOutputQueueIntermediateRecordStore ¶ added in v0.3.0
type JobOutputQueueIntermediateRecordStore struct {
// contains filtered or unexported fields
}
func (*JobOutputQueueIntermediateRecordStore) AddRecord ¶ added in v0.3.0
func (j *JobOutputQueueIntermediateRecordStore) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error
func (*JobOutputQueueIntermediateRecordStore) GetWatermark ¶ added in v0.3.0
func (j *JobOutputQueueIntermediateRecordStore) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
func (*JobOutputQueueIntermediateRecordStore) MarkEndOfStream ¶ added in v0.3.0
func (j *JobOutputQueueIntermediateRecordStore) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error
func (*JobOutputQueueIntermediateRecordStore) MarkError ¶ added in v0.3.0
func (j *JobOutputQueueIntermediateRecordStore) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error
func (*JobOutputQueueIntermediateRecordStore) Next ¶ added in v0.3.0
func (j *JobOutputQueueIntermediateRecordStore) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)
func (*JobOutputQueueIntermediateRecordStore) ReadyForMore ¶ added in v0.3.0
func (j *JobOutputQueueIntermediateRecordStore) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error
func (*JobOutputQueueIntermediateRecordStore) TriggerKeys ¶ added in v0.3.0
func (j *JobOutputQueueIntermediateRecordStore) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)
func (*JobOutputQueueIntermediateRecordStore) UpdateWatermark ¶ added in v0.3.0
func (j *JobOutputQueueIntermediateRecordStore) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
type JoinedStream ¶ added in v0.3.0
type JoinedStream struct {
// contains filtered or unexported fields
}
type KeyHashingStrategy ¶ added in v0.3.0
type KeyHashingStrategy struct {
// contains filtered or unexported fields
}
func (*KeyHashingStrategy) CalculatePartition ¶ added in v0.3.0
func (s *KeyHashingStrategy) CalculatePartition(ctx context.Context, record *Record, outputs int) (int, error)
TODO: The key should really be calculated by the preceding map. Like all group by values.
type KeyHashingStrategyPrototype ¶ added in v0.3.0
type KeyHashingStrategyPrototype struct {
// contains filtered or unexported fields
}
func (*KeyHashingStrategyPrototype) Get ¶ added in v0.3.0
func (s *KeyHashingStrategyPrototype) Get(ctx context.Context, variables octosql.Variables) (ShuffleStrategy, error)
type LogicExpression ¶
type LogicExpression struct {
// contains filtered or unexported fields
}
func NewLogicExpression ¶
func NewLogicExpression(formula Formula) *LogicExpression
func (*LogicExpression) ExpressionValue ¶
type LookupJoin ¶ added in v0.3.0
type LookupJoin struct {
// contains filtered or unexported fields
}
func NewLookupJoin ¶ added in v0.3.0
func (*LookupJoin) Get ¶ added in v0.3.0
func (node *LookupJoin) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)
type LookupJoinStream ¶ added in v0.3.0
type LookupJoinStream struct {
// contains filtered or unexported fields
}
func (*LookupJoinStream) AddRecord ¶ added in v0.3.0
func (rs *LookupJoinStream) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error
func (*LookupJoinStream) GetNextRecord ¶ added in v0.3.0
func (rs *LookupJoinStream) GetNextRecord(ctx context.Context, tx storage.StateTransaction) (*Record, error)
func (*LookupJoinStream) GetWatermark ¶ added in v0.3.0
func (rs *LookupJoinStream) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
func (*LookupJoinStream) HandleControlMessages ¶ added in v0.3.0
func (rs *LookupJoinStream) HandleControlMessages(ctx context.Context, tx storage.StateTransaction) error
func (*LookupJoinStream) MarkEndOfStream ¶ added in v0.3.0
func (rs *LookupJoinStream) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error
func (*LookupJoinStream) MarkError ¶ added in v0.3.0
func (rs *LookupJoinStream) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error
func (*LookupJoinStream) Next ¶ added in v0.3.0
func (rs *LookupJoinStream) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)
func (*LookupJoinStream) ReadyForMore ¶ added in v0.3.0
func (rs *LookupJoinStream) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error
func (*LookupJoinStream) RunScheduler ¶ added in v0.3.0
func (rs *LookupJoinStream) RunScheduler(ctx context.Context)
The scheduler takes records from the toBeJoined queue, and starts jobs to do joins. Control messages (records too, to satisfy the initial ordering of messages) are put on a controlMessages queue, where they will be handled by the receiver.
func (*LookupJoinStream) RunWorker ¶ added in v0.3.0
func (rs *LookupJoinStream) RunWorker(ctx context.Context, id *RecordID) error
The worker drives streams to completion, puts received records to output queues scoped by record id. In the end, it puts an EndOfStream message on the queue.
func (*LookupJoinStream) TriggerKeys ¶ added in v0.3.0
func (j *LookupJoinStream) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)
func (*LookupJoinStream) UpdateWatermark ¶ added in v0.3.0
func (rs *LookupJoinStream) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
type MappedStream ¶
type MappedStream struct {
// contains filtered or unexported fields
}
type Metadata ¶ added in v0.3.0
type Metadata struct { Id *RecordID `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` Undo bool `protobuf:"varint,2,opt,name=undo,proto3" json:"undo,omitempty"` EventTimeField string `protobuf:"bytes,3,opt,name=eventTimeField,proto3" json:"eventTimeField,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*Metadata) Descriptor ¶ added in v0.3.0
func (*Metadata) GetEventTimeField ¶ added in v0.3.0
func (*Metadata) ProtoMessage ¶ added in v0.3.0
func (*Metadata) ProtoMessage()
func (*Metadata) XXX_DiscardUnknown ¶ added in v0.3.0
func (m *Metadata) XXX_DiscardUnknown()
func (*Metadata) XXX_Marshal ¶ added in v0.3.0
func (*Metadata) XXX_Unmarshal ¶ added in v0.3.0
type MultiTrigger ¶ added in v0.3.0
type MultiTrigger struct {
// contains filtered or unexported fields
}
func NewMultiTrigger ¶ added in v0.3.0
func NewMultiTrigger(triggers ...TriggerPrototype) *MultiTrigger
type NamedExpression ¶
type NamedExpression interface { Expression Name() octosql.VariableName }
type NextShuffleMetadataChange ¶ added in v0.3.0
func NewNextShuffleMetadataChange ¶ added in v0.3.0
func NewNextShuffleMetadataChange(shuffleIDAddSuffix string, partition int, source Node) *NextShuffleMetadataChange
func (*NextShuffleMetadataChange) Get ¶ added in v0.3.0
func (n *NextShuffleMetadataChange) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)
type Node ¶
type Node interface {
Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)
}
type NodeExpression ¶
type NodeExpression struct {
// contains filtered or unexported fields
}
func NewNodeExpression ¶
func NewNodeExpression(node Node, stateStorage storage.Storage) *NodeExpression
func (*NodeExpression) ExpressionValue ¶
type OrderBy ¶
type OrderBy struct {
// contains filtered or unexported fields
}
func NewOrderBy ¶
func NewOrderBy(storage storage.Storage, source Node, exprs []Expression, directions []OrderDirection, eventTimeField octosql.VariableName, triggerPrototype TriggerPrototype) *OrderBy
type OrderByKey ¶ added in v0.3.0
type OrderByKey struct {
// contains filtered or unexported fields
}
func NewOrderByKey ¶ added in v0.3.0
func NewOrderByKey(key []byte) *OrderByKey
func (*OrderByKey) MonotonicMarshal ¶ added in v0.3.0
func (k *OrderByKey) MonotonicMarshal() []byte
func (*OrderByKey) MonotonicUnmarshal ¶ added in v0.3.0
func (k *OrderByKey) MonotonicUnmarshal(data []byte) error
type OrderByStream ¶ added in v0.3.0
type OrderByStream struct {
// contains filtered or unexported fields
}
type OrderDirection ¶
type OrderDirection string
const ( Ascending OrderDirection = "asc" Descending OrderDirection = "desc" )
type OutputOptions ¶ added in v0.3.0
type OutputOptions struct { OrderByExpressions []Expression OrderByDirections []OrderDirection Limit Expression Offset Expression }
func NewOutputOptions ¶ added in v0.3.0
func NewOutputOptions( orderByExpressions []Expression, orderByDirections []OrderDirection, limit Expression, offset Expression, ) *OutputOptions
type OutputQueue ¶ added in v0.3.0
type OutputQueue struct {
// contains filtered or unexported fields
}
func NewOutputQueue ¶ added in v0.3.0
func NewOutputQueue(tx storage.StateTransaction) *OutputQueue
type PipelineMetadata ¶ added in v0.3.0
type Predicate ¶
type Predicate struct { Left Expression Relation Relation Right Expression }
func NewPredicate ¶
func NewPredicate(left Expression, relation Relation, right Expression) *Predicate
type ProcessByKey ¶ added in v0.3.0
type ProcessByKey struct {
// contains filtered or unexported fields
}
func (*ProcessByKey) AddRecord ¶ added in v0.3.0
func (p *ProcessByKey) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error
func (*ProcessByKey) GetWatermark ¶ added in v0.3.0
func (p *ProcessByKey) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
func (*ProcessByKey) MarkEndOfStream ¶ added in v0.3.0
func (p *ProcessByKey) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error
func (*ProcessByKey) MarkError ¶ added in v0.3.0
func (p *ProcessByKey) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error
func (*ProcessByKey) Next ¶ added in v0.3.0
func (p *ProcessByKey) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)
func (*ProcessByKey) ReadyForMore ¶ added in v0.3.0
func (p *ProcessByKey) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error
func (*ProcessByKey) TriggerKeys ¶ added in v0.3.0
func (p *ProcessByKey) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)
func (*ProcessByKey) UpdateWatermark ¶ added in v0.3.0
func (p *ProcessByKey) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
type ProcessFunction ¶ added in v0.3.0
type PullEngine ¶ added in v0.3.0
type PullEngine struct {
// contains filtered or unexported fields
}
func NewPullEngine ¶ added in v0.3.0
func NewPullEngine(irs IntermediateRecordStore, storage storage.Storage, sources []RecordStream, streamID *StreamID, watermarkSource WatermarkSource, shouldPrefixStreamID bool, ctx context.Context) *PullEngine
func (*PullEngine) GetWatermark ¶ added in v0.3.0
func (engine *PullEngine) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
func (*PullEngine) Next ¶ added in v0.3.0
func (engine *PullEngine) Next(ctx context.Context) (*Record, error)
func (*PullEngine) Run ¶ added in v0.3.0
func (engine *PullEngine) Run()
type QueueElement ¶ added in v0.3.0
type QueueElement struct { // Types that are valid to be assigned to Type: // *QueueElement_Record // *QueueElement_Watermark // *QueueElement_EndOfStream // *QueueElement_Error Type isQueueElement_Type `protobuf_oneof:"type"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (*QueueElement) Descriptor ¶ added in v0.3.0
func (*QueueElement) Descriptor() ([]byte, []int)
func (*QueueElement) GetEndOfStream ¶ added in v0.3.0
func (m *QueueElement) GetEndOfStream() bool
func (*QueueElement) GetError ¶ added in v0.3.0
func (m *QueueElement) GetError() string
func (*QueueElement) GetRecord ¶ added in v0.3.0
func (m *QueueElement) GetRecord() *Record
func (*QueueElement) GetType ¶ added in v0.3.0
func (m *QueueElement) GetType() isQueueElement_Type
func (*QueueElement) GetWatermark ¶ added in v0.3.0
func (m *QueueElement) GetWatermark() *timestamp.Timestamp
func (*QueueElement) ProtoMessage ¶ added in v0.3.0
func (*QueueElement) ProtoMessage()
func (*QueueElement) Reset ¶ added in v0.3.0
func (m *QueueElement) Reset()
func (*QueueElement) String ¶ added in v0.3.0
func (m *QueueElement) String() string
func (*QueueElement) XXX_DiscardUnknown ¶ added in v0.3.0
func (m *QueueElement) XXX_DiscardUnknown()
func (*QueueElement) XXX_Marshal ¶ added in v0.3.0
func (m *QueueElement) XXX_Marshal(b []byte, deterministic bool) ([]byte, error)
func (*QueueElement) XXX_Merge ¶ added in v0.3.0
func (m *QueueElement) XXX_Merge(src proto.Message)
func (*QueueElement) XXX_OneofWrappers ¶ added in v0.3.0
func (*QueueElement) XXX_OneofWrappers() []interface{}
XXX_OneofWrappers is for the internal use of the proto package.
func (*QueueElement) XXX_Size ¶ added in v0.3.0
func (m *QueueElement) XXX_Size() int
func (*QueueElement) XXX_Unmarshal ¶ added in v0.3.0
func (m *QueueElement) XXX_Unmarshal(b []byte) error
type QueueElement_EndOfStream ¶ added in v0.3.0
type QueueElement_EndOfStream struct {
EndOfStream bool `protobuf:"varint,3,opt,name=endOfStream,proto3,oneof"`
}
type QueueElement_Error ¶ added in v0.3.0
type QueueElement_Error struct {
Error string `protobuf:"bytes,4,opt,name=error,proto3,oneof"`
}
type QueueElement_Record ¶ added in v0.3.0
type QueueElement_Record struct {
Record *Record `protobuf:"bytes,1,opt,name=record,proto3,oneof"`
}
type QueueElement_Watermark ¶ added in v0.3.0
type Record ¶
type Record struct { Metadata *Metadata `protobuf:"bytes,1,opt,name=metadata,proto3" json:"metadata,omitempty"` FieldNames []string `protobuf:"bytes,2,rep,name=fieldNames,proto3" json:"fieldNames,omitempty"` Data []*octosql.Value `protobuf:"bytes,3,rep,name=data,proto3" json:"data,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func NewRecord ¶
func NewRecord(fields []octosql.VariableName, data map[octosql.VariableName]octosql.Value, opts ...RecordOption) *Record
func NewRecordFromRecord ¶ added in v0.3.0
func NewRecordFromRecord(record *Record, opts ...RecordOption) *Record
func NewRecordFromSlice ¶
func NewRecordFromSlice(fields []octosql.VariableName, data []octosql.Value, opts ...RecordOption) *Record
func NewRecordFromSliceWithNormalize ¶
func NewRecordFromSliceWithNormalize(fields []octosql.VariableName, data []interface{}, opts ...RecordOption) *Record
func ReadAllWithCount ¶ added in v0.3.0
func (*Record) AsVariables ¶
func (*Record) Descriptor ¶ added in v0.3.0
func (*Record) EventTimeField ¶ added in v0.3.0
func (r *Record) EventTimeField() octosql.VariableName
func (*Record) GetFieldNames ¶ added in v0.3.0
func (*Record) GetMetadata ¶ added in v0.3.0
func (*Record) GetVariableNames ¶ added in v0.3.0
func (r *Record) GetVariableNames() []octosql.VariableName
func (*Record) ProtoMessage ¶ added in v0.3.0
func (*Record) ProtoMessage()
func (*Record) ShowFields ¶ added in v0.3.0
func (*Record) XXX_DiscardUnknown ¶ added in v0.3.0
func (m *Record) XXX_DiscardUnknown()
func (*Record) XXX_Marshal ¶ added in v0.3.0
func (*Record) XXX_Unmarshal ¶ added in v0.3.0
type RecordEqualityFunc ¶ added in v0.3.0
type RecordExpression ¶ added in v0.3.0
type RecordExpression struct{}
func (*RecordExpression) ExpressionValue ¶ added in v0.3.0
type RecordID ¶ added in v0.3.0
type RecordID struct { ID string `protobuf:"bytes,1,opt,name=ID,proto3" json:"ID,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func NewRecordID ¶ added in v0.3.0
GetRandomRecordID can be used to get a new random RecordID.
func NewRecordIDFromStreamIDWithOffset ¶ added in v0.3.0
NewRecordIDFromStreamIDWithOffset can be used to get a new RecordID deterministically based on the streamID and record offset.
func (*RecordID) AsPrefix ¶ added in v0.3.0
This is a helper function to use a record ID as a storage prefix.
func (*RecordID) Descriptor ¶ added in v0.3.0
func (*RecordID) MonotonicMarshal ¶ added in v0.3.0
func (*RecordID) MonotonicUnmarshal ¶ added in v0.3.0
func (*RecordID) ProtoMessage ¶ added in v0.3.0
func (*RecordID) ProtoMessage()
func (*RecordID) XXX_DiscardUnknown ¶ added in v0.3.0
func (m *RecordID) XXX_DiscardUnknown()
func (*RecordID) XXX_Marshal ¶ added in v0.3.0
func (*RecordID) XXX_Unmarshal ¶ added in v0.3.0
type RecordOption ¶ added in v0.2.0
type RecordOption func(stream *Record)
func WithEventTimeField ¶ added in v0.3.0
func WithEventTimeField(field octosql.VariableName) RecordOption
func WithID ¶ added in v0.3.0
func WithID(id *RecordID) RecordOption
func WithMetadataFrom ¶ added in v0.2.0
func WithMetadataFrom(base *Record) RecordOption
func WithNoUndo ¶ added in v0.3.0
func WithNoUndo() RecordOption
func WithUndo ¶ added in v0.2.0
func WithUndo() RecordOption
type RecordStream ¶
type RecordStream interface { Next(ctx context.Context) (*Record, error) Close(ctx context.Context, storage storage.Storage) error }
func GetTestStream ¶ added in v0.3.0
func GetTestStream(t *testing.T, stateStorage storage.Storage, variables octosql.Variables, node Node, opts ...GetTestStreamOption) RecordStream
type Relation ¶
type Relation interface {
Apply(ctx context.Context, variables octosql.Variables, left, right Expression) (bool, error)
}
func NewGreaterEqual ¶
func NewGreaterEqual() Relation
func NewLessEqual ¶
func NewLessEqual() Relation
func NewLessThan ¶
func NewLessThan() Relation
func NewMoreThan ¶
func NewMoreThan() Relation
func NewNotEqual ¶
func NewNotEqual() Relation
type RequalifiedStream ¶
type RequalifiedStream struct {
// contains filtered or unexported fields
}
type Requalifier ¶
type Requalifier struct {
// contains filtered or unexported fields
}
func NewRequalifier ¶
func NewRequalifier(qualifier string, child Node) *Requalifier
func (*Requalifier) Get ¶
func (node *Requalifier) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)
type Shuffle ¶ added in v0.3.0
type Shuffle struct {
// contains filtered or unexported fields
}
func NewShuffle ¶ added in v0.3.0
func NewShuffle(outputPartitionCount int, strategyPrototype ShuffleStrategyPrototype, sources []Node) *Shuffle
type ShuffleData ¶ added in v0.3.0
type ShuffleID ¶ added in v0.3.0
type ShuffleID struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func GetSourceShuffleID ¶ added in v0.3.0
func NewShuffleID ¶ added in v0.3.0
func (*ShuffleID) Descriptor ¶ added in v0.3.0
func (*ShuffleID) ProtoMessage ¶ added in v0.3.0
func (*ShuffleID) ProtoMessage()
func (*ShuffleID) XXX_DiscardUnknown ¶ added in v0.3.0
func (m *ShuffleID) XXX_DiscardUnknown()
func (*ShuffleID) XXX_Marshal ¶ added in v0.3.0
func (*ShuffleID) XXX_Unmarshal ¶ added in v0.3.0
type ShuffleReceiver ¶ added in v0.3.0
type ShuffleReceiver struct {
// contains filtered or unexported fields
}
ShuffleReceiver is a RecordStream abstraction on a shuffle and receives records from it for a partition.
func NewShuffleReceiver ¶ added in v0.3.0
func NewShuffleReceiver(streamID *StreamID, shuffleID *ShuffleID, sourcePartitionCount int, partition int) *ShuffleReceiver
func (*ShuffleReceiver) GetWatermark ¶ added in v0.3.0
func (rs *ShuffleReceiver) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
type ShuffleSender ¶ added in v0.3.0
type ShuffleSender struct {
// contains filtered or unexported fields
}
ShuffleSender is used to send data to a shuffle from a given partition.
func NewShuffleSender ¶ added in v0.3.0
func NewShuffleSender(streamID *StreamID, shuffleID *ShuffleID, shuffleStrategy ShuffleStrategy, outputPartitionCount int, partition int) *ShuffleSender
func (*ShuffleSender) AddRecord ¶ added in v0.3.0
func (node *ShuffleSender) AddRecord(ctx context.Context, tx storage.StateTransaction, inputIndex int, record *Record) error
func (*ShuffleSender) GetWatermark ¶ added in v0.3.0
func (node *ShuffleSender) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
func (*ShuffleSender) MarkEndOfStream ¶ added in v0.3.0
func (node *ShuffleSender) MarkEndOfStream(ctx context.Context, tx storage.StateTransaction) error
func (*ShuffleSender) MarkError ¶ added in v0.3.0
func (node *ShuffleSender) MarkError(ctx context.Context, tx storage.StateTransaction, err error) error
func (*ShuffleSender) Next ¶ added in v0.3.0
func (node *ShuffleSender) Next(ctx context.Context, tx storage.StateTransaction) (*Record, error)
func (*ShuffleSender) ReadyForMore ¶ added in v0.3.0
func (node *ShuffleSender) ReadyForMore(ctx context.Context, tx storage.StateTransaction) error
func (*ShuffleSender) TriggerKeys ¶ added in v0.3.0
func (node *ShuffleSender) TriggerKeys(ctx context.Context, tx storage.StateTransaction, batchSize int) (int, error)
func (*ShuffleSender) UpdateWatermark ¶ added in v0.3.0
func (node *ShuffleSender) UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error
type ShuffleStrategy ¶ added in v0.3.0
type ShuffleStrategy interface { // Return output partition index based on the record and output partition count. CalculatePartition(ctx context.Context, record *Record, outputs int) (int, error) }
func NewConstantStrategy ¶ added in v0.3.0
func NewConstantStrategy(partition int) ShuffleStrategy
func NewKeyHashingStrategy ¶ added in v0.3.0
func NewKeyHashingStrategy(variables octosql.Variables, key []Expression) ShuffleStrategy
type ShuffleStrategyPrototype ¶ added in v0.3.0
type ShuffleStrategyPrototype interface {
Get(ctx context.Context, variables octosql.Variables) (ShuffleStrategy, error)
}
func NewConstantStrategyPrototype ¶ added in v0.3.0
func NewConstantStrategyPrototype(partition int) ShuffleStrategyPrototype
func NewKeyHashingStrategyPrototype ¶ added in v0.3.0
func NewKeyHashingStrategyPrototype(key []Expression) ShuffleStrategyPrototype
type StarExpression ¶ added in v0.3.0
type StarExpression struct {
// contains filtered or unexported fields
}
func NewStarExpression ¶ added in v0.3.0
func NewStarExpression(qualifier string) *StarExpression
func (*StarExpression) ExpressionValue ¶ added in v0.3.0
func (*StarExpression) Fields ¶ added in v0.3.0
func (se *StarExpression) Fields(variables octosql.Variables) []octosql.VariableName
func (*StarExpression) Name ¶ added in v0.3.0
func (se *StarExpression) Name() octosql.VariableName
type StreamID ¶ added in v0.3.0
type StreamID struct { Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
StreamID is a unique identifier for a RecordStream node. This StreamID should prefix all state storage keys this node uses.
func GetRawStreamID ¶ added in v0.3.0
func GetRawStreamID() *StreamID
GetRawStreamID can be used to get a new random StreamID without saving it.
func GetSourceStreamID ¶ added in v0.3.0
func NewStreamID ¶ added in v0.3.0
NewStreamID can be used to create a StreamID without saving it.
func (*StreamID) AsPrefix ¶ added in v0.3.0
A RecordStream node should use its StreamID as a prefix to all storage operations. This is a helper function to make that easier.
func (*StreamID) Descriptor ¶ added in v0.3.0
func (*StreamID) ProtoMessage ¶ added in v0.3.0
func (*StreamID) ProtoMessage()
func (*StreamID) XXX_DiscardUnknown ¶ added in v0.3.0
func (m *StreamID) XXX_DiscardUnknown()
func (*StreamID) XXX_Marshal ¶ added in v0.3.0
func (*StreamID) XXX_Unmarshal ¶ added in v0.3.0
type StreamJoin ¶ added in v0.3.0
type StreamJoin struct {
// contains filtered or unexported fields
}
func NewStreamJoin ¶ added in v0.3.0
func NewStreamJoin(leftSource, rightSource Node, leftKey, rightKey []Expression, storage storage.Storage, eventTimeField octosql.VariableName, joinType JoinType, triggerPrototype TriggerPrototype) *StreamJoin
func (*StreamJoin) Get ¶ added in v0.3.0
func (node *StreamJoin) Get(ctx context.Context, variables octosql.Variables, streamID *StreamID) (RecordStream, *ExecutionOutput, error)
type Trigger ¶ added in v0.3.0
type Trigger interface { docs.Documented RecordReceived(ctx context.Context, tx storage.StateTransaction, key octosql.Value, eventTime time.Time) error UpdateWatermark(ctx context.Context, tx storage.StateTransaction, watermark time.Time) error PollKeysToFire(ctx context.Context, tx storage.StateTransaction, batchSize int) ([]octosql.Value, error) KeysFired(ctx context.Context, tx storage.StateTransaction, key []octosql.Value) error }
type TriggerPrototype ¶ added in v0.3.0
type TupleExpression ¶
type TupleExpression struct {
// contains filtered or unexported fields
}
func NewTuple ¶
func NewTuple(expressions []Expression) *TupleExpression
func (*TupleExpression) ExpressionValue ¶
type UnionWatermarkGenerator ¶ added in v0.3.0
type UnionWatermarkGenerator struct {
// contains filtered or unexported fields
}
func NewUnionWatermarkGenerator ¶ added in v0.3.0
func NewUnionWatermarkGenerator(sources []WatermarkSource) *UnionWatermarkGenerator
func (*UnionWatermarkGenerator) GetWatermark ¶ added in v0.3.0
func (uwg *UnionWatermarkGenerator) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
type Variable ¶
type Variable struct {
// contains filtered or unexported fields
}
func NewVariable ¶
func NewVariable(name octosql.VariableName) *Variable
func (*Variable) ExpressionValue ¶
func (*Variable) Name ¶
func (v *Variable) Name() octosql.VariableName
type WatermarkSource ¶ added in v0.3.0
type WatermarkTrigger ¶ added in v0.3.0
type WatermarkTrigger struct { }
func NewWatermarkTrigger ¶ added in v0.3.0
func NewWatermarkTrigger() *WatermarkTrigger
type ZeroWatermarkGenerator ¶ added in v0.3.0
type ZeroWatermarkGenerator struct { }
func NewZeroWatermarkGenerator ¶ added in v0.3.0
func NewZeroWatermarkGenerator() *ZeroWatermarkGenerator
func (*ZeroWatermarkGenerator) GetWatermark ¶ added in v0.3.0
func (s *ZeroWatermarkGenerator) GetWatermark(ctx context.Context, tx storage.StateTransaction) (time.Time, error)
Source Files ¶
- batch.go
- distinct.go
- engine.go
- execution.go
- execution.pb.go
- filter.go
- function.go
- group_by.go
- hashmap.go
- in_memory_stream.go
- logic.go
- lookup_join.go
- map.go
- order_by.go
- output_queue.go
- process.go
- process.pb.go
- record.go
- record.pb.go
- relation.go
- requalifier.go
- shuffle.go
- shuffle.pb.go
- shuffle_strategy.go
- sourcestorage.go
- stream_join.go
- test_utils.go
- trigger.go
- types.go