Documentation ¶
Index ¶
- Variables
- func LogBatchingStat(logger log.Logger, input []abstract.ChangeItem, ...)
- func MakeFormatSettingsWithTopicPrefix(format model.SerializationFormat, topicPrefix string, topicFullPath string) model.SerializationFormat
- func MergeBack(in []*task) map[abstract.TablePartID][]SerializedMessage
- func MergeWithMaxMessageSize(in []*task, maxMessageSize int) map[abstract.TablePartID][]SerializedMessage
- func MultithreadingSerialize(serializer serializerOneThread, sessionPackers packer.SessionPackers, ...) (map[abstract.TablePartID][]SerializedMessage, error)
- func Split(serializer serializerOneThread, sessionPackers packer.SessionPackers, ...) []*task
- type DebeziumSerializer
- func (s *DebeziumSerializer) Serialize(input []abstract.ChangeItem) (map[abstract.TablePartID][]SerializedMessage, error)
- func (s *DebeziumSerializer) SerializeImpl(input []abstract.ChangeItem, threadsNum, chunkSize int) (map[abstract.TablePartID][]SerializedMessage, error)
- func (s *DebeziumSerializer) SerializeOneThread(input []abstract.ChangeItem, packerCache packer.SessionPackers) (map[abstract.TablePartID][]SerializedMessage, error)
- type JSONSerializer
- type MirrorSerializer
- func (s *MirrorSerializer) GroupAndSerializeLB(input []abstract.ChangeItem) (map[abstract.TablePartID][]SerializedMessage, ...)
- func (s *MirrorSerializer) Serialize(input []abstract.ChangeItem) (map[abstract.TablePartID][]SerializedMessage, error)
- func (s *MirrorSerializer) SerializeLB(changeItem *abstract.ChangeItem) ([]SerializedMessage, error)
- type NativeSerializer
- type RawColumnSerializer
- type SerializedMessage
- type Serializer
Constants ¶
This section is empty.
Variables ¶
var UnsupportedItemKinds = map[abstract.Kind]bool{ abstract.UpdateKind: true, abstract.DeleteKind: true, }
Functions ¶
func LogBatchingStat ¶
func LogBatchingStat(logger log.Logger, input []abstract.ChangeItem, in map[abstract.TablePartID][]SerializedMessage, startTime time.Time)
func MakeFormatSettingsWithTopicPrefix ¶
func MakeFormatSettingsWithTopicPrefix(format model.SerializationFormat, topicPrefix string, topicFullPath string) model.SerializationFormat
func MergeBack ¶
func MergeBack(in []*task) map[abstract.TablePartID][]SerializedMessage
func MergeWithMaxMessageSize ¶
func MergeWithMaxMessageSize(in []*task, maxMessageSize int) map[abstract.TablePartID][]SerializedMessage
func MultithreadingSerialize ¶
func MultithreadingSerialize(serializer serializerOneThread, sessionPackers packer.SessionPackers, input []abstract.ChangeItem, threadsNum, chunkSize, maxMessageSize int) (map[abstract.TablePartID][]SerializedMessage, error)
func Split ¶
func Split(serializer serializerOneThread, sessionPackers packer.SessionPackers, items []abstract.ChangeItem, chunkSize int) []*task
Types ¶
type DebeziumSerializer ¶
type DebeziumSerializer struct {
// contains filtered or unexported fields
}
func NewDebeziumSerializer ¶
func (*DebeziumSerializer) Serialize ¶
func (s *DebeziumSerializer) Serialize(input []abstract.ChangeItem) (map[abstract.TablePartID][]SerializedMessage, error)
Serialize - serializes []abstract.ChangeItem into map: topic->[]SerializedMessage via debezium emitter It's optimized version - with multithreading and caches optimizations
func (*DebeziumSerializer) SerializeImpl ¶
func (s *DebeziumSerializer) SerializeImpl(input []abstract.ChangeItem, threadsNum, chunkSize int) (map[abstract.TablePartID][]SerializedMessage, error)
func (*DebeziumSerializer) SerializeOneThread ¶
func (s *DebeziumSerializer) SerializeOneThread(input []abstract.ChangeItem, packerCache packer.SessionPackers) (map[abstract.TablePartID][]SerializedMessage, error)
type JSONSerializer ¶
type JSONSerializer struct {
// contains filtered or unexported fields
}
func NewJSONSerializer ¶
func (*JSONSerializer) Serialize ¶
func (s *JSONSerializer) Serialize(input []abstract.ChangeItem) (map[abstract.TablePartID][]SerializedMessage, error)
type MirrorSerializer ¶
type MirrorSerializer struct {
// contains filtered or unexported fields
}
func NewMirrorSerializer ¶
func NewMirrorSerializer(logger log.Logger) (*MirrorSerializer, error)
func (*MirrorSerializer) GroupAndSerializeLB ¶
func (s *MirrorSerializer) GroupAndSerializeLB(input []abstract.ChangeItem) (map[abstract.TablePartID][]SerializedMessage, map[abstract.TablePartID]map[string]string, error)
GroupAndSerializeLB For logbroker-destination logic should be absolute another! ChangeItems should be grouped by Key (it's ProducerID) And for every Key should be extracted extras (extras - unique for every producer)
func (*MirrorSerializer) Serialize ¶
func (s *MirrorSerializer) Serialize(input []abstract.ChangeItem) (map[abstract.TablePartID][]SerializedMessage, error)
Serialize naive implementation - can be boosted by multi-threading
func (*MirrorSerializer) SerializeLB ¶
func (s *MirrorSerializer) SerializeLB(changeItem *abstract.ChangeItem) ([]SerializedMessage, error)
type NativeSerializer ¶
type NativeSerializer struct {
// contains filtered or unexported fields
}
NativeSerializer - for legacy compatibility: transfers named: realty-rent-prod/realty-rent-test
func NewNativeSerializer ¶
func NewNativeSerializer(batchingSettings model.Batching, saveTxOrder bool) (*NativeSerializer, error)
func (*NativeSerializer) Serialize ¶
func (s *NativeSerializer) Serialize(input []abstract.ChangeItem) (map[abstract.TablePartID][]SerializedMessage, error)
Serialize - serializes []abstract.ChangeItem into map: topic->[]SerializedMessage via json marshalling naive implementation - can be boosted by multi-threading
type RawColumnSerializer ¶
type RawColumnSerializer struct {
// contains filtered or unexported fields
}
func NewRawColumnSerializer ¶
func NewRawColumnSerializer(columnName string, logger log.Logger) *RawColumnSerializer
func (*RawColumnSerializer) Serialize ¶
func (s *RawColumnSerializer) Serialize(input []abstract.ChangeItem) (map[abstract.TablePartID][]SerializedMessage, error)
type SerializedMessage ¶
func BatchJSON ¶
func BatchJSON(batchingSettings model.Batching, in []abstract.ChangeItem) ([]SerializedMessage, error)
func BatchNative ¶
func BatchNative(batchingSettings model.Batching, in []abstract.ChangeItem) []SerializedMessage
type Serializer ¶
type Serializer interface {
Serialize(input []abstract.ChangeItem) (map[abstract.TablePartID][]SerializedMessage, error)
}
Serializer - takes array of changeItems, returns queue messages, grouped by some groupID (string) all messages of one group should go in the same partition (that's why TopicName field in SerializedMessagesGroup struct)
this separation: groupID vs topicPath - is useful mostly for logbroker-sink (where groupID is part of sourceID)
mirror-serializer should keep order into sourceID so, groupID for mirror is sourceID other serializers should be able to write different tables into different partitions simultaneously (for sharding runtime case - when a lot of workers write into lb simultaneously) so, groupID for other serializers is abstract.TablePartID
func New ¶
func New(format model.SerializationFormat, saveTxOrder, dropKeys, isSnapshot bool, logger log.Logger) (Serializer, error)