Documentation
¶
Index ¶
- type DMLProducer
- func NewDMLMockProducer(_ context.Context, _ model.ChangeFeedID, asyncProducer kafka.AsyncProducer, ...) DMLProducer
- func NewKafkaDMLProducer(ctx context.Context, changefeedID model.ChangeFeedID, ...) DMLProducer
- func NewPulsarDMLProducer(ctx context.Context, changefeedID model.ChangeFeedID, client pulsar.Client, ...) (DMLProducer, error)
- func NewPulsarDMLProducerMock(ctx context.Context, changefeedID model.ChangeFeedID, client pulsar.Client, ...) (DMLProducer, error)
- type Factory
- type MockDMLProducer
- func (m *MockDMLProducer) AsyncSendMessage(_ context.Context, topic string, partition int32, message *common.Message) error
- func (m *MockDMLProducer) Close()
- func (m *MockDMLProducer) GetAllEvents() []*common.Message
- func (m *MockDMLProducer) GetEvents(topic string, partition int32) []*common.Message
- type PulsarFactory
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DMLProducer ¶
type DMLProducer interface { // AsyncSendMessage sends a message asynchronously. AsyncSendMessage( ctx context.Context, topic string, partition int32, message *common.Message, ) error // Close closes the producer and client(s). Close() }
DMLProducer is the interface for message producer.
func NewDMLMockProducer ¶
func NewDMLMockProducer(_ context.Context, _ model.ChangeFeedID, asyncProducer kafka.AsyncProducer, _ kafka.MetricsCollector, _ chan error, _ chan error, ) DMLProducer
NewDMLMockProducer creates a mock producer.
func NewKafkaDMLProducer ¶
func NewKafkaDMLProducer( ctx context.Context, changefeedID model.ChangeFeedID, asyncProducer kafka.AsyncProducer, metricsCollector kafka.MetricsCollector, errCh chan error, failpointCh chan error, ) DMLProducer
NewKafkaDMLProducer creates a new kafka producer.
func NewPulsarDMLProducer ¶
func NewPulsarDMLProducer( ctx context.Context, changefeedID model.ChangeFeedID, client pulsar.Client, sinkConfig *config.SinkConfig, errCh chan error, failpointCh chan error, ) (DMLProducer, error)
NewPulsarDMLProducer creates a new pulsar producer.
func NewPulsarDMLProducerMock ¶
func NewPulsarDMLProducerMock( ctx context.Context, changefeedID model.ChangeFeedID, client pulsar.Client, sinkConfig *config.SinkConfig, errCh chan error, failpointCh chan error, ) (DMLProducer, error)
NewPulsarDMLProducerMock creates a new pulsar producer.
type Factory ¶
type Factory func(ctx context.Context, changefeedID model.ChangeFeedID, asyncProducer kafka.AsyncProducer, metricsCollector kafka.MetricsCollector, errCh chan error, failpointCh chan error, ) DMLProducer
Factory is a function to create a producer. errCh is used to report error to the caller(i.e. processor,owner). Because the caller passes errCh to many goroutines, there is no way to safely close errCh by the sender. So we let the GC close errCh. It's usually a buffered channel.
type MockDMLProducer ¶
type MockDMLProducer struct {
// contains filtered or unexported fields
}
MockDMLProducer is a mock producer for test.
func (*MockDMLProducer) AsyncSendMessage ¶
func (m *MockDMLProducer) AsyncSendMessage(_ context.Context, topic string, partition int32, message *common.Message, ) error
AsyncSendMessage appends a message to the mock producer.
func (*MockDMLProducer) GetAllEvents ¶
func (m *MockDMLProducer) GetAllEvents() []*common.Message
GetAllEvents returns the events received by the mock producer.
type PulsarFactory ¶
type PulsarFactory func(ctx context.Context, changefeedID model.ChangeFeedID, client pulsar.Client, sinkConfig *config.SinkConfig, errCh chan error, failpointCh chan error) (DMLProducer, error)
PulsarFactory is a function to create a pulsar producer.