Documentation ¶
Index ¶
- Variables
- func RegisterBulker(bulkerType string, initFunc InitFunction)
- func RegisterOption[V any](option *ImplementationOption[V])
- type BulkMode
- type Bulker
- type BulkerStream
- type Config
- type ImplementationOption
- type InitFunction
- type LogLevel
- type ParseableOption
- type State
- type Status
- type StreamConfig
- type StreamOption
- type StreamOptions
Constants ¶
This section is empty.
Variables ¶
View Source
var ( BatchSizeOption = ImplementationOption[int]{ Key: "batchSize", DefaultValue: 0, ParseFunc: utils.ParseInt, } // BatchFrequencyOption frequency of running batches in minutes BatchFrequencyOption = ImplementationOption[float64]{ Key: "frequency", DefaultValue: 0, ParseFunc: utils.ParseFloat, } RetryBatchSizeOption = ImplementationOption[int]{ Key: "retryBatchSize", DefaultValue: 0, ParseFunc: utils.ParseInt, } // RetryFrequencyOption frequency of running retry consumer in minutes RetryFrequencyOption = ImplementationOption[float64]{ Key: "retryFrequency", DefaultValue: 0, ParseFunc: utils.ParseFloat, } ModeOption = ImplementationOption[BulkMode]{Key: "mode", ParseFunc: func(serialized any) (BulkMode, error) { switch v := serialized.(type) { case string: if v == "stream" { return Stream, nil } else if v == "batch" { return Batch, nil } else { return Unknown, fmt.Errorf("unknown mode: %s", v) } default: return Unknown, fmt.Errorf("invalid value type of mode option: %T", v) } }, } PrimaryKeyOption = ImplementationOption[utils.Set[string]]{ Key: "primaryKey", DefaultValue: utils.Set[string]{}, AdvancedParseFunc: func(o *ImplementationOption[utils.Set[string]], serializedValue any) (StreamOption, error) { switch v := serializedValue.(type) { case []string: return withPrimaryKey(o, v...), nil case string: if v == "" { return func(options *StreamOptions) {}, nil } return withPrimaryKey(o, v), nil default: return nil, fmt.Errorf("failed to parse 'primaryKey' option: %v incorrect type: %T expected string or []string", v, v) } }, } MergeRowsOption = ImplementationOption[bool]{ Key: "deduplicate", DefaultValue: false, ParseFunc: utils.ParseBool, } PartitionIdOption = ImplementationOption[string]{ Key: "partitionId", ParseFunc: utils.ParseString, } // TimestampOption - field name that contains timestamp. For creating sorting indexes or partitions by that field in destination tables TimestampOption = ImplementationOption[string]{ Key: "timestampColumn", ParseFunc: utils.ParseString, } // Not used by bulker. Just added here to be treated as known options FunctionsOption = ImplementationOption[any]{Key: "functions", ParseFunc: func(serialized any) (any, error) { return nil, nil }} DataLayoutOption = ImplementationOption[string]{Key: "dataLayout", ParseFunc: utils.ParseString} EventsOption = ImplementationOption[string]{Key: "events", ParseFunc: utils.ParseString} HostsOption = ImplementationOption[string]{Key: "hosts", ParseFunc: utils.ParseString} )
View Source
var BulkerRegistry = make(map[string]InitFunction)
BulkerRegistry registry of init functions for bulker implementations. Used By CreateBulker factory method
Functions ¶
func RegisterBulker ¶
func RegisterBulker(bulkerType string, initFunc InitFunction)
RegisterBulker registers function to create new bulker instance
func RegisterOption ¶
func RegisterOption[V any](option *ImplementationOption[V])
Types ¶
type BulkMode ¶
type BulkMode string
const ( //Stream - bulker stream immediately commits each consumed object to the database //Useful when working with live stream of objects Stream BulkMode = "stream" //Batch - bulker stream commits all consumed object on Complete call //Useful when working with large number of objects or to optimize performance or costs using batch processing //Any error with just one object will fail the whole transaction and no objects will be written to the database Batch BulkMode = "batch" //ReplacePartition stream replaces all rows associated with the chosen partition column value in a single transaction (where applicable). //It is useful when it is required to reprocess all objects associates with specific partition id. //E.g. for processing and reprocessing one day of reporting data //If data of your stream may be reprocessed in some point in time it is recommended to always use ReplacePartition mode for that stream // //ReplacePartition implies Batch, meaning that the new data will be available only after BulkerStream.complete() call ReplacePartition BulkMode = "replace_partition" //ReplaceTable - atomically replaces target table with a new one filled with the object injected to current stream. //To sync entire collection of objects at once without leaving target table in unfinished state //Useful when collection contains finite number of object, and when it is required that target table always represent complete state for some point of time. // //ReplaceTable implies Batch, meaning that the new data will be available only after BulkerStream.complete() call ReplaceTable BulkMode = "replace_table" Unknown BulkMode = "" BatchNumberCtxKey = "batch_number" )
type Bulker ¶
type Bulker interface { io.Closer // CreateStream create a BulkerStream instance that will store objects to the target table in a data warehouse. // bulker BulkerStream creates a new table with provided tableName if it does not exist. // Table schema is based on flattened object structure but may be overridden by providing WithTable option. // bulker BulkerStream will add new column to a table on the fly if new properties appear in object and table schema is not overridden. // TODO: escape special symbols in table names CreateStream(id, tableName string, mode BulkMode, streamOptions ...StreamOption) (BulkerStream, error) }
Bulker interface allows streaming object to data warehouses using different modes. See BulkMode for more details.
func CreateBulker ¶
type BulkerStream ¶
type BulkerStream interface { //Consume - put object to the stream. If stream is in Stream mode it will be immediately committed to the database. //Otherwise, it will be buffered and committed on Complete call. Consume(ctx context.Context, object types.Object) (state State, processedObjects []types.Object, err error) //Abort - abort stream and rollback all uncommitted objects. For stream in Stream mode does nothing. //Returns stream statistics. BulkerStream cannot be used after Abort call. Abort(ctx context.Context) (State, error) //Complete - commit all uncommitted objects to the database. For stream in Stream mode does nothing. //Returns stream statistics. BulkerStream cannot be used after Complete call. Complete(ctx context.Context) (State, error) }
TODO: Commit() method that commits transaction and start new one ??
type Config ¶
type Config struct { //id of Bulker instance for logging and metrics Id string `mapstructure:"id" json:"id"` //bulkerType - type of bulker implementation will stream data to BulkerType string `mapstructure:"type" json:"type"` //destinationConfig - config of destination - may be struct type supported by destination implementation of map[string]any DestinationConfig any `mapstructure:"credentials" json:"credentials"` //TODO: think about logging approach for library LogLevel LogLevel `mapstructure:"logLevel,omitempty"` }
type ImplementationOption ¶
type ImplementationOption[V any] struct { Key string DefaultValue V ParseFunc func(serialized any) (V, error) AdvancedParseFunc func(*ImplementationOption[V], any) (StreamOption, error) }
func (*ImplementationOption[V]) Get ¶
func (io *ImplementationOption[V]) Get(so *StreamOptions) V
func (*ImplementationOption[V]) Parse ¶
func (io *ImplementationOption[V]) Parse(serializedValue any) (StreamOption, error)
func (*ImplementationOption[V]) Set ¶
func (io *ImplementationOption[V]) Set(so *StreamOptions, value V)
type InitFunction ¶
type ParseableOption ¶
type ParseableOption interface {
Parse(serialized any) (StreamOption, error)
}
type State ¶
type State struct { //Representation of message processing. For SQL warehouses it is table schema Representation any `json:"representation"` Status Status `json:"status"` LastError error `json:"-"` LastErrorText string `json:"error,omitempty"` ProcessedRows int `json:"processedRows"` SuccessfulRows int `json:"successfulRows"` ErrorRowIndex int `json:"errorRowIndex,omitempty"` }
State is used as a Batch storing result
type StreamConfig ¶
type StreamOption ¶
type StreamOption func(*StreamOptions)
func ParseOption ¶
func ParseOption(name string, serialized any) (StreamOption, error)
func WithMergeRows ¶
func WithMergeRows() StreamOption
WithMergeRows - when true merge rows on primary keys collision.
func WithPartition ¶
func WithPartition(partitionId string) StreamOption
WithPartition settings for bulker.ReplacePartition mode only partitionId - value of `__partition_id` for current BulkerStream e.g. id of current partition TODO: For bigquery require string in special format
func WithPrimaryKey ¶
func WithPrimaryKey(pkFields ...string) StreamOption
func WithTimestamp ¶
func WithTimestamp(timestampField string) StreamOption
func WithoutMergeRows ¶
func WithoutMergeRows() StreamOption
type StreamOptions ¶
type StreamOptions struct { // options slice. To pass to CreateStream method Options []StreamOption // contains filtered or unexported fields }
func (*StreamOptions) Add ¶
func (so *StreamOptions) Add(option StreamOption)
Click to show internal directories.
Click to hide internal directories.