util

package
v0.0.0-...-47eb5cf Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 7, 2025 License: Apache-2.0 Imports: 34 Imported by: 6

Documentation

Index

Constants

View Source
const (
	MilvusClientResourceTyp = "milvus_client"
	MilvusClientExpireTime  = 30 * time.Second
	DefaultDbName           = "default"
)

Variables

View Source
var (
	EtcdOpTimeout        = 10 * time.Second
	EtcdOpRetryTime uint = 5
)
View Source
var (
	DroppedDatabaseKey   = "database"
	DroppedCollectionKey = "collection"
	DroppedPartitionKey  = "partition"
)
View Source
var CtxTaskKey = ctxTaskType{}
View Source
var DefaultReplicateTTInterval = 500
View Source
var NotFoundDatabase = errors.New("database not found")
View Source
var SuffixSnapshotTombstone = []byte{0xE2, 0x9B, 0xBC} // base64 value: "4pu8"

Functions

func Base64DecodeMsgPosition

func Base64DecodeMsgPosition(position string) (*msgstream.MsgPosition, error)

func Base64Encode

func Base64Encode(obj []byte) string

func Base64JSON

func Base64JSON(obj any) string

func Base64Msg

func Base64Msg(msg msgstream.TsMsg) string

func Base64MsgPosition

func Base64MsgPosition(position *msgstream.MsgPosition) string

func Base64ProtoObj

func Base64ProtoObj(obj proto.Message) string

func ConvertKVPairToMap

func ConvertKVPairToMap(pair []*commonpb.KeyValuePair) map[string]string

func EtcdDelete

func EtcdDelete(etcdCli KVApi, key string, opts ...clientv3.OpOption) error

func EtcdGet deprecated

func EtcdGet(etcdCli KVApi, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)

Deprecated: use EtcdGetWithContext instead

func EtcdGetWithContext

func EtcdGetWithContext(ctx context.Context, etcdCli KVApi, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error)

func EtcdPut

func EtcdPut(etcdCli KVApi, key, val string, opts ...clientv3.OpOption) error

func EtcdStatus

func EtcdStatus(etcdCli KVApi) error

func EtcdTxn

func EtcdTxn(etcdCli KVApi, fun func(txn clientv3.Txn) error) error

func GetCollectionIDFromMsgPack

func GetCollectionIDFromMsgPack(msgPack *msgstream.MsgPack) int64

func GetCollectionIDFromRequest

func GetCollectionIDFromRequest(req any) (int64, bool)

func GetCollectionInfoKeys

func GetCollectionInfoKeys(collectionName, dbName string) (string, string)

func GetCollectionNameFromFull

func GetCollectionNameFromFull(fullName string) (string, string)

func GetCollectionNameFromMsgPack

func GetCollectionNameFromMsgPack(msgPack *msgstream.MsgPack) string

func GetCollectionNameFromRequest

func GetCollectionNameFromRequest(req any) (string, bool)

func GetCreateInfoKey

func GetCreateInfoKey(key string) string

func GetCtxWithTaskID

func GetCtxWithTaskID(ctx context.Context, taskID string) context.Context

func GetDBInfoKeys

func GetDBInfoKeys(dbName string) (string, string)

func GetDatabaseNameFromMsgPack

func GetDatabaseNameFromMsgPack(msgPack *msgstream.MsgPack) string

func GetDialOptions

func GetDialOptions(config DialConfig) ([]grpc.DialOption, error)

func GetDropInfoKey

func GetDropInfoKey(key string) string

func GetEtcdConfig

func GetEtcdConfig(etcdServerConfig config.EtcdServerConfig) (clientv3.Config, error)

func GetEtcdSSLCfg

func GetEtcdSSLCfg(endpoints []string, certFile string, keyFile string, caCertFile string, minVersion string, cfg *clientv3.Config) (*clientv3.Config, error)

func GetFullCollectionName

func GetFullCollectionName(collectionName string, databaseName string) string

func GetPartitionInfoKeys

func GetPartitionInfoKeys(partitionName, collectionName, dbName string) (string, string)

func GetRetryDefaultOptions

func GetRetryDefaultOptions() []retry.Option

GetRetryDefaultOptions 2 days retry

func GetRetryOptions

func GetRetryOptions(c config.RetrySettings) []retry.Option

func GetTaskIDFromCtx

func GetTaskIDFromCtx(ctx context.Context) string

func GetToken

func GetToken(username, password string) string

func GetURI

func GetURI(address string, port int, enableTLS bool) string

func GetUUID

func GetUUID() string

func GetVChannel

func GetVChannel(pchannel, mark string) string

func InitMilvusPkgParam

func InitMilvusPkgParam()

func IsTombstone

func IsTombstone(data []byte) bool

func IsUnRecoverable

func IsUnRecoverable(err error) bool

IsUnRecoverable is used to judge whether the error is wrapped by unrecoverableError.

func IsUserRoleMessage

func IsUserRoleMessage(msgPack *msgstream.MsgPack) bool

func MockEtcdClient

func MockEtcdClient(new func(cfg clientv3.Config) (KVApi, error), f func())

deprecated

func NewMsgStreamFactory

func NewMsgStreamFactory(factory msgstream.Factory, ttMsgStream bool) msgstream.Factory

func NoRetryOption

func NoRetryOption() []retry.Option

func OnceFuncWithContext

func OnceFuncWithContext(f func(context.Context)) func(context.Context)

OnceFuncWithContext copy from the oncefunc.go

func ToBytes

func ToBytes(s string) []byte

ToBytes performs unholy acts to avoid allocations

func ToPhysicalChannel

func ToPhysicalChannel(vchannel string) string

ToPhysicalChannel get physical channel name from virtual channel name

func ToString

func ToString(b []byte) string

ToString like ToBytes

func Unrecoverable

func Unrecoverable(err error) error

Unrecoverable method wrap an error to unrecoverableError. This will make retry quick return.

Types

type ChannelInfo

type ChannelInfo struct {
	PChannelName string
	CollectionID int64
	ShardIndex   int
}

func ParseVChannel

func ParseVChannel(virtualName string) (ChannelInfo, error)

type ChannelMapping

type ChannelMapping struct {
	// contains filtered or unexported fields
}

func NewChannelMapping

func NewChannelMapping(sourceCnt, targetCnt int) *ChannelMapping

nolint

func (*ChannelMapping) AddKeyValue

func (c *ChannelMapping) AddKeyValue(source, target string)

func (*ChannelMapping) AverageCnt

func (c *ChannelMapping) AverageCnt() int

func (*ChannelMapping) CheckKeyExist

func (c *ChannelMapping) CheckKeyExist(source, target string) bool

func (*ChannelMapping) CheckKeyNotExist

func (c *ChannelMapping) CheckKeyNotExist(source, target string) bool

CheckKeyNotExist which means the key isn't existed, it will return true if the value is no mapping key, else return false

func (*ChannelMapping) GetMapKey

func (c *ChannelMapping) GetMapKey(source, target string) string

func (*ChannelMapping) GetMapValue

func (c *ChannelMapping) GetMapValue(source, target string) string

func (*ChannelMapping) UsingSourceKey

func (c *ChannelMapping) UsingSourceKey() bool

type CollectionIDGetter

type CollectionIDGetter interface {
	GetCollectionID() int64
}

type CollectionNameGetter

type CollectionNameGetter interface {
	GetCollectionName() string
}

type DialConfig

type DialConfig struct {
	ServerName    string `json:"server_name,omitempty" mapstructure:"server_name,omitempty"`
	ServerPemPath string `json:"server_pem_path,omitempty" mapstructure:"server_pem_path,omitempty"`
	CaPemPath     string `json:"ca_pem_path,omitempty" mapstructure:"ca_pem_path,omitempty"`
	ClientPemPath string `json:"client_pem_path,omitempty" mapstructure:"client_pem_path,omitempty"`
	ClientKeyPath string `json:"client_key_path,omitempty" mapstructure:"client_key_path,omitempty"`
}

type ErrorList

type ErrorList []error

ErrorList for print error log

func (ErrorList) Error

func (el ErrorList) Error() string

Error method return an string representation of retry error list.

type KVApi

type KVApi interface {
	clientv3.KV
	clientv3.Watcher
	// Status From clientv3.Maintenance interface
	Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
	Endpoints() []string
}

func GetEtcdClient

func GetEtcdClient(etcdServerConfig config.EtcdServerConfig) (KVApi, error)

func GetEtcdClientWithAddress

func GetEtcdClientWithAddress(endpoints []string) (KVApi, error)

type Map

type Map[K comparable, V any] struct {
	// contains filtered or unexported fields
}

func (*Map[K, V]) Delete

func (m *Map[K, V]) Delete(key K)

func (*Map[K, V]) GetUnsafeMap

func (m *Map[K, V]) GetUnsafeMap() map[K]V

func (*Map[K, V]) Load

func (m *Map[K, V]) Load(key K) (value V, ok bool)

func (*Map[K, V]) LoadAndDelete

func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool)

func (*Map[K, V]) LoadOrStore

func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool)

func (*Map[K, V]) LoadWithDefault

func (m *Map[K, V]) LoadWithDefault(key K, value V) V

func (*Map[K, V]) Range

func (m *Map[K, V]) Range(f func(key K, value V) bool) bool

func (*Map[K, V]) Store

func (m *Map[K, V]) Store(key K, value V)

type MilvusClientResourceManager

type MilvusClientResourceManager struct {
	// contains filtered or unexported fields
}

func GetMilvusClientManager

func GetMilvusClientManager() *MilvusClientResourceManager

func (*MilvusClientResourceManager) DeleteMilvusClient

func (m *MilvusClientResourceManager) DeleteMilvusClient(address, database string)

func (*MilvusClientResourceManager) GetMilvusClient

func (m *MilvusClientResourceManager) GetMilvusClient(ctx context.Context, address, token, database string, dialConfig DialConfig) (client.Client, error)

type MsgStreamFactory

type MsgStreamFactory struct {
	// contains filtered or unexported fields
}

func (*MsgStreamFactory) NewMsgStream

func (m *MsgStreamFactory) NewMsgStream(ctx context.Context) (msgstream.MsgStream, error)

func (*MsgStreamFactory) NewMsgStreamDisposer

func (m *MsgStreamFactory) NewMsgStreamDisposer(ctx context.Context) func([]string, string) error

func (*MsgStreamFactory) NewTtMsgStream

func (m *MsgStreamFactory) NewTtMsgStream(ctx context.Context) (msgstream.MsgStream, error)

type SafeArray

type SafeArray[T any] struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

func (*SafeArray[T]) Append

func (sa *SafeArray[T]) Append(n T)

func (*SafeArray[T]) Get

func (sa *SafeArray[T]) Get(index int) T

func (*SafeArray[T]) Range

func (sa *SafeArray[T]) Range(f func(index int, value T) bool)

type Value

type Value[T any] struct {
	// contains filtered or unexported fields
}

func NewValue

func NewValue[T any](initValue T) *Value[T]

func (*Value[T]) CompareAndSwap

func (value *Value[T]) CompareAndSwap(old T, new T) bool

func (*Value[T]) CompareAndSwapWithFunc

func (value *Value[T]) CompareAndSwapWithFunc(f func(old T) T)

func (*Value[T]) Load

func (value *Value[T]) Load() T

func (*Value[T]) Store

func (value *Value[T]) Store(t T)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL