bulkerlib

package module
v0.0.0-...-ec019eb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 29, 2024 License: MIT Imports: 10 Imported by: 2

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	BatchSizeOption = ImplementationOption[int]{
		Key:          "batchSize",
		DefaultValue: 0,
		ParseFunc:    utils.ParseInt,
	}

	// BatchFrequencyOption frequency of running batches in minutes
	BatchFrequencyOption = ImplementationOption[float64]{
		Key:          "frequency",
		DefaultValue: 0,
		ParseFunc:    utils.ParseFloat,
	}

	RetryBatchSizeOption = ImplementationOption[int]{
		Key:          "retryBatchSize",
		DefaultValue: 0,
		ParseFunc:    utils.ParseInt,
	}
	// RetryFrequencyOption frequency of running retry consumer in minutes
	RetryFrequencyOption = ImplementationOption[float64]{
		Key:          "retryFrequency",
		DefaultValue: 0,
		ParseFunc:    utils.ParseFloat,
	}

	ModeOption = ImplementationOption[BulkMode]{Key: "mode", ParseFunc: func(serialized any) (BulkMode, error) {
		switch v := serialized.(type) {
		case string:
			if v == "stream" {
				return Stream, nil
			} else if v == "batch" {
				return Batch, nil
			} else {
				return Unknown, fmt.Errorf("unknown mode: %s", v)
			}
		default:
			return Unknown, fmt.Errorf("invalid value type of mode option: %T", v)
		}
	},
	}

	PrimaryKeyOption = ImplementationOption[types2.OrderedSet[string]]{
		Key:          "primaryKey",
		DefaultValue: types2.OrderedSet[string]{},
		AdvancedParseFunc: func(o *ImplementationOption[types2.OrderedSet[string]], serializedValue any) (StreamOption, error) {
			switch v := serializedValue.(type) {
			case []string:
				return withPrimaryKey(o, v...), nil
			case string:
				if v == "" {
					return func(options *StreamOptions) {}, nil
				}
				keys := utils.ArrayMap(strings.Split(v, ","), strings.TrimSpace)
				return withPrimaryKey(o, keys...), nil
			default:
				return nil, fmt.Errorf("failed to parse 'primaryKey' option: %v incorrect type: %T expected string or []string", v, v)
			}
		},
	}

	DeduplicateOption = ImplementationOption[bool]{
		Key:          "deduplicate",
		DefaultValue: false,
		ParseFunc:    utils.ParseBool,
	}

	PartitionIdOption = ImplementationOption[string]{
		Key:       "partitionId",
		ParseFunc: utils.ParseString,
	}

	NamespaceOption = ImplementationOption[string]{
		Key:       "namespace",
		ParseFunc: utils.ParseString,
	}

	// TimestampOption - field name that contains timestamp. For creating sorting indexes or partitions by that field in destination tables
	TimestampOption = ImplementationOption[string]{
		Key:       "timestampColumn",
		ParseFunc: utils.ParseString,
	}

	// ToSameCaseOption - when true all fields and tables name will be converted to the same case (lowercase for most db, uppercase for snowflake)
	ToSameCaseOption = ImplementationOption[bool]{
		Key:          "toSameCase",
		DefaultValue: false,
		ParseFunc:    utils.ParseBool,
	}

	// DiscriminatorFieldOption - array represents path to the object property. when deduplicate is true, and multiple rows has the same primary key, row with the highest value of this field will be selected
	DiscriminatorFieldOption = ImplementationOption[[]string]{
		Key:          "discriminatorField",
		DefaultValue: nil,
		ParseFunc: func(serializedValue any) ([]string, error) {
			switch v := serializedValue.(type) {
			case []string:
				return v, nil
			case string:
				if v == "" {
					return nil, nil
				}
				keys := utils.ArrayMap(strings.Split(v, ","), strings.TrimSpace)
				return keys, nil
			default:
				return nil, fmt.Errorf("failed to parse 'discriminatorField' option: %v incorrect type: %T expected string or []string", v, v)
			}
		},
	}

	SchemaOption = ImplementationOption[types.Schema]{
		Key: "schema",
		ParseFunc: func(serialized any) (types.Schema, error) {
			switch v := serialized.(type) {
			case types.Schema:
				return v, nil
			case string:
				schema := types.Schema{}
				err := jsoniter.Unmarshal([]byte(v), &schema)
				if err != nil {
					return types.Schema{}, fmt.Errorf("failed to parse schema: %v", err)
				}
				return schema, nil
			default:
				return types.Schema{}, fmt.Errorf("invalid value type of schema option: %T", v)
			}
		},
	}
)
View Source
var BulkerRegistry = make(map[string]InitFunction)

BulkerRegistry registry of init functions for bulker implementations. Used By CreateBulker factory method

Functions

func RegisterBulker

func RegisterBulker(bulkerType string, initFunc InitFunction)

RegisterBulker registers function to create new bulker instance

func RegisterOption

func RegisterOption[V any](option *ImplementationOption[V])

Types

type BulkMode

type BulkMode string
const (
	//Stream - bulker stream immediately commits each consumed object to the database
	//Useful when working with live stream of objects
	Stream BulkMode = "stream"

	//Batch - bulker stream commits all consumed object on Complete call
	//Useful when working with large number of objects or to optimize performance or costs using batch processing
	//Any error with just one object will fail the whole transaction and no objects will be written to the database
	Batch BulkMode = "batch"

	//ReplacePartition stream replaces all rows associated with the chosen partition column value in a single transaction (where applicable).
	//It is useful when it is required to reprocess all objects associates with specific partition id.
	//E.g. for processing and reprocessing one day of reporting data
	//If data of your stream may be reprocessed in some point in time it is recommended to always use ReplacePartition mode for that stream
	//
	//ReplacePartition implies Batch, meaning that the new data will be available only after BulkerStream.complete() call
	ReplacePartition BulkMode = "replace_partition"

	//ReplaceTable - atomically replaces target table with a new one filled with the object injected to current stream.
	//To sync entire collection of objects at once without leaving target table in unfinished state
	//Useful when collection contains finite number of object, and when it is required that target table always represent complete state for some point of time.
	//
	//ReplaceTable implies Batch, meaning that the new data will be available only after BulkerStream.complete() call
	ReplaceTable BulkMode = "replace_table"

	Unknown BulkMode = ""

	BatchNumberCtxKey = "batch_number"
)

type Bulker

type Bulker interface {
	io.Closer
	// CreateStream create a BulkerStream instance that will store objects to the target table in a data warehouse.
	// bulker BulkerStream creates a new table with provided tableName if it does not exist.
	// Table schema is based on flattened object structure but may be overridden by providing WithTable option.
	// bulker BulkerStream will add new column to a table on the fly if new properties appear in object and table schema is not overridden.
	// TODO: escape special symbols in table names
	CreateStream(id, tableName string, mode BulkMode, streamOptions ...StreamOption) (BulkerStream, error)
}

Bulker interface allows streaming object to data warehouses using different modes. See BulkMode for more details.

func CreateBulker

func CreateBulker(config Config) (Bulker, error)

type BulkerStream

type BulkerStream interface {

	//Consume - put object to the stream. If stream is in Stream mode it will be immediately committed to the database.
	//Otherwise, it will be buffered and committed on Complete call.
	Consume(ctx context.Context, object types.Object) (state State, processedObject types.Object, err error)
	ConsumeJSON(ctx context.Context, json []byte) (state State, processedObject types.Object, err error)
	ConsumeMap(ctx context.Context, mp map[string]any) (state State, processedObject types.Object, err error)
	//Abort - abort stream and rollback all uncommitted objects. For stream in Stream mode does nothing.
	//Returns stream statistics. BulkerStream cannot be used after Abort call.
	Abort(ctx context.Context) State
	//Complete - commit all uncommitted objects to the database. For stream in Stream mode does nothing.
	//Returns stream statistics. BulkerStream cannot be used after Complete call.
	Complete(ctx context.Context) (State, error)
}

TODO: Commit() method that commits transaction and start new one ??

type Config

type Config struct {
	//id of Bulker instance for logging and metrics
	Id string `mapstructure:"id"  json:"id"`
	//bulkerType - type of bulker implementation will stream data to
	BulkerType string `mapstructure:"type" json:"type"`
	//destinationConfig - config of destination - may be struct type supported by destination implementation of map[string]any
	DestinationConfig any `mapstructure:"credentials" json:"credentials"`
	//TODO: think about logging approach for library
	LogLevel LogLevel `mapstructure:"logLevel,omitempty"`
}

type DummyBulker

type DummyBulker struct {
	Error error
}

func (*DummyBulker) Close

func (d *DummyBulker) Close() error

func (*DummyBulker) CreateStream

func (d *DummyBulker) CreateStream(id, tableName string, mode BulkMode, streamOptions ...StreamOption) (BulkerStream, error)

type ImplementationOption

type ImplementationOption[V any] struct {
	Key               string
	DefaultValue      V
	ParseFunc         func(serialized any) (V, error)
	AdvancedParseFunc func(*ImplementationOption[V], any) (StreamOption, error)
}

func (*ImplementationOption[V]) Get

func (io *ImplementationOption[V]) Get(so *StreamOptions) V

func (*ImplementationOption[V]) Parse

func (io *ImplementationOption[V]) Parse(serializedValue any) (StreamOption, error)

func (*ImplementationOption[V]) Set

func (io *ImplementationOption[V]) Set(so *StreamOptions, value V)

type InitFunction

type InitFunction func(Config) (Bulker, error)

type LogLevel

type LogLevel int
const (
	Off LogLevel = iota
	Default
	Verbose
	Full
)

type ParseableOption

type ParseableOption interface {
	Parse(serialized any) (StreamOption, error)
}

type State

type State struct {
	//Representation of message processing. For SQL warehouses it is table schema
	Representation    any            `json:"representation"`
	Status            Status         `json:"status"`
	Mode              BulkMode       `json:"mode"`
	LastError         error          `json:"-"`
	LastErrorText     string         `json:"error,omitempty"`
	ProcessedRows     int            `json:"processedRows"`
	SuccessfulRows    int            `json:"successfulRows"`
	ErrorRowIndex     int            `json:"errorRowIndex,omitempty"`
	ProcessingTimeSec float64        `json:"processingTimeSec"`
	WarehouseState    WarehouseState `json:"statistics,omitempty"`
}

State is used as a Batch storing result

func (*State) AddWarehouseState

func (s *State) AddWarehouseState(ws WarehouseState)

func (*State) Merge

func (s *State) Merge(second State)

func (*State) PrintWarehouseState

func (s *State) PrintWarehouseState() string

func (*State) SetError

func (s *State) SetError(err error)

SetError sets error to the state

func (*State) String

func (s *State) String() string

to string

type Status

type Status string
const (
	//Completed - stream was completed successfully
	Completed Status = "COMPLETED"
	//Aborted - stream was aborted by user
	Aborted Status = "ABORTED"
	//Failed - failed to complete stream
	Failed Status = "FAILED"
	//Active - stream is active
	Active Status = "ACTIVE"
)

type StreamConfig

type StreamConfig struct {
	TableName string `mapstructure:"defaultTableName" json:"defaultTableName"`
	//Options by option name (key) in serialized form
	Options map[string]any `mapstructure:"options" json:"options"`
}

type StreamOption

type StreamOption func(*StreamOptions)

func ParseOption

func ParseOption(name string, serialized any) (StreamOption, error)

func WithDeduplicate

func WithDeduplicate() StreamOption

WithDeduplicate - when true merge rows on primary keys collision.

func WithDiscriminatorField

func WithDiscriminatorField(discriminatorField []string) StreamOption

func WithNamespace

func WithNamespace(namespace string) StreamOption

func WithOption

func WithOption[T any](o *ImplementationOption[T], value T) StreamOption

func WithPartition

func WithPartition(partitionId string) StreamOption

WithPartition settings for bulker.ReplacePartition mode only partitionId - value of `__partition_id` for current BulkerStream e.g. id of current partition TODO: For bigquery require string in special format

func WithPrimaryKey

func WithPrimaryKey(pkFields ...string) StreamOption

func WithSchema

func WithSchema(schema types.Schema) StreamOption

func WithTimestamp

func WithTimestamp(timestampField string) StreamOption

func WithToSameCase

func WithToSameCase() StreamOption

type StreamOptions

type StreamOptions struct {

	// options slice. To pass to CreateStream method
	Options []StreamOption
	// contains filtered or unexported fields
}

func (*StreamOptions) Add

func (so *StreamOptions) Add(option StreamOption)

type WarehouseState

type WarehouseState struct {
	Name            string                          `json:"name"`
	BytesProcessed  int                             `json:"bytesProcessed,omitempty"`
	TimeProcessedMs int64                           `json:"timeProcessedMs"`
	EstimatedCost   float64                         `json:"estimatedCost"`
	AdditionalInfo  map[string]any                  `json:",inline,omitempty"`
	States          *types2.OrderedMap[string, any] `json:"states,omitempty"`
}

func (*WarehouseState) Merge

func (ws *WarehouseState) Merge(second WarehouseState)

func (*WarehouseState) String

func (ws *WarehouseState) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL