table_valued_functions

package
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2022 License: MPL-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var MaxDiffWatermark = logical.TableValuedFunctionDescription{
	TypecheckArguments: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionArgumentValue) map[string]logical.TableValuedFunctionTypecheckedArgument {
		outArgs := make(map[string]logical.TableValuedFunctionTypecheckedArgument)

		source, mapping := args["source"].(*logical.TableValuedFunctionArgumentValueTable).
			Typecheck(ctx, env, logicalEnv)
		outArgs["source"] = logical.TableValuedFunctionTypecheckedArgument{Mapping: mapping, Argument: source}

		outArgs["max_diff"] = logical.TableValuedFunctionTypecheckedArgument{
			Argument: args["max_diff"].(*logical.TableValuedFunctionArgumentValueExpression).
				Typecheck(ctx, env, logicalEnv),
		}
		outArgs["time_field"] = logical.TableValuedFunctionTypecheckedArgument{
			Argument: args["time_field"].(*logical.TableValuedFunctionArgumentValueDescriptor).
				Typecheck(ctx, env, logicalEnv.WithRecordUniqueVariableNames(mapping)),
		}
		if _, ok := args["resolution"]; ok {
			outArgs["resolution"] = logical.TableValuedFunctionTypecheckedArgument{
				Argument: args["resolution"].(*logical.TableValuedFunctionArgumentValueExpression).
					Typecheck(ctx, env, logicalEnv),
			}
		}

		return outArgs
	},
	Descriptors: []logical.TableValuedFunctionDescriptor{
		{
			Arguments: map[string]logical.TableValuedFunctionArgumentMatcher{
				"source": {
					Required:                               true,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeTable,
					Table:                                  &logical.TableValuedFunctionArgumentMatcherTable{},
				},
				"max_diff": {
					Required:                               true,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression,
					Expression: &logical.TableValuedFunctionArgumentMatcherExpression{
						Type: octosql.Duration,
					},
				},
				"time_field": {
					Required:                               true,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeDescriptor,
					Descriptor:                             &logical.TableValuedFunctionArgumentMatcherDescriptor{},
				},
				"resolution": {
					Required:                               false,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression,
					Expression: &logical.TableValuedFunctionArgumentMatcherExpression{
						Type: octosql.Duration,
					},
				},
			},
			OutputSchema: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionTypecheckedArgument) (physical.Schema, map[string]string, error) {
				source := args["source"].Argument.Table.Table
				timeField := args["time_field"].Argument.Descriptor.Descriptor
				timeFieldIndex := -1
				for i, field := range source.Schema.Fields {
					if timeField != field.Name {
						continue
					}
					if field.Type.TypeID != octosql.TypeIDTime {
						return physical.Schema{}, nil, fmt.Errorf("time_field must reference field with type Time, is %s", field.Type.String())
					}
					timeFieldIndex = i
					break
				}
				if timeFieldIndex == -1 {
					return physical.Schema{}, nil, fmt.Errorf("no %s field in source stream", timeField)
				}
				return physical.Schema{
					Fields:        source.Schema.Fields,
					TimeField:     timeFieldIndex,
					NoRetractions: source.Schema.NoRetractions,
				}, args["source"].Mapping, nil
			},
			Materialize: func(ctx context.Context, env physical.Environment, args map[string]physical.TableValuedFunctionArgument) (execution.Node, error) {
				source, err := args["source"].Table.Table.Materialize(ctx, env)
				if err != nil {
					return nil, fmt.Errorf("couldn't materialize source table: %w", err)
				}
				maxDifference, err := args["max_diff"].Expression.Expression.Materialize(ctx, env)
				if err != nil {
					return nil, fmt.Errorf("couldn't materialize max_diff: %w", err)
				}

				timeField := args["time_field"].Descriptor.Descriptor
				timeFieldIndex := -1
				for i, field := range args["source"].Table.Table.Schema.Fields {
					if timeField == field.Name {
						timeFieldIndex = i
						break
					}
				}
				var resolution execution.Expression = execution.NewConstant(octosql.NewDuration(time.Second))
				if arg, ok := args["resolution"]; ok {
					resolution, err = arg.Expression.Expression.Materialize(ctx, env)
					if err != nil {
						return nil, fmt.Errorf("couldn't materialize resolution: %w", err)
					}
				}

				return &maxDifferenceWatermarkGenerator{
					source:         source,
					maxDifference:  maxDifference,
					resolution:     resolution,
					timeFieldIndex: timeFieldIndex,
				}, nil
			},
		},
	},
}
View Source
var Poll = logical.TableValuedFunctionDescription{
	TypecheckArguments: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionArgumentValue) map[string]logical.TableValuedFunctionTypecheckedArgument {
		outArgs := make(map[string]logical.TableValuedFunctionTypecheckedArgument)

		source, mapping := args["source"].(*logical.TableValuedFunctionArgumentValueTable).
			Typecheck(ctx, env, logicalEnv)
		outArgs["source"] = logical.TableValuedFunctionTypecheckedArgument{Mapping: mapping, Argument: source}

		if _, ok := args["poll_interval"]; ok {
			outArgs["poll_interval"] = logical.TableValuedFunctionTypecheckedArgument{
				Argument: args["poll_interval"].(*logical.TableValuedFunctionArgumentValueDescriptor).
					Typecheck(ctx, env, logicalEnv.WithRecordUniqueVariableNames(mapping)),
			}
		}

		return outArgs
	},
	Descriptors: []logical.TableValuedFunctionDescriptor{
		{
			Arguments: map[string]logical.TableValuedFunctionArgumentMatcher{
				"source": {
					Required:                               true,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeTable,
					Table:                                  &logical.TableValuedFunctionArgumentMatcherTable{},
				},
				"poll_interval": {
					Required:                               false,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeDescriptor,
					Descriptor:                             &logical.TableValuedFunctionArgumentMatcherDescriptor{},
				},
			},
			OutputSchema: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionTypecheckedArgument) (physical.Schema, map[string]string, error) {
				source := args["source"].Argument.Table.Table

				outFields := make([]physical.SchemaField, len(source.Schema.Fields)+1)
				copy(outFields[1:], source.Schema.Fields)

				outMapping := make(map[string]string)
				for k, v := range args["source"].Mapping {
					outMapping[k] = v
				}

				uniqueTime := logicalEnv.GetUnique("time")
				outMapping["time"] = uniqueTime
				outFields[0] = physical.SchemaField{
					Name: uniqueTime,
					Type: octosql.Time,
				}

				return physical.Schema{
					Fields:    outFields,
					TimeField: 0,
				}, outMapping, nil
			},
			Materialize: func(ctx context.Context, env physical.Environment, args map[string]physical.TableValuedFunctionArgument) (execution.Node, error) {
				source, err := args["source"].Table.Table.Materialize(ctx, env)
				if err != nil {
					return nil, fmt.Errorf("couldn't materialize source table: %w", err)
				}
				var interval execution.Expression
				if intervalExpr, ok := args["poll_interval"]; ok {
					interval, err = intervalExpr.Expression.Expression.Materialize(ctx, env)
					if err != nil {
						return nil, fmt.Errorf("couldn't materialize interval: %w", err)
					}
				} else {
					interval = execution.NewConstant(octosql.NewDuration(time.Second))
				}

				return &poll{
					source:   source,
					interval: interval,
				}, nil
			},
		},
	},
}
View Source
var Range = logical.TableValuedFunctionDescription{
	TypecheckArguments: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionArgumentValue) map[string]logical.TableValuedFunctionTypecheckedArgument {
		outArgs := make(map[string]logical.TableValuedFunctionTypecheckedArgument)
		outArgs["start"] = logical.TableValuedFunctionTypecheckedArgument{
			Argument: args["start"].(*logical.TableValuedFunctionArgumentValueExpression).Typecheck(ctx, env, logicalEnv),
		}
		outArgs["end"] = logical.TableValuedFunctionTypecheckedArgument{
			Argument: args["end"].(*logical.TableValuedFunctionArgumentValueExpression).Typecheck(ctx, env, logicalEnv),
		}
		return outArgs
	},
	Descriptors: []logical.TableValuedFunctionDescriptor{
		{
			Arguments: map[string]logical.TableValuedFunctionArgumentMatcher{
				"start": {
					Required:                               true,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression,
					Expression: &logical.TableValuedFunctionArgumentMatcherExpression{
						Type: octosql.Int,
					},
				},
				"end": {
					Required:                               true,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression,
					Expression: &logical.TableValuedFunctionArgumentMatcherExpression{
						Type: octosql.Int,
					},
				},
			},
			OutputSchema: func(
				ctx context.Context,
				env physical.Environment,
				logicalEnv logical.Environment,
				args map[string]logical.TableValuedFunctionTypecheckedArgument,
			) (physical.Schema, map[string]string, error) {
				unique := logicalEnv.GetUnique("i")

				return physical.Schema{
						Fields: []physical.SchemaField{
							{
								Name: unique,
								Type: octosql.Int,
							},
						},
						TimeField:     -1,
						NoRetractions: true,
					}, map[string]string{
						"i": unique,
					}, nil
			},
			Materialize: func(
				ctx context.Context,
				environment physical.Environment,
				args map[string]physical.TableValuedFunctionArgument,
			) (execution.Node, error) {
				start, err := args["start"].Expression.Expression.Materialize(ctx, environment)
				if err != nil {
					return nil, fmt.Errorf("couldn't materialize start: %w", err)
				}
				end, err := args["end"].Expression.Expression.Materialize(ctx, environment)
				if err != nil {
					return nil, fmt.Errorf("couldn't materialize end: %w", err)
				}

				return &rangeNode{
					start: start,
					end:   end,
				}, err
			},
		},
	},
}
View Source
var Tumble = logical.TableValuedFunctionDescription{
	TypecheckArguments: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionArgumentValue) map[string]logical.TableValuedFunctionTypecheckedArgument {
		outArgs := make(map[string]logical.TableValuedFunctionTypecheckedArgument)

		source, mapping := args["source"].(*logical.TableValuedFunctionArgumentValueTable).
			Typecheck(ctx, env, logicalEnv)
		outArgs["source"] = logical.TableValuedFunctionTypecheckedArgument{Mapping: mapping, Argument: source}

		outArgs["window_length"] = logical.TableValuedFunctionTypecheckedArgument{
			Argument: args["window_length"].(*logical.TableValuedFunctionArgumentValueExpression).
				Typecheck(ctx, env, logicalEnv),
		}
		if _, ok := args["time_field"]; ok {
			outArgs["time_field"] = logical.TableValuedFunctionTypecheckedArgument{
				Argument: args["time_field"].(*logical.TableValuedFunctionArgumentValueDescriptor).
					Typecheck(ctx, env, logicalEnv.WithRecordUniqueVariableNames(mapping)),
			}
		}
		if _, ok := args["offset"]; ok {
			outArgs["offset"] = logical.TableValuedFunctionTypecheckedArgument{
				Argument: args["offset"].(*logical.TableValuedFunctionArgumentValueExpression).
					Typecheck(ctx, env, logicalEnv),
			}
		}

		return outArgs
	},
	Descriptors: []logical.TableValuedFunctionDescriptor{
		{
			Arguments: map[string]logical.TableValuedFunctionArgumentMatcher{
				"source": {
					Required:                               true,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeTable,
					Table:                                  &logical.TableValuedFunctionArgumentMatcherTable{},
				},
				"window_length": {
					Required:                               true,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression,
					Expression: &logical.TableValuedFunctionArgumentMatcherExpression{
						Type: octosql.Duration,
					},
				},
				"time_field": {
					Required:                               false,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeDescriptor,
					Descriptor:                             &logical.TableValuedFunctionArgumentMatcherDescriptor{},
				},
				"offset": {
					Required:                               false,
					TableValuedFunctionArgumentMatcherType: physical.TableValuedFunctionArgumentTypeExpression,
					Expression: &logical.TableValuedFunctionArgumentMatcherExpression{
						Type: octosql.Duration,
					},
				},
			},
			OutputSchema: func(ctx context.Context, env physical.Environment, logicalEnv logical.Environment, args map[string]logical.TableValuedFunctionTypecheckedArgument) (physical.Schema, map[string]string, error) {
				source := args["source"].Argument.Table.Table
				if timeFieldDescriptor, ok := args["time_field"]; ok {
					timeField := timeFieldDescriptor.Argument.Descriptor.Descriptor
					found := false
					for _, field := range source.Schema.Fields {
						if field.Name != timeField {
							continue
						}
						if field.Type.TypeID != octosql.TypeIDTime {
							return physical.Schema{}, nil, fmt.Errorf("time_field must reference Time typed field, is %s", field.Type.String())
						}
						found = true
						break
					}
					if !found {
						return physical.Schema{}, nil, fmt.Errorf("no %s field in source stream", timeField)
					}
				} else {
					if source.Schema.TimeField == -1 {
						return physical.Schema{}, nil, fmt.Errorf("the source table has no implicit watermarked time field, time_field must be specified explicitly")
					}
				}
				outMapping := make(map[string]string)
				for k, v := range args["source"].Mapping {
					outMapping[k] = v
				}
				outFields := make([]physical.SchemaField, len(source.Schema.Fields)+2)
				copy(outFields, source.Schema.Fields)

				uniqueWindowStart := logicalEnv.GetUnique("window_start")
				outMapping["window_start"] = uniqueWindowStart
				outFields[len(source.Schema.Fields)] = physical.SchemaField{
					Name: uniqueWindowStart,
					Type: octosql.Time,
				}

				uniqueWindowEnd := logicalEnv.GetUnique("window_end")
				outMapping["window_end"] = uniqueWindowEnd
				outFields[len(source.Schema.Fields)+1] = physical.SchemaField{
					Name: uniqueWindowEnd,
					Type: octosql.Time,
				}
				return physical.Schema{
					Fields:        outFields,
					TimeField:     len(source.Schema.Fields) + 1,
					NoRetractions: source.Schema.NoRetractions,
				}, outMapping, nil
			},
			Materialize: func(ctx context.Context, env physical.Environment, args map[string]physical.TableValuedFunctionArgument) (execution.Node, error) {
				source, err := args["source"].Table.Table.Materialize(ctx, env)
				if err != nil {
					return nil, fmt.Errorf("couldn't materialize source table: %w", err)
				}
				windowLength, err := args["window_length"].Expression.Expression.Materialize(ctx, env)
				if err != nil {
					return nil, fmt.Errorf("couldn't materialize window_length: %w", err)
				}
				var timeFieldIndex int
				if timeFieldDescriptor, ok := args["time_field"]; ok {
					timeField := timeFieldDescriptor.Descriptor.Descriptor
					for i, field := range args["source"].Table.Table.Schema.Fields {
						if field.Name == timeField {
							timeFieldIndex = i
							break
						}
					}
				} else {
					timeFieldIndex = args["source"].Table.Table.Schema.TimeField
				}
				var offset execution.Expression
				if offsetExpr, ok := args["offset"]; ok {
					offset, err = offsetExpr.Expression.Expression.Materialize(ctx, env)
					if err != nil {
						return nil, fmt.Errorf("couldn't materialize offset: %w", err)
					}
				} else {
					offset = execution.NewConstant(octosql.NewDuration(0))
				}

				return &tumble{
					source:         source,
					timeFieldIndex: timeFieldIndex,
					windowLength:   windowLength,
					offset:         offset,
				}, nil
			},
		},
	},
}

Functions

This section is empty.

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL