Documentation ¶
Index ¶
- Variables
- func AddressOf[T any](v T) *T
- func AreStringSlicesEquivalent(a, b []string) bool
- func CheckMemoryUsage(limit float64) (bool, error)
- func CompareAndIncrease[T numbers](target genericAtomic[T], new T) bool
- func CompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T) bool
- func ConvertTimezone(timestamp string, location string) (string, error)
- func DefaultS3Retryer() request.Retryer
- func DeleteFilesInExtStorage(ctx context.Context, extStorage storage.ExternalStorage, ...) error
- func GetExternalStorage(ctx context.Context, uri string, opts *storage.BackendOptions, ...) (storage.ExternalStorage, error)
- func GetExternalStorageFromURI(ctx context.Context, uri string) (storage.ExternalStorage, error)
- func GetExternalStorageWithDefaultTimeout(ctx context.Context, uri string) (storage.ExternalStorage, error)
- func GetLocalTimezone() (*time.Location, error)
- func GetMemoryLimit() (uint64, error)
- func GetOrZero[T any](p *T) T
- func GetTestExtStorage(ctx context.Context, tmpDir string) (storage.ExternalStorage, *url.URL, error)
- func GetTimeZoneName(tz *time.Location) string
- func GetTimezone(name string) (tz *time.Location, err error)
- func GetTimezoneFromZonefile(zonefile string) (tz *time.Location, err error)
- func HandleErr(ctx context.Context, errCh <-chan error, errFn func(error))
- func HandleErrWithErrGroup(ctx context.Context, errCh <-chan error, errFn func(error)) *errgroup.Group
- func Hang(ctx context.Context, dur time.Duration) error
- func IsIPv6Address(hostname string) bool
- func IsNotExistInExtStorage(err error) bool
- func IsValidIPv6AddressFormatInURI(hostPort string) bool
- func MaskSensitiveDataInURI(uri string) string
- func MaskSinkURI(uri string) (string, error)
- func MonitorCancelLatency(ctx context.Context, identifier string) (func(), func())
- func Must[T any](v T, err error) T
- func MustCompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T)
- func NewS3Retryer(maxRetries int, minRetryDelay, minThrottleDelay time.Duration) request.Retryer
- func RemoveFilesIf(ctx context.Context, extStorage storage.ExternalStorage, ...) error
- func ReturnJSONWriter(w *JSONWriter)
- func WaitMemoryAvailable(ctx context.Context, limit float64, timeout time.Duration) error
- func WaitSomething(nRetry int, waitTime time.Duration, fn func() bool) bool
- type Flag
- type JSONWriter
- func (w *JSONWriter) Buffer() []byte
- func (w *JSONWriter) WriteAnyElement(value any)
- func (w *JSONWriter) WriteAnyField(fieldName string, value any)
- func (w *JSONWriter) WriteArray(arrayElementsWriteFn func())
- func (w *JSONWriter) WriteArrayElement(arrayElementsWriteFn func())
- func (w *JSONWriter) WriteArrayField(fieldName string, arrayElementsWriteFn func())
- func (w *JSONWriter) WriteBase64String(b []byte)
- func (w *JSONWriter) WriteBase64StringElement(b []byte)
- func (w *JSONWriter) WriteBase64StringField(fieldName string, b []byte)
- func (w *JSONWriter) WriteBoolElement(value bool)
- func (w *JSONWriter) WriteBoolField(fieldName string, value bool)
- func (w *JSONWriter) WriteFloat64Element(value float64)
- func (w *JSONWriter) WriteFloat64Field(fieldName string, value float64)
- func (w *JSONWriter) WriteInt64Element(value int64)
- func (w *JSONWriter) WriteInt64Field(fieldName string, value int64)
- func (w *JSONWriter) WriteIntElement(value int)
- func (w *JSONWriter) WriteIntField(fieldName string, value int)
- func (w *JSONWriter) WriteNullElement()
- func (w *JSONWriter) WriteNullField(fieldName string)
- func (w *JSONWriter) WriteObject(objectFieldsWriteFn func())
- func (w *JSONWriter) WriteObjectElement(objectFieldsWriteFn func())
- func (w *JSONWriter) WriteObjectField(fieldName string, objectFieldsWriteFn func())
- func (w *JSONWriter) WriteRaw(b string)
- func (w *JSONWriter) WriteStringElement(value string)
- func (w *JSONWriter) WriteStringField(fieldName string, value string)
- func (w *JSONWriter) WriteUint64Element(value uint64)
- func (w *JSONWriter) WriteUint64Field(fieldName string, value uint64)
- type Role
- type Runnable
Constants ¶
This section is empty.
Variables ¶
var FailpointBuild = isFailpointBuild()
FailpointBuild is true if this is a failpoint build
Functions ¶
func AddressOf ¶
func AddressOf[T any](v T) *T
AddressOf return the address of the given input variable.
func AreStringSlicesEquivalent ¶
AreStringSlicesEquivalent checks if two string slices are equivalent. If the slices are of the same length and contain the same elements (but possibly in different order), the function returns true. Note: This function does modify the slices. Please be caution of this if you are using it.
func CheckMemoryUsage ¶
CheckMemoryUsage checks if the memory usage is less than the limit.
func CompareAndIncrease ¶
func CompareAndIncrease[T numbers](target genericAtomic[T], new T) bool
CompareAndIncrease updates the target if the new value is larger than or equal to the old value. It returns false if the new value is smaller than the old value.
func CompareAndMonotonicIncrease ¶
func CompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T) bool
CompareAndMonotonicIncrease updates the target if the new value is larger than the old value. It returns false if the new value is smaller than or equal to the old value.
func ConvertTimezone ¶
ConvertTimezone converts the timestamp to the specified timezone
func DefaultS3Retryer ¶
DefaultS3Retryer is the default s3 retryer, maybe this function should be extracted to another place.
func DeleteFilesInExtStorage ¶
func DeleteFilesInExtStorage( ctx context.Context, extStorage storage.ExternalStorage, toRemoveFiles []string, ) error
DeleteFilesInExtStorage deletes files in external storage concurrently. TODO: Add a test for this function to cover batch delete.
func GetExternalStorage ¶
func GetExternalStorage( ctx context.Context, uri string, opts *storage.BackendOptions, retryer request.Retryer, ) (storage.ExternalStorage, error)
GetExternalStorage creates a new storage.ExternalStorage based on the uri and options.
func GetExternalStorageFromURI ¶
GetExternalStorageFromURI creates a new storage.ExternalStorage from a uri.
func GetExternalStorageWithDefaultTimeout ¶
func GetExternalStorageWithDefaultTimeout(ctx context.Context, uri string) (storage.ExternalStorage, error)
GetExternalStorageWithDefaultTimeout creates a new storage.ExternalStorage from a uri without retry. It is the caller's responsibility to set timeout to the context.
func GetLocalTimezone ¶
GetLocalTimezone returns the timezone in local system
func GetMemoryLimit ¶
GetMemoryLimit gets the memory limit of current process based on cgroup. If the cgourp is not set or memory.max is set to max, returns the available memory of host.
func GetOrZero ¶
func GetOrZero[T any](p *T) T
GetOrZero returns the value pointed to by p, or a zero value of its type if p is nil.
func GetTestExtStorage ¶
func GetTestExtStorage( ctx context.Context, tmpDir string, ) (storage.ExternalStorage, *url.URL, error)
GetTestExtStorage creates a test storage.ExternalStorage from a uri.
func GetTimeZoneName ¶
GetTimeZoneName returns the timezone name in a time.Location.
func GetTimezone ¶
GetTimezone returns the timezone specified by the name
func GetTimezoneFromZonefile ¶
GetTimezoneFromZonefile read the timezone from file
func HandleErrWithErrGroup ¶
func HandleErrWithErrGroup(ctx context.Context, errCh <-chan error, errFn func(error)) *errgroup.Group
HandleErrWithErrGroup creates a `errgroup.Group` and calls `HandleErr` within the error group
func IsIPv6Address ¶
IsIPv6Address reports whether hostname is a IPv6 address. Notice: There is hostname not host(host+port).
func IsNotExistInExtStorage ¶
IsNotExistInExtStorage checks if the error is caused by the file not exist in external storage.
func IsValidIPv6AddressFormatInURI ¶
IsValidIPv6AddressFormatInURI reports whether hostPort is a valid IPv6 address in URI. See: https://www.ietf.org/rfc/rfc2732.txt.
func MaskSensitiveDataInURI ¶
MaskSensitiveDataInURI returns an uri that sensitive infos has been masked.
func MaskSinkURI ¶
MaskSinkURI returns a sink uri that sensitive infos has been masked.
func MonitorCancelLatency ¶
MonitorCancelLatency monitors the latency from ctx being cancelled the first returned function should be called when the cancellation is done the second returned function should be called to mark the cancellation is started, it will start a background go routine to monitor the latency util finish is called or cancellation is done
func MustCompareAndMonotonicIncrease ¶
func MustCompareAndMonotonicIncrease[T numbers](target genericAtomic[T], new T)
MustCompareAndMonotonicIncrease updates the target if the new value is larger than the old value. It do nothing if the new value is smaller than or equal to the old value.
func NewS3Retryer ¶
NewS3Retryer creates a new s3 retryer.
func RemoveFilesIf ¶
func RemoveFilesIf( ctx context.Context, extStorage storage.ExternalStorage, pred func(path string) bool, opt *storage.WalkOption, ) error
RemoveFilesIf removes files from external storage if the path matches the predicate.
func ReturnJSONWriter ¶
func ReturnJSONWriter(w *JSONWriter)
ReturnJSONWriter returns the borrowed JSONWriter instance to pool.
func WaitMemoryAvailable ¶
WaitMemoryAvailable waits until the memory usage is less than the limit.
Types ¶
type JSONWriter ¶
type JSONWriter struct {
// contains filtered or unexported fields
}
JSONWriter builds JSON in an efficient and structural way.
Example Usage
w := BorrowJSONWriter(out) w.WriteObject(func() { w.WriteObjectField("payload", func() { w.WriteObjectField("dml", func() { w.WriteStringField("statement", "INSERT") w.WriteUint64Field("ts", 100) }) }) w.WriteObjectField("source", func() { w.WriteStringField("source", "TiCDC") w.WriteInt64Field("version", 1) }) }) ReturnJSONWriter(w)
func BorrowJSONWriter ¶
func BorrowJSONWriter(out io.Writer) *JSONWriter
BorrowJSONWriter borrows a JSONWriter instance from pool. Remember to call ReturnJSONWriter to return the borrowed instance.
func (*JSONWriter) Buffer ¶
func (w *JSONWriter) Buffer() []byte
Buffer returns the buffer if out is nil. WARN: You may need to copy the result of the buffer. Otherwise the content of the buffer may be changed.
func (*JSONWriter) WriteAnyElement ¶
func (w *JSONWriter) WriteAnyElement(value any)
WriteAnyElement writes a array element like ,<value>.
func (*JSONWriter) WriteAnyField ¶
func (w *JSONWriter) WriteAnyField(fieldName string, value any)
WriteAnyField writes a field like "<fieldName>":<value>.
func (*JSONWriter) WriteArray ¶
func (w *JSONWriter) WriteArray(arrayElementsWriteFn func())
WriteArray writes [......].
func (*JSONWriter) WriteArrayElement ¶
func (w *JSONWriter) WriteArrayElement(arrayElementsWriteFn func())
WriteArrayElement writes a array array element like ,[......].
func (*JSONWriter) WriteArrayField ¶
func (w *JSONWriter) WriteArrayField(fieldName string, arrayElementsWriteFn func())
WriteArrayField writes a array field like "<fieldName>":[......].
func (*JSONWriter) WriteBase64String ¶
func (w *JSONWriter) WriteBase64String(b []byte)
WriteBase64String writes a base64 string like "<value>".
func (*JSONWriter) WriteBase64StringElement ¶
func (w *JSONWriter) WriteBase64StringElement(b []byte)
WriteBase64StringElement writes a base64 string array element like ,"<value>".
func (*JSONWriter) WriteBase64StringField ¶
func (w *JSONWriter) WriteBase64StringField(fieldName string, b []byte)
WriteBase64StringField writes a base64 string field like "<fieldName>":"<value>".
func (*JSONWriter) WriteBoolElement ¶
func (w *JSONWriter) WriteBoolElement(value bool)
WriteBoolElement writes a bool array element like ,<value>.
func (*JSONWriter) WriteBoolField ¶
func (w *JSONWriter) WriteBoolField(fieldName string, value bool)
WriteBoolField writes a bool field like "<fieldName>":<value>.
func (*JSONWriter) WriteFloat64Element ¶
func (w *JSONWriter) WriteFloat64Element(value float64)
WriteFloat64Element writes a float64 array element like ,<value>.
func (*JSONWriter) WriteFloat64Field ¶
func (w *JSONWriter) WriteFloat64Field(fieldName string, value float64)
WriteFloat64Field writes a float64 field like "<fieldName>":<value>.
func (*JSONWriter) WriteInt64Element ¶
func (w *JSONWriter) WriteInt64Element(value int64)
WriteInt64Element writes a int64 array element like ,<value>.
func (*JSONWriter) WriteInt64Field ¶
func (w *JSONWriter) WriteInt64Field(fieldName string, value int64)
WriteInt64Field writes a int64 field like "<fieldName>":<value>.
func (*JSONWriter) WriteIntElement ¶
func (w *JSONWriter) WriteIntElement(value int)
WriteIntElement writes a int array element like ,<value>.
func (*JSONWriter) WriteIntField ¶
func (w *JSONWriter) WriteIntField(fieldName string, value int)
WriteIntField writes a int field like "<fieldName>":<value>.
func (*JSONWriter) WriteNullElement ¶
func (w *JSONWriter) WriteNullElement()
WriteNullElement writes a null array element like ,null.
func (*JSONWriter) WriteNullField ¶
func (w *JSONWriter) WriteNullField(fieldName string)
WriteNullField writes a null field like "<fieldName>":null.
func (*JSONWriter) WriteObject ¶
func (w *JSONWriter) WriteObject(objectFieldsWriteFn func())
WriteObject writes {......}.
func (*JSONWriter) WriteObjectElement ¶
func (w *JSONWriter) WriteObjectElement(objectFieldsWriteFn func())
WriteObjectElement writes a object array element like ,{......}.
func (*JSONWriter) WriteObjectField ¶
func (w *JSONWriter) WriteObjectField(fieldName string, objectFieldsWriteFn func())
WriteObjectField writes a object field like "<fieldName>":{......}.
func (*JSONWriter) WriteRaw ¶
func (w *JSONWriter) WriteRaw(b string)
WriteRaw writes a raw string directly into the output.
func (*JSONWriter) WriteStringElement ¶
func (w *JSONWriter) WriteStringElement(value string)
WriteStringElement writes a string array element like ,"<value>".
func (*JSONWriter) WriteStringField ¶
func (w *JSONWriter) WriteStringField(fieldName string, value string)
WriteStringField writes a string field like "<fieldName>":"<value>".
func (*JSONWriter) WriteUint64Element ¶
func (w *JSONWriter) WriteUint64Element(value uint64)
WriteUint64Element writes a uint64 array element like ,<value>.
func (*JSONWriter) WriteUint64Field ¶
func (w *JSONWriter) WriteUint64Field(fieldName string, value uint64)
WriteUint64Field writes a uint64 field like "<fieldName>":<value>.
type Role ¶
type Role int
Role is the operator role, mainly used for logging at the moment.
const ( // RoleOwner is the owner of the cluster. RoleOwner Role = iota // RoleProcessor is the processor of the cluster. RoleProcessor // RoleClient is the client. RoleClient // RoleRedoLogApplier is the redo log applier. RoleRedoLogApplier // RoleKafkaConsumer is the kafka consumer. RoleKafkaConsumer // RoleTester for test. RoleTester // RoleUnknown is the unknown role. RoleUnknown )
type Runnable ¶
type Runnable interface { // Run all sub goroutines and block the current one. If an error occurs // in any sub goroutine, return it and cancel all others. // // Generally a Runnable object can have some internal resources, like file // descriptors, channels or memory buffers. Those resources may be still // necessary by other components after this Runnable is returned. We can use // Close to release them. // // `warnings` is used to retrieve internal warnings generated when running. Run(ctx context.Context, warnings ...chan<- error) error // WaitForReady blocks the current goroutine until `Run` initializes itself. WaitForReady(ctx context.Context) // Close all internal resources synchronously. Close() }
Runnable is for handling sub components in a unified way.
For example: r := createRunable() ctx, cancel := context.WithCancel(context.Background()) go func() { handleError(r.Run(ctx)) }() r.WaitForReady() cancel() r.Close()