reader

package
v0.0.0-...-10ddbbe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 27, 2024 License: Apache-2.0 Imports: 47 Imported by: 3

Documentation

Index

Constants

View Source
const (
	TomeObject = "_tome" // has marked deleted object

	SkipCollectionState = pb.CollectionState(-100)
	SkipPartitionState  = pb.PartitionState(-100)
)
View Source
const (
	AllCollection = "*"
)
View Source
const BufferSize = "4"

Variables

View Source
var (
	TSMetricVec = prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "milvus",
			Subsystem: "cdc",
			Name:      "center_tt",
			Help:      "the center ts, unit: ms",
		}, []string{"channel_name"})
)

Functions

func CheckAndFixVirtualChannel

func CheckAndFixVirtualChannel(vchannel string) string

func ForeachChannel

func ForeachChannel(sourcePChannels, targetPChannels []string, f func(sourcePChannel, targetPChannel string) error) error

func GetMsgDispatcherClient

func GetMsgDispatcherClient(creator FactoryCreator, mqConfig config.MQConfig, ttMsgStream bool) (msgdispatcher.Client, error)

GetMsgDispatcherClient TODO the client can't include the current msg, however it should include when give the position from the backup tool

func GetStreamFactory

func GetStreamFactory(creator FactoryCreator, mqConfig config.MQConfig, ttMsgStream bool) (msgstream.Factory, error)

func GetTSManager

func GetTSManager() *tsManager

func GetVChannelByPChannel

func GetVChannelByPChannel(pChannel string, vChannels []string) string

func GreedyConsumeChan

func GreedyConsumeChan(packChan chan *api.ReplicateMsg, forward bool, f func(bool, *api.ReplicateMsg))

func IsCollectionNotFoundError

func IsCollectionNotFoundError(err error) bool

func IsDatabaseNotFoundError

func IsDatabaseNotFoundError(err error) bool

func IsDroppedObject

func IsDroppedObject(name string) bool

func IsVirtualChannel

func IsVirtualChannel(vchannel string) bool

func NewChannelReader

func NewChannelReader(channelName, seekPosition string,
	dispatchClient msgdispatcher.Client,
	taskID string,
	dataHandler func(context.Context, *msgstream.MsgPack) bool,
) (api.Reader, error)

func NewCollectionReader

func NewCollectionReader(id string,
	channelManager api.ChannelManager, metaOp api.MetaOp,
	seekPosition map[int64]map[string]*msgpb.MsgPosition,
	shouldReadFunc ShouldReadFunc,
	readerConfig config.ReaderConfig,
) (api.Reader, error)

func NewEtcdOp

func NewEtcdOp(etcdServerConfig config.EtcdServerConfig, defaultPartitionName string,
	etcdConfig config.EtcdRetryConfig, target api.TargetAPI,
) (api.MetaOp, error)

func NewEtcdOpWithAddress

func NewEtcdOpWithAddress(endpoints []string,
	rootPath, metaPath, defaultPartitionName string,
	etcdConfig config.EtcdRetryConfig, target api.TargetAPI,
) (api.MetaOp, error)

func NewReplicateChannelManagerWithDispatchClient

func NewReplicateChannelManagerWithDispatchClient(
	dispatchClient msgdispatcher.Client,
	factory msgstream.Factory,
	client api.TargetAPI,
	readConfig config.ReaderConfig,
	metaOp api.MetaOp,
	msgPackCallback func(string, *msgstream.MsgPack),
	downstream string,
) (api.ChannelManager, error)

func NewTarget

func NewTarget(ctx context.Context, config TargetConfig) (api.TargetAPI, error)

Types

type Barrier

type Barrier struct {
	Dest        int
	BarrierChan chan uint64
	CloseChan   chan struct{}
}

func NewBarrier

func NewBarrier(count int, f func(msgTs uint64, b *Barrier)) *Barrier

type ChannelReader

type ChannelReader struct {
	api.DefaultReader
	// contains filtered or unexported fields
}

func (*ChannelReader) QuitRead

func (c *ChannelReader) QuitRead(ctx context.Context)

func (*ChannelReader) StartRead

func (c *ChannelReader) StartRead(ctx context.Context)

type CollectionInfo

type CollectionInfo struct {
	// contains filtered or unexported fields
}

type CollectionReader

type CollectionReader struct {
	api.DefaultReader
	// contains filtered or unexported fields
}

func (*CollectionReader) ErrorChan

func (reader *CollectionReader) ErrorChan() <-chan error

func (*CollectionReader) QuitRead

func (reader *CollectionReader) QuitRead(ctx context.Context)

func (*CollectionReader) StartRead

func (reader *CollectionReader) StartRead(ctx context.Context)

type DefaultFactoryCreator

type DefaultFactoryCreator struct{}

func (*DefaultFactoryCreator) NewKmsFactory

func (d *DefaultFactoryCreator) NewKmsFactory(cfg *config.KafkaConfig) msgstream.Factory

func (*DefaultFactoryCreator) NewPmsFactory

type DisptachClientStreamCreator

type DisptachClientStreamCreator struct {
	// contains filtered or unexported fields
}

func NewDisptachClientStreamCreator

func NewDisptachClientStreamCreator(factory msgstream.Factory, dispatchClient msgdispatcher.Client) *DisptachClientStreamCreator

func (*DisptachClientStreamCreator) CheckConnection

func (dcsc *DisptachClientStreamCreator) CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error

func (*DisptachClientStreamCreator) GetChannelLatestMsgID

func (dcsc *DisptachClientStreamCreator) GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error)

func (*DisptachClientStreamCreator) GetStreamChan

func (dcsc *DisptachClientStreamCreator) GetStreamChan(ctx context.Context,
	vchannel string,
	seekPosition *msgstream.MsgPosition,
) (<-chan *msgstream.MsgPack, io.Closer, error)

type EtcdOp

type EtcdOp struct {
	// contains filtered or unexported fields
}

func (*EtcdOp) GetAllCollection

func (e *EtcdOp) GetAllCollection(ctx context.Context, filter api.CollectionFilter) ([]*pb.CollectionInfo, error)

func (*EtcdOp) GetAllDroppedObj

func (e *EtcdOp) GetAllDroppedObj() map[string]map[string]uint64

func (*EtcdOp) GetAllPartition

func (e *EtcdOp) GetAllPartition(ctx context.Context, filter api.PartitionFilter) ([]*pb.PartitionInfo, error)

func (*EtcdOp) GetCollectionNameByID

func (e *EtcdOp) GetCollectionNameByID(ctx context.Context, id int64) string

func (*EtcdOp) GetDatabaseInfoForCollection

func (e *EtcdOp) GetDatabaseInfoForCollection(ctx context.Context, id int64) model.DatabaseInfo

func (*EtcdOp) StartWatch

func (e *EtcdOp) StartWatch()

func (*EtcdOp) SubscribeCollectionEvent

func (e *EtcdOp) SubscribeCollectionEvent(taskID string, consumer api.CollectionEventConsumer)

func (*EtcdOp) SubscribePartitionEvent

func (e *EtcdOp) SubscribePartitionEvent(taskID string, consumer api.PartitionEventConsumer)

func (*EtcdOp) UnsubscribeEvent

func (e *EtcdOp) UnsubscribeEvent(taskID string, eventType api.WatchEventType)

func (*EtcdOp) WatchCollection

func (e *EtcdOp) WatchCollection(ctx context.Context, filter api.CollectionFilter)

func (*EtcdOp) WatchPartition

func (e *EtcdOp) WatchPartition(ctx context.Context, filter api.PartitionFilter)

type FactoryCreator

type FactoryCreator interface {
	NewPmsFactory(cfg *config.PulsarConfig) msgstream.Factory
	NewKmsFactory(cfg *config.KafkaConfig) msgstream.Factory
}

func NewDefaultFactoryCreator

func NewDefaultFactoryCreator() FactoryCreator

type FactoryStreamCreator

type FactoryStreamCreator struct {
	// contains filtered or unexported fields
}

func (*FactoryStreamCreator) CheckConnection

func (fsc *FactoryStreamCreator) CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error

func (*FactoryStreamCreator) GetChannelLatestMsgID

func (fsc *FactoryStreamCreator) GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error)

func (*FactoryStreamCreator) GetStreamChan

func (fsc *FactoryStreamCreator) GetStreamChan(ctx context.Context,
	vchannel string,
	seekPosition *msgstream.MsgPosition,
) (<-chan *msgstream.MsgPack, io.Closer, error)

type ShouldReadFunc

type ShouldReadFunc func(*pb.CollectionInfo) bool

type StreamCloser

type StreamCloser func()

func (StreamCloser) Close

func (sc StreamCloser) Close() error

type StreamCreator

type StreamCreator interface {
	GetStreamChan(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) (<-chan *msgstream.MsgPack, io.Closer, error)
	CheckConnection(ctx context.Context, vchannel string, seekPosition *msgstream.MsgPosition) error
	GetChannelLatestMsgID(ctx context.Context, channelName string) ([]byte, error)
}

type TargetClient

type TargetClient struct {
	// contains filtered or unexported fields
}

func (*TargetClient) GetCollectionInfo

func (t *TargetClient) GetCollectionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)

func (*TargetClient) GetDatabaseName

func (t *TargetClient) GetDatabaseName(ctx context.Context, collectionName, databaseName string) (string, error)

func (*TargetClient) GetPartitionInfo

func (t *TargetClient) GetPartitionInfo(ctx context.Context, collectionName, databaseName string) (*model.CollectionInfo, error)

type TargetConfig

type TargetConfig struct {
	URI        string
	Token      string
	APIKey     string
	DialConfig util.DialConfig
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL