ddlproducer

package
v0.0.0-...-ddbfbf2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 21, 2025 License: Apache-2.0 Imports: 16 Imported by: 3

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DDLProducer

type DDLProducer interface {
	// SyncBroadcastMessage broadcasts a message synchronously.
	SyncBroadcastMessage(
		ctx context.Context, topic string, totalPartitionsNum int32, message *common.Message,
	) error
	// SyncSendMessage sends a message for a partition synchronously.
	SyncSendMessage(
		ctx context.Context, topic string, partitionNum int32, message *common.Message,
	) error
	// Close closes the producer.
	Close()
}

DDLProducer is the interface for DDL message producer.

func NewKafkaDDLProducer

func NewKafkaDDLProducer(_ context.Context,
	changefeedID model.ChangeFeedID,
	syncProducer kafka.SyncProducer,
) DDLProducer

NewKafkaDDLProducer creates a new kafka producer for replicating DDL.

func NewMockDDLProducer

func NewMockDDLProducer(_ context.Context, _ model.ChangeFeedID, _ kafka.SyncProducer) DDLProducer

NewMockDDLProducer creates a mock producer.

func NewMockPulsarProducerDDL

func NewMockPulsarProducerDDL(
	ctx context.Context,
	changefeedID model.ChangeFeedID,
	pConfig *config.PulsarConfig,
	client pulsar.Client,
	sinkConfig *config.SinkConfig,
) (DDLProducer, error)

NewMockPulsarProducerDDL creates a pulsar producer for DDLProducer

func NewPulsarProducer

func NewPulsarProducer(
	ctx context.Context,
	changefeedID model.ChangeFeedID,
	pConfig *config.PulsarConfig,
	client pulsar.Client,
	sinkConfig *config.SinkConfig,
) (DDLProducer, error)

NewPulsarProducer creates a pulsar producer

type Factory

type Factory func(ctx context.Context, changefeedID model.ChangeFeedID,
	syncProducer kafka.SyncProducer) DDLProducer

Factory is a function to create a producer.

type MockDDLProducer

type MockDDLProducer struct {
	// contains filtered or unexported fields
}

MockDDLProducer is a mock producer for test.

func (*MockDDLProducer) Close

func (m *MockDDLProducer) Close()

Close do nothing.

func (*MockDDLProducer) GetAllEvents

func (m *MockDDLProducer) GetAllEvents() []*common.Message

GetAllEvents returns the events received by the mock producer.

func (*MockDDLProducer) GetEvents

func (m *MockDDLProducer) GetEvents(topic string,
	partitionNum int32,
) []*common.Message

GetEvents returns the event filtered by the key.

func (*MockDDLProducer) SyncBroadcastMessage

func (m *MockDDLProducer) SyncBroadcastMessage(ctx context.Context, topic string,
	totalPartitionsNum int32, message *common.Message,
) error

SyncBroadcastMessage stores a message to all partitions of the topic.

func (*MockDDLProducer) SyncSendMessage

func (m *MockDDLProducer) SyncSendMessage(_ context.Context, topic string,
	partitionNum int32, message *common.Message,
) error

SyncSendMessage stores a message to a partition of the topic.

type PulsarFactory

type PulsarFactory func(ctx context.Context, changefeedID model.ChangeFeedID,
	pConfig *config.PulsarConfig, client pulsar.Client, sinkConfig *config.SinkConfig) (DDLProducer, error)

PulsarFactory is a function to create a pulsar producer.

type PulsarMockProducers

type PulsarMockProducers struct {
	// contains filtered or unexported fields
}

PulsarMockProducers is a mock pulsar producer

func NewMockPulsarProducer

func NewMockPulsarProducer(
	ctx context.Context,
	changefeedID model.ChangeFeedID,
	pConfig *config.PulsarConfig,
	client pulsar.Client,
) (*PulsarMockProducers, error)

NewMockPulsarProducer creates a pulsar producer

func (*PulsarMockProducers) Close

func (p *PulsarMockProducers) Close()

Close close all producers

func (*PulsarMockProducers) Flush

func (p *PulsarMockProducers) Flush(ctx context.Context) error

Flush waits for all the messages in the async producer to be sent to Pulsar. Notice: this method is not thread-safe. Do not try to call AsyncSendMessage and Flush functions in different threads, otherwise Flush will not work as expected. It may never finish or flush the wrong message. Because inflight will be modified by mistake.

func (*PulsarMockProducers) GetAllEvents

func (p *PulsarMockProducers) GetAllEvents() []*pulsar.ProducerMessage

GetAllEvents returns the events received by the mock producer.

func (*PulsarMockProducers) GetEvents

func (p *PulsarMockProducers) GetEvents(topic string) []*pulsar.ProducerMessage

GetEvents returns the event filtered by the key.

func (*PulsarMockProducers) GetProducerByTopic

func (p *PulsarMockProducers) GetProducerByTopic(topicName string) (producer pulsar.Producer, err error)

GetProducerByTopic returns a producer by topic name

func (*PulsarMockProducers) SyncBroadcastMessage

func (p *PulsarMockProducers) SyncBroadcastMessage(ctx context.Context, topic string,
	totalPartitionsNum int32, message *common.Message,
) error

SyncBroadcastMessage pulsar consume all partitions

func (*PulsarMockProducers) SyncSendMessage

func (p *PulsarMockProducers) SyncSendMessage(ctx context.Context, topic string,
	partitionNum int32, message *common.Message,
) error

SyncSendMessage sends a message partitionNum is not used,pulsar consume all partitions

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL