Documentation ¶
Index ¶
- Variables
- func RegisterBulker(bulkerType string, initFunc InitFunction)
- func RegisterOption[V any](option *ImplementationOption[V])
- type BulkMode
- type Bulker
- type BulkerStream
- type Config
- type DummyBulker
- type ImplementationOption
- type InitFunction
- type LogLevel
- type ParseableOption
- type State
- type Status
- type StreamConfig
- type StreamOption
- func ParseOption(name string, serialized any) (StreamOption, error)
- func WithDeduplicate() StreamOption
- func WithDiscriminatorField(discriminatorField []string) StreamOption
- func WithNamespace(namespace string) StreamOption
- func WithOption[T any](o *ImplementationOption[T], value T) StreamOption
- func WithPartition(partitionId string) StreamOption
- func WithPrimaryKey(pkFields ...string) StreamOption
- func WithSchema(schema types.Schema) StreamOption
- func WithTimestamp(timestampField string) StreamOption
- func WithToSameCase() StreamOption
- type StreamOptions
- type WarehouseState
Constants ¶
This section is empty.
Variables ¶
View Source
var ( BatchSizeOption = ImplementationOption[int]{ Key: "batchSize", DefaultValue: 0, ParseFunc: utils.ParseInt, } // BatchFrequencyOption frequency of running batches in minutes BatchFrequencyOption = ImplementationOption[float64]{ Key: "frequency", DefaultValue: 0, ParseFunc: utils.ParseFloat, } RetryBatchSizeOption = ImplementationOption[int]{ Key: "retryBatchSize", DefaultValue: 0, ParseFunc: utils.ParseInt, } // RetryFrequencyOption frequency of running retry consumer in minutes RetryFrequencyOption = ImplementationOption[float64]{ Key: "retryFrequency", DefaultValue: 0, ParseFunc: utils.ParseFloat, } ModeOption = ImplementationOption[BulkMode]{Key: "mode", ParseFunc: func(serialized any) (BulkMode, error) { switch v := serialized.(type) { case string: if v == "stream" { return Stream, nil } else if v == "batch" { return Batch, nil } else { return Unknown, fmt.Errorf("unknown mode: %s", v) } default: return Unknown, fmt.Errorf("invalid value type of mode option: %T", v) } }, } PrimaryKeyOption = ImplementationOption[types2.OrderedSet[string]]{ Key: "primaryKey", DefaultValue: types2.OrderedSet[string]{}, AdvancedParseFunc: func(o *ImplementationOption[types2.OrderedSet[string]], serializedValue any) (StreamOption, error) { switch v := serializedValue.(type) { case []string: return withPrimaryKey(o, v...), nil case string: if v == "" { return func(options *StreamOptions) {}, nil } keys := utils.ArrayMap(strings.Split(v, ","), strings.TrimSpace) return withPrimaryKey(o, keys...), nil default: return nil, fmt.Errorf("failed to parse 'primaryKey' option: %v incorrect type: %T expected string or []string", v, v) } }, } DeduplicateOption = ImplementationOption[bool]{ Key: "deduplicate", DefaultValue: false, ParseFunc: utils.ParseBool, } PartitionIdOption = ImplementationOption[string]{ Key: "partitionId", ParseFunc: utils.ParseString, } NamespaceOption = ImplementationOption[string]{ Key: "namespace", ParseFunc: utils.ParseString, } // TimestampOption - field name that contains timestamp. For creating sorting indexes or partitions by that field in destination tables TimestampOption = ImplementationOption[string]{ Key: "timestampColumn", ParseFunc: utils.ParseString, } // ToSameCaseOption - when true all fields and tables name will be converted to the same case (lowercase for most db, uppercase for snowflake) ToSameCaseOption = ImplementationOption[bool]{ Key: "toSameCase", DefaultValue: false, ParseFunc: utils.ParseBool, } // DiscriminatorFieldOption - array represents path to the object property. when deduplicate is true, and multiple rows has the same primary key, row with the highest value of this field will be selected DiscriminatorFieldOption = ImplementationOption[[]string]{ Key: "discriminatorField", DefaultValue: nil, ParseFunc: func(serializedValue any) ([]string, error) { switch v := serializedValue.(type) { case []string: return v, nil case string: if v == "" { return nil, nil } keys := utils.ArrayMap(strings.Split(v, ","), strings.TrimSpace) return keys, nil default: return nil, fmt.Errorf("failed to parse 'discriminatorField' option: %v incorrect type: %T expected string or []string", v, v) } }, } SchemaOption = ImplementationOption[types.Schema]{ Key: "schema", ParseFunc: func(serialized any) (types.Schema, error) { switch v := serialized.(type) { case types.Schema: return v, nil case string: schema := types.Schema{} err := jsoniter.Unmarshal([]byte(v), &schema) if err != nil { return types.Schema{}, fmt.Errorf("failed to parse schema: %v", err) } return schema, nil default: return types.Schema{}, fmt.Errorf("invalid value type of schema option: %T", v) } }, } )
View Source
var BulkerRegistry = make(map[string]InitFunction)
BulkerRegistry registry of init functions for bulker implementations. Used By CreateBulker factory method
Functions ¶
func RegisterBulker ¶
func RegisterBulker(bulkerType string, initFunc InitFunction)
RegisterBulker registers function to create new bulker instance
func RegisterOption ¶
func RegisterOption[V any](option *ImplementationOption[V])
Types ¶
type BulkMode ¶
type BulkMode string
const ( //Stream - bulker stream immediately commits each consumed object to the database //Useful when working with live stream of objects Stream BulkMode = "stream" //Batch - bulker stream commits all consumed object on Complete call //Useful when working with large number of objects or to optimize performance or costs using batch processing //Any error with just one object will fail the whole transaction and no objects will be written to the database Batch BulkMode = "batch" //ReplacePartition stream replaces all rows associated with the chosen partition column value in a single transaction (where applicable). //It is useful when it is required to reprocess all objects associates with specific partition id. //E.g. for processing and reprocessing one day of reporting data //If data of your stream may be reprocessed in some point in time it is recommended to always use ReplacePartition mode for that stream // //ReplacePartition implies Batch, meaning that the new data will be available only after BulkerStream.complete() call ReplacePartition BulkMode = "replace_partition" //ReplaceTable - atomically replaces target table with a new one filled with the object injected to current stream. //To sync entire collection of objects at once without leaving target table in unfinished state //Useful when collection contains finite number of object, and when it is required that target table always represent complete state for some point of time. // //ReplaceTable implies Batch, meaning that the new data will be available only after BulkerStream.complete() call ReplaceTable BulkMode = "replace_table" Unknown BulkMode = "" BatchNumberCtxKey = "batch_number" )
type Bulker ¶
type Bulker interface { io.Closer // CreateStream create a BulkerStream instance that will store objects to the target table in a data warehouse. // bulker BulkerStream creates a new table with provided tableName if it does not exist. // Table schema is based on flattened object structure but may be overridden by providing WithTable option. // bulker BulkerStream will add new column to a table on the fly if new properties appear in object and table schema is not overridden. // TODO: escape special symbols in table names CreateStream(id, tableName string, mode BulkMode, streamOptions ...StreamOption) (BulkerStream, error) }
Bulker interface allows streaming object to data warehouses using different modes. See BulkMode for more details.
func CreateBulker ¶
type BulkerStream ¶
type BulkerStream interface { //Consume - put object to the stream. If stream is in Stream mode it will be immediately committed to the database. //Otherwise, it will be buffered and committed on Complete call. Consume(ctx context.Context, object types.Object) (state State, processedObject types.Object, err error) ConsumeJSON(ctx context.Context, json []byte) (state State, processedObject types.Object, err error) ConsumeMap(ctx context.Context, mp map[string]any) (state State, processedObject types.Object, err error) //Abort - abort stream and rollback all uncommitted objects. For stream in Stream mode does nothing. //Returns stream statistics. BulkerStream cannot be used after Abort call. Abort(ctx context.Context) State //Complete - commit all uncommitted objects to the database. For stream in Stream mode does nothing. //Returns stream statistics. BulkerStream cannot be used after Complete call. Complete(ctx context.Context) (State, error) }
TODO: Commit() method that commits transaction and start new one ??
type Config ¶
type Config struct { //id of Bulker instance for logging and metrics Id string `mapstructure:"id" json:"id"` //bulkerType - type of bulker implementation will stream data to BulkerType string `mapstructure:"type" json:"type"` //destinationConfig - config of destination - may be struct type supported by destination implementation of map[string]any DestinationConfig any `mapstructure:"credentials" json:"credentials"` //TODO: think about logging approach for library LogLevel LogLevel `mapstructure:"logLevel,omitempty"` }
type DummyBulker ¶
type DummyBulker struct {
Error error
}
func (*DummyBulker) Close ¶
func (d *DummyBulker) Close() error
func (*DummyBulker) CreateStream ¶
func (d *DummyBulker) CreateStream(id, tableName string, mode BulkMode, streamOptions ...StreamOption) (BulkerStream, error)
type ImplementationOption ¶
type ImplementationOption[V any] struct { Key string DefaultValue V ParseFunc func(serialized any) (V, error) AdvancedParseFunc func(*ImplementationOption[V], any) (StreamOption, error) }
func (*ImplementationOption[V]) Get ¶
func (io *ImplementationOption[V]) Get(so *StreamOptions) V
func (*ImplementationOption[V]) Parse ¶
func (io *ImplementationOption[V]) Parse(serializedValue any) (StreamOption, error)
func (*ImplementationOption[V]) Set ¶
func (io *ImplementationOption[V]) Set(so *StreamOptions, value V)
type InitFunction ¶
type ParseableOption ¶
type ParseableOption interface {
Parse(serialized any) (StreamOption, error)
}
type State ¶
type State struct { //Representation of message processing. For SQL warehouses it is table schema Representation any `json:"representation"` Status Status `json:"status"` Mode BulkMode `json:"mode"` LastError error `json:"-"` LastErrorText string `json:"error,omitempty"` ProcessedRows int `json:"processedRows"` SuccessfulRows int `json:"successfulRows"` ErrorRowIndex int `json:"errorRowIndex,omitempty"` ProcessingTimeSec float64 `json:"processingTimeSec"` WarehouseState WarehouseState `json:"statistics,omitempty"` }
State is used as a Batch storing result
func (*State) AddWarehouseState ¶
func (s *State) AddWarehouseState(ws WarehouseState)
func (*State) PrintWarehouseState ¶
type StreamConfig ¶
type StreamOption ¶
type StreamOption func(*StreamOptions)
func ParseOption ¶
func ParseOption(name string, serialized any) (StreamOption, error)
func WithDeduplicate ¶
func WithDeduplicate() StreamOption
WithDeduplicate - when true merge rows on primary keys collision.
func WithDiscriminatorField ¶
func WithDiscriminatorField(discriminatorField []string) StreamOption
func WithNamespace ¶
func WithNamespace(namespace string) StreamOption
func WithOption ¶
func WithOption[T any](o *ImplementationOption[T], value T) StreamOption
func WithPartition ¶
func WithPartition(partitionId string) StreamOption
WithPartition settings for bulker.ReplacePartition mode only partitionId - value of `__partition_id` for current BulkerStream e.g. id of current partition TODO: For bigquery require string in special format
func WithPrimaryKey ¶
func WithPrimaryKey(pkFields ...string) StreamOption
func WithSchema ¶
func WithSchema(schema types.Schema) StreamOption
func WithTimestamp ¶
func WithTimestamp(timestampField string) StreamOption
func WithToSameCase ¶
func WithToSameCase() StreamOption
type StreamOptions ¶
type StreamOptions struct { // options slice. To pass to CreateStream method Options []StreamOption // contains filtered or unexported fields }
func (*StreamOptions) Add ¶
func (so *StreamOptions) Add(option StreamOption)
type WarehouseState ¶
type WarehouseState struct { Name string `json:"name"` BytesProcessed int `json:"bytesProcessed,omitempty"` TimeProcessedMs int64 `json:"timeProcessedMs"` EstimatedCost float64 `json:"estimatedCost"` AdditionalInfo map[string]any `json:",inline,omitempty"` States *types2.OrderedMap[string, any] `json:"states,omitempty"` }
func (*WarehouseState) Merge ¶
func (ws *WarehouseState) Merge(second WarehouseState)
func (*WarehouseState) String ¶
func (ws *WarehouseState) String() string
Click to show internal directories.
Click to hide internal directories.