canal

package
v0.0.0-...-91902aa Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 31, 2024 License: Apache-2.0 Imports: 34 Imported by: 0

Documentation

Index

Constants

View Source
const (
	CanalPacketVersion   int32  = 1
	CanalProtocolVersion int32  = 1
	CanalServerEncode    string = "UTF-8"
)

compatible with canal-1.1.4 https://github.com/alibaba/canal/tree/canal-1.1.4

Variables

This section is empty.

Functions

func NewBatchDecoder

func NewBatchDecoder(
	ctx context.Context, codecConfig *common.Config, db *sql.DB,
) (codec.RowEventDecoder, error)

NewBatchDecoder return a decoder for canal-json

func NewBatchEncoderBuilder

func NewBatchEncoderBuilder(config *common.Config) codec.RowEventEncoderBuilder

NewBatchEncoderBuilder creates a canal batchEncoderBuilder.

func NewCanalJSONTxnEventDecoder

func NewCanalJSONTxnEventDecoder(
	codecConfig *common.Config,
) *canalJSONTxnEventDecoder

NewCanalJSONTxnEventDecoder return a new CanalJSONTxnEventDecoder.

func NewJSONRowEventEncoderBuilder

func NewJSONRowEventEncoderBuilder(ctx context.Context, config *common.Config) (codec.RowEventEncoderBuilder, error)

NewJSONRowEventEncoderBuilder creates a canal-json batchEncoderBuilder.

func NewJSONTxnEventEncoderBuilder

func NewJSONTxnEventEncoderBuilder(config *common.Config) codec.TxnEventEncoderBuilder

NewJSONTxnEventEncoderBuilder creates a jsonTxnEventEncoderBuilder.

Types

type BatchEncoder

type BatchEncoder struct {
	// contains filtered or unexported fields
}

BatchEncoder encodes the events into the byte of a batch into.

func (*BatchEncoder) AppendRowChangedEvent

func (d *BatchEncoder) AppendRowChangedEvent(
	_ context.Context,
	_ string,
	e *model.RowChangedEvent,
	callback func(),
) error

AppendRowChangedEvent implements the RowEventEncoder interface

func (*BatchEncoder) Build

func (d *BatchEncoder) Build() []*common.Message

Build implements the RowEventEncoder interface

func (*BatchEncoder) EncodeCheckpointEvent

func (d *BatchEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error)

EncodeCheckpointEvent implements the RowEventEncoder interface

func (*BatchEncoder) EncodeDDLEvent

func (d *BatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error)

EncodeDDLEvent implements the RowEventEncoder interface

type JSONMessage

type JSONMessage struct {
	// ignored by consumers
	ID        int64    `json:"id"`
	Schema    string   `json:"database"`
	Table     string   `json:"table"`
	PKNames   []string `json:"pkNames"`
	IsDDL     bool     `json:"isDdl"`
	EventType string   `json:"type"`
	// officially the timestamp of the event-time of the message, in milliseconds since Epoch.
	ExecutionTime int64 `json:"es"`
	// officially the timestamp of building the message, in milliseconds since Epoch.
	BuildTime int64 `json:"ts"`
	// SQL that generated the change event, DDL or Query
	Query string `json:"sql"`
	// only works for INSERT / UPDATE / DELETE events, records each column's java representation type.
	SQLType map[string]int32 `json:"sqlType"`
	// only works for INSERT / UPDATE / DELETE events, records each column's mysql representation type.
	MySQLType map[string]string `json:"mysqlType"`
	// A Datum should be a string or nil
	Data []map[string]interface{} `json:"data"`
	Old  []map[string]interface{} `json:"old"`
}

JSONMessage adapted from https://github.com/alibaba/canal/blob/b54bea5e3337c9597c427a53071d214ff04628d1/protocol/src/main/java/com/alibaba/otter/canal/protocol/FlatMessage.java#L1

type JSONRowEventEncoder

type JSONRowEventEncoder struct {
	// contains filtered or unexported fields
}

JSONRowEventEncoder encodes row event in JSON format

func (*JSONRowEventEncoder) AppendRowChangedEvent

func (c *JSONRowEventEncoder) AppendRowChangedEvent(
	ctx context.Context,
	_ string,
	e *model.RowChangedEvent,
	callback func(),
) error

AppendRowChangedEvent implements the interface EventJSONBatchEncoder

func (*JSONRowEventEncoder) Build

func (c *JSONRowEventEncoder) Build() []*common.Message

Build implements the RowEventEncoder interface

func (*JSONRowEventEncoder) EncodeCheckpointEvent

func (c *JSONRowEventEncoder) EncodeCheckpointEvent(ts uint64) (*common.Message, error)

EncodeCheckpointEvent implements the RowEventEncoder interface

func (*JSONRowEventEncoder) EncodeDDLEvent

func (c *JSONRowEventEncoder) EncodeDDLEvent(e *model.DDLEvent) (*common.Message, error)

EncodeDDLEvent encodes DDL events

type JSONTxnEventEncoder

type JSONTxnEventEncoder struct {
	// contains filtered or unexported fields
}

JSONTxnEventEncoder encodes txn event in JSON format

func (*JSONTxnEventEncoder) AppendTxnEvent

func (j *JSONTxnEventEncoder) AppendTxnEvent(
	txn *model.SingleTableTxn,
	callback func(),
) error

AppendTxnEvent appends a txn event to the encoder.

func (*JSONTxnEventEncoder) Build

func (j *JSONTxnEventEncoder) Build() []*common.Message

Build builds a message from the encoder and resets the encoder.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL