Documentation ¶
Overview ¶
Package tunnel is a generated GoMock package.
Index ¶
- Constants
- Variables
- func ExponentialBackoff(interval, maxInterval, maxElapsed time.Duration, multiplier, jitter float64) *backoff.ExponentialBackOff
- func ParseRequestToken(token string) (*protocol.TokenContentV2, error)
- func ReplaceLogCore(ws zapcore.WriteSyncer, conf zap.Config) zap.Option
- func StreamRecordSequenceLess(a, b *SequenceInfo) bool
- type ActionType
- type AsyncProcessFactory
- type BatchGetStatusReq
- type ChannelBackoffConfig
- type ChannelConn
- type ChannelContext
- type ChannelDialer
- type ChannelInfo
- type ChannelProcessor
- type ChannelProcessorFactory
- type ChannelStatus
- type ChannelType
- type Checkpointer
- type ClientOption
- type CreateTunnelRequest
- type CreateTunnelResponse
- type DefaultTunnelClient
- func (c *DefaultTunnelClient) CreateTunnel(req *CreateTunnelRequest) (*CreateTunnelResponse, error)
- func (c *DefaultTunnelClient) DeleteTunnel(req *DeleteTunnelRequest) (*DeleteTunnelResponse, error)
- func (c *DefaultTunnelClient) DescribeTunnel(req *DescribeTunnelRequest) (*DescribeTunnelResponse, error)
- func (c *DefaultTunnelClient) GetRpo(req *GetRpoRequest) (*GetRpoResponse, error)
- func (c *DefaultTunnelClient) GetRpoByOffset(req *GetRpoRequest) (*GetRpoResponse, error)
- func (c *DefaultTunnelClient) ListTunnel(req *ListTunnelRequest) (*ListTunnelResponse, error)
- func (c *DefaultTunnelClient) NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error)
- func (c *DefaultTunnelClient) Schedule(req *ScheduleRequest) (*ScheduleResponse, error)
- type DeleteTunnelRequest
- type DeleteTunnelResponse
- type DescribeTunnelRequest
- type DescribeTunnelResponse
- type GetRpoRequest
- type GetRpoResponse
- type ListTunnelRequest
- type ListTunnelResponse
- type MockTunnelMetaApi
- func (m *MockTunnelMetaApi) CreateTunnel(req *CreateTunnelRequest) (*CreateTunnelResponse, error)
- func (m *MockTunnelMetaApi) DeleteTunnel(req *DeleteTunnelRequest) (*DeleteTunnelResponse, error)
- func (m *MockTunnelMetaApi) DescribeTunnel(req *DescribeTunnelRequest) (*DescribeTunnelResponse, error)
- func (m *MockTunnelMetaApi) EXPECT() *MockTunnelMetaApiMockRecorder
- func (m *MockTunnelMetaApi) GetRpo(req *GetRpoRequest) (*GetRpoResponse, error)
- func (m *MockTunnelMetaApi) GetRpoByOffset(req *GetRpoRequest) (*GetRpoResponse, error)
- func (m *MockTunnelMetaApi) ListTunnel(req *ListTunnelRequest) (*ListTunnelResponse, error)
- func (m *MockTunnelMetaApi) NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error)
- func (m *MockTunnelMetaApi) Schedule(req *ScheduleRequest) (*ScheduleResponse, error)
- type MockTunnelMetaApiMockRecorder
- func (mr *MockTunnelMetaApiMockRecorder) CreateTunnel(req interface{}) *gomock.Call
- func (mr *MockTunnelMetaApiMockRecorder) DeleteTunnel(req interface{}) *gomock.Call
- func (mr *MockTunnelMetaApiMockRecorder) DescribeTunnel(req interface{}) *gomock.Call
- func (mr *MockTunnelMetaApiMockRecorder) GetRpo(req interface{}) *gomock.Call
- func (mr *MockTunnelMetaApiMockRecorder) GetRpoByOffset(req interface{}) *gomock.Call
- func (mr *MockTunnelMetaApiMockRecorder) ListTunnel(req interface{}) *gomock.Call
- func (mr *MockTunnelMetaApiMockRecorder) NewTunnelWorker(tunnelId, workerConfig interface{}) *gomock.Call
- func (mr *MockTunnelMetaApiMockRecorder) Schedule(req interface{}) *gomock.Call
- type MockTunnelWorker
- type MockTunnelWorkerMockRecorder
- type MocktunnelDataApi
- type MocktunnelDataApiMockRecorder
- type ParallelReleaseManager
- type PrimaryKey
- type PrimaryKeyColumn
- type ReadRecordRequest
- type ReadRecordResponse
- type Record
- type RecordColumn
- type RecordColumnType
- type ResponseInfo
- type RpoLatency
- type ScheduleChannel
- type ScheduleRequest
- type ScheduleResponse
- type SequenceInfo
- type SimpleProcessFactory
- type StreamTunnelConfig
- type TunnelApi
- func NewTunnelApi(endpoint, instanceName, accessKeyId, accessKeySecret string, ...) *TunnelApi
- func NewTunnelApiWithExternalHeader(endpoint, instanceName, accessKeyId, accessKeySecret, token string, ...) *TunnelApi
- func NewTunnelApiWithToken(endpoint, instanceName, accessKeyId, accessKeySecret, token string, ...) *TunnelApi
- func (api *TunnelApi) Checkpoint(tunnelId, clientId string, channelId string, token string, ...) error
- func (api *TunnelApi) CreateTunnel(req *CreateTunnelRequest) (*CreateTunnelResponse, error)
- func (api *TunnelApi) DeleteTunnel(req *DeleteTunnelRequest) (*DeleteTunnelResponse, error)
- func (api *TunnelApi) DescribeTunnel(req *DescribeTunnelRequest) (*DescribeTunnelResponse, error)
- func (api *TunnelApi) GetCheckpoint(tunnelId, clientId string, channelId string) (string, int64, error)
- func (api *TunnelApi) GetRpo(req *GetRpoRequest) (*GetRpoResponse, error)
- func (api *TunnelApi) GetRpoByOffset(req *GetRpoRequest) (*GetRpoResponse, error)
- func (api *TunnelApi) ListTunnel(req *ListTunnelRequest) (*ListTunnelResponse, error)
- func (api *TunnelApi) ReadRecords(req *ReadRecordRequest) (*ReadRecordResponse, error)
- func (api *TunnelApi) ReadRows(tunnelId, clientId string, channelId string, token string) ([]*protocol.Record, string, string, int, error)
- func (api *TunnelApi) Schedule(req *ScheduleRequest) (*ScheduleResponse, error)
- type TunnelClient
- func NewTunnelClient(endpoint, instanceName, accessId, accessKey string, options ...ClientOption) TunnelClient
- func NewTunnelClientWithConfig(endpoint, instanceName, accessId, accessKey string, conf *TunnelConfig, ...) TunnelClient
- func NewTunnelClientWithConfigAndExternalHeader(endpoint, instanceName, accessId, accessKey, token string, conf *TunnelConfig, ...) TunnelClient
- func NewTunnelClientWithExternalHeader(endpoint, instanceName, accessId, accessKey, token string, ...) TunnelClient
- func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string, conf *TunnelConfig, ...) TunnelClient
- type TunnelConfig
- type TunnelError
- type TunnelInfo
- type TunnelMetaApi
- type TunnelRpoLatency
- type TunnelStateMachine
- type TunnelType
- type TunnelWorker
- type TunnelWorkerConfig
- type TunnelWorkerDaemon
Constants ¶
const ( ErrCodeParamInvalid = "OTSParameterInvalid" ErrCodeResourceGone = "OTSResourceGone" ErrCodeSequenceNotMatch = "OTSSequenceNumberNotMatch" ErrCodeClientError = "OTSClientError" ErrCodeTunnelExpired = "OTSTunnelExpired" ErrCodePermissionDenied = "OTSPermissionDenied" ErrCodeTunnelExist = "OTSTunnelExist" )
const ( //for serialize binary record TAG_VERSION = 0x1 TAG_RECORD_COUNT = 0x2 TAG_ACTION_TYPE = 0x3 TAG_RECORD_LENGTH = 0x4 TAG_RECORD = 0x5 )
Variables ¶
var ( DefaultHeartbeatInterval = 30 * time.Second DefaultHeartbeatTimeout = 300 * time.Second )
var ( ErrUnExpectBinaryRecordTag = errors.New("UnexpectBinaryRecordTag") ErrUnSupportRecordVersion = errors.New("UnSupportRecordVersion") )
var ( DefaultChannelSize = 10 DefaultCheckpointInterval = 10 * time.Second )
var DaemonRandomRetryMs = 10000
var DefaultBackoffConfig = ChannelBackoffConfig{ MaxDelay: 5 * time.Second, // contains filtered or unexported fields }
DefaultBackoffConfig is specified channel worker backoff parameters, decided by table store tunnel service developers.
var DefaultLogConfig = zap.Config{ Level: zap.NewAtomicLevelAt(zap.InfoLevel), Development: false, Sampling: &zap.SamplingConfig{ Initial: 100, Thereafter: 100, }, Encoding: "json", EncoderConfig: zapcore.EncoderConfig{ TimeKey: "ts", LevelKey: "level", NameKey: "logger", CallerKey: "caller", MessageKey: "msg", StacktraceKey: "stacktrace", LineEnding: zapcore.DefaultLineEnding, EncodeLevel: zapcore.LowercaseLevelEncoder, EncodeTime: zapcore.ISO8601TimeEncoder, EncodeDuration: zapcore.SecondsDurationEncoder, EncodeCaller: zapcore.ShortCallerEncoder, }, }
var DefaultSyncer = zapcore.AddSync(&lumberjack.Logger{ Filename: "tunnelClient.log", MaxSize: 512, MaxBackups: 5, MaxAge: 30, Compress: true, })
var DefaultTunnelConfig = &TunnelConfig{ MaxRetryElapsedTime: 75 * time.Second, RequestTimeout: 60 * time.Second, Transport: http.DefaultTransport, }
var (
// PipeChannelSize is channelDialer's data pipeline channel size
PipeChannelSize = 1
)
Functions ¶
func ExponentialBackoff ¶
func ParseRequestToken ¶
func ParseRequestToken(token string) (*protocol.TokenContentV2, error)
func ReplaceLogCore ¶
hack replace zap config build core with lumberjack logger
func StreamRecordSequenceLess ¶
func StreamRecordSequenceLess(a, b *SequenceInfo) bool
Types ¶
type ActionType ¶
type ActionType int
const ( AT_Put ActionType = iota AT_Update AT_Delete )
func ParseActionType ¶
func ParseActionType(pbType *protocol.ActionType) (ActionType, error)
func (ActionType) String ¶
func (t ActionType) String() string
type AsyncProcessFactory ¶ added in v1.7.3
type AsyncProcessFactory struct { CustomValue interface{} CpInterval time.Duration ProcessFunc func(channelCtx *ChannelContext, records []*Record) error ChannelOpenFunc func(channelCtx *ChannelContext) error ShutdownFunc func(channelCtx *ChannelContext) Logger *zap.Logger NeedBinaryRecord bool SyncCloseResource bool // sync close tunnel resource before daemon exit AlwaysCallBack bool // when AlwaysCallBack is true, processFunc will be called even if the record is empty RecordPipePerChannel *int }
AsyncProcessFactory support async commit token
func (*AsyncProcessFactory) NewProcessor ¶ added in v1.7.3
func (s *AsyncProcessFactory) NewProcessor(tunnelId, clientId, channelId string, checkpointer Checkpointer) (ChannelProcessor, error)
type BatchGetStatusReq ¶
type BatchGetStatusReq struct {
// contains filtered or unexported fields
}
func NewBatchGetStatusReq ¶
func NewBatchGetStatusReq() *BatchGetStatusReq
type ChannelBackoffConfig ¶
type ChannelBackoffConfig struct { //MaxDelay is the upper bound of backoff delay. MaxDelay time.Duration // contains filtered or unexported fields }
stream type channel worker backoff config
type ChannelConn ¶
type ChannelConn interface { NotifyStatus(channel *ChannelStatus) Closed() bool Close() }
type ChannelContext ¶
type ChannelContext struct { TunnelId string ClientId string ChannelId string IsStreamChannel bool TraceId string NextToken string RecordCount int BinaryRecords []byte Processor ChannelProcessor CustomValue interface{} ParallelReleaseManager ParallelReleaseManager }
func NewChannelContext ¶ added in v1.6.0
func NewChannelContext(tunnelId, clientId, channelId string, customValue interface{}) *ChannelContext
func (*ChannelContext) String ¶
func (c *ChannelContext) String() string
type ChannelDialer ¶
type ChannelDialer interface {
ChannelDial(tunnelId, clientId, channelId, token string, p ChannelProcessor, state *TunnelStateMachine) ChannelConn
}
type ChannelInfo ¶
type ChannelProcessor ¶
type ChannelProcessorFactory ¶
type ChannelProcessorFactory interface {
NewProcessor(tunnelId, clientId, channelId string, checkpointer Checkpointer) (ChannelProcessor, error)
}
type ChannelStatus ¶
type ChannelStatus struct { ChannelId string Version int64 Status protocol.ChannelStatus }
func ToChannelStatus ¶
func ToChannelStatus(c *protocol.Channel) *ChannelStatus
func (*ChannelStatus) ToPbChannel ¶
func (c *ChannelStatus) ToPbChannel() *protocol.Channel
type ChannelType ¶
type ChannelType string
const ( ChannelType_BaseData ChannelType = "BaseData" ChannelType_Stream ChannelType = "Stream" )
type Checkpointer ¶
type ClientOption ¶ added in v1.7.3
type ClientOption func(api *TunnelApi)
func SetCredentialsProvider ¶ added in v1.7.3
func SetCredentialsProvider(provider common.CredentialsProvider) ClientOption
SetCredentialsProvider sets funciton for get the user's ak
type CreateTunnelRequest ¶
type CreateTunnelRequest struct { TableName string TunnelName string Type TunnelType StreamTunnelConfig *StreamTunnelConfig // NeedAllTimeSeriesColumns function is temporarily disabled. NeedAllTimeSeriesColumns bool }
type CreateTunnelResponse ¶
type CreateTunnelResponse struct { TunnelId string ResponseInfo }
type DefaultTunnelClient ¶
type DefaultTunnelClient struct {
// contains filtered or unexported fields
}
func (*DefaultTunnelClient) CreateTunnel ¶
func (c *DefaultTunnelClient) CreateTunnel(req *CreateTunnelRequest) (*CreateTunnelResponse, error)
func (*DefaultTunnelClient) DeleteTunnel ¶
func (c *DefaultTunnelClient) DeleteTunnel(req *DeleteTunnelRequest) (*DeleteTunnelResponse, error)
func (*DefaultTunnelClient) DescribeTunnel ¶
func (c *DefaultTunnelClient) DescribeTunnel(req *DescribeTunnelRequest) (*DescribeTunnelResponse, error)
func (*DefaultTunnelClient) GetRpo ¶
func (c *DefaultTunnelClient) GetRpo(req *GetRpoRequest) (*GetRpoResponse, error)
func (*DefaultTunnelClient) GetRpoByOffset ¶ added in v1.7.0
func (c *DefaultTunnelClient) GetRpoByOffset(req *GetRpoRequest) (*GetRpoResponse, error)
func (*DefaultTunnelClient) ListTunnel ¶
func (c *DefaultTunnelClient) ListTunnel(req *ListTunnelRequest) (*ListTunnelResponse, error)
func (*DefaultTunnelClient) NewTunnelWorker ¶
func (c *DefaultTunnelClient) NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error)
func (*DefaultTunnelClient) Schedule ¶
func (c *DefaultTunnelClient) Schedule(req *ScheduleRequest) (*ScheduleResponse, error)
type DeleteTunnelRequest ¶
type DeleteTunnelResponse ¶
type DeleteTunnelResponse struct {
ResponseInfo
}
type DescribeTunnelRequest ¶
type DescribeTunnelResponse ¶
type DescribeTunnelResponse struct { TunnelRPO int64 Tunnel *TunnelInfo Channels []*ChannelInfo ResponseInfo }
type GetRpoRequest ¶
type GetRpoRequest struct {
TunnelId string
}
type GetRpoResponse ¶
type GetRpoResponse struct { RpoInfos map[string]map[string]*RpoLatency TunnelRpoInfos map[string]*TunnelRpoLatency TunnelId string }
type ListTunnelRequest ¶
type ListTunnelRequest struct {
TableName string
}
type ListTunnelResponse ¶
type ListTunnelResponse struct { Tunnels []*TunnelInfo ResponseInfo }
type MockTunnelMetaApi ¶
type MockTunnelMetaApi struct {
// contains filtered or unexported fields
}
MockTunnelMetaApi is a mock of TunnelMetaApi interface
func NewMockTunnelMetaApi ¶
func NewMockTunnelMetaApi(ctrl *gomock.Controller) *MockTunnelMetaApi
NewMockTunnelMetaApi creates a new mock instance
func (*MockTunnelMetaApi) CreateTunnel ¶
func (m *MockTunnelMetaApi) CreateTunnel(req *CreateTunnelRequest) (*CreateTunnelResponse, error)
CreateTunnel mocks base method
func (*MockTunnelMetaApi) DeleteTunnel ¶
func (m *MockTunnelMetaApi) DeleteTunnel(req *DeleteTunnelRequest) (*DeleteTunnelResponse, error)
DeleteTunnel mocks base method
func (*MockTunnelMetaApi) DescribeTunnel ¶
func (m *MockTunnelMetaApi) DescribeTunnel(req *DescribeTunnelRequest) (*DescribeTunnelResponse, error)
DescribeTunnel mocks base method
func (*MockTunnelMetaApi) EXPECT ¶
func (m *MockTunnelMetaApi) EXPECT() *MockTunnelMetaApiMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockTunnelMetaApi) GetRpo ¶
func (m *MockTunnelMetaApi) GetRpo(req *GetRpoRequest) (*GetRpoResponse, error)
GetRpo mocks base method
func (*MockTunnelMetaApi) GetRpoByOffset ¶ added in v1.7.0
func (m *MockTunnelMetaApi) GetRpoByOffset(req *GetRpoRequest) (*GetRpoResponse, error)
GetRpoByOffset mocks base method
func (*MockTunnelMetaApi) ListTunnel ¶
func (m *MockTunnelMetaApi) ListTunnel(req *ListTunnelRequest) (*ListTunnelResponse, error)
ListTunnel mocks base method
func (*MockTunnelMetaApi) NewTunnelWorker ¶
func (m *MockTunnelMetaApi) NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error)
NewTunnelWorker mocks base method
func (*MockTunnelMetaApi) Schedule ¶
func (m *MockTunnelMetaApi) Schedule(req *ScheduleRequest) (*ScheduleResponse, error)
Schedule mocks base method
type MockTunnelMetaApiMockRecorder ¶
type MockTunnelMetaApiMockRecorder struct {
// contains filtered or unexported fields
}
MockTunnelMetaApiMockRecorder is the mock recorder for MockTunnelMetaApi
func (*MockTunnelMetaApiMockRecorder) CreateTunnel ¶
func (mr *MockTunnelMetaApiMockRecorder) CreateTunnel(req interface{}) *gomock.Call
CreateTunnel indicates an expected call of CreateTunnel
func (*MockTunnelMetaApiMockRecorder) DeleteTunnel ¶
func (mr *MockTunnelMetaApiMockRecorder) DeleteTunnel(req interface{}) *gomock.Call
DeleteTunnel indicates an expected call of DeleteTunnel
func (*MockTunnelMetaApiMockRecorder) DescribeTunnel ¶
func (mr *MockTunnelMetaApiMockRecorder) DescribeTunnel(req interface{}) *gomock.Call
DescribeTunnel indicates an expected call of DescribeTunnel
func (*MockTunnelMetaApiMockRecorder) GetRpo ¶
func (mr *MockTunnelMetaApiMockRecorder) GetRpo(req interface{}) *gomock.Call
GetRpo indicates an expected call of GetRpo
func (*MockTunnelMetaApiMockRecorder) GetRpoByOffset ¶ added in v1.7.0
func (mr *MockTunnelMetaApiMockRecorder) GetRpoByOffset(req interface{}) *gomock.Call
GetRpoByOffset indicates an expected call of GetRpoByOffset
func (*MockTunnelMetaApiMockRecorder) ListTunnel ¶
func (mr *MockTunnelMetaApiMockRecorder) ListTunnel(req interface{}) *gomock.Call
ListTunnel indicates an expected call of ListTunnel
func (*MockTunnelMetaApiMockRecorder) NewTunnelWorker ¶
func (mr *MockTunnelMetaApiMockRecorder) NewTunnelWorker(tunnelId, workerConfig interface{}) *gomock.Call
NewTunnelWorker indicates an expected call of NewTunnelWorker
func (*MockTunnelMetaApiMockRecorder) Schedule ¶
func (mr *MockTunnelMetaApiMockRecorder) Schedule(req interface{}) *gomock.Call
Schedule indicates an expected call of Schedule
type MockTunnelWorker ¶
type MockTunnelWorker struct {
// contains filtered or unexported fields
}
MockTunnelWorker is a mock of TunnelWorker interface
func NewMockTunnelWorker ¶
func NewMockTunnelWorker(ctrl *gomock.Controller) *MockTunnelWorker
NewMockTunnelWorker creates a new mock instance
func (*MockTunnelWorker) ConnectAndWorking ¶
func (m *MockTunnelWorker) ConnectAndWorking() error
ConnectAndWorking mocks base method
func (*MockTunnelWorker) EXPECT ¶
func (m *MockTunnelWorker) EXPECT() *MockTunnelWorkerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MockTunnelWorker) Shutdown ¶
func (m *MockTunnelWorker) Shutdown()
Shutdown mocks base method
type MockTunnelWorkerMockRecorder ¶
type MockTunnelWorkerMockRecorder struct {
// contains filtered or unexported fields
}
MockTunnelWorkerMockRecorder is the mock recorder for MockTunnelWorker
func (*MockTunnelWorkerMockRecorder) ConnectAndWorking ¶
func (mr *MockTunnelWorkerMockRecorder) ConnectAndWorking() *gomock.Call
ConnectAndWorking indicates an expected call of ConnectAndWorking
func (*MockTunnelWorkerMockRecorder) Shutdown ¶
func (mr *MockTunnelWorkerMockRecorder) Shutdown() *gomock.Call
Shutdown indicates an expected call of Shutdown
type MocktunnelDataApi ¶
type MocktunnelDataApi struct {
// contains filtered or unexported fields
}
MocktunnelDataApi is a mock of tunnelDataApi interface
func NewMocktunnelDataApi ¶
func NewMocktunnelDataApi(ctrl *gomock.Controller) *MocktunnelDataApi
NewMocktunnelDataApi creates a new mock instance
func (*MocktunnelDataApi) EXPECT ¶
func (m *MocktunnelDataApi) EXPECT() *MocktunnelDataApiMockRecorder
EXPECT returns an object that allows the caller to indicate expected use
func (*MocktunnelDataApi) ReadRecords ¶ added in v1.6.0
func (m *MocktunnelDataApi) ReadRecords(req *ReadRecordRequest) (*ReadRecordResponse, error)
ReadRecords mocks base method
type MocktunnelDataApiMockRecorder ¶
type MocktunnelDataApiMockRecorder struct {
// contains filtered or unexported fields
}
MocktunnelDataApiMockRecorder is the mock recorder for MocktunnelDataApi
func (*MocktunnelDataApiMockRecorder) ReadRecords ¶ added in v1.6.0
func (mr *MocktunnelDataApiMockRecorder) ReadRecords(req interface{}) *gomock.Call
ReadRecords indicates an expected call of ReadRecords
type ParallelReleaseManager ¶ added in v1.7.3
type ParallelReleaseManager interface {
Release() bool
}
type PrimaryKey ¶
type PrimaryKey struct {
PrimaryKeys []*PrimaryKeyColumn
}
type PrimaryKeyColumn ¶
type PrimaryKeyColumn struct { ColumnName string Value interface{} }
func (*PrimaryKeyColumn) String ¶ added in v1.7.5
func (p *PrimaryKeyColumn) String() string
type ReadRecordRequest ¶ added in v1.7.3
type ReadRecordResponse ¶ added in v1.7.3
type Record ¶
type Record struct { Type ActionType Timestamp int64 // SequenceInfo is nil when it is a base data record, // while SequenceInfo is not nil when it is a stream record. SequenceInfo *SequenceInfo PrimaryKey *PrimaryKey // required Columns []*RecordColumn OriginColumns []*RecordColumn }
func DeserializeRecordFromRawBytes ¶
func DeserializeRecordFromRawBytes(data []byte, originData []byte, actionType ActionType) (*Record, error)
func UnSerializeBatchBinaryRecordFromBytes ¶ added in v1.7.3
func UnSerializeBinaryRecordFromBytes ¶ added in v1.7.3
type RecordColumn ¶
type RecordColumn struct { Type RecordColumnType Name *string // required Value interface{} // optional. present when Type is RCT_Put Timestamp *int64 // optional, in msec. present when Type is RCT_Put or RCT_DeleteOneVersion }
func (*RecordColumn) String ¶
func (c *RecordColumn) String() string
type RecordColumnType ¶
type RecordColumnType int
const ( RCT_Put RecordColumnType = iota RCT_DeleteOneVersion RCT_DeleteAllVersions )
type ResponseInfo ¶
type ResponseInfo struct {
RequestId string
}
type RpoLatency ¶
type RpoLatency struct { ChannelTyp ChannelType Status string TotalCount int64 AccessTime int64 RpoTime int64 }
type ScheduleChannel ¶
type ScheduleChannel struct { ChannelId string ChannelStatus protocol.ChannelStatus }
func OpenChannel ¶
func OpenChannel(channelId string) *ScheduleChannel
func ResumeChannel ¶
func ResumeChannel(channelId string) *ScheduleChannel
func SuspendChannel ¶
func SuspendChannel(channelId string) *ScheduleChannel
func TerminateChannel ¶
func TerminateChannel(channelId string) *ScheduleChannel
type ScheduleRequest ¶
type ScheduleRequest struct { TunnelId string Channels []*ScheduleChannel }
type ScheduleResponse ¶
type ScheduleResponse struct {
ResponseInfo
}
type SequenceInfo ¶
type SequenceInfo struct { // Epoch of stream log partition Epoch int32 // stream log timestamp Timestamp int64 // row index of stream log with same log timestamp RowIndex int32 }
func ParseSerializedSeqInfo ¶ added in v1.6.0
func ParseSerializedSeqInfo(hexedSeqStr string) (*SequenceInfo, error)
func (*SequenceInfo) Serialization ¶ added in v1.6.0
func (si *SequenceInfo) Serialization() string
type SimpleProcessFactory ¶
type SimpleProcessFactory struct { CustomValue interface{} CpInterval time.Duration ProcessFunc func(channelCtx *ChannelContext, records []*Record) error ChannelOpenFunc func(channelCtx *ChannelContext) error ShutdownFunc func(channelCtx *ChannelContext) Logger *zap.Logger NeedBinaryRecord bool SyncCloseResource bool // sync close tunnel resource before daemon exit AlwaysCallBack bool // when AlwaysCallBack is true, processFunc will be called even if the record is empty RecordPipePerChannel *int }
func (*SimpleProcessFactory) NewProcessor ¶
func (s *SimpleProcessFactory) NewProcessor(tunnelId, clientId, channelId string, checkpointer Checkpointer) (ChannelProcessor, error)
type StreamTunnelConfig ¶
type StreamTunnelConfig struct { Flag protocol.StartOffsetFlag StartOffset uint64 EndOffset uint64 }
type TunnelApi ¶
type TunnelApi struct {
// contains filtered or unexported fields
}
func NewTunnelApi ¶
func NewTunnelApi(endpoint, instanceName, accessKeyId, accessKeySecret string, conf *TunnelConfig, options ...ClientOption) *TunnelApi
func NewTunnelApiWithExternalHeader ¶ added in v1.7.0
func NewTunnelApiWithExternalHeader(endpoint, instanceName, accessKeyId, accessKeySecret, token string, conf *TunnelConfig, header map[string]string) *TunnelApi
func NewTunnelApiWithToken ¶
func NewTunnelApiWithToken(endpoint, instanceName, accessKeyId, accessKeySecret, token string, conf *TunnelConfig, options ...ClientOption) *TunnelApi
func (*TunnelApi) Checkpoint ¶ added in v1.6.0
func (*TunnelApi) CreateTunnel ¶
func (api *TunnelApi) CreateTunnel(req *CreateTunnelRequest) (*CreateTunnelResponse, error)
func (*TunnelApi) DeleteTunnel ¶
func (api *TunnelApi) DeleteTunnel(req *DeleteTunnelRequest) (*DeleteTunnelResponse, error)
func (*TunnelApi) DescribeTunnel ¶
func (api *TunnelApi) DescribeTunnel(req *DescribeTunnelRequest) (*DescribeTunnelResponse, error)
func (*TunnelApi) GetCheckpoint ¶ added in v1.6.0
func (*TunnelApi) GetRpo ¶
func (api *TunnelApi) GetRpo(req *GetRpoRequest) (*GetRpoResponse, error)
func (*TunnelApi) GetRpoByOffset ¶ added in v1.7.0
func (api *TunnelApi) GetRpoByOffset(req *GetRpoRequest) (*GetRpoResponse, error)
func (*TunnelApi) ListTunnel ¶
func (api *TunnelApi) ListTunnel(req *ListTunnelRequest) (*ListTunnelResponse, error)
func (*TunnelApi) ReadRecords ¶ added in v1.6.0
func (api *TunnelApi) ReadRecords(req *ReadRecordRequest) (*ReadRecordResponse, error)
ReadRecords needBinaryRecord: whether to provide binaryRecord Only one of binaryRecord and record is not null
func (*TunnelApi) ReadRows ¶ added in v1.6.0
func (api *TunnelApi) ReadRows(tunnelId, clientId string, channelId string, token string) ([]*protocol.Record, string, string, int, error)
add for oss data lake sync part
func (*TunnelApi) Schedule ¶
func (api *TunnelApi) Schedule(req *ScheduleRequest) (*ScheduleResponse, error)
type TunnelClient ¶
type TunnelClient interface { TunnelMetaApi NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error) }
func NewTunnelClient ¶
func NewTunnelClient(endpoint, instanceName, accessId, accessKey string, options ...ClientOption) TunnelClient
func NewTunnelClientWithConfig ¶
func NewTunnelClientWithConfig(endpoint, instanceName, accessId, accessKey string, conf *TunnelConfig, options ...ClientOption) TunnelClient
func NewTunnelClientWithConfigAndExternalHeader ¶ added in v1.7.0
func NewTunnelClientWithConfigAndExternalHeader(endpoint, instanceName, accessId, accessKey, token string, conf *TunnelConfig, header map[string]string) TunnelClient
func NewTunnelClientWithExternalHeader ¶ added in v1.7.0
func NewTunnelClientWithExternalHeader(endpoint, instanceName, accessId, accessKey, token string, header map[string]string) TunnelClient
func NewTunnelClientWithToken ¶
func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string, conf *TunnelConfig, options ...ClientOption) TunnelClient
type TunnelConfig ¶
type TunnelError ¶
func (*TunnelError) Error ¶
func (te *TunnelError) Error() string
func (*TunnelError) Temporary ¶
func (te *TunnelError) Temporary() bool
type TunnelInfo ¶
type TunnelMetaApi ¶
type TunnelMetaApi interface { CreateTunnel(req *CreateTunnelRequest) (resp *CreateTunnelResponse, err error) ListTunnel(req *ListTunnelRequest) (resp *ListTunnelResponse, err error) DescribeTunnel(req *DescribeTunnelRequest) (resp *DescribeTunnelResponse, err error) DeleteTunnel(req *DeleteTunnelRequest) (resp *DeleteTunnelResponse, err error) GetRpo(req *GetRpoRequest) (resp *GetRpoResponse, err error) GetRpoByOffset(req *GetRpoRequest) (resp *GetRpoResponse, err error) Schedule(req *ScheduleRequest) (resp *ScheduleResponse, err error) }
type TunnelRpoLatency ¶
type TunnelStateMachine ¶
type TunnelStateMachine struct {
// contains filtered or unexported fields
}
func NewTunnelStateMachine ¶
func NewTunnelStateMachine(tunnelId, clientId string, dialer ChannelDialer, factory ChannelProcessorFactory, api *TunnelApi, lg *zap.Logger) *TunnelStateMachine
func (*TunnelStateMachine) BatchGetStatus ¶
func (s *TunnelStateMachine) BatchGetStatus(req *BatchGetStatusReq) ([]*protocol.Channel, error)
func (*TunnelStateMachine) BatchUpdateStatus ¶
func (s *TunnelStateMachine) BatchUpdateStatus(batchChannels []*protocol.Channel)
func (*TunnelStateMachine) Close ¶
func (s *TunnelStateMachine) Close()
func (*TunnelStateMachine) UpdateStatus ¶
func (s *TunnelStateMachine) UpdateStatus(channel *ChannelStatus)
type TunnelType ¶
type TunnelType string
const ( TunnelTypeBaseData TunnelType = "BaseData" TunnelTypeStream TunnelType = "Stream" TunnelTypeBaseStream TunnelType = "BaseAndStream" FinishTag = "finished" )
type TunnelWorker ¶
type TunnelWorker interface { ConnectAndWorking() error Shutdown() }
type TunnelWorkerConfig ¶
type TunnelWorkerConfig struct { HeartbeatTimeout time.Duration HeartbeatInterval time.Duration ChannelDialer ChannelDialer ProcessorFactory ChannelProcessorFactory LogConfig *zap.Config LogWriteSyncer zapcore.WriteSyncer BackoffConfig *ChannelBackoffConfig //Maximum concurrency of channels that read and process data, Concurrency is not limited by default MaxChannelParallel int //whether to manually manage concurrent channel switching NeedManualRelease bool //Whether to read data synchronously SyncReadRecords bool }
type TunnelWorkerDaemon ¶
type TunnelWorkerDaemon struct {
// contains filtered or unexported fields
}
func NewTunnelDaemon ¶
func NewTunnelDaemon(c TunnelClient, id string, conf *TunnelWorkerConfig) *TunnelWorkerDaemon
func (*TunnelWorkerDaemon) Close ¶
func (d *TunnelWorkerDaemon) Close()
func (*TunnelWorkerDaemon) Run ¶
func (d *TunnelWorkerDaemon) Run() error