Documentation
¶
Index ¶
- Constants
- Variables
- func Base64DecodeMsgPosition(position string) (*msgstream.MsgPosition, error)
- func Base64Encode(obj []byte) string
- func Base64JSON(obj any) string
- func Base64Msg(msg msgstream.TsMsg) string
- func Base64MsgPosition(position *msgstream.MsgPosition) string
- func Base64ProtoObj(obj proto.Message) string
- func ConvertKVPairToMap(pair []*commonpb.KeyValuePair) map[string]string
- func EtcdDelete(etcdCli KVApi, key string, opts ...clientv3.OpOption) error
- func EtcdGet(etcdCli KVApi, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)deprecated
- func EtcdGetWithContext(ctx context.Context, etcdCli KVApi, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)
- func EtcdPut(etcdCli KVApi, key, val string, opts ...clientv3.OpOption) error
- func EtcdStatus(etcdCli KVApi) error
- func EtcdTxn(etcdCli KVApi, fun func(txn clientv3.Txn) error) error
- func GetCollectionIDFromMsgPack(msgPack *msgstream.MsgPack) int64
- func GetCollectionIDFromRequest(req any) (int64, bool)
- func GetCollectionInfoKeys(collectionName, dbName string) (string, string)
- func GetCollectionNameFromFull(fullName string) (string, string)
- func GetCollectionNameFromMsgPack(msgPack *msgstream.MsgPack) string
- func GetCollectionNameFromRequest(req any) (string, bool)
- func GetCreateInfoKey(key string) string
- func GetCtxWithTaskID(ctx context.Context, taskID string) context.Context
- func GetDBInfoKeys(dbName string) (string, string)
- func GetDatabaseNameFromMsgPack(msgPack *msgstream.MsgPack) string
- func GetDialOptions(config DialConfig) ([]grpc.DialOption, error)
- func GetDropInfoKey(key string) string
- func GetEtcdConfig(etcdServerConfig config.EtcdServerConfig) (clientv3.Config, error)
- func GetEtcdSSLCfg(endpoints []string, certFile string, keyFile string, caCertFile string, ...) (*clientv3.Config, error)
- func GetFullCollectionName(collectionName string, databaseName string) string
- func GetPartitionInfoKeys(partitionName, collectionName, dbName string) (string, string)
- func GetRetryDefaultOptions() []retry.Option
- func GetRetryOptions(c config.RetrySettings) []retry.Option
- func GetTaskIDFromCtx(ctx context.Context) string
- func GetToken(username, password string) string
- func GetURI(address string, port int, enableTLS bool) string
- func GetUUID() string
- func GetVChannel(pchannel, mark string) string
- func InitMilvusPkgParam()
- func IsTombstone(data []byte) bool
- func IsUnRecoverable(err error) bool
- func IsUserRoleMessage(msgPack *msgstream.MsgPack) bool
- func MockEtcdClient(new func(cfg clientv3.Config) (KVApi, error), f func())
- func NewMsgStreamFactory(factory msgstream.Factory, ttMsgStream bool) msgstream.Factory
- func NoRetryOption() []retry.Option
- func OnceFuncWithContext(f func(context.Context)) func(context.Context)
- func ToBytes(s string) []byte
- func ToPhysicalChannel(vchannel string) string
- func ToString(b []byte) string
- func Unrecoverable(err error) error
- type ChannelInfo
- type ChannelMapping
- func (c *ChannelMapping) AddKeyValue(source, target string)
- func (c *ChannelMapping) AverageCnt() int
- func (c *ChannelMapping) CheckKeyExist(source, target string) bool
- func (c *ChannelMapping) CheckKeyNotExist(source, target string) bool
- func (c *ChannelMapping) GetMapKey(source, target string) string
- func (c *ChannelMapping) GetMapValue(source, target string) string
- func (c *ChannelMapping) UsingSourceKey() bool
- type CollectionIDGetter
- type CollectionNameGetter
- type DialConfig
- type ErrorList
- type KVApi
- type Map
- func (m *Map[K, V]) Delete(key K)
- func (m *Map[K, V]) GetUnsafeMap() map[K]V
- func (m *Map[K, V]) Load(key K) (value V, ok bool)
- func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool)
- func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool)
- func (m *Map[K, V]) LoadWithDefault(key K, value V) V
- func (m *Map[K, V]) Range(f func(key K, value V) bool) bool
- func (m *Map[K, V]) Store(key K, value V)
- type MilvusClientResourceManager
- type MsgStreamFactory
- type SafeArray
- type Value
Constants ¶
View Source
const ( MilvusClientResourceTyp = "milvus_client" MilvusClientExpireTime = 30 * time.Second DefaultDbName = "default" )
Variables ¶
View Source
var ( EtcdOpTimeout = 10 * time.Second EtcdOpRetryTime uint = 5 )
View Source
var ( DroppedDatabaseKey = "database" DroppedCollectionKey = "collection" DroppedPartitionKey = "partition" )
View Source
var CtxTaskKey = ctxTaskType{}
View Source
var DefaultReplicateTTInterval = 500
View Source
var NotFoundDatabase = errors.New("database not found")
View Source
var SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC} // base64 value: "4pu8"
Functions ¶
func Base64DecodeMsgPosition ¶
func Base64DecodeMsgPosition(position string) (*msgstream.MsgPosition, error)
func Base64Encode ¶
func Base64JSON ¶
func Base64MsgPosition ¶
func Base64MsgPosition(position *msgstream.MsgPosition) string
func Base64ProtoObj ¶
func ConvertKVPairToMap ¶
func ConvertKVPairToMap(pair []*commonpb.KeyValuePair) map[string]string
func EtcdGetWithContext ¶
func EtcdStatus ¶
func GetCollectionInfoKeys ¶
func GetCreateInfoKey ¶
func GetDBInfoKeys ¶
func GetDialOptions ¶
func GetDialOptions(config DialConfig) ([]grpc.DialOption, error)
func GetDropInfoKey ¶
func GetEtcdConfig ¶
func GetEtcdConfig(etcdServerConfig config.EtcdServerConfig) (clientv3.Config, error)
func GetEtcdSSLCfg ¶
func GetFullCollectionName ¶
func GetPartitionInfoKeys ¶
func GetRetryDefaultOptions ¶
GetRetryDefaultOptions 2 days retry
func GetRetryOptions ¶
func GetRetryOptions(c config.RetrySettings) []retry.Option
func GetTaskIDFromCtx ¶
func GetVChannel ¶
func InitMilvusPkgParam ¶
func InitMilvusPkgParam()
func IsTombstone ¶
func IsUnRecoverable ¶
IsUnRecoverable is used to judge whether the error is wrapped by unrecoverableError.
func IsUserRoleMessage ¶
func MockEtcdClient ¶
deprecated
func NewMsgStreamFactory ¶
func NoRetryOption ¶
func OnceFuncWithContext ¶
OnceFuncWithContext copy from the oncefunc.go
func ToPhysicalChannel ¶
ToPhysicalChannel get physical channel name from virtual channel name
func Unrecoverable ¶
Unrecoverable method wrap an error to unrecoverableError. This will make retry quick return.
Types ¶
type ChannelInfo ¶
func ParseVChannel ¶
func ParseVChannel(virtualName string) (ChannelInfo, error)
type ChannelMapping ¶
type ChannelMapping struct {
// contains filtered or unexported fields
}
func (*ChannelMapping) AddKeyValue ¶
func (c *ChannelMapping) AddKeyValue(source, target string)
func (*ChannelMapping) AverageCnt ¶
func (c *ChannelMapping) AverageCnt() int
func (*ChannelMapping) CheckKeyExist ¶
func (c *ChannelMapping) CheckKeyExist(source, target string) bool
func (*ChannelMapping) CheckKeyNotExist ¶
func (c *ChannelMapping) CheckKeyNotExist(source, target string) bool
CheckKeyNotExist which means the key isn't existed, it will return true if the value is no mapping key, else return false
func (*ChannelMapping) GetMapKey ¶
func (c *ChannelMapping) GetMapKey(source, target string) string
func (*ChannelMapping) GetMapValue ¶
func (c *ChannelMapping) GetMapValue(source, target string) string
func (*ChannelMapping) UsingSourceKey ¶
func (c *ChannelMapping) UsingSourceKey() bool
type CollectionIDGetter ¶
type CollectionIDGetter interface {
GetCollectionID() int64
}
type CollectionNameGetter ¶
type CollectionNameGetter interface {
GetCollectionName() string
}
type DialConfig ¶
type DialConfig struct { ServerName string `json:"server_name,omitempty" mapstructure:"server_name,omitempty"` ServerPemPath string `json:"server_pem_path,omitempty" mapstructure:"server_pem_path,omitempty"` CaPemPath string `json:"ca_pem_path,omitempty" mapstructure:"ca_pem_path,omitempty"` ClientPemPath string `json:"client_pem_path,omitempty" mapstructure:"client_pem_path,omitempty"` ClientKeyPath string `json:"client_key_path,omitempty" mapstructure:"client_key_path,omitempty"` }
type KVApi ¶
type KVApi interface { clientv3.KV clientv3.Watcher // Status From clientv3.Maintenance interface Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) Endpoints() []string }
func GetEtcdClient ¶
func GetEtcdClient(etcdServerConfig config.EtcdServerConfig) (KVApi, error)
type Map ¶
type Map[K comparable, V any] struct { // contains filtered or unexported fields }
func (*Map[K, V]) GetUnsafeMap ¶
func (m *Map[K, V]) GetUnsafeMap() map[K]V
func (*Map[K, V]) LoadAndDelete ¶
func (*Map[K, V]) LoadOrStore ¶
func (*Map[K, V]) LoadWithDefault ¶
func (m *Map[K, V]) LoadWithDefault(key K, value V) V
type MilvusClientResourceManager ¶
type MilvusClientResourceManager struct {
// contains filtered or unexported fields
}
func GetMilvusClientManager ¶
func GetMilvusClientManager() *MilvusClientResourceManager
func (*MilvusClientResourceManager) DeleteMilvusClient ¶
func (m *MilvusClientResourceManager) DeleteMilvusClient(address, database string)
func (*MilvusClientResourceManager) GetMilvusClient ¶
func (m *MilvusClientResourceManager) GetMilvusClient(ctx context.Context, address, token, database string, dialConfig DialConfig) (client.Client, error)
type MsgStreamFactory ¶
type MsgStreamFactory struct {
// contains filtered or unexported fields
}
func (*MsgStreamFactory) NewMsgStream ¶
func (*MsgStreamFactory) NewMsgStreamDisposer ¶
func (*MsgStreamFactory) NewTtMsgStream ¶
type Value ¶
type Value[T any] struct { // contains filtered or unexported fields }
func (*Value[T]) CompareAndSwap ¶
func (*Value[T]) CompareAndSwapWithFunc ¶
func (value *Value[T]) CompareAndSwapWithFunc(f func(old T) T)
Click to show internal directories.
Click to hide internal directories.