Documentation ¶
Index ¶
- Constants
- Variables
- func BaseKey(clusterID string) string
- func CaptureInfoKeyPrefix(clusterID string) string
- func CaptureOwnerKey(clusterID string) string
- func ChangefeedStatusKeyPrefix(clusterID, namespace string) string
- func CreateRawEtcdClient(securityConf *security.Credential, grpcDialOption grpc.DialOption, ...) (*clientv3.Client, error)
- func GcServiceIDForTest() string
- func GetEtcdKeyCaptureInfo(clusterID, id string) string
- func GetEtcdKeyChangeFeedInfo(clusterID string, changefeedID model.ChangeFeedID) string
- func GetEtcdKeyChangeFeedList(clusterID, namespace string) string
- func GetEtcdKeyJob(clusterID string, changeFeedID model.ChangeFeedID) string
- func GetEtcdKeyTaskPosition(clusterID string, changefeedID model.ChangeFeedID, captureID string) string
- func InitMetrics(registry *prometheus.Registry)
- func IsHealthy(ctx context.Context, client *clientv3.Client) bool
- func MigrateBackupKey(version int, backupKey string) string
- func NamespacedPrefix(clusterID, namespace string) string
- func SetupEmbedEtcd(dir string) (clientURL *url.URL, e *embed.Etcd, err error)
- func TaskPositionKeyPrefix(clusterID, namespace string) string
- type CDCEtcdClient
- type CDCEtcdClientImpl
- func (c *CDCEtcdClientImpl) CheckMultipleCDCClusterExist(ctx context.Context) error
- func (c *CDCEtcdClientImpl) ClearAllCDCInfo(ctx context.Context) error
- func (c *CDCEtcdClientImpl) Close() error
- func (c *CDCEtcdClientImpl) CreateChangefeedInfo(ctx context.Context, upstreamInfo *model.UpstreamInfo, ...) error
- func (c *CDCEtcdClientImpl) DeleteCaptureInfo(ctx context.Context, captureID string) error
- func (c *CDCEtcdClientImpl) DeleteChangeFeedInfo(ctx context.Context, id model.ChangeFeedID) error
- func (c *CDCEtcdClientImpl) GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error)
- func (c *CDCEtcdClientImpl) GetAllChangeFeedInfo(ctx context.Context) (map[model.ChangeFeedID]*model.ChangeFeedInfo, error)
- func (c *CDCEtcdClientImpl) GetCaptureInfo(ctx context.Context, id string) (info *model.CaptureInfo, err error)
- func (c *CDCEtcdClientImpl) GetCaptureLeases(ctx context.Context) (map[string]int64, error)
- func (c *CDCEtcdClientImpl) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error)
- func (c *CDCEtcdClientImpl) GetChangeFeedInfo(ctx context.Context, id model.ChangeFeedID) (*model.ChangeFeedInfo, error)
- func (c *CDCEtcdClientImpl) GetChangeFeedStatus(ctx context.Context, id model.ChangeFeedID) (*model.ChangeFeedStatus, int64, error)
- func (c *CDCEtcdClientImpl) GetChangeFeeds(ctx context.Context) (int64, map[model.ChangeFeedID]*mvccpb.KeyValue, error)
- func (c *CDCEtcdClientImpl) GetClusterID() string
- func (c *CDCEtcdClientImpl) GetEnsureGCServiceID(tag string) string
- func (c *CDCEtcdClientImpl) GetEtcdClient() *Client
- func (c *CDCEtcdClientImpl) GetGCServiceID() string
- func (c *CDCEtcdClientImpl) GetOwnerID(ctx context.Context) (string, error)
- func (c *CDCEtcdClientImpl) GetOwnerRevision(ctx context.Context, captureID string) (rev int64, err error)
- func (c *CDCEtcdClientImpl) GetUpstreamInfo(ctx context.Context, upstreamID model.UpstreamID, namespace string) (*model.UpstreamInfo, error)
- func (c *CDCEtcdClientImpl) PutCaptureInfo(ctx context.Context, info *model.CaptureInfo, leaseID clientv3.LeaseID) error
- func (c *CDCEtcdClientImpl) RevokeAllLeases(ctx context.Context, leases map[string]int64) error
- func (c *CDCEtcdClientImpl) SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, ...) error
- func (c *CDCEtcdClientImpl) UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, ...) error
- type CDCKey
- type CDCKeyType
- type Client
- func (c *Client) Delete(ctx context.Context, key string, opts ...clientV3.OpOption) (resp *clientV3.DeleteResponse, err error)
- func (c *Client) Get(ctx context.Context, key string, opts ...clientV3.OpOption) (resp *clientV3.GetResponse, err error)
- func (c *Client) Grant(ctx context.Context, ttl int64) (resp *clientV3.LeaseGrantResponse, err error)
- func (c *Client) Put(ctx context.Context, key, val string, opts ...clientV3.OpOption) (resp *clientV3.PutResponse, err error)
- func (c *Client) RequestProgress(ctx context.Context) error
- func (c *Client) Revoke(ctx context.Context, id clientV3.LeaseID) (resp *clientV3.LeaseRevokeResponse, err error)
- func (c *Client) TimeToLive(ctx context.Context, lease clientV3.LeaseID, opts ...clientV3.LeaseOption) (resp *clientV3.LeaseTimeToLiveResponse, err error)
- func (c *Client) Txn(ctx context.Context, cmps []clientV3.Cmp, opsThen, opsElse []clientV3.Op) (resp *clientV3.TxnResponse, err error)
- func (c *Client) Unwrap() *clientV3.Client
- func (c *Client) Watch(ctx context.Context, key string, role string, opts ...clientV3.OpOption) clientV3.WatchChan
- func (c *Client) WatchWithChan(ctx context.Context, outCh chan<- clientV3.WatchResponse, key string, ...)
- type OwnerCaptureInfoClient
- type Tester
Constants ¶
const ( EtcdPut = "Put" EtcdGet = "Get" EtcdTxn = "Txn" EtcdDel = "Del" EtcdGrant = "Grant" EtcdRevoke = "Revoke" )
etcd operation names
const ( // ChangefeedInfoKey is the key path for changefeed info ChangefeedInfoKey = "/changefeed/info" // ChangefeedStatusKey is the key path for changefeed status ChangefeedStatusKey = "/changefeed/status" // DeletionCounterKey is the key path for the counter of deleted keys DeletionCounterKey = metaPrefix + "/meta/ticdc-delete-etcd-key-count" // DefaultClusterAndNamespacePrefix is the default prefix of changefeed data DefaultClusterAndNamespacePrefix = "/tidb/cdc/default/default" // DefaultClusterAndMetaPrefix is the default prefix of cluster meta DefaultClusterAndMetaPrefix = "/tidb/cdc/default" + metaPrefix )
const DefaultCDCClusterID = "default"
DefaultCDCClusterID is the default value of cdc cluster id
Variables ¶
var ( // TxnEmptyOpsElse is a no-op operation. TxnEmptyOpsElse = []clientV3.Op{} )
Functions ¶
func BaseKey ¶
BaseKey is the common prefix of the keys with cluster id in CDC
Note that tidb-dashboard depends on this prefix to filter out TiCDC keys. Ref: https://github.com/pingcap/tidb-dashboard/blob/1f39ee09c5352adbf23af8ec7e15020147ef9ca4/pkg/utils/topology/ticdc.go#L22
func CaptureInfoKeyPrefix ¶
CaptureInfoKeyPrefix is the capture info path that is saved to etcd
func CaptureOwnerKey ¶
CaptureOwnerKey is the capture owner path that is saved to etcd
func ChangefeedStatusKeyPrefix ¶
ChangefeedStatusKeyPrefix is the prefix of changefeed status keys
func CreateRawEtcdClient ¶
func CreateRawEtcdClient(securityConf *security.Credential, grpcDialOption grpc.DialOption, endpoints ...string) (*clientv3.Client, error)
CreateRawEtcdClient creates etcd v3 client with detecting endpoints. It will check the health of endpoints periodically, and update endpoints if needed.
func GcServiceIDForTest ¶
func GcServiceIDForTest() string
GcServiceIDForTest returns the gc service ID for tests
func GetEtcdKeyCaptureInfo ¶
GetEtcdKeyCaptureInfo returns the key of a capture info
func GetEtcdKeyChangeFeedInfo ¶
func GetEtcdKeyChangeFeedInfo(clusterID string, changefeedID model.ChangeFeedID) string
GetEtcdKeyChangeFeedInfo returns the key of a changefeed config
func GetEtcdKeyChangeFeedList ¶
GetEtcdKeyChangeFeedList returns the prefix key of all changefeed config
func GetEtcdKeyJob ¶
func GetEtcdKeyJob(clusterID string, changeFeedID model.ChangeFeedID) string
GetEtcdKeyJob returns the key for a job status
func GetEtcdKeyTaskPosition ¶
func GetEtcdKeyTaskPosition(clusterID string, changefeedID model.ChangeFeedID, captureID string, ) string
GetEtcdKeyTaskPosition returns the key of a task position
func InitMetrics ¶
func InitMetrics(registry *prometheus.Registry)
InitMetrics registers the etcd request counter.
func MigrateBackupKey ¶
MigrateBackupKey is the key of backup data during a migration.
func NamespacedPrefix ¶
NamespacedPrefix returns the etcd prefix of changefeed data
func SetupEmbedEtcd ¶
SetupEmbedEtcd starts an embed etcd server
func TaskPositionKeyPrefix ¶
TaskPositionKeyPrefix is the prefix of task position keys
Types ¶
type CDCEtcdClient ¶
type CDCEtcdClient interface { OwnerCaptureInfoClient GetClusterID() string GetEtcdClient() *Client GetAllCDCInfo(ctx context.Context) ([]*mvccpb.KeyValue, error) GetChangeFeedInfo(ctx context.Context, id model.ChangeFeedID, ) (*model.ChangeFeedInfo, error) GetAllChangeFeedInfo(ctx context.Context) ( map[model.ChangeFeedID]*model.ChangeFeedInfo, error, ) GetChangeFeedStatus(ctx context.Context, id model.ChangeFeedID, ) (*model.ChangeFeedStatus, int64, error) GetUpstreamInfo(ctx context.Context, upstreamID model.UpstreamID, namespace string, ) (*model.UpstreamInfo, error) GetGCServiceID() string GetEnsureGCServiceID(tag string) string SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID model.ChangeFeedID, ) error CreateChangefeedInfo(context.Context, *model.UpstreamInfo, *model.ChangeFeedInfo, ) error UpdateChangefeedAndUpstream(ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, ) error PutCaptureInfo(context.Context, *model.CaptureInfo, clientv3.LeaseID) error DeleteCaptureInfo(context.Context, model.CaptureID) error CheckMultipleCDCClusterExist(ctx context.Context) error }
CDCEtcdClient extracts CDCEtcdClients's method used for apiv2.
type CDCEtcdClientImpl ¶
type CDCEtcdClientImpl struct { Client *Client ClusterID string // contains filtered or unexported fields }
CDCEtcdClientImpl is a wrap of etcd client
func NewCDCEtcdClient ¶
func NewCDCEtcdClient(ctx context.Context, cli *clientv3.Client, clusterID string, ) (*CDCEtcdClientImpl, error)
NewCDCEtcdClient returns a new CDCEtcdClient
func (*CDCEtcdClientImpl) CheckMultipleCDCClusterExist ¶
func (c *CDCEtcdClientImpl) CheckMultipleCDCClusterExist(ctx context.Context) error
CheckMultipleCDCClusterExist checks if other cdc clusters exists, and returns an error is so, and user should use --server instead
func (*CDCEtcdClientImpl) ClearAllCDCInfo ¶
func (c *CDCEtcdClientImpl) ClearAllCDCInfo(ctx context.Context) error
ClearAllCDCInfo delete all keys created by CDC
func (*CDCEtcdClientImpl) Close ¶
func (c *CDCEtcdClientImpl) Close() error
Close releases resources in CDCEtcdClient
func (*CDCEtcdClientImpl) CreateChangefeedInfo ¶
func (c *CDCEtcdClientImpl) CreateChangefeedInfo( ctx context.Context, upstreamInfo *model.UpstreamInfo, info *model.ChangeFeedInfo, ) error
CreateChangefeedInfo creates a change feed info into etcd and fails if it is already exists.
func (*CDCEtcdClientImpl) DeleteCaptureInfo ¶
func (c *CDCEtcdClientImpl) DeleteCaptureInfo(ctx context.Context, captureID string) error
DeleteCaptureInfo delete all capture related info from etcd.
func (*CDCEtcdClientImpl) DeleteChangeFeedInfo ¶
func (c *CDCEtcdClientImpl) DeleteChangeFeedInfo(ctx context.Context, id model.ChangeFeedID, ) error
DeleteChangeFeedInfo deletes a changefeed config from etcd
func (*CDCEtcdClientImpl) GetAllCDCInfo ¶
GetAllCDCInfo get all keys created by CDC
func (*CDCEtcdClientImpl) GetAllChangeFeedInfo ¶
func (c *CDCEtcdClientImpl) GetAllChangeFeedInfo(ctx context.Context) ( map[model.ChangeFeedID]*model.ChangeFeedInfo, error, )
GetAllChangeFeedInfo queries all changefeed information
func (*CDCEtcdClientImpl) GetCaptureInfo ¶
func (c *CDCEtcdClientImpl) GetCaptureInfo( ctx context.Context, id string, ) (info *model.CaptureInfo, err error)
GetCaptureInfo get capture info from etcd. return ErrCaptureNotExist if the capture not exists.
func (*CDCEtcdClientImpl) GetCaptureLeases ¶
GetCaptureLeases returns a map mapping from capture ID to its lease
func (*CDCEtcdClientImpl) GetCaptures ¶
func (c *CDCEtcdClientImpl) GetCaptures(ctx context.Context) (int64, []*model.CaptureInfo, error)
GetCaptures returns kv revision and CaptureInfo list
func (*CDCEtcdClientImpl) GetChangeFeedInfo ¶
func (c *CDCEtcdClientImpl) GetChangeFeedInfo(ctx context.Context, id model.ChangeFeedID, ) (*model.ChangeFeedInfo, error)
GetChangeFeedInfo queries the config of a given changefeed
func (*CDCEtcdClientImpl) GetChangeFeedStatus ¶
func (c *CDCEtcdClientImpl) GetChangeFeedStatus(ctx context.Context, id model.ChangeFeedID, ) (*model.ChangeFeedStatus, int64, error)
GetChangeFeedStatus queries the checkpointTs and resovledTs of a given changefeed
func (*CDCEtcdClientImpl) GetChangeFeeds ¶
func (c *CDCEtcdClientImpl) GetChangeFeeds(ctx context.Context) ( int64, map[model.ChangeFeedID]*mvccpb.KeyValue, error, )
GetChangeFeeds returns kv revision and a map mapping from changefeedID to changefeed detail mvccpb.KeyValue
func (*CDCEtcdClientImpl) GetClusterID ¶
func (c *CDCEtcdClientImpl) GetClusterID() string
GetClusterID gets CDC cluster ID.
func (*CDCEtcdClientImpl) GetEnsureGCServiceID ¶
func (c *CDCEtcdClientImpl) GetEnsureGCServiceID(tag string) string
GetEnsureGCServiceID return the prefix for the gc service id when changefeed is creating
func (*CDCEtcdClientImpl) GetEtcdClient ¶
func (c *CDCEtcdClientImpl) GetEtcdClient() *Client
GetEtcdClient gets Client.
func (*CDCEtcdClientImpl) GetGCServiceID ¶
func (c *CDCEtcdClientImpl) GetGCServiceID() string
GetGCServiceID returns the cdc gc service ID
func (*CDCEtcdClientImpl) GetOwnerID ¶
func (c *CDCEtcdClientImpl) GetOwnerID(ctx context.Context) (string, error)
GetOwnerID returns the owner id by querying etcd
func (*CDCEtcdClientImpl) GetOwnerRevision ¶
func (c *CDCEtcdClientImpl) GetOwnerRevision( ctx context.Context, captureID string, ) (rev int64, err error)
GetOwnerRevision gets the Etcd revision for the elected owner.
func (*CDCEtcdClientImpl) GetUpstreamInfo ¶
func (c *CDCEtcdClientImpl) GetUpstreamInfo(ctx context.Context, upstreamID model.UpstreamID, namespace string, ) (*model.UpstreamInfo, error)
GetUpstreamInfo get a upstreamInfo from etcd server
func (*CDCEtcdClientImpl) PutCaptureInfo ¶
func (c *CDCEtcdClientImpl) PutCaptureInfo( ctx context.Context, info *model.CaptureInfo, leaseID clientv3.LeaseID, ) error
PutCaptureInfo put capture info into etcd, this happens when the capture starts.
func (*CDCEtcdClientImpl) RevokeAllLeases ¶
RevokeAllLeases revokes all leases passed from parameter
func (*CDCEtcdClientImpl) SaveChangeFeedInfo ¶
func (c *CDCEtcdClientImpl) SaveChangeFeedInfo(ctx context.Context, info *model.ChangeFeedInfo, changeFeedID model.ChangeFeedID, ) error
SaveChangeFeedInfo stores change feed info into etcd TODO: this should be called from outer system, such as from a TiDB client
func (*CDCEtcdClientImpl) UpdateChangefeedAndUpstream ¶
func (c *CDCEtcdClientImpl) UpdateChangefeedAndUpstream( ctx context.Context, upstreamInfo *model.UpstreamInfo, changeFeedInfo *model.ChangeFeedInfo, ) error
UpdateChangefeedAndUpstream updates the changefeed's info and its upstream info into etcd
type CDCKey ¶
type CDCKey struct { Tp CDCKeyType ChangefeedID model.ChangeFeedID CaptureID string OwnerLeaseID string ClusterID string UpstreamID model.UpstreamID Namespace string }
CDCKey represents an etcd key which is defined by TiCDC
Usage: we can parse a raw etcd key: ``` k := new(CDCKey) rawKey := "/tidb/cdc/changefeed/info/test/changefeed" err := k.Parse(rawKey) c.Assert(k, check.DeepEquals, &CDCKey{ Tp: CDCKeyTypeChangefeedInfo, ChangefeedID: "test/changefeed", }) ``` and we can generate a raw key from CDCKey ``` k := &CDCKey{ Tp: CDCKeyTypeChangefeedInfo, ChangefeedID: "test/changefeed", } c.Assert(k.String(), check.Equals, "/tidb/cdc/changefeed/info/test/changefeed") ```
type CDCKeyType ¶
type CDCKeyType = int
CDCKeyType is the type of etcd key
const ( CDCKeyTypeUnknown CDCKeyType = iota CDCKeyTypeOwner CDCKeyTypeCapture CDCKeyTypeChangefeedInfo CDCKeyTypeChangeFeedStatus CDCKeyTypeTaskPosition CDCKeyTypeMetaVersion CDCKeyTypeUpStream )
the types of etcd key
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a simple wrapper that adds retry to etcd RPC
func (*Client) Delete ¶
func (c *Client) Delete( ctx context.Context, key string, opts ...clientV3.OpOption, ) (resp *clientV3.DeleteResponse, err error)
Delete delegates request to clientV3.KV.Delete
func (*Client) Get ¶
func (c *Client) Get( ctx context.Context, key string, opts ...clientV3.OpOption, ) (resp *clientV3.GetResponse, err error)
Get delegates request to clientV3.KV.Get
func (*Client) Grant ¶
func (c *Client) Grant( ctx context.Context, ttl int64, ) (resp *clientV3.LeaseGrantResponse, err error)
Grant delegates request to clientV3.Lease.Grant
func (*Client) Put ¶
func (c *Client) Put( ctx context.Context, key, val string, opts ...clientV3.OpOption, ) (resp *clientV3.PutResponse, err error)
Put delegates request to clientV3.KV.Put
func (*Client) RequestProgress ¶
RequestProgress requests a progress notify response be sent in all watch channels.
func (*Client) Revoke ¶
func (c *Client) Revoke( ctx context.Context, id clientV3.LeaseID, ) (resp *clientV3.LeaseRevokeResponse, err error)
Revoke delegates request to clientV3.Lease.Revoke
func (*Client) TimeToLive ¶
func (c *Client) TimeToLive( ctx context.Context, lease clientV3.LeaseID, opts ...clientV3.LeaseOption, ) (resp *clientV3.LeaseTimeToLiveResponse, err error)
TimeToLive delegates request to clientV3.Lease.TimeToLive
func (*Client) Txn ¶
func (c *Client) Txn( ctx context.Context, cmps []clientV3.Cmp, opsThen, opsElse []clientV3.Op, ) (resp *clientV3.TxnResponse, err error)
Txn delegates request to clientV3.KV.Txn. The error returned can only be a non-retryable error, such as context.Canceled, context.DeadlineExceeded, errors.ErrReachMaxTry.
type OwnerCaptureInfoClient ¶
type OwnerCaptureInfoClient interface { GetOwnerID(context.Context) (model.CaptureID, error) GetOwnerRevision(context.Context, model.CaptureID) (int64, error) GetCaptures(context.Context) (int64, []*model.CaptureInfo, error) }
OwnerCaptureInfoClient is the sub interface of CDCEtcdClient that used for get owner capture information