datahub

package
v0.1.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 17, 2020 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Overview

package is the offical Aliyun examples SDK for Go programing language.

The examples SDK for Go provides APIs and utilities that developers can use to build Go applications that use Aliyun examples services, such as can Aliyun MaxCompute.

pack

Index

Constants

View Source
const (
	InvalidParameter    = "InvalidParameter"
	InvalidSubscription = "InvalidSubscription"
	InvalidCursor       = "InvalidCursor"
	/**
	 * for later arrange error code
	 */
	ResourceNotFound   = "ResourceNotFound"
	NoSuchTopic        = "NoSuchTopic"
	NoSuchProject      = "NoSuchProject"
	NoSuchSubscription = "NoSuchSubscription"
	NoSuchShard        = "NoSuchShard"
	NoSuchConnector    = "NoSuchConnector"
	NoSuchMeterInfo    = "NoSuchMeteringInfo"
	/**
	 * for later arrange error code
	 */
	SeekOutOfRange        = "SeekOutOfRange"
	ResourceAlreadyExist  = "ResourceAlreadyExist"
	ProjectAlreadyExist   = "ProjectAlreadyExist"
	TopicAlreadyExist     = "TopicAlreadyExist"
	ConnectorAlreadyExist = "ConnectorAlreadyExist"
	UnAuthorized          = "Unauthorized"
	NoPermission          = "NoPermission"
	InvalidShardOperation = "InvalidShardOperation"
	OperatorDenied        = "OperationDenied"
	LimitExceed           = "LimitExceeded"
	//ODPSServiceError       = "OdpsServiceError"
	//MysqlServiceError      = "MysqlServiceError"
	//InternalServerErrorS    = "InternalServerError"
	SubscriptionOffline    = "SubscriptionOffline"
	OffsetReseted          = "OffsetReseted"
	OffsetSessionClosed    = "OffsetSessionClosed"
	OffsetSessionChanged   = "OffsetSessionChanged"
	MalformedRecord        = "MalformedRecord"
	NoSuchConsumer         = "NoSuchConsumer"
	ConsumerGroupInProcess = "ConsumerGroupInProcess"
)

Error codes

View Source
const (
	DATAHUB_SDK_VERSION    = "0.1.3"
	DATAHUB_CLIENT_VERSION = "1.1"
)

Variables

This section is empty.

Functions

func DefaultHttpClient

func DefaultHttpClient() *http.Client

DefaultHttpClient returns a default HTTP client with sensible values.

func DefaultUserAgent

func DefaultUserAgent() string

DefaultUserAgent returns a default user agent

Types

type Account

type Account interface {
	fmt.Stringer
	GetAccountId() string
	GetAccountKey() string
	GetSecurityToken() string
}

type AliyunAccount

type AliyunAccount struct {
	// Aliyun Access key ID
	AccessId string

	// Aliyun Secret Access Key
	AccessKey string
}

func NewAliyunAccount

func NewAliyunAccount(accessId, accessKey string) *AliyunAccount

create new instance

func (AliyunAccount) GetAccountId

func (a AliyunAccount) GetAccountId() string

func (AliyunAccount) GetAccountKey

func (a AliyunAccount) GetAccountKey() string

func (AliyunAccount) GetSecurityToken

func (a AliyunAccount) GetSecurityToken() string

func (AliyunAccount) String

func (a AliyunAccount) String() string

type AppendConnectorFieldRequest

type AppendConnectorFieldRequest struct {
	Action    string `json:"Action"`
	FieldName string `json:"FieldName"`
}

type AppendFieldRequest

type AppendFieldRequest struct {
	Action    string    `json:"Action"`
	FieldName string    `json:"FieldName"`
	FieldType FieldType `json:"FieldType"`
}

type AuthMode

type AuthMode string
const (
	AK  AuthMode = "ak"
	STS AuthMode = "sts"
)

type AuthorizationFailedError

type AuthorizationFailedError struct {
	DatahubClientError
}

func NewAuthorizationFailedError

func NewAuthorizationFailedError(statusCode int, requestId string, code string, message string) *AuthorizationFailedError

type BaseRecord

type BaseRecord struct {
	ShardId      string                 `json:"ShardId,omitempty"`
	PartitionKey string                 `json:"PartitionKey,omitempty"`
	HashKey      string                 `json:"HashKey,omitempty"`
	SystemTime   int64                  `json:"SystemTime"`
	Sequence     int64                  `json:"Sequence,omitempty"`
	Cursor       string                 `json:"Cursor"`
	NextCursor   string                 `json:"NextCursor"`
	Serial       int64                  `json:"Serial"`
	Attributes   map[string]interface{} `json:"Attributes"`
}

BaseRecord

func (*BaseRecord) GetSequence

func (br *BaseRecord) GetSequence() int64

func (*BaseRecord) GetSystemTime

func (br *BaseRecord) GetSystemTime() int64

func (*BaseRecord) SetAttribute

func (br *BaseRecord) SetAttribute(key string, val interface{})

SetAttribute set or modify(if exist) attribute

type Bigint

type Bigint int64

Bigint

func (Bigint) String

func (bi Bigint) String() string

type BlobRecord

type BlobRecord struct {
	RawData   []byte
	StoreData string
	BaseRecord
}

BlobRecord blob type record

func NewBlobRecord

func NewBlobRecord(bytedata []byte, systemTime int64) *BlobRecord

NewBlobRecord new a tuple type record from given record schema

func (*BlobRecord) FillData

func (br *BlobRecord) FillData(data interface{}) error

FillData implement of IRecord interface

func (*BlobRecord) GetBaseRecord

func (br *BlobRecord) GetBaseRecord() BaseRecord

GetBaseRecord get base record enbry

func (*BlobRecord) GetData

func (br *BlobRecord) GetData() interface{}

GetData implement of IRecord interface

func (*BlobRecord) SetBaseRecord

func (br *BlobRecord) SetBaseRecord(baseRecord BaseRecord)

func (*BlobRecord) String

func (br *BlobRecord) String() string

type Boolean

type Boolean bool

Boolean

func (Boolean) String

func (bl Boolean) String() string

type CommitSubscriptionOffsetRequest

type CommitSubscriptionOffsetRequest struct {
	Action  string                        `json:"Action"`
	Offsets map[string]SubscriptionOffset `json:"Offsets"`
}

type CompressorType

type CompressorType string

compress type

const (
	NOCOMPRESS CompressorType = ""
	LZ4        CompressorType = "lz4"
	DEFLATE    CompressorType = "deflate"
	ZLIB       CompressorType = "zlib"
)

func (*CompressorType) String

func (ct *CompressorType) String() string

type Config

type Config struct {
	UserAgent      string
	CompressorType CompressorType
	EnableBinary   bool
	HttpClient     *http.Client
}

func NewDefaultConfig

func NewDefaultConfig() *Config

type ConnectorOffset

type ConnectorOffset struct {
	Timestamp int64 `json:"Timestamp"`
	Sequence  int64 `json:"Sequence"`
}

type ConnectorShardState

type ConnectorShardState string
const (
	Created   ConnectorShardState = "CONTEXT_PLANNED"
	Eexcuting ConnectorShardState = "CONTEXT_EXECUTING"
	Stopped   ConnectorShardState = "CONTEXT_PAUSED"
	Finished  ConnectorShardState = "CONTEXT_FINISHED"
)

type ConnectorShardStatusEntry

type ConnectorShardStatusEntry struct {
	StartSequence    int64               `json:"StartSequence"`
	EndSequence      int64               `json:"EndSequence"`
	CurrentSequence  int64               `json:"CurrentSequence"`
	CurrentTimestamp int64               `json:"CurrentTimestamp"`
	UpdateTime       int64               `json:"UpdateTime"`
	State            ConnectorShardState `json:"State"`
	LastErrorMessage string              `json:"LastErrorMessage"`
	DiscardCount     int64               `json:"DiscardCount"`
	DoneTime         int64               `json:"DoneTime"`
	WorkerAddress    string              `json:"WorkerAddress"`
}

type ConnectorState

type ConnectorState string
const (
	ConnectorStopped ConnectorState = "CONNECTOR_PAUSED"
	ConnectorRunning ConnectorState = "CONNECTOR_RUNNING"
)

type ConnectorType

type ConnectorType string
const (
	SinkOdps    ConnectorType = "sink_odps"
	SinkOss     ConnectorType = "sink_oss"
	SinkEs      ConnectorType = "sink_es"
	SinkAds     ConnectorType = "sink_ads"
	SinkMysql   ConnectorType = "sink_mysql"
	SinkFc      ConnectorType = "sink_fc"
	SinkOts     ConnectorType = "sink_ots"
	SinkDatahub ConnectorType = "sink_datahub"
)

func (*ConnectorType) String

func (ct *ConnectorType) String() string

type CreateConnectorRequest

type CreateConnectorRequest struct {
	Action        string        `json:"Action"`
	Type          ConnectorType `json:"Type"`
	SinkStartTime int64         `json:"SinkStartTime"`
	ColumnFields  []string      `json:"ColumnFields"`
	Config        interface{}   `json:"Config"`
}

type CreateConnectorResult

type CreateConnectorResult struct {
	ConnectorId string `json:"ConnectorId"`
}

func NewCreateConnectorResult

func NewCreateConnectorResult(data []byte) (*CreateConnectorResult, error)

type CreateProjectRequest

type CreateProjectRequest struct {
	Comment string `json:"Comment"`
}

type CreateSubscriptionRequest

type CreateSubscriptionRequest struct {
	Action  string `json:"Action"`
	Comment string `json:"Comment"`
}

type CreateSubscriptionResult

type CreateSubscriptionResult struct {
	SubId string `json:"SubId"`
}

func NewCreateSubscriptionResult

func NewCreateSubscriptionResult(data []byte) (*CreateSubscriptionResult, error)

type CreateTopicRequest

type CreateTopicRequest struct {
	Action       string        `json:"Action"`
	ProjectName  string        `json:"ProjectName"`
	TopicName    string        `json:"TopicName"`
	ShardCount   int           `json:"ShardCount"`
	Lifecycle    int           `json:"Lifecycle"`
	RecordType   RecordType    `json:"RecordType"`
	RecordSchema *RecordSchema `json:"RecordSchema,omitempty"`
	Comment      string        `json:"Comment"`
}

func (*CreateTopicRequest) MarshalJSON

func (ctr *CreateTopicRequest) MarshalJSON() ([]byte, error)

type CursorType

type CursorType string

CursorType

const (
	// OLDEST
	OLDEST CursorType = "OLDEST"

	// LATEST
	LATEST CursorType = "LATEST"

	// SYSTEM_TIME point to first record after system_time
	SYSTEM_TIME CursorType = "SYSTEM_TIME"

	// SEQUENCE point to the specified sequence
	SEQUENCE CursorType = "SEQUENCE"
)

func (CursorType) String

func (ct CursorType) String() string

type DataHub

type DataHub struct {
	Client *RestClient
}

func (*DataHub) AppendConnectorField

func (datahub *DataHub) AppendConnectorField(projectName, topicName, connectorId, fieldName string) error

func (*DataHub) AppendField

func (datahub *DataHub) AppendField(projectName, topicName string, field Field) error

func (*DataHub) CommitSubscriptionOffset

func (datahub *DataHub) CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error

func (*DataHub) CreateBlobTopic

func (datahub *DataHub) CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) error

func (*DataHub) CreateConnector

func (datahub *DataHub) CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (*CreateConnectorResult, error)

func (*DataHub) CreateConnectorWithStartTime

func (datahub *DataHub) CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType,
	columnFields []string, sinkStartTime int64, config interface{}) (*CreateConnectorResult, error)

func (*DataHub) CreateProject

func (datahub *DataHub) CreateProject(projectName, comment string) error

CreateProject create new project

func (*DataHub) CreateSubscription

func (datahub *DataHub) CreateSubscription(projectName, topicName, comment string) (*CreateSubscriptionResult, error)

func (*DataHub) CreateTupleTopic

func (datahub *DataHub) CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) error

func (*DataHub) DeleteConnector

func (datahub *DataHub) DeleteConnector(projectName, topicName, connectorId string) error

func (*DataHub) DeleteProject

func (datahub *DataHub) DeleteProject(projectName string) error

DeleteProject delete project

func (*DataHub) DeleteSubscription

func (datahub *DataHub) DeleteSubscription(projectName, topicName, subId string) error

func (*DataHub) DeleteTopic

func (datahub *DataHub) DeleteTopic(projectName, topicName string) error

func (*DataHub) GetBlobRecords

func (datahub *DataHub) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error)

func (*DataHub) GetConnector

func (datahub *DataHub) GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error)

func (*DataHub) GetConnectorDoneTime

func (datahub *DataHub) GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error)

func (*DataHub) GetConnectorShardStatus

func (datahub *DataHub) GetConnectorShardStatus(projectName, topicName, connectorId string) (*GetConnectorShardStatusResult, error)

func (*DataHub) GetConnectorShardStatusByShard

func (datahub *DataHub) GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (*ConnectorShardStatusEntry, error)

func (*DataHub) GetCursor

func (datahub *DataHub) GetCursor(projectName, topicName, shardId string, ctype CursorType, param ...int64) (*GetCursorResult, error)

func (*DataHub) GetMeterInfo

func (datahub *DataHub) GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error)

func (*DataHub) GetProject

func (datahub *DataHub) GetProject(projectName string) (*GetProjectResult, error)

GetProject get a project deatil named the given name

func (*DataHub) GetSubscription

func (datahub *DataHub) GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error)

func (*DataHub) GetSubscriptionOffset

func (datahub *DataHub) GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error)

func (*DataHub) GetTopic

func (datahub *DataHub) GetTopic(projectName, topicName string) (*GetTopicResult, error)

func (*DataHub) GetTupleRecords

func (datahub *DataHub) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error)

func (*DataHub) Heartbeat

func (datahub *DataHub) Heartbeat(projectName, topicName, consumerGroup, consumerId string, versionId int64, holdShardList, readEndShardList []string) (*HeartbeatResult, error)

func (*DataHub) JoinGroup

func (datahub *DataHub) JoinGroup(projectName, topicName, consumerGroup string, sessionTimeout int64) (*JoinGroupResult, error)

func (*DataHub) LeaveGroup

func (datahub *DataHub) LeaveGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64) error

func (*DataHub) ListConnector

func (datahub *DataHub) ListConnector(projectName, topicName string) (*ListConnectorResult, error)

func (*DataHub) ListProject

func (datahub *DataHub) ListProject() (*ListProjectResult, error)

ListProjects list all projects

func (*DataHub) ListShard

func (datahub *DataHub) ListShard(projectName, topicName string) (*ListShardResult, error)

func (*DataHub) ListSubscription

func (datahub *DataHub) ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error)

func (*DataHub) ListTopic

func (datahub *DataHub) ListTopic(projectName string) (*ListTopicResult, error)

func (*DataHub) MergeShard

func (datahub *DataHub) MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error)

func (*DataHub) OpenSubscriptionSession

func (datahub *DataHub) OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error)

func (*DataHub) PutRecords

func (datahub *DataHub) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error)

func (*DataHub) PutRecordsByShard

func (datahub *DataHub) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) error

func (*DataHub) ReloadConnector

func (datahub *DataHub) ReloadConnector(projectName, topicName, connectorId string) error

func (*DataHub) ReloadConnectorByShard

func (datahub *DataHub) ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) error

func (*DataHub) ResetSubscriptionOffset

func (datahub *DataHub) ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error

func (*DataHub) SplitShard

func (datahub *DataHub) SplitShard(projectName, topicName, shardId string) (*SplitShardResult, error)

func (*DataHub) SplitShardBySplitKey

func (datahub *DataHub) SplitShardBySplitKey(projectName, topicName, shardId, splitKey string) (*SplitShardResult, error)

func (*DataHub) SyncGroup

func (datahub *DataHub) SyncGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64, releaseShardList, readEndShardList []string) error

func (*DataHub) UpdateConnector

func (datahub *DataHub) UpdateConnector(projectName, topicName, connectorId string, config interface{}) error

func (*DataHub) UpdateConnectorOffset

func (datahub *DataHub) UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) error

func (*DataHub) UpdateConnectorState

func (datahub *DataHub) UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) error

func (*DataHub) UpdateProject

func (datahub *DataHub) UpdateProject(projectName, comment string) error

UpdateProject update project

func (*DataHub) UpdateSubscription

func (datahub *DataHub) UpdateSubscription(projectName, topicName, subId, comment string) error

func (*DataHub) UpdateSubscriptionState

func (datahub *DataHub) UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) error

func (*DataHub) UpdateTopic

func (datahub *DataHub) UpdateTopic(projectName, topicName, comment string) error

func (*DataHub) WaitAllShardsReady

func (datahub *DataHub) WaitAllShardsReady(projectName, topicName string) bool

func (*DataHub) WaitAllShardsReadyWithTime

func (datahub *DataHub) WaitAllShardsReadyWithTime(projectName, topicName string, timeout int64) bool

type DataHubApi

type DataHubApi interface {
	// Get the information of the specified project.
	GetProject(projectName string) (*GetProjectResult, error)

	// List all projects the user owns.
	ListProject() (*ListProjectResult, error)

	// Create a examples project.
	CreateProject(projectName, comment string) error

	// Update project information. Only support comment
	UpdateProject(projectName, comment string) error

	// Delete the specified project. If any topics exist in the project, the delete operation will fail.
	DeleteProject(projectName string) error

	// Wait for all shards' status of this topic is ACTIVE. Default timeout is 60s.
	WaitAllShardsReady(projectName, topicName string) bool

	// Wait for all shards' status of this topic is ACTIVE.
	// The unit is seconds.
	// If timeout < 0, it will block util all shards ready
	WaitAllShardsReadyWithTime(projectName, topicName string, timeout int64) bool

	// Create a examples topic with type: BLOB
	CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) error

	// Create a examples topic with type: TUPLE
	CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) error

	// Update topic meta information. Now only support modify comment info.
	UpdateTopic(projectName, topicName, comment string) error

	// Get the information of the specified topic.
	GetTopic(projectName, topicName string) (*GetTopicResult, error)

	// Delete a specified topic.
	DeleteTopic(projectName, topicName string) error

	// List all topics in the project.
	ListTopic(projectName string) (*ListTopicResult, error)

	// List shard information {ShardEntry} of a topic.
	ListShard(projectName, topicName string) (*ListShardResult, error)

	// Split a shard. In function, sdk will automatically compute the split key which is used to split shard.
	SplitShard(projectName, topicName, shardId string) (*SplitShardResult, error)

	// Split a shard by the specified splitKey.
	SplitShardBySplitKey(projectName, topicName, shardId, splitKey string) (*SplitShardResult, error)

	// Merge the specified shard and its adjacent shard. Only adjacent shards can be merged.
	MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error)

	// Get the data cursor of a shard. This function support OLDEST, LATEST, SYSTEM_TIME and SEQUENCE.
	// If choose OLDEST or LATEST, the last parameter will not be needed.
	// if choose SYSTEM_TIME or SEQUENCE. it needs to a parameter as sequence num or timestamp.
	GetCursor(projectName, topicName, shardId string, ctype CursorType, param ...int64) (*GetCursorResult, error)

	// Write data records into a DataHub topic.
	// The PutRecordsResult includes unsuccessfully processed records.
	// Datahub attempts to process all records in each record.
	// A single record failure does not stop the processing of subsequent records.
	PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error)

	PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) error

	// Get the TUPLE records of a shard.
	GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error)

	// Get the BLOB records of a shard.
	GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error)

	// Append a field to a TUPLE topic.
	// Field AllowNull should be true.
	AppendField(projectName, topicName string, field Field) error

	// Get metering info of the specified shard
	GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error)

	// Create data connectors.
	CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (*CreateConnectorResult, error)

	// Create connector with start time(unit:ms)
	CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType,
		columnFields []string, sinkStartTime int64, config interface{}) (*CreateConnectorResult, error)

	// Get information of the specified data connector.
	GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error)

	// Update connector config of the specified data connector.
	// Config should be SinkOdpsConfig, SinkOssConfig ...
	UpdateConnector(projectName, topicName, connectorId string, config interface{}) error

	// List name of connectors.
	ListConnector(projectName, topicName string) (*ListConnectorResult, error)

	// Delete a data connector.
	DeleteConnector(projectName, topicName, connectorId string) error

	// Get the done time of a data connector. This method mainly used to get MaxCompute synchronize point.
	GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error)

	// Reload a data connector.
	ReloadConnector(projectName, topicName, connectorId string) error

	// Reload the specified shard of the data connector.
	ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) error

	// Update the state of the data connector
	UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) error

	// Update connector sink offset. The operation must be operated after connector stopped.
	UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) error

	// Get the detail information of the shard task which belongs to the specified data connector.
	GetConnectorShardStatus(projectName, topicName, connectorId string) (*GetConnectorShardStatusResult, error)

	// Get the detail information of the shard task which belongs to the specified data connector.
	GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (*ConnectorShardStatusEntry, error)

	// Append data connector field.
	// Before run this method, you should ensure that this field is in both the topic and the connector.
	AppendConnectorField(projectName, topicName, connectorId, fieldName string) error

	// Create a subscription, and then you should commit offsets with this subscription.
	CreateSubscription(projectName, topicName, comment string) (*CreateSubscriptionResult, error)

	// Get the detail information of a subscription.
	GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error)

	// Delete a subscription.
	DeleteSubscription(projectName, topicName, subId string) error

	// List subscriptions in the topic.
	ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error)

	// Update a subscription. Now only support update comment information.
	UpdateSubscription(projectName, topicName, subId, comment string) error

	// Update a subscription' state. You can change the state of a subscription to SUB_ONLINE or SUB_OFFLINE.
	// When offline, you can not commit offsets of the subscription.
	UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) error

	// Init and get a subscription session, and returns offset if any offset stored before.
	// Subscription should be initialized before use. This operation makes sure that only one client use this subscription.
	// If this function be called in elsewhere, the seesion will be invalid and can not commit offsets of the subscription.
	OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error)

	// Get offsets of a subscription.This method dost not return sessionId in SubscriptionOffset.
	// Only the SubscriptionOffset containing sessionId can commit offset.
	GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error)

	// Update offsets of shards to server. This operation allows you store offsets on the server side.
	CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error

	// Reset offsets of shards to server. This operation allows you reset offsets on the server side.
	ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) error

	// Heartbeat request to let server know consumer status.
	Heartbeat(projectName, topicName, consumerGroup, consumerId string, versionId int64, holdShardList, readEndShardList []string) (*HeartbeatResult, error)

	// Join a consumer group.
	JoinGroup(projectName, topicName, consumerGroup string, sessionTimeout int64) (*JoinGroupResult, error)

	// Sync consumer group info.
	SyncGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64, releaseShardList, readEndShardList []string) error

	// Leave consumer group info.
	LeaveGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64) error
}

Datahub provides restful apis for visiting examples service.

func New

func New(accessId, accessKey, endpoint string) DataHubApi

func NewClientWithConfig

func NewClientWithConfig(endpoint string, config *Config, account Account) DataHubApi

type DataHubPB

type DataHubPB struct {
	DataHub
}

func (*DataHubPB) GetBlobRecords

func (datahub *DataHubPB) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error)

func (*DataHubPB) GetTupleRecords

func (datahub *DataHubPB) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error)

func (*DataHubPB) PutRecords

func (datahub *DataHubPB) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error)

func (*DataHubPB) PutRecordsByShard

func (datahub *DataHubPB) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) error

type DataType

type DataType interface {
	fmt.Stringer
}

type DatahubClientError

type DatahubClientError struct {
	StatusCode int    `json:"StatusCode"`   // Http status code
	RequestId  string `json:"RequestId"`    // Request-id to trace the request
	Code       string `json:"ErrorCode"`    // Datahub error code
	Message    string `json:"ErrorMessage"` // Error msg of the error code
}

DatahubError struct

func NewDatahubClientError

func NewDatahubClientError(statusCode int, requestId string, code string, message string) *DatahubClientError

create a new DatahubClientError

func (*DatahubClientError) Error

func (err *DatahubClientError) Error() string

type Decimal

type Decimal decimal.Decimal

DECIMAL

func (Decimal) String

func (d Decimal) String() string

type DialContextFn

type DialContextFn func(ctx context.Context, network, address string) (net.Conn, error)

DialContextFn was defined to make code more readable.

func TraceDialContext

func TraceDialContext(ctimeout time.Duration) DialContextFn

TraceDialContext implements our own dialer in order to trace conn info.

type Double

type Double float64

Double

func (Double) String

func (d Double) String() string

type EmptyRequest

type EmptyRequest struct {
}

empty request

type FailedRecord

type FailedRecord struct {
	Index        int    `json:"Index"`
	ErrorCode    string `json:"ErrorCode"`
	ErrorMessage string `json:"ErrorMessage"`
}

type Field

type Field struct {
	Name      string    `json:"name"`
	Type      FieldType `json:"type"`
	AllowNull bool      `json:"notnull"`
}

type FieldType

type FieldType string

FieldType

const (
	// BIGINT 8-bit long signed integer, not include (-9223372036854775808)
	// -9223372036854775807 ~ 9223372036854775807
	BIGINT FieldType = "BIGINT"

	// only support utf-8
	// 1Mb max size
	STRING FieldType = "STRING"

	// BOOLEAN
	// True/False,true/false, 0/1
	BOOLEAN FieldType = "BOOLEAN"

	// DOUBLE 8-bit double
	// -1.0 * 10^308 ~ 1.0 * 10^308
	DOUBLE FieldType = "DOUBLE"

	// TIMESTAMP
	// unit: us
	TIMESTAMP FieldType = "TIMESTAMP"

	// DECIMAL
	// can "only" represent numbers with a maximum of 2^31 digits after the decimal point.
	DECIMAL FieldType = "DECIMAL"
)

func (FieldType) String

func (ft FieldType) String() string

type GetConnectorDoneTimeResult

type GetConnectorDoneTimeResult struct {
	DoneTime int64  `json:"DoneTime"`
	TimeZone string `json:"TimeZone"`
}

func NewGetConnectorDoneTimeResult

func NewGetConnectorDoneTimeResult(data []byte) (*GetConnectorDoneTimeResult, error)

type GetConnectorResult

type GetConnectorResult struct {
	CreateTime     int64             `json:"CreateTime"`
	LastModifyTime int64             `json:"LastModifyTime"`
	ConnectorId    string            `json:"ConnectorId"`
	ClusterAddress string            `json:"ClusterAddress"`
	Type           ConnectorType     `json:"Type"`
	State          ConnectorState    `json:"State"`
	ColumnFields   []string          `json:"ColumnFields"`
	ExtraConfig    map[string]string `json:"ExtraInfo"`
	Creator        string            `json:"Creator"`
	Owner          string            `json:"Owner"`
	Config         interface{}       `json:"Config"`
}

func NewGetConnectorResult

func NewGetConnectorResult(data []byte) (*GetConnectorResult, error)

type GetConnectorShardStatusRequest

type GetConnectorShardStatusRequest struct {
	Action  string `json:"Action"`
	ShardId string `json:"ShardId,omitempty"`
}

type GetConnectorShardStatusResult

type GetConnectorShardStatusResult struct {
	ShardStatus map[string]ConnectorShardStatusEntry `json:"ShardStatusInfos"`
}

func NewGetConnectorShardStatusResult

func NewGetConnectorShardStatusResult(data []byte) (*GetConnectorShardStatusResult, error)

type GetCursorRequest

type GetCursorRequest struct {
	Action     string     `json:"Action"`
	CursorType CursorType `json:"Type"`
	SystemTime int64      `json:"SystemTime"`
	Sequence   int64      `json:"Sequence"`
}

type GetCursorResult

type GetCursorResult struct {
	Cursor     string `json:"Cursor"`
	RecordTime int64  `json:"RecordTime"`
	Sequence   int64  `json:"Sequence"`
}

func NewGetCursorResult

func NewGetCursorResult(data []byte) (*GetCursorResult, error)

type GetMeterInfoRequest

type GetMeterInfoRequest struct {
	Action string `json:"Action"`
}

type GetMeterInfoResult

type GetMeterInfoResult struct {
	ActiveTime int64 `json:"ActiveTime"`
	Storage    int64 `json:"Storage"`
}

func NewGetMeterInfoResult

func NewGetMeterInfoResult(data []byte) (*GetMeterInfoResult, error)

type GetPBRecordRequest

type GetPBRecordRequest struct {
	Cursor string `json:"Cursor"`
	Limit  int    `json:"Limit"`
}

type GetProjectResult

type GetProjectResult struct {
	ProjectName    string
	CreateTime     int64  `json:"CreateTime"`
	LastModifyTime int64  `json:"LastModifyTime"`
	Comment        string `json"Comment"`
}

the result of GetProject

func NewGetProjectResult

func NewGetProjectResult(data []byte) (*GetProjectResult, error)

convert the response body to GetProjectResult

type GetRecordRequest

type GetRecordRequest struct {
	Action string `json:"Action"`
	Cursor string `json:"Cursor"`
	Limit  int    `json:"Limit"`
}

type GetRecordsResult

type GetRecordsResult struct {
	NextCursor    string        `json:"NextCursor"`
	RecordCount   int           `json:"RecordCount"`
	StartSequence int64         `json:"StartSeq"`
	Records       []IRecord     `json:"Records"`
	RecordSchema  *RecordSchema `json:"-"`
}

func NewGetPBRecordsResult

func NewGetPBRecordsResult(data []byte, schema *RecordSchema) (*GetRecordsResult, error)

func NewGetRecordsResult

func NewGetRecordsResult(data []byte, schema *RecordSchema) (*GetRecordsResult, error)

func (*GetRecordsResult) UnmarshalJSON

func (grr *GetRecordsResult) UnmarshalJSON(data []byte) error

type GetSubscriptionOffsetRequest

type GetSubscriptionOffsetRequest struct {
	Action   string   `json:"Action"`
	ShardIds []string `json:"ShardIds"`
}

type GetSubscriptionOffsetResult

type GetSubscriptionOffsetResult struct {
	Offsets map[string]SubscriptionOffset `json:"Offsets"`
}

func NewGetSubscriptionOffsetResult

func NewGetSubscriptionOffsetResult(data []byte) (*GetSubscriptionOffsetResult, error)

type GetSubscriptionResult

type GetSubscriptionResult struct {
	SubscriptionEntry
}

func NewGetSubscriptionResult

func NewGetSubscriptionResult(data []byte) (*GetSubscriptionResult, error)

type GetTopicResult

type GetTopicResult struct {
	ProjectName    string
	TopicName      string
	ShardCount     int           `json:"ShardCount"`
	LifeCycle      int           `json:"LifeCycle"`
	RecordType     RecordType    `json:"RecordType"`
	RecordSchema   *RecordSchema `json:"RecordSchema"`
	Comment        string        `json:"Comment"`
	CreateTime     int64         `json:"CreateTime"`
	LastModifyTime int64         `json:"LastModifyTime"`
}

func NewGetTopicResult

func NewGetTopicResult(data []byte) (*GetTopicResult, error)

func (*GetTopicResult) UnmarshalJSON

func (gtr *GetTopicResult) UnmarshalJSON(data []byte) error

for deserialize the RecordSchema

type HeartbeatRequest

type HeartbeatRequest struct {
	Action           string   `json:"Action"`
	ConsumerId       string   `json:"ConsumerId"`
	VersionId        int64    `json:"VersionId"`
	HoldShardList    []string `json:"HoldShardList"`
	ReadEndShardList []string `json:"ReadEndShardList"`
}

type HeartbeatResult

type HeartbeatResult struct {
	PlanVersion int64    `json:"PlanVersion"`
	ShardList   []string `json:"ShardList"`
	TotalPlan   string   `json:"TotalPlan"`
}

func NewHeartbeatResult

func NewHeartbeatResult(data []byte) (*HeartbeatResult, error)

type IRecord

type IRecord interface {
	fmt.Stringer
	GetSystemTime() int64
	GetSequence() int64
	GetData() interface{}
	FillData(data interface{}) error
	GetBaseRecord() BaseRecord
	SetBaseRecord(baseRecord BaseRecord)
	SetAttribute(key string, val interface{})
}

IRecord record interface

type InsertMode

type InsertMode string
const (
	IGNORE    InsertMode = "true"
	OVERWRITE InsertMode = "false"
)

type InvalidOperationError

type InvalidOperationError struct {
	DatahubClientError
}

func NewInvalidOperationError

func NewInvalidOperationError(statusCode int, requestId string, code string, message string) *InvalidOperationError

type InvalidParameterError

type InvalidParameterError struct {
	DatahubClientError
}

InvalidParameterError represent the parameter error

func NewInvalidParameterError

func NewInvalidParameterError(statusCode int, requestId string, code string, message string) *InvalidParameterError

func NewInvalidParameterErrorWithMessage

func NewInvalidParameterErrorWithMessage(message string) *InvalidParameterError

type JoinGroupRequest

type JoinGroupRequest struct {
	Action         String `json:"Action"`
	SessionTimeout int64  `json:"SessionTimeout"`
}

type JoinGroupResult

type JoinGroupResult struct {
	ConsumerId     string `json:"ConsumerId"`
	VersionId      int64  `json:"VersionId"`
	SessionTimeout int64  `json:"SessionTimeout"`
}

func NewJoinGroupResult

func NewJoinGroupResult(data []byte) (*JoinGroupResult, error)

type LeaveGroupRequest

type LeaveGroupRequest struct {
	Action     string `json:"Action"`
	ConsumerId string `json:"ConsumerId"`
	VersionId  int64  `json:"VersionId"`
}

type LimitExceededError

type LimitExceededError struct {
	DatahubClientError
}

func NewLimitExceededError

func NewLimitExceededError(statusCode int, requestId string, code string, message string) *LimitExceededError

type ListConnectorResult

type ListConnectorResult struct {
	ConnectorIds []string `json:"Connectors"`
}

func NewListConnectorResult

func NewListConnectorResult(data []byte) (*ListConnectorResult, error)

type ListProjectResult

type ListProjectResult struct {
	ProjectNames []string `json:"ProjectNames"`
}

the result of ListProject

func NewListProjectResult

func NewListProjectResult(data []byte) (*ListProjectResult, error)

convert the response body to ListProjectResult

type ListShardResult

type ListShardResult struct {
	Shards []ShardEntry `json:"Shards"`
}

func NewListShardResult

func NewListShardResult(data []byte) (*ListShardResult, error)

type ListSubscriptionRequest

type ListSubscriptionRequest struct {
	Action    string `json:"Action"`
	PageIndex int    `json:"PageIndex"`
	PageSize  int    `json:"PageSize"`
}

type ListSubscriptionResult

type ListSubscriptionResult struct {
	TotalCount    int64               `json:"TotalCount"`
	Subscriptions []SubscriptionEntry `json:"Subscriptions"`
}

func NewListSubscriptionResult

func NewListSubscriptionResult(data []byte) (*ListSubscriptionResult, error)

type ListTopicResult

type ListTopicResult struct {
	TopicNames []string `json:"TopicNames"`
}

func NewListTopicResult

func NewListTopicResult(data []byte) (*ListTopicResult, error)

type MalformedRecordError

type MalformedRecordError struct {
	DatahubClientError
}

func NewMalformedRecordError

func NewMalformedRecordError(statusCode int, requestId string, code string, message string) *MalformedRecordError

type MergeShardRequest

type MergeShardRequest struct {
	Action          string `json:"Action"`
	ShardId         string `json:"ShardId"`
	AdjacentShardId string `json:"AdjacentShardId"`
}

type MergeShardResult

type MergeShardResult struct {
	ShardId      string `json:"ShardId"`
	BeginHashKey string `json:"BeginHashKey"`
	EndHashKey   string `json:"EndHashKey"`
}

func NewMergeShardResult

func NewMergeShardResult(data []byte) (*MergeShardResult, error)

type NoPermissionError

type NoPermissionError struct {
	DatahubClientError
}

func NewNoPermissionError

func NewNoPermissionError(statusCode int, requestId string, code string, message string) *NoPermissionError

type OffsetAction

type OffsetAction string

func (OffsetAction) String

func (oa OffsetAction) String() string

type OpenSubscriptionSessionRequest

type OpenSubscriptionSessionRequest struct {
	Action   string   `json:"Action"`
	ShardIds []string `json:"ShardIds"`
}

type OpenSubscriptionSessionResult

type OpenSubscriptionSessionResult struct {
	Offsets map[string]SubscriptionOffset `json:"Offsets"`
}

func NewOpenSubscriptionSessionResult

func NewOpenSubscriptionSessionResult(data []byte) (*OpenSubscriptionSessionResult, error)

type PartitionConfig

type PartitionConfig struct {
	ConfigMap []map[string]string
}

func NewPartitionConfig

func NewPartitionConfig() *PartitionConfig

func (*PartitionConfig) AddConfig

func (pc *PartitionConfig) AddConfig(key, value string)

func (*PartitionConfig) MarshalJSON

func (pc *PartitionConfig) MarshalJSON() ([]byte, error)

func (*PartitionConfig) UnmarshalJSON

func (pc *PartitionConfig) UnmarshalJSON(data []byte) error

type PartitionMode

type PartitionMode string
const (
	UserDefineMode PartitionMode = "USER_DEFINE"
	SystemTimeMode PartitionMode = "SYSTEM_TIME"
	EventTimeMode  PartitionMode = "EVENT_TIME"
)

func (*PartitionMode) String

func (pm *PartitionMode) String() string

type PutPBRecordsRequest

type PutPBRecordsRequest struct {
	Records []IRecord `json:"Records"`
}

type PutRecordsRequest

type PutRecordsRequest struct {
	Action  string    `json:"Action"`
	Records []IRecord `json:"Records"`
}

func (*PutRecordsRequest) MarshalJSON

func (ptr *PutRecordsRequest) MarshalJSON() ([]byte, error)

type PutRecordsResult

type PutRecordsResult struct {
	FailedRecordCount int            `json:"FailedRecordCount"`
	FailedRecords     []FailedRecord `json:"FailedRecords"`
}

func NewPutPBRecordsResult

func NewPutPBRecordsResult(data []byte) (*PutRecordsResult, error)

func NewPutRecordsResult

func NewPutRecordsResult(data []byte) (*PutRecordsResult, error)

type RecordEntry

type RecordEntry struct {
	Data interface{} `json:"Data"`
	BaseRecord
}

RecordEntry

type RecordSchema

type RecordSchema struct {
	Fields []Field `json:"fields"`
}

RecordSchema

func NewRecordSchema

func NewRecordSchema() *RecordSchema

NewRecordSchema create a new record schema for tuple record

func NewRecordSchemaFromJson

func NewRecordSchemaFromJson(SchemaJson string) (recordSchema *RecordSchema, err error)

func (*RecordSchema) AddField

func (rs *RecordSchema) AddField(f Field) *RecordSchema

AddField add a field

func (*RecordSchema) GetFieldIndex

func (rs *RecordSchema) GetFieldIndex(fname string) int

GetFieldIndex get index of given field

func (*RecordSchema) Size

func (rs *RecordSchema) Size() int

Size get record schema fields size

func (*RecordSchema) String

func (rs *RecordSchema) String() string

type RecordType

type RecordType string

RecordType

const (
	// BLOB record
	BLOB RecordType = "BLOB"

	// TUPLE record
	TUPLE RecordType = "TUPLE"
)

func (RecordType) String

func (rt RecordType) String() string

type ReloadConnectorRequest

type ReloadConnectorRequest struct {
	Action  string `json:"Action"`
	ShardId string `json:"ShardId,omitempty"`
}

type RequestModel

type RequestModel interface {
	// contains filtered or unexported methods
}

handel the http request

type ResetSubscriptionOffsetRequest

type ResetSubscriptionOffsetRequest struct {
	Action  string                        `json:"Action"`
	Offsets map[string]SubscriptionOffset `json:"Offsets"`
}

type ResourceExistError

type ResourceExistError struct {
	DatahubClientError
}

func NewResourceExistError

func NewResourceExistError(statusCode int, requestId string, code string, message string) *ResourceExistError

type ResourceNotFoundError

type ResourceNotFoundError struct {
	DatahubClientError
}

func NewResourceNotFoundError

func NewResourceNotFoundError(statusCode int, requestId string, code string, message string) *ResourceNotFoundError

type RestClient

type RestClient struct {
	// Endpoint datahub服务的endpint
	Endpoint string
	// Useragent user agent
	Useragent string
	// HttpClient http client
	HttpClient *http.Client
	// Account
	Account        Account
	CompressorType CompressorType
}

RestClient rest客户端

func NewRestClient

func NewRestClient(endpoint string, useragent string, httpclient *http.Client, account Account, ctype CompressorType) *RestClient

NewRestClient create a new rest client

func (*RestClient) Delete

func (client *RestClient) Delete(resource string) (interface{}, error)

Delete send HTTP Delete method request

func (*RestClient) Get

func (client *RestClient) Get(resource string) ([]byte, error)

Get send HTTP Get method request

func (*RestClient) Post

func (client *RestClient) Post(resource string, model RequestModel) ([]byte, error)

Post send HTTP Post method request

func (*RestClient) Put

func (client *RestClient) Put(resource string, model RequestModel) (interface{}, error)

Put send HTTP Put method request

type SeekOutOfRangeError

type SeekOutOfRangeError struct {
	DatahubClientError
}

func NewSeekOutOfRangeError

func NewSeekOutOfRangeError(statusCode int, requestId string, code string, message string) *SeekOutOfRangeError

type ServiceInProcessError

type ServiceInProcessError struct {
	DatahubClientError
}

func NewServiceInProcessError

func NewServiceInProcessError(statusCode int, requestId string, code string, message string) *ServiceInProcessError

type ServiceTemporaryUnavailableError added in v0.1.4

type ServiceTemporaryUnavailableError struct {
	DatahubClientError
}

func NewServiceTemporaryUnavailableError added in v0.1.4

func NewServiceTemporaryUnavailableError(message string) *ServiceTemporaryUnavailableError

func NewServiceTemporaryUnavailableErrorWithCode added in v0.1.4

func NewServiceTemporaryUnavailableErrorWithCode(statusCode int, requestId string, code string, message string) *ServiceTemporaryUnavailableError

type ShardEntry

type ShardEntry struct {
	ShardId        string     `json:"ShardId"`
	State          ShardState `json:"State"`
	BeginHashKey   string     `json:"BeginHashKey"`
	EndHashKey     string     `json:"EndHashKey"`
	ClosedTime     int64      `json:"ClosedTime"`
	ParentShardIds []string   `json:"ParentShardIds"`
	LeftShardId    string     `json:"LeftShardId"`
	RightShardId   string     `json:"RightShardId"`
	Address        string     `json:"Address"`
}

type ShardSealedError

type ShardSealedError struct {
	DatahubClientError
}

func NewShardSealedError

func NewShardSealedError(statusCode int, requestId string, code string, message string) *ShardSealedError

type ShardState

type ShardState string

ShardState

const (
	// OPENING shard is creating or fail over, not available
	OPENING ShardState = "OPENING"

	// ACTIVE is available
	ACTIVE ShardState = "ACTIVE"

	// CLOSED read-only
	CLOSED ShardState = "CLOSED"

	// CLOSING shard is closing, not available
	CLOSING ShardState = "CLOSING"
)

func (ShardState) String

func (state ShardState) String() string

type SinkAdsConfig

type SinkAdsConfig struct {
	SinkMysqlConfig
}

Ads Config

type SinkDatahubConfig

type SinkDatahubConfig struct {
	Endpoint  string   `json:"Endpoint"`
	Project   string   `json:"Project"`
	Topic     string   `json:"Topic"`
	AuthMode  AuthMode `json:"AuthMode"`
	AccessId  string   `json:"AccessId"`
	AccessKey string   `json:"AccessKey"`
}

datahub Config

type SinkEsConfig

type SinkEsConfig struct {
	Index      string   `json:"Index"`
	Endpoint   string   `json:"Endpoint"`
	User       string   `json:"User"`
	Password   string   `json:"Password"`
	IDFields   []string `json:"IDFields"`
	TypeFields []string `json:"TypeFields"`
	ProxyMode  bool     `json:"ProxyMode"`
}

ES Config

type SinkFcConfig

type SinkFcConfig struct {
	Endpoint  string   `json:"Endpoint"`
	Service   string   `json:"Service"`
	Function  string   `json:"Function"`
	AuthMode  AuthMode `json:"AuthMode"`
	AccessId  string   `json:"AccessId"`
	AccessKey string   `json:"AccessKey"`
}

FC Config

type SinkMysqlConfig

type SinkMysqlConfig struct {
	Host     string     `json:"Host"`
	Port     string     `json:"Port"`
	Database string     `json:"Database"`
	Table    string     `json:"Table"`
	User     string     `json:"User"`
	Password string     `json:"Password"`
	Ignore   InsertMode `json:"Ignore"`
}

mysql Config

type SinkOdpsConfig

type SinkOdpsConfig struct {
	Endpoint        string          `json:"OdpsEndpoint"`
	Project         string          `json:"Project"`
	Table           string          `json:"Table"`
	AccessId        string          `json:"AccessId"`
	AccessKey       string          `json:"AccessKey"`
	TimeRange       int             `json:"TimeRange"`
	TimeZone        string          `json:"TimeZone"`
	PartitionMode   PartitionMode   `json:"PartitionMode"`
	PartitionConfig PartitionConfig `json:"PartitionConfig"`
	TunnelEndpoint  string          `json:"TunnelEndpoint,omitempty"`
}

* ODPS CONFIG *

type SinkOssConfig

type SinkOssConfig struct {
	Endpoint   string   `json:"Endpoint"`
	Bucket     string   `json:"Bucket"`
	Prefix     string   `json:"Prefix"`
	TimeFormat string   `json:"TimeFormat"`
	TimeRange  int      `json:"TimeRange"`
	AuthMode   AuthMode `json:"AuthMode"`
	AccessId   string   `json:"AccessId"`
	AccessKey  string   `json:"AccessKey"`
}

Oss Config

type SinkOtsConfig

type SinkOtsConfig struct {
	Endpoint     string   `json:"Endpoint"`
	InstanceName string   `json:"InstanceName"`
	TableName    string   `json:"TableName"`
	AuthMode     AuthMode `json:"AuthMode"`
	AccessId     string   `json:"AccessId"`
	AccessKey    string   `json:"AccessKey"`
}

Ots Config

type SplitShardRequest

type SplitShardRequest struct {
	Action   string `json:"Action"`
	ShardId  string `json:"ShardId"`
	SplitKey string `json:"SplitKey"`
}

type SplitShardResult

type SplitShardResult struct {
	NewShards []ShardEntry `json:"NewShards"`
}

func NewSplitShardResult

func NewSplitShardResult(data []byte) (*SplitShardResult, error)

type String

type String string

String

func (String) String

func (str String) String() string

type StsCredential

type StsCredential struct {
	// Access key ID
	AccessId string

	// Secret Access Key
	AccessKey string

	// Security Token
	SecurityToken string
}

func NewStsCredential

func NewStsCredential(accessId, accessKey, securityToken string) *StsCredential

create new instance

func (StsCredential) GetAccountId

func (a StsCredential) GetAccountId() string

func (StsCredential) GetAccountKey

func (a StsCredential) GetAccountKey() string

func (StsCredential) GetSecurityToken

func (a StsCredential) GetSecurityToken() string

func (StsCredential) String

func (a StsCredential) String() string

type SubscriptionEntry

type SubscriptionEntry struct {
	SubId          string            `json:"SubId"`
	TopicName      string            `json:"TopicName"`
	IsOwner        bool              `json:"IsOwner"`
	Type           SubscriptionType  `json:"Type"`
	State          SubscriptionState `json:"State,omitempty"`
	Comment        string            `json:"Comment,omitempty"`
	CreateTime     int64             `json:"CreateTime"`
	LastModifyTime int64             `json:"LastModifyTime"`
}

type SubscriptionOfflineError

type SubscriptionOfflineError struct {
	DatahubClientError
}

func NewSubscriptionOfflineError

func NewSubscriptionOfflineError(statusCode int, requestId string, code string, message string) *SubscriptionOfflineError

type SubscriptionOffset

type SubscriptionOffset struct {
	Timestamp int64  `json:"Timestamp"`
	Sequence  int64  `json:"Sequence"`
	VersionId int64  `json:"Version"`
	SessionId *int64 `json:"SessionId"`
}

type SubscriptionOffsetResetError

type SubscriptionOffsetResetError struct {
	DatahubClientError
}

func NewSubscriptionOffsetResetError

func NewSubscriptionOffsetResetError(statusCode int, requestId string, code string, message string) *SubscriptionOffsetResetError

type SubscriptionSessionInvalidError

type SubscriptionSessionInvalidError struct {
	DatahubClientError
}

func NewSubscriptionSessionInvalidError

func NewSubscriptionSessionInvalidError(statusCode int, requestId string, code string, message string) *SubscriptionSessionInvalidError

type SubscriptionState

type SubscriptionState int

SubscriptionState

const (
	// SUB_OFFLINE
	SUB_OFFLINE SubscriptionState = iota

	// SUB_ONLINE
	SUB_ONLINE
)

func (SubscriptionState) Value

func (subState SubscriptionState) Value() int

type SubscriptionType

type SubscriptionType int

SubscriptionType

const (
	// SUBTYPE_USER
	SUBTYPE_USER SubscriptionType = iota

	// SUBTYPE_SYSTEM
	SUBTYPE_SYSTEM

	// SUBTYPE_TT
	SUBTYPE_TT
)

func (SubscriptionType) Value

func (subType SubscriptionType) Value() int

type SyncGroupRequest

type SyncGroupRequest struct {
	Action           string   `json:"Action"`
	ConsumerId       string   `json:"ConsumerId"`
	VersionId        int64    `json:"VersionId"`
	ReleaseShardList []string `json:"ReleaseShardList"`
	ReadEndShardList []string `json:"ReadEndShardList"`
}

type Timestamp

type Timestamp uint64

Timestamp

func (Timestamp) String

func (t Timestamp) String() string

type TupleRecord

type TupleRecord struct {
	RecordSchema *RecordSchema
	Values       []DataType
	BaseRecord
}

TupleRecord tuple type record

func NewTupleRecord

func NewTupleRecord(schema *RecordSchema, systemTime int64) *TupleRecord

NewTupleRecord new a tuple type record from given record schema

func (*TupleRecord) FillData

func (tr *TupleRecord) FillData(data interface{}) error

FillData implement of IRecord interface

func (*TupleRecord) GetBaseRecord

func (tr *TupleRecord) GetBaseRecord() BaseRecord

GetBaseRecord get base record entry

func (*TupleRecord) GetData

func (tr *TupleRecord) GetData() interface{}

GetData implement of IRecord interface

func (*TupleRecord) GetValueByIdx

func (tr *TupleRecord) GetValueByIdx(idx int) DataType

func (*TupleRecord) GetValueByName

func (tr *TupleRecord) GetValueByName(name string) DataType

func (*TupleRecord) GetValues

func (tr *TupleRecord) GetValues() map[string]DataType

func (*TupleRecord) SetBaseRecord

func (tr *TupleRecord) SetBaseRecord(baseRecord BaseRecord)

func (*TupleRecord) SetValueByIdx

func (tr *TupleRecord) SetValueByIdx(idx int, val interface{}) *TupleRecord

SetValueByIdx set a value by idx

func (*TupleRecord) SetValueByName

func (tr *TupleRecord) SetValueByName(name string, val interface{}) *TupleRecord

SetValueByName set a value by name

func (*TupleRecord) SetValues

func (tr *TupleRecord) SetValues(values []DataType) *TupleRecord

SetValues batch set values

func (*TupleRecord) String

func (tr *TupleRecord) String() string

type UpdateConnectorOffsetRequest

type UpdateConnectorOffsetRequest struct {
	Action    string `json:"Action"`
	ShardId   string `json:"ShardId"`
	Timestamp int64  `json:"CurrentTime"`
	Sequence  int64  `json:"CurrentSequence"`
}

type UpdateConnectorRequest

type UpdateConnectorRequest struct {
	Action string `json:"Action"`
	//Type   ConnectorType `json:"-"`
	Config interface{} `json:"Config"`
}

type UpdateConnectorStateRequest

type UpdateConnectorStateRequest struct {
	Action string         `json:"Action"`
	State  ConnectorState `json:"State"`
}

type UpdateProjectRequest

type UpdateProjectRequest struct {
	Comment string `json:"Comment"`
}

type UpdateSubscriptionRequest

type UpdateSubscriptionRequest struct {
	//Action  string            `json:"Action"`
	Comment string `json:"Comment"`
}

type UpdateSubscriptionStateRequest

type UpdateSubscriptionStateRequest struct {
	State SubscriptionState `json:"State"`
}

type UpdateTopicRequest

type UpdateTopicRequest struct {
	Comment string `json:"Comment"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL