bulker

package
v0.0.0-...-c6cbf5f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 7, 2023 License: MIT Imports: 5 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	BatchSizeOption = ImplementationOption[int]{
		Key:          "batchSize",
		DefaultValue: 0,
		ParseFunc:    utils.ParseInt,
	}

	// BatchFrequencyOption frequency of running batches in minutes
	BatchFrequencyOption = ImplementationOption[float64]{
		Key:          "frequency",
		DefaultValue: 0,
		ParseFunc:    utils.ParseFloat,
	}

	RetryBatchSizeOption = ImplementationOption[int]{
		Key:          "retryBatchSize",
		DefaultValue: 0,
		ParseFunc:    utils.ParseInt,
	}
	// RetryFrequencyOption frequency of running retry consumer in minutes
	RetryFrequencyOption = ImplementationOption[float64]{
		Key:          "retryFrequency",
		DefaultValue: 0,
		ParseFunc:    utils.ParseFloat,
	}

	ModeOption = ImplementationOption[BulkMode]{Key: "mode", ParseFunc: func(serialized any) (BulkMode, error) {
		switch v := serialized.(type) {
		case string:
			if v == "stream" {
				return Stream, nil
			} else if v == "batch" {
				return Batch, nil
			} else {
				return Unknown, fmt.Errorf("unknown mode: %s", v)
			}
		default:
			return Unknown, fmt.Errorf("invalid value type of mode option: %T", v)
		}
	},
	}

	PrimaryKeyOption = ImplementationOption[utils.Set[string]]{
		Key:          "primaryKey",
		DefaultValue: utils.Set[string]{},
		AdvancedParseFunc: func(o *ImplementationOption[utils.Set[string]], serializedValue any) (StreamOption, error) {
			switch v := serializedValue.(type) {
			case []string:
				return withPrimaryKey(o, v...), nil
			case string:
				if v == "" {
					return func(options *StreamOptions) {}, nil
				}
				return withPrimaryKey(o, v), nil
			default:
				return nil, fmt.Errorf("failed to parse 'primaryKey' option: %v incorrect type: %T expected string or []string", v, v)
			}
		},
	}

	MergeRowsOption = ImplementationOption[bool]{
		Key:          "deduplicate",
		DefaultValue: false,
		ParseFunc:    utils.ParseBool,
	}

	PartitionIdOption = ImplementationOption[string]{
		Key:       "partitionId",
		ParseFunc: utils.ParseString,
	}

	// TimestampOption - field name that contains timestamp. For creating sorting indexes or partitions by that field in destination tables
	TimestampOption = ImplementationOption[string]{
		Key:       "timestampColumn",
		ParseFunc: utils.ParseString,
	}

	// Not used by bulker. Just added here to be treated as known options
	FunctionsOption  = ImplementationOption[any]{Key: "functions", ParseFunc: func(serialized any) (any, error) { return nil, nil }}
	DataLayoutOption = ImplementationOption[string]{Key: "dataLayout", ParseFunc: utils.ParseString}
	EventsOption     = ImplementationOption[string]{Key: "events", ParseFunc: utils.ParseString}
	HostsOption      = ImplementationOption[string]{Key: "hosts", ParseFunc: utils.ParseString}
)
View Source
var BulkerRegistry = make(map[string]InitFunction)

BulkerRegistry registry of init functions for bulker implementations. Used By CreateBulker factory method

Functions

func RegisterBulker

func RegisterBulker(bulkerType string, initFunc InitFunction)

RegisterBulker registers function to create new bulker instance

func RegisterOption

func RegisterOption[V any](option *ImplementationOption[V])

Types

type BulkMode

type BulkMode string
const (
	//Stream - bulker stream immediately commits each consumed object to the database
	//Useful when working with live stream of objects
	Stream BulkMode = "stream"

	//Batch - bulker stream commits all consumed object on Complete call
	//Useful when working with large number of objects or to optimize performance or costs using batch processing
	//Any error with just one object will fail the whole transaction and no objects will be written to the database
	Batch BulkMode = "batch"

	//ReplacePartition stream replaces all rows associated with the chosen partition column value in a single transaction (where applicable).
	//It is useful when it is required to reprocess all objects associates with specific partition id.
	//E.g. for processing and reprocessing one day of reporting data
	//If data of your stream may be reprocessed in some point in time it is recommended to always use ReplacePartition mode for that stream
	//
	//ReplacePartition implies Batch, meaning that the new data will be available only after BulkerStream.complete() call
	ReplacePartition BulkMode = "replace_partition"

	//ReplaceTable - atomically replaces target table with a new one filled with the object injected to current stream.
	//To sync entire collection of objects at once without leaving target table in unfinished state
	//Useful when collection contains finite number of object, and when it is required that target table always represent complete state for some point of time.
	//
	//ReplaceTable implies Batch, meaning that the new data will be available only after BulkerStream.complete() call
	ReplaceTable BulkMode = "replace_table"

	Unknown BulkMode = ""

	BatchNumberCtxKey = "batch_number"
)

type Bulker

type Bulker interface {
	io.Closer
	// CreateStream create a BulkerStream instance that will store objects to the target table in a data warehouse.
	// bulker BulkerStream creates a new table with provided tableName if it does not exist.
	// Table schema is based on flattened object structure but may be overridden by providing WithTable option.
	// bulker BulkerStream will add new column to a table on the fly if new properties appear in object and table schema is not overridden.
	// TODO: escape special symbols in table names
	CreateStream(id, tableName string, mode BulkMode, streamOptions ...StreamOption) (BulkerStream, error)
}

Bulker interface allows streaming object to data warehouses using different modes. See BulkMode for more details.

func CreateBulker

func CreateBulker(config Config) (Bulker, error)

type BulkerStream

type BulkerStream interface {
	//Consume - put object to the stream. If stream is in Stream mode it will be immediately committed to the database.
	//Otherwise, it will be buffered and committed on Complete call.
	Consume(ctx context.Context, object types.Object) (state State, processedObjects []types.Object, err error)
	//Abort - abort stream and rollback all uncommitted objects. For stream in Stream mode does nothing.
	//Returns stream statistics. BulkerStream cannot be used after Abort call.
	Abort(ctx context.Context) (State, error)
	//Complete - commit all uncommitted objects to the database. For stream in Stream mode does nothing.
	//Returns stream statistics. BulkerStream cannot be used after Complete call.
	Complete(ctx context.Context) (State, error)
}

TODO: Commit() method that commits transaction and start new one ??

type Config

type Config struct {
	//id of Bulker instance for logging and metrics
	Id string `mapstructure:"id"  json:"id"`
	//bulkerType - type of bulker implementation will stream data to
	BulkerType string `mapstructure:"type" json:"type"`
	//destinationConfig - config of destination - may be struct type supported by destination implementation of map[string]any
	DestinationConfig any `mapstructure:"credentials" json:"credentials"`
	//TODO: think about logging approach for library
	LogLevel LogLevel `mapstructure:"logLevel,omitempty"`
}

type ImplementationOption

type ImplementationOption[V any] struct {
	Key               string
	DefaultValue      V
	ParseFunc         func(serialized any) (V, error)
	AdvancedParseFunc func(*ImplementationOption[V], any) (StreamOption, error)
}

func (*ImplementationOption[V]) Get

func (io *ImplementationOption[V]) Get(so *StreamOptions) V

func (*ImplementationOption[V]) Parse

func (io *ImplementationOption[V]) Parse(serializedValue any) (StreamOption, error)

func (*ImplementationOption[V]) Set

func (io *ImplementationOption[V]) Set(so *StreamOptions, value V)

type InitFunction

type InitFunction func(Config) (Bulker, error)

type LogLevel

type LogLevel int
const (
	Off LogLevel = iota
	Default
	Verbose
	Full
)

type ParseableOption

type ParseableOption interface {
	Parse(serialized any) (StreamOption, error)
}

type State

type State struct {
	//Representation of message processing. For SQL warehouses it is table schema
	Representation any    `json:"representation"`
	Status         Status `json:"status"`
	LastError      error  `json:"-"`
	LastErrorText  string `json:"error,omitempty"`
	ProcessedRows  int    `json:"processedRows"`
	SuccessfulRows int    `json:"successfulRows"`
	ErrorRowIndex  int    `json:"errorRowIndex,omitempty"`
}

State is used as a Batch storing result

func (*State) SetError

func (s *State) SetError(err error)

SetError sets error to the state

type Status

type Status string
const (
	//Completed - stream was completed successfully
	Completed Status = "COMPLETED"
	//Aborted - stream was aborted by user
	Aborted Status = "ABORTED"
	//Failed - failed to complete stream
	Failed Status = "FAILED"
	//Active - stream is active
	Active Status = "ACTIVE"
)

type StreamConfig

type StreamConfig struct {
	TableName string `mapstructure:"defaultTableName" json:"defaultTableName"`
	//Options by option name (key) in serialized form
	Options map[string]any `mapstructure:"options" json:"options"`
}

type StreamOption

type StreamOption func(*StreamOptions)

func ParseOption

func ParseOption(name string, serialized any) (StreamOption, error)

func WithMergeRows

func WithMergeRows() StreamOption

WithMergeRows - when true merge rows on primary keys collision.

func WithPartition

func WithPartition(partitionId string) StreamOption

WithPartition settings for bulker.ReplacePartition mode only partitionId - value of `__partition_id` for current BulkerStream e.g. id of current partition TODO: For bigquery require string in special format

func WithPrimaryKey

func WithPrimaryKey(pkFields ...string) StreamOption

func WithTimestamp

func WithTimestamp(timestampField string) StreamOption

func WithoutMergeRows

func WithoutMergeRows() StreamOption

type StreamOptions

type StreamOptions struct {

	// options slice. To pass to CreateStream method
	Options []StreamOption
	// contains filtered or unexported fields
}

func (*StreamOptions) Add

func (so *StreamOptions) Add(option StreamOption)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL