Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var MaxDiffWatermark = logical.TableValuedFunctionDescription{ TypecheckArguments: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionArgumentValue) map[string]logical.TableValuedFunctionTypecheckedArgument { outArgs := make(map[string]logical.TableValuedFunctionTypecheckedArgument) source, mapping := args["source"].(*logical.TableValuedFunctionArgumentValueTable). Typecheck(ctx, env, logicalEnv) outArgs["source"] = logical.TableValuedFunctionTypecheckedArgument{Mapping: mapping, Argument: source} outArgs["max_diff"] = logical.TableValuedFunctionTypecheckedArgument{ Argument: args["max_diff"].(*logical.TableValuedFunctionArgumentValueExpression). Typecheck(ctx, env, logicalEnv), } outArgs["time_field"] = logical.TableValuedFunctionTypecheckedArgument{ Argument: args["time_field"].(*logical.TableValuedFunctionArgumentValueDescriptor). Typecheck(ctx, env, logicalEnv.WithRecordUniqueVariableNames(mapping)), } if _, ok := args["resolution"]; ok { outArgs["resolution"] = logical.TableValuedFunctionTypecheckedArgument{ Argument: args["resolution"].(*logical.TableValuedFunctionArgumentValueExpression). Typecheck(ctx, env, logicalEnv), } } return outArgs }, Descriptors: []logical.TableValuedFunctionDescriptor{ { Arguments: map[string]logical.TableValuedFunctionArgumentMatcher{ "source": { Required: true, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeTable, Table: &logical.TableValuedFunctionArgumentMatcherTable{}, }, "max_diff": { Required: true, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression, Expression: &logical.TableValuedFunctionArgumentMatcherExpression{ Type: octosql.Duration, }, }, "time_field": { Required: true, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeDescriptor, Descriptor: &logical.TableValuedFunctionArgumentMatcherDescriptor{}, }, "resolution": { Required: false, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression, Expression: &logical.TableValuedFunctionArgumentMatcherExpression{ Type: octosql.Duration, }, }, }, OutputSchema: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionTypecheckedArgument) (physical.Schema, map[string]string, error) { source := args["source"].Argument.Table.Table timeField := args["time_field"].Argument.Descriptor.Descriptor timeFieldIndex := -1 for i, field := range source.Schema.Fields { if timeField != field.Name { continue } if field.Type.TypeID != octosql.TypeIDTime { return physical.Schema{}, nil, fmt.Errorf("time_field must reference field with type Time, is %s", field.Type.String()) } timeFieldIndex = i break } if timeFieldIndex == -1 { return physical.Schema{}, nil, fmt.Errorf("no %s field in source stream", timeField) } return physical.Schema{ Fields: source.Schema.Fields, TimeField: timeFieldIndex, NoRetractions: source.Schema.NoRetractions, }, args["source"].Mapping, nil }, Materialize: func(ctx context.Context, env physical.Environment, args map[string]physical.TableValuedFunctionArgument) (execution.Node, error) { source, err := args["source"].Table.Table.Materialize(ctx, env) if err != nil { return nil, fmt.Errorf("couldn't materialize source table: %w", err) } maxDifference, err := args["max_diff"].Expression.Expression.Materialize(ctx, env) if err != nil { return nil, fmt.Errorf("couldn't materialize max_diff: %w", err) } timeField := args["time_field"].Descriptor.Descriptor timeFieldIndex := -1 for i, field := range args["source"].Table.Table.Schema.Fields { if timeField == field.Name { timeFieldIndex = i break } } var resolution execution.Expression = execution.NewConstant(octosql.NewDuration(time.Second)) if arg, ok := args["resolution"]; ok { resolution, err = arg.Expression.Expression.Materialize(ctx, env) if err != nil { return nil, fmt.Errorf("couldn't materialize resolution: %w", err) } } return &maxDifferenceWatermarkGenerator{ source: source, maxDifference: maxDifference, resolution: resolution, timeFieldIndex: timeFieldIndex, }, nil }, }, }, }
View Source
var Poll = logical.TableValuedFunctionDescription{ TypecheckArguments: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionArgumentValue) map[string]logical.TableValuedFunctionTypecheckedArgument { outArgs := make(map[string]logical.TableValuedFunctionTypecheckedArgument) source, mapping := args["source"].(*logical.TableValuedFunctionArgumentValueTable). Typecheck(ctx, env, logicalEnv) outArgs["source"] = logical.TableValuedFunctionTypecheckedArgument{Mapping: mapping, Argument: source} if _, ok := args["poll_interval"]; ok { outArgs["poll_interval"] = logical.TableValuedFunctionTypecheckedArgument{ Argument: args["poll_interval"].(*logical.TableValuedFunctionArgumentValueDescriptor). Typecheck(ctx, env, logicalEnv.WithRecordUniqueVariableNames(mapping)), } } return outArgs }, Descriptors: []logical.TableValuedFunctionDescriptor{ { Arguments: map[string]logical.TableValuedFunctionArgumentMatcher{ "source": { Required: true, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeTable, Table: &logical.TableValuedFunctionArgumentMatcherTable{}, }, "poll_interval": { Required: false, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeDescriptor, Descriptor: &logical.TableValuedFunctionArgumentMatcherDescriptor{}, }, }, OutputSchema: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionTypecheckedArgument) (physical.Schema, map[string]string, error) { source := args["source"].Argument.Table.Table outFields := make([]physical.SchemaField, len(source.Schema.Fields)+1) copy(outFields[1:], source.Schema.Fields) outMapping := make(map[string]string) for k, v := range args["source"].Mapping { outMapping[k] = v } uniqueTime := logicalEnv.GetUnique("time") outMapping["time"] = uniqueTime outFields[0] = physical.SchemaField{ Name: uniqueTime, Type: octosql.Time, } return physical.Schema{ Fields: outFields, TimeField: 0, }, outMapping, nil }, Materialize: func(ctx context.Context, env physical.Environment, args map[string]physical.TableValuedFunctionArgument) (execution.Node, error) { source, err := args["source"].Table.Table.Materialize(ctx, env) if err != nil { return nil, fmt.Errorf("couldn't materialize source table: %w", err) } var interval execution.Expression if intervalExpr, ok := args["poll_interval"]; ok { interval, err = intervalExpr.Expression.Expression.Materialize(ctx, env) if err != nil { return nil, fmt.Errorf("couldn't materialize interval: %w", err) } } else { interval = execution.NewConstant(octosql.NewDuration(time.Second)) } return &poll{ source: source, interval: interval, }, nil }, }, }, }
View Source
var Range = logical.TableValuedFunctionDescription{ TypecheckArguments: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionArgumentValue) map[string]logical.TableValuedFunctionTypecheckedArgument { outArgs := make(map[string]logical.TableValuedFunctionTypecheckedArgument) outArgs["start"] = logical.TableValuedFunctionTypecheckedArgument{ Argument: args["start"].(*logical.TableValuedFunctionArgumentValueExpression).Typecheck(ctx, env, logicalEnv), } outArgs["end"] = logical.TableValuedFunctionTypecheckedArgument{ Argument: args["end"].(*logical.TableValuedFunctionArgumentValueExpression).Typecheck(ctx, env, logicalEnv), } return outArgs }, Descriptors: []logical.TableValuedFunctionDescriptor{ { Arguments: map[string]logical.TableValuedFunctionArgumentMatcher{ "start": { Required: true, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression, Expression: &logical.TableValuedFunctionArgumentMatcherExpression{ Type: octosql.Int, }, }, "end": { Required: true, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression, Expression: &logical.TableValuedFunctionArgumentMatcherExpression{ Type: octosql.Int, }, }, }, OutputSchema: func( ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionTypecheckedArgument, ) (physical.Schema, map[string]string, error) { unique := logicalEnv.GetUnique("i") return physical.Schema{ Fields: []physical.SchemaField{ { Name: unique, Type: octosql.Int, }, }, TimeField: -1, NoRetractions: true, }, map[string]string{ "i": unique, }, nil }, Materialize: func( ctx context.Context, environment physical.Environment, args map[string]physical.TableValuedFunctionArgument, ) (execution.Node, error) { start, err := args["start"].Expression.Expression.Materialize(ctx, environment) if err != nil { return nil, fmt.Errorf("couldn't materialize start: %w", err) } end, err := args["end"].Expression.Expression.Materialize(ctx, environment) if err != nil { return nil, fmt.Errorf("couldn't materialize end: %w", err) } return &rangeNode{ start: start, end: end, }, err }, }, }, }
View Source
var Tumble = logical.TableValuedFunctionDescription{ TypecheckArguments: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionArgumentValue) map[string]logical.TableValuedFunctionTypecheckedArgument { outArgs := make(map[string]logical.TableValuedFunctionTypecheckedArgument) source, mapping := args["source"].(*logical.TableValuedFunctionArgumentValueTable). Typecheck(ctx, env, logicalEnv) outArgs["source"] = logical.TableValuedFunctionTypecheckedArgument{Mapping: mapping, Argument: source} outArgs["window_length"] = logical.TableValuedFunctionTypecheckedArgument{ Argument: args["window_length"].(*logical.TableValuedFunctionArgumentValueExpression). Typecheck(ctx, env, logicalEnv), } if _, ok := args["time_field"]; ok { outArgs["time_field"] = logical.TableValuedFunctionTypecheckedArgument{ Argument: args["time_field"].(*logical.TableValuedFunctionArgumentValueDescriptor). Typecheck(ctx, env, logicalEnv.WithRecordUniqueVariableNames(mapping)), } } if _, ok := args["offset"]; ok { outArgs["offset"] = logical.TableValuedFunctionTypecheckedArgument{ Argument: args["offset"].(*logical.TableValuedFunctionArgumentValueExpression). Typecheck(ctx, env, logicalEnv), } } return outArgs }, Descriptors: []logical.TableValuedFunctionDescriptor{ { Arguments: map[string]logical.TableValuedFunctionArgumentMatcher{ "source": { Required: true, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeTable, Table: &logical.TableValuedFunctionArgumentMatcherTable{}, }, "window_length": { Required: true, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression, Expression: &logical.TableValuedFunctionArgumentMatcherExpression{ Type: octosql.Duration, }, }, "time_field": { Required: false, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeDescriptor, Descriptor: &logical.TableValuedFunctionArgumentMatcherDescriptor{}, }, "offset": { Required: false, TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression, Expression: &logical.TableValuedFunctionArgumentMatcherExpression{ Type: octosql.Duration, }, }, }, OutputSchema: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionTypecheckedArgument) (physical.Schema, map[string]string, error) { source := args["source"].Argument.Table.Table if timeFieldDescriptor, ok := args["time_field"]; ok { timeField := timeFieldDescriptor.Argument.Descriptor.Descriptor found := false for _, field := range source.Schema.Fields { if field.Name != timeField { continue } if field.Type.TypeID != octosql.TypeIDTime { return physical.Schema{}, nil, fmt.Errorf("time_field must reference Time typed field, is %s", field.Type.String()) } found = true break } if !found { return physical.Schema{}, nil, fmt.Errorf("no %s field in source stream", timeField) } } else { if source.Schema.TimeField == -1 { return physical.Schema{}, nil, fmt.Errorf("the source table has no implicit watermarked time field, time_field must be specified explicitly") } } outMapping := make(map[string]string) for k, v := range args["source"].Mapping { outMapping[k] = v } outFields := make([]physical.SchemaField, len(source.Schema.Fields)+2) copy(outFields, source.Schema.Fields) uniqueWindowStart := logicalEnv.GetUnique("window_start") outMapping["window_start"] = uniqueWindowStart outFields[len(source.Schema.Fields)] = physical.SchemaField{ Name: uniqueWindowStart, Type: octosql.Time, } uniqueWindowEnd := logicalEnv.GetUnique("window_end") outMapping["window_end"] = uniqueWindowEnd outFields[len(source.Schema.Fields)+1] = physical.SchemaField{ Name: uniqueWindowEnd, Type: octosql.Time, } return physical.Schema{ Fields: outFields, TimeField: len(source.Schema.Fields) + 1, NoRetractions: source.Schema.NoRetractions, }, outMapping, nil }, Materialize: func(ctx context.Context, env physical.Environment, args map[string]physical.TableValuedFunctionArgument) (execution.Node, error) { source, err := args["source"].Table.Table.Materialize(ctx, env) if err != nil { return nil, fmt.Errorf("couldn't materialize source table: %w", err) } windowLength, err := args["window_length"].Expression.Expression.Materialize(ctx, env) if err != nil { return nil, fmt.Errorf("couldn't materialize window_length: %w", err) } var timeFieldIndex int if timeFieldDescriptor, ok := args["time_field"]; ok { timeField := timeFieldDescriptor.Descriptor.Descriptor for i, field := range args["source"].Table.Table.Schema.Fields { if field.Name == timeField { timeFieldIndex = i break } } } else { timeFieldIndex = args["source"].Table.Table.Schema.TimeField } var offset execution.Expression if offsetExpr, ok := args["offset"]; ok { offset, err = offsetExpr.Expression.Expression.Materialize(ctx, env) if err != nil { return nil, fmt.Errorf("couldn't materialize offset: %w", err) } } else { offset = execution.NewConstant(octosql.NewDuration(0)) } return &tumble{ source: source, timeFieldIndex: timeFieldIndex, windowLength: windowLength, offset: offset, }, nil }, }, }, }
Functions ¶
This section is empty.
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.