tunnel

package
v1.7.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2022 License: Apache-2.0 Imports: 34 Imported by: 7

README

Tunnel service

Table Store tunnel service golang sdk.

Install

  • download tunnel client source code
go get github.com/aliyun/aliyun-tablestore-go-sdk/tunnel
  • use dep to install dependencies under tunnel directory
    • install dep
    • dep ensure -v
  • or use go get to install dependencies
go get -u go.uber.org/zap
go get github.com/cenkalti/backoff
go get github.com/golang/protobuf/proto
go get github.com/satori/go.uuid
go get github.com/stretchr/testify/assert
go get github.com/smartystreets/goconvey/convey
go get github.com/golang/mock/gomock
go get gopkg.in/natefinch/lumberjack.v2

Document

Quick Start

  • tunnel type

    • TunnelTypeStream:stream data(增量数据流)
    • TunnelTypeBaseData: full data(全量数据流)
    • TunnelTypeBaseStream: full and stream data(先全量后增量数据流)
  • init tunnel client

    tunnelClient := tunnel.NewTunnelClient(endpoint, instance,
       accessKeyId, accessKeySecret)
  • create new tunnel
    req := &tunnel.CreateTunnelRequest{
       TableName:  "testTable",
       TunnelName: "testTunnel",
       Type:       tunnel.TunnelTypeBaseStream, //base and stream data tunnel
    }
    resp, err := tunnelClient.CreateTunnel(req)
    if err != nil {
       log.Fatal("create test tunnel failed", err)
    }
    log.Println("tunnel id is", resp.TunnelId)
  • get existing tunnel detail information
    req := &tunnel.DescribeTunnelRequest{
       TableName:  "testTable",
       TunnelName: "testTunnel",
    }
    resp, err := tunnelClient.DescribeTunnel(req)
    if err != nil {
       log.Fatal("create test tunnel failed", err)
    }
    log.Println("tunnel id is", resp.Tunnel.TunnelId)
  • consume tunnel data with callback function
//user-defined callback function
func exampleConsumeFunction(ctx *tunnel.ChannelContext, records []*tunnel.Record) error {
	fmt.Println("user-defined information", ctx.CustomValue)
	for _, rec := range records {
		fmt.Println("tunnel record detail:", rec.String())
	}
	fmt.Println("a round of records consumption finished")
	return nil
}

//set callback to SimpleProcessFactory
workConfig := &tunnel.TunnelWorkerConfig{
   ProcessorFactory: &tunnel.SimpleProcessFactory{
      CustomValue: "user custom interface{} value",
      ProcessFunc: exampleConsumeFunction,
   },
}

//use TunnelDaemon to consume tunnel with specified tunnelId
daemon := tunnel.NewTunnelDaemon(tunnelClient, tunnelId, workConfig)
log.Fatal(daemon.Run())
  • delete tunnel
req := &tunnel.DeleteTunnelRequest {
   TableName: "testTable",
   TunnelName: "testTunnel",
}
_, err := tunnelClient.DeleteTunnel(req)
if err != nil {
   log.Fatal("delete test tunnel failed", err)
}

See the sample directory for more details.

tunnel document

configuration

  • Default TunnelConfig definition
var DefaultTunnelConfig = &TunnelConfig{
      //Max backoff retry duration.
      MaxRetryElapsedTime: 75 * time.Second,
      //HTTP request timeout.
      RequestTimeout:      60 * time.Second,
      //http.DefaultTransport.
      Transport:           http.DefaultTransport,
}
  • TunnelWorkerConfig definition
type TunnelWorkerConfig struct {
   //The heartbeat timeout time of the worker. If nil, the default value is used.
   HeartbeatTimeout  time.Duration
   //The heartbeat interval time of the worker. If nil, the default value is used.
   HeartbeatInterval time.Duration
   //The channel connection dial interface. If nil, the default dialer is used.
   //Usually the default dialer is fine.
   ChannelDialer     ChannelDialer

   //The channel processor creation interface.
   //It's recomended to use the pre-defined SimpleChannelProcessorFactory.
   ProcessorFactory ChannelProcessorFactory

   //zap log config. If nil, the DefaultLogConfig is used.
   LogConfig      *zap.Config
   //zap log rotate config. If nil, the DefaultSyncer is used.
   LogWriteSyncer zapcore.WriteSyncer
}

Documentation

Overview

Package tunnel is a generated GoMock package.

Index

Constants

View Source
const (
	ErrCodeParamInvalid      = "OTSParameterInvalid"
	ErrCodeResourceGone      = "OTSResourceGone"
	ErrCodeServerUnavailable = "OTSTunnelServerUnavailable"
	ErrCodeSequenceNotMatch  = "OTSSequenceNumberNotMatch"
	ErrCodeClientError       = "OTSClientError"
	ErrCodeTunnelExpired     = "OTSTunnelExpired"
	ErrCodePermissionDenied  = "OTSPermissionDenied"
	ErrCodeTunnelExist       = "OTSTunnelExist"
)
View Source
const (
	//for serialize binary record
	TAG_VERSION       = 0x1
	TAG_RECORD_COUNT  = 0x2
	TAG_ACTION_TYPE   = 0x3
	TAG_RECORD_LENGTH = 0x4
	TAG_RECORD        = 0x5
)

Variables

View Source
var (
	DefaultHeartbeatInterval = 30 * time.Second
	DefaultHeartbeatTimeout  = 300 * time.Second
)
View Source
var (
	ErrUnExpectBinaryRecordTag = errors.New("UnexpectBinaryRecordTag")
	ErrUnSupportRecordVersion  = errors.New("UnSupportRecordVersion")
)
View Source
var (
	DefaultChannelSize        = 10
	DefaultCheckpointInterval = 10 * time.Second
)
View Source
var DaemonRandomRetryMs = 10000
View Source
var DefaultBackoffConfig = ChannelBackoffConfig{
	MaxDelay: 5 * time.Second,
	// contains filtered or unexported fields
}

DefaultBackoffConfig is specified channel worker backoff parameters, decided by table store tunnel service developers.

View Source
var DefaultLogConfig = zap.Config{
	Level:       zap.NewAtomicLevelAt(zap.InfoLevel),
	Development: false,
	Sampling: &zap.SamplingConfig{
		Initial:    100,
		Thereafter: 100,
	},
	Encoding: "json",
	EncoderConfig: zapcore.EncoderConfig{
		TimeKey:        "ts",
		LevelKey:       "level",
		NameKey:        "logger",
		CallerKey:      "caller",
		MessageKey:     "msg",
		StacktraceKey:  "stacktrace",
		LineEnding:     zapcore.DefaultLineEnding,
		EncodeLevel:    zapcore.LowercaseLevelEncoder,
		EncodeTime:     zapcore.ISO8601TimeEncoder,
		EncodeDuration: zapcore.SecondsDurationEncoder,
		EncodeCaller:   zapcore.ShortCallerEncoder,
	},
}
View Source
var DefaultSyncer = zapcore.AddSync(&lumberjack.Logger{
	Filename:   "tunnelClient.log",
	MaxSize:    512,
	MaxBackups: 5,
	MaxAge:     30,
	Compress:   true,
})
View Source
var DefaultTunnelConfig = &TunnelConfig{
	MaxRetryElapsedTime: 75 * time.Second,
	RequestTimeout:      60 * time.Second,
	Transport:           http.DefaultTransport,
}
View Source
var (
	// PipeChannelSize is channelDialer's data pipeline channel size
	PipeChannelSize = 1
)

Functions

func ExponentialBackoff

func ExponentialBackoff(interval, maxInterval, maxElapsed time.Duration, multiplier, jitter float64) *backoff.ExponentialBackOff

func ParseRequestToken

func ParseRequestToken(token string) (*protocol.TokenContentV2, error)

func ReplaceLogCore

func ReplaceLogCore(ws zapcore.WriteSyncer, conf zap.Config) zap.Option

hack replace zap config build core with lumberjack logger

func StreamRecordSequenceLess

func StreamRecordSequenceLess(a, b *SequenceInfo) bool

Types

type ActionType

type ActionType int
const (
	AT_Put ActionType = iota
	AT_Update
	AT_Delete
)

func ParseActionType

func ParseActionType(pbType *protocol.ActionType) (ActionType, error)

func (ActionType) String

func (t ActionType) String() string

type AsyncProcessFactory added in v1.7.3

type AsyncProcessFactory struct {
	CustomValue interface{}

	CpInterval time.Duration

	ProcessFunc     func(channelCtx *ChannelContext, records []*Record) error
	ChannelOpenFunc func(channelCtx *ChannelContext) error
	ShutdownFunc    func(channelCtx *ChannelContext)

	Logger               *zap.Logger
	NeedBinaryRecord     bool
	SyncCloseResource    bool // sync close tunnel resource before daemon exit
	AlwaysCallBack       bool // when AlwaysCallBack is true, processFunc will be called even if the record is empty
	RecordPipePerChannel *int
}

AsyncProcessFactory support async commit token

func (*AsyncProcessFactory) NewProcessor added in v1.7.3

func (s *AsyncProcessFactory) NewProcessor(tunnelId, clientId, channelId string, checkpointer Checkpointer) (ChannelProcessor, error)

type BatchGetStatusReq

type BatchGetStatusReq struct {
	// contains filtered or unexported fields
}

func NewBatchGetStatusReq

func NewBatchGetStatusReq() *BatchGetStatusReq

type ChannelBackoffConfig

type ChannelBackoffConfig struct {
	//MaxDelay is the upper bound of backoff delay.
	MaxDelay time.Duration
	// contains filtered or unexported fields
}

stream type channel worker backoff config

type ChannelConn

type ChannelConn interface {
	NotifyStatus(channel *ChannelStatus)
	Closed() bool
	Close()
}

type ChannelContext

type ChannelContext struct {
	TunnelId        string
	ClientId        string
	ChannelId       string
	IsStreamChannel bool
	TraceId         string

	NextToken string

	RecordCount            int
	BinaryRecords          []byte
	Processor              ChannelProcessor
	CustomValue            interface{}
	ParallelReleaseManager ParallelReleaseManager
}

func NewChannelContext added in v1.6.0

func NewChannelContext(tunnelId, clientId, channelId string, customValue interface{}) *ChannelContext

func (*ChannelContext) String

func (c *ChannelContext) String() string

type ChannelDialer

type ChannelDialer interface {
	ChannelDial(tunnelId, clientId, channelId, token string, p ChannelProcessor, state *TunnelStateMachine) ChannelConn
}

type ChannelInfo

type ChannelInfo struct {
	ChannelId     string
	ChannelType   string
	ChannelStatus string
	ClientId      string
	ChannelRPO    int64
}

type ChannelProcessor

type ChannelProcessor interface {
	Process(records []*Record, binaryRecords []byte, recordCount int, nextToken, traceId string, streamChannel bool, manager ParallelReleaseManager) error
	Shutdown()
	Error() bool
	Finished() bool
	CommitToken(token string) error
}

type ChannelProcessorFactory

type ChannelProcessorFactory interface {
	NewProcessor(tunnelId, clientId, channelId string, checkpointer Checkpointer) (ChannelProcessor, error)
}

type ChannelStatus

type ChannelStatus struct {
	ChannelId string
	Version   int64
	Status    protocol.ChannelStatus
}

func ToChannelStatus

func ToChannelStatus(c *protocol.Channel) *ChannelStatus

func (*ChannelStatus) ToPbChannel

func (c *ChannelStatus) ToPbChannel() *protocol.Channel

type ChannelType

type ChannelType string
const (
	ChannelType_BaseData ChannelType = "BaseData"
	ChannelType_Stream   ChannelType = "Stream"
)

type Checkpointer

type Checkpointer interface {
	Checkpoint(token string) error
}

type ClientOption added in v1.7.3

type ClientOption func(api *TunnelApi)

func SetCredentialsProvider added in v1.7.3

func SetCredentialsProvider(provider common.CredentialsProvider) ClientOption

SetCredentialsProvider sets funciton for get the user's ak

type CreateTunnelRequest

type CreateTunnelRequest struct {
	TableName          string
	TunnelName         string
	Type               TunnelType
	StreamTunnelConfig *StreamTunnelConfig
	// NeedAllTimeSeriesColumns function is temporarily disabled.
	NeedAllTimeSeriesColumns bool
}

type CreateTunnelResponse

type CreateTunnelResponse struct {
	TunnelId string
	ResponseInfo
}

type DefaultTunnelClient

type DefaultTunnelClient struct {
	// contains filtered or unexported fields
}

func (*DefaultTunnelClient) CreateTunnel

func (*DefaultTunnelClient) DeleteTunnel

func (*DefaultTunnelClient) DescribeTunnel

func (*DefaultTunnelClient) GetRpo

func (*DefaultTunnelClient) GetRpoByOffset added in v1.7.0

func (c *DefaultTunnelClient) GetRpoByOffset(req *GetRpoRequest) (*GetRpoResponse, error)

func (*DefaultTunnelClient) ListTunnel

func (*DefaultTunnelClient) NewTunnelWorker

func (c *DefaultTunnelClient) NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error)

func (*DefaultTunnelClient) Schedule

type DeleteTunnelRequest

type DeleteTunnelRequest struct {
	TableName  string
	TunnelName string
}

type DeleteTunnelResponse

type DeleteTunnelResponse struct {
	ResponseInfo
}

type DescribeTunnelRequest

type DescribeTunnelRequest struct {
	TableName  string
	TunnelName string
}

type DescribeTunnelResponse

type DescribeTunnelResponse struct {
	TunnelRPO int64
	Tunnel    *TunnelInfo
	Channels  []*ChannelInfo
	ResponseInfo
}

type GetRpoRequest

type GetRpoRequest struct {
	TunnelId string
}

type GetRpoResponse

type GetRpoResponse struct {
	RpoInfos       map[string]map[string]*RpoLatency
	TunnelRpoInfos map[string]*TunnelRpoLatency
	TunnelId       string
}

type ListTunnelRequest

type ListTunnelRequest struct {
	TableName string
}

type ListTunnelResponse

type ListTunnelResponse struct {
	Tunnels []*TunnelInfo
	ResponseInfo
}

type MockTunnelMetaApi

type MockTunnelMetaApi struct {
	// contains filtered or unexported fields
}

MockTunnelMetaApi is a mock of TunnelMetaApi interface

func NewMockTunnelMetaApi

func NewMockTunnelMetaApi(ctrl *gomock.Controller) *MockTunnelMetaApi

NewMockTunnelMetaApi creates a new mock instance

func (*MockTunnelMetaApi) CreateTunnel

CreateTunnel mocks base method

func (*MockTunnelMetaApi) DeleteTunnel

DeleteTunnel mocks base method

func (*MockTunnelMetaApi) DescribeTunnel

DescribeTunnel mocks base method

func (*MockTunnelMetaApi) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockTunnelMetaApi) GetRpo

GetRpo mocks base method

func (*MockTunnelMetaApi) GetRpoByOffset added in v1.7.0

func (m *MockTunnelMetaApi) GetRpoByOffset(req *GetRpoRequest) (*GetRpoResponse, error)

GetRpoByOffset mocks base method

func (*MockTunnelMetaApi) ListTunnel

ListTunnel mocks base method

func (*MockTunnelMetaApi) NewTunnelWorker

func (m *MockTunnelMetaApi) NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error)

NewTunnelWorker mocks base method

func (*MockTunnelMetaApi) Schedule

Schedule mocks base method

type MockTunnelMetaApiMockRecorder

type MockTunnelMetaApiMockRecorder struct {
	// contains filtered or unexported fields
}

MockTunnelMetaApiMockRecorder is the mock recorder for MockTunnelMetaApi

func (*MockTunnelMetaApiMockRecorder) CreateTunnel

func (mr *MockTunnelMetaApiMockRecorder) CreateTunnel(req interface{}) *gomock.Call

CreateTunnel indicates an expected call of CreateTunnel

func (*MockTunnelMetaApiMockRecorder) DeleteTunnel

func (mr *MockTunnelMetaApiMockRecorder) DeleteTunnel(req interface{}) *gomock.Call

DeleteTunnel indicates an expected call of DeleteTunnel

func (*MockTunnelMetaApiMockRecorder) DescribeTunnel

func (mr *MockTunnelMetaApiMockRecorder) DescribeTunnel(req interface{}) *gomock.Call

DescribeTunnel indicates an expected call of DescribeTunnel

func (*MockTunnelMetaApiMockRecorder) GetRpo

func (mr *MockTunnelMetaApiMockRecorder) GetRpo(req interface{}) *gomock.Call

GetRpo indicates an expected call of GetRpo

func (*MockTunnelMetaApiMockRecorder) GetRpoByOffset added in v1.7.0

func (mr *MockTunnelMetaApiMockRecorder) GetRpoByOffset(req interface{}) *gomock.Call

GetRpoByOffset indicates an expected call of GetRpoByOffset

func (*MockTunnelMetaApiMockRecorder) ListTunnel

func (mr *MockTunnelMetaApiMockRecorder) ListTunnel(req interface{}) *gomock.Call

ListTunnel indicates an expected call of ListTunnel

func (*MockTunnelMetaApiMockRecorder) NewTunnelWorker

func (mr *MockTunnelMetaApiMockRecorder) NewTunnelWorker(tunnelId, workerConfig interface{}) *gomock.Call

NewTunnelWorker indicates an expected call of NewTunnelWorker

func (*MockTunnelMetaApiMockRecorder) Schedule

func (mr *MockTunnelMetaApiMockRecorder) Schedule(req interface{}) *gomock.Call

Schedule indicates an expected call of Schedule

type MockTunnelWorker

type MockTunnelWorker struct {
	// contains filtered or unexported fields
}

MockTunnelWorker is a mock of TunnelWorker interface

func NewMockTunnelWorker

func NewMockTunnelWorker(ctrl *gomock.Controller) *MockTunnelWorker

NewMockTunnelWorker creates a new mock instance

func (*MockTunnelWorker) ConnectAndWorking

func (m *MockTunnelWorker) ConnectAndWorking() error

ConnectAndWorking mocks base method

func (*MockTunnelWorker) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MockTunnelWorker) Shutdown

func (m *MockTunnelWorker) Shutdown()

Shutdown mocks base method

type MockTunnelWorkerMockRecorder

type MockTunnelWorkerMockRecorder struct {
	// contains filtered or unexported fields
}

MockTunnelWorkerMockRecorder is the mock recorder for MockTunnelWorker

func (*MockTunnelWorkerMockRecorder) ConnectAndWorking

func (mr *MockTunnelWorkerMockRecorder) ConnectAndWorking() *gomock.Call

ConnectAndWorking indicates an expected call of ConnectAndWorking

func (*MockTunnelWorkerMockRecorder) Shutdown

func (mr *MockTunnelWorkerMockRecorder) Shutdown() *gomock.Call

Shutdown indicates an expected call of Shutdown

type MocktunnelDataApi

type MocktunnelDataApi struct {
	// contains filtered or unexported fields
}

MocktunnelDataApi is a mock of tunnelDataApi interface

func NewMocktunnelDataApi

func NewMocktunnelDataApi(ctrl *gomock.Controller) *MocktunnelDataApi

NewMocktunnelDataApi creates a new mock instance

func (*MocktunnelDataApi) EXPECT

EXPECT returns an object that allows the caller to indicate expected use

func (*MocktunnelDataApi) ReadRecords added in v1.6.0

ReadRecords mocks base method

type MocktunnelDataApiMockRecorder

type MocktunnelDataApiMockRecorder struct {
	// contains filtered or unexported fields
}

MocktunnelDataApiMockRecorder is the mock recorder for MocktunnelDataApi

func (*MocktunnelDataApiMockRecorder) ReadRecords added in v1.6.0

func (mr *MocktunnelDataApiMockRecorder) ReadRecords(req interface{}) *gomock.Call

ReadRecords indicates an expected call of ReadRecords

type ParallelReleaseManager added in v1.7.3

type ParallelReleaseManager interface {
	Release() bool
}

type PrimaryKey

type PrimaryKey struct {
	PrimaryKeys []*PrimaryKeyColumn
}

type PrimaryKeyColumn

type PrimaryKeyColumn struct {
	ColumnName string
	Value      interface{}
}

func (*PrimaryKeyColumn) String added in v1.7.5

func (p *PrimaryKeyColumn) String() string

type ReadRecordRequest added in v1.7.3

type ReadRecordRequest struct {
	TunnelId         string
	ChannelId        string
	ClientId         string
	Token            string
	NeedBinaryRecord bool
}

type ReadRecordResponse added in v1.7.3

type ReadRecordResponse struct {
	Records       []*Record
	BinaryRecords []byte
	NextToken     string
	Size          int
	RecordCount   int
	ResponseInfo
}

type Record

type Record struct {
	Type      ActionType
	Timestamp int64
	// SequenceInfo is nil when it is a base data record,
	// while SequenceInfo is not nil when it is a stream record.
	SequenceInfo  *SequenceInfo
	PrimaryKey    *PrimaryKey // required
	Columns       []*RecordColumn
	OriginColumns []*RecordColumn
}

func DeserializeRecordFromRawBytes

func DeserializeRecordFromRawBytes(data []byte, originData []byte, actionType ActionType) (*Record, error)

func UnSerializeBatchBinaryRecordFromBytes added in v1.7.3

func UnSerializeBatchBinaryRecordFromBytes(data []byte) (record []*Record, err error)

func UnSerializeBinaryRecordFromBytes added in v1.7.3

func UnSerializeBinaryRecordFromBytes(r *bytes.Reader) (*Record, error)

func (*Record) String

func (r *Record) String() string

type RecordColumn

type RecordColumn struct {
	Type      RecordColumnType
	Name      *string     // required
	Value     interface{} // optional. present when Type is RCT_Put
	Timestamp *int64      // optional, in msec. present when Type is RCT_Put or RCT_DeleteOneVersion
}

func (*RecordColumn) String

func (c *RecordColumn) String() string

type RecordColumnType

type RecordColumnType int
const (
	RCT_Put RecordColumnType = iota
	RCT_DeleteOneVersion
	RCT_DeleteAllVersions
)

type ResponseInfo

type ResponseInfo struct {
	RequestId string
}

type RpoLatency

type RpoLatency struct {
	ChannelTyp ChannelType
	Status     string
	TotalCount int64
	AccessTime int64
	RpoTime    int64
}

type ScheduleChannel

type ScheduleChannel struct {
	ChannelId     string
	ChannelStatus protocol.ChannelStatus
}

func OpenChannel

func OpenChannel(channelId string) *ScheduleChannel

func ResumeChannel

func ResumeChannel(channelId string) *ScheduleChannel

func SuspendChannel

func SuspendChannel(channelId string) *ScheduleChannel

func TerminateChannel

func TerminateChannel(channelId string) *ScheduleChannel

type ScheduleRequest

type ScheduleRequest struct {
	TunnelId string
	Channels []*ScheduleChannel
}

type ScheduleResponse

type ScheduleResponse struct {
	ResponseInfo
}

type SequenceInfo

type SequenceInfo struct {
	// Epoch of stream log partition
	Epoch int32
	// stream log timestamp
	Timestamp int64
	// row index of stream log with same log timestamp
	RowIndex int32
}

func ParseSerializedSeqInfo added in v1.6.0

func ParseSerializedSeqInfo(hexedSeqStr string) (*SequenceInfo, error)

func (*SequenceInfo) Serialization added in v1.6.0

func (si *SequenceInfo) Serialization() string

type SimpleProcessFactory

type SimpleProcessFactory struct {
	CustomValue interface{}

	CpInterval time.Duration

	ProcessFunc     func(channelCtx *ChannelContext, records []*Record) error
	ChannelOpenFunc func(channelCtx *ChannelContext) error
	ShutdownFunc    func(channelCtx *ChannelContext)

	Logger               *zap.Logger
	NeedBinaryRecord     bool
	SyncCloseResource    bool // sync close tunnel resource before daemon exit
	AlwaysCallBack       bool // when AlwaysCallBack is true, processFunc will be called even if the record is empty
	RecordPipePerChannel *int
}

func (*SimpleProcessFactory) NewProcessor

func (s *SimpleProcessFactory) NewProcessor(tunnelId, clientId, channelId string, checkpointer Checkpointer) (ChannelProcessor, error)

type StreamTunnelConfig

type StreamTunnelConfig struct {
	Flag        protocol.StartOffsetFlag
	StartOffset uint64
	EndOffset   uint64
}

type TunnelApi

type TunnelApi struct {
	// contains filtered or unexported fields
}

func NewTunnelApi

func NewTunnelApi(endpoint, instanceName, accessKeyId, accessKeySecret string, conf *TunnelConfig, options ...ClientOption) *TunnelApi

func NewTunnelApiWithExternalHeader added in v1.7.0

func NewTunnelApiWithExternalHeader(endpoint, instanceName, accessKeyId, accessKeySecret, token string, conf *TunnelConfig, header map[string]string) *TunnelApi

func NewTunnelApiWithToken

func NewTunnelApiWithToken(endpoint, instanceName, accessKeyId, accessKeySecret, token string, conf *TunnelConfig, options ...ClientOption) *TunnelApi

func (*TunnelApi) Checkpoint added in v1.6.0

func (api *TunnelApi) Checkpoint(tunnelId, clientId string, channelId string, token string, sequenceNumber int64) error

func (*TunnelApi) CreateTunnel

func (api *TunnelApi) CreateTunnel(req *CreateTunnelRequest) (*CreateTunnelResponse, error)

func (*TunnelApi) DeleteTunnel

func (api *TunnelApi) DeleteTunnel(req *DeleteTunnelRequest) (*DeleteTunnelResponse, error)

func (*TunnelApi) DescribeTunnel

func (api *TunnelApi) DescribeTunnel(req *DescribeTunnelRequest) (*DescribeTunnelResponse, error)

func (*TunnelApi) GetCheckpoint added in v1.6.0

func (api *TunnelApi) GetCheckpoint(tunnelId, clientId string, channelId string) (string, int64, error)

func (*TunnelApi) GetRpo

func (api *TunnelApi) GetRpo(req *GetRpoRequest) (*GetRpoResponse, error)

func (*TunnelApi) GetRpoByOffset added in v1.7.0

func (api *TunnelApi) GetRpoByOffset(req *GetRpoRequest) (*GetRpoResponse, error)

func (*TunnelApi) ListTunnel

func (api *TunnelApi) ListTunnel(req *ListTunnelRequest) (*ListTunnelResponse, error)

func (*TunnelApi) ReadRecords added in v1.6.0

func (api *TunnelApi) ReadRecords(req *ReadRecordRequest) (*ReadRecordResponse, error)

ReadRecords needBinaryRecord: whether to provide binaryRecord Only one of binaryRecord and record is not null

func (*TunnelApi) ReadRows added in v1.6.0

func (api *TunnelApi) ReadRows(tunnelId, clientId string, channelId string, token string) ([]*protocol.Record, string, string, int, error)

add for oss data lake sync part

func (*TunnelApi) Schedule

func (api *TunnelApi) Schedule(req *ScheduleRequest) (*ScheduleResponse, error)

type TunnelClient

type TunnelClient interface {
	TunnelMetaApi
	NewTunnelWorker(tunnelId string, workerConfig *TunnelWorkerConfig) (TunnelWorker, error)
}

func NewTunnelClient

func NewTunnelClient(endpoint, instanceName, accessId, accessKey string, options ...ClientOption) TunnelClient

func NewTunnelClientWithConfig

func NewTunnelClientWithConfig(endpoint, instanceName, accessId, accessKey string, conf *TunnelConfig, options ...ClientOption) TunnelClient

func NewTunnelClientWithConfigAndExternalHeader added in v1.7.0

func NewTunnelClientWithConfigAndExternalHeader(endpoint, instanceName, accessId, accessKey, token string, conf *TunnelConfig, header map[string]string) TunnelClient

func NewTunnelClientWithExternalHeader added in v1.7.0

func NewTunnelClientWithExternalHeader(endpoint, instanceName, accessId, accessKey, token string, header map[string]string) TunnelClient

func NewTunnelClientWithToken

func NewTunnelClientWithToken(endpoint, instanceName, accessId, accessKey, token string, conf *TunnelConfig, options ...ClientOption) TunnelClient

type TunnelConfig

type TunnelConfig struct {
	MaxRetryElapsedTime time.Duration
	RequestTimeout      time.Duration
	Transport           http.RoundTripper
}

type TunnelError

type TunnelError struct {
	Code      string
	Message   string
	RequestId string
	TunnelId  string
}

func (*TunnelError) Error

func (te *TunnelError) Error() string

func (*TunnelError) Temporary

func (te *TunnelError) Temporary() bool

type TunnelInfo

type TunnelInfo struct {
	TunnelId           string
	TunnelName         string
	TunnelType         string
	TableName          string
	InstanceName       string
	StreamId           string
	Stage              string
	Expired            bool
	CreateTime         time.Time
	StreamTunnelConfig *StreamTunnelConfig
}

type TunnelMetaApi

type TunnelMetaApi interface {
	CreateTunnel(req *CreateTunnelRequest) (resp *CreateTunnelResponse, err error)
	ListTunnel(req *ListTunnelRequest) (resp *ListTunnelResponse, err error)
	DescribeTunnel(req *DescribeTunnelRequest) (resp *DescribeTunnelResponse, err error)
	DeleteTunnel(req *DeleteTunnelRequest) (resp *DeleteTunnelResponse, err error)
	GetRpo(req *GetRpoRequest) (resp *GetRpoResponse, err error)
	GetRpoByOffset(req *GetRpoRequest) (resp *GetRpoResponse, err error)
	Schedule(req *ScheduleRequest) (resp *ScheduleResponse, err error)
}

type TunnelRpoLatency

type TunnelRpoLatency struct {
	Status     string
	TotalCount int64
	AccessTime int64
	RpoTime    int64
}

type TunnelStateMachine

type TunnelStateMachine struct {
	// contains filtered or unexported fields
}

func NewTunnelStateMachine

func NewTunnelStateMachine(tunnelId, clientId string, dialer ChannelDialer, factory ChannelProcessorFactory, api *TunnelApi, lg *zap.Logger) *TunnelStateMachine

func (*TunnelStateMachine) BatchGetStatus

func (s *TunnelStateMachine) BatchGetStatus(req *BatchGetStatusReq) ([]*protocol.Channel, error)

func (*TunnelStateMachine) BatchUpdateStatus

func (s *TunnelStateMachine) BatchUpdateStatus(batchChannels []*protocol.Channel)

func (*TunnelStateMachine) Close

func (s *TunnelStateMachine) Close()

func (*TunnelStateMachine) UpdateStatus

func (s *TunnelStateMachine) UpdateStatus(channel *ChannelStatus)

type TunnelType

type TunnelType string
const (
	TunnelTypeBaseData   TunnelType = "BaseData"
	TunnelTypeStream     TunnelType = "Stream"
	TunnelTypeBaseStream TunnelType = "BaseAndStream"

	FinishTag = "finished"
)

type TunnelWorker

type TunnelWorker interface {
	ConnectAndWorking() error
	Shutdown()
}

type TunnelWorkerConfig

type TunnelWorkerConfig struct {
	HeartbeatTimeout  time.Duration
	HeartbeatInterval time.Duration
	ChannelDialer     ChannelDialer

	ProcessorFactory ChannelProcessorFactory

	LogConfig      *zap.Config
	LogWriteSyncer zapcore.WriteSyncer
	BackoffConfig  *ChannelBackoffConfig

	//Maximum concurrency of channels that read and process data, Concurrency is not limited by default
	MaxChannelParallel int
	//whether to manually manage concurrent channel switching
	NeedManualRelease bool
	//Whether to read data synchronously
	SyncReadRecords bool
}

type TunnelWorkerDaemon

type TunnelWorkerDaemon struct {
	// contains filtered or unexported fields
}

func NewTunnelDaemon

func NewTunnelDaemon(c TunnelClient, id string, conf *TunnelWorkerConfig) *TunnelWorkerDaemon

func (*TunnelWorkerDaemon) Close

func (d *TunnelWorkerDaemon) Close()

func (*TunnelWorkerDaemon) Run

func (d *TunnelWorkerDaemon) Run() error

Directories

Path Synopsis
sample

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL