goclickzetta

package module
v0.0.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 17, 2025 License: Apache-2.0 Imports: 45 Imported by: 0

README

Go Clickzetta Driver

This topic provides instructions for installing, running, and modifying the Go Clickzetta Driver. The driver supports Go's database/sql package.

Prerequisites

The following software packages are required to use the Go Clickzetta Driver.

Go

The latest driver requires the Go language 1.19 or higher. The supported operating systems are Linux, Mac OS, and Windows, but you may run the driver on other platforms if the Go language works correctly on those platforms.

Installation

Get goclickzetta source code, if not installed.

go get -u github.com/clickzetta/goclickzetta

Docs

For detailed documentation and basic usage examples, please see the documentation at goclickzetta-doc.

Development

The developer notes are hosted with the source code on GitHub.

Example code

  • The following example code demonstrates how to use the Go Clickzetta Driver to connect to a Clickzetta account and run a simple query.

import (
"database/sql"
"fmt"
_ "github.com/clickzetta/goclickzetta"
)

type CountResult struct {
    Count int64
}

db, err := sql.Open("clickzetta", "${username}:${pwd}@${protocol}(${service}/${schema}?virtualCluster=${vc}&workspace=${workspace}&instance=${instanceName}")
if err != nil {
    t.Error(err)
}

res, err := db.Query("select count(1) from table;")
if err != nil {
    t.Error(err)
}

for res.Next() {
    var result CountResult
    err := res.Scan(&result.Count)
    if err != nil {
        t.Error(err)
    }
    fmt.Printf("result is: %v", result)
}



  • The following example code demonstrates how to use the Go Clickzetta connection to write batch data to a Clickzetta table.
dsn := "${username}:${pwd}@${protocol}(${service}/${schema}?virtualCluster=${vc}&workspace=${workspace}&instance=${instanceName}"
conn, err := connect(dsn)
if err != nil {
t.Error(err)
}
options := BulkloadOptions{
Table:     "table",
Operation: APPEND,
}
stream, err := conn.CreateBulkloadStream(options)
writer, err := stream.OpenWriter(0)
row := writer.CreateRow()
row.SetBigint("id", int64(1))

row.SetString("month", "January")

row.SetBigint("amount", int64(2))

row.SetDecimal("cost", decimal.NewFromFloat(1.1))

writer.WriteRow(row)

writer.Close()
stream.Close()

More examples can be found in the examples.

DSN (Data Source Name)

The Data Source Name has a common format, like the following:

${username}:${pwd}@${protocol}(${service}/${schema}?virtualCluster=${vc}&workspace=${workspace}&instance=${instanceName}
  • username: The username of the Clickzetta account.
  • pwd: The password of the Clickzetta account.
  • protocol: The protocol of the Clickzetta service. The default value is https.(http,tcp ...)
  • service: The Clickzetta service name.
  • schema: The Clickzetta schema name.
  • vc: The Clickzetta virtual cluster name.
  • workspace: The Clickzetta workspace name.
  • instanceName: The Clickzetta instance name.

When User use the Clickzetta driver to execute SQL and write batch data , must construct the DSN.

BulkLoad

Users can use BulkLoad to write data to Clickzetta.BulkLoad has three modes: APPEND, OVERWRITE, and UPSERT. The default mode is APPEND.

  • APPEND: The APPEND mode appends data to the table.
  • OVERWRITE: The OVERWRITE mode overwrites the table. If the table has data, the data is deleted.
  • UPSERT: The UPSERT mode updates the table. Users must specify the primary key when using this mode. If the primary key exists, the data is updated. If the primary key does not exist, the data is inserted.

Row

Users can use Row to write data to Clickzetta. Row has the following methods:

  • SetBigint: Sets the value of a int64 column.
  • SetBoolean: Sets the value of a boolean column.
  • SetDate: Sets the value of a date column. (value should be string. eg: "2023-01-01")
  • SetDecimal: Sets the value of a decimal.Decimal column.
  • SetDouble: Sets the value of a float64 column.
  • SetFloat: Sets the value of a float32 column.
  • SetInt: Sets the value of an int32 column.
  • SetSmallint: Sets the value of a int16 column.
  • SetString: Sets the value of a string column.
  • SetTimestamp: Sets the value of a timestamp column. (value should be string. eg: "2023-01-01 00:00:00")
  • SetTinyInt: Sets the value of an int8 column.

Support

For official support, contact Clickzetta support at: https://www.yunqi.tech.

Documentation

Index

Constants

View Source
const (
	SubmitJobRequestPath requestPath = "/lh/submitJob"
	GetJobResultPath     requestPath = "/lh/getJob"
	CancelJobPath        requestPath = "/lh/cancelJob"
	GetTokenPath         requestPath = "/clickzetta-portal/user/loginSingle"
	GETWAYPATH           requestPath = "/igs/gatewayEndpoint"
)
View Source
const (
	// TimestampLTZType denotes a LTZ timezoneType for array binds
	TimestampLTZType timezoneType = iota
	// DateType denotes a date type for array binds
	DateType
)
View Source
const (
	BIGINT clickzettaType = iota
	BOOLEAN
	CHAR
	DATE
	DECIMAL
	DOUBLE
	FLOAT
	INT
	INTERVAL
	SMALLINT
	STRING
	TIMESTAMP_LTZ
	TINYINT
	ARRAY
	MAP
	STRUCT
	VARCHAR
	NOT_SUPPORTED
	JSON
)
View Source
const (
	Memory queryDataType = iota
	File   queryDataType = iota
)
View Source
const (
	OSS objectStorageType = iota
	COS objectStorageType = iota
)
View Source
const (
	// QueryStatusInProgress denotes a query execution in progress
	QueryStatusInProgress queryStatus = "queryStatusInProgress"
	// QueryStatusComplete denotes a completed query execution
	QueryStatusComplete queryStatus = "queryStatusComplete"
	// QueryFailed denotes a failed query
	QueryFailed queryStatus = "queryFailed"
)
View Source
const ClickzettaGoDriverVersion = "0.0.5"

ClickzettaGoDriverVersion is the version of Go Clickzetta Driver.

View Source
const SFSessionIDKey contextKey = "LOG_SESSION_ID"

SFSessionIDKey is context key of session id

View Source
const SFSessionUserKey contextKey = "LOG_USER"

SFSessionUserKey is context key of user id of a session

Variables

View Source
var (
	SQL_JOB        jobType = "SQL_JOB"
	COMPACTION_JOB jobType = "COMPACTION_JOB"
)
View Source
var (
	UNKNOWN jobRequestMode = "UNKNOWN"
	HYBRID  jobRequestMode = "HYBRID"
	ASYNC   jobRequestMode = "ASYNC"
	SYNC    jobRequestMode = "SYNC"
)
View Source
var HTTPTransport = &http.Transport{
	DialContext: (&net.Dialer{
		Timeout:   30 * time.Second,
		KeepAlive: 60 * time.Second,
	}).DialContext,
	MaxIdleConns:          500,
	IdleConnTimeout:       60 * time.Second,
	ExpectContinueTimeout: 30 * time.Second,
	MaxIdleConnsPerHost:   100,
}
View Source
var LogKeys = [...]contextKey{SFSessionIDKey, SFSessionUserKey}

LogKeys these keys in context should be included in logging messages when using logger.WithContext

Functions

func AppendValueToArrowField

func AppendValueToArrowField(field array.Builder, value interface{}, tpe *util.DataType) error

func ConvertToArrowDataType

func ConvertToArrowDataType(tpe *util.DataType) (arrow.DataType, error)

func ConvertToArrowValue

func ConvertToArrowValue(value interface{}, tpe *util.DataType) (string, error)

func DSN

func DSN(cfg *Config) (dsn string)

DSN constructs a DSN for Clickzetta db.

func GetHttpResponseMsgToJson

func GetHttpResponseMsgToJson(headers map[string]string, path string, connection *ClickzettaConn, jsonData []byte) (*fastjson.Value, []byte, error)

func SFCallerPrettyfier

func SFCallerPrettyfier(frame *runtime.Frame) (string, string)

SFCallerPrettyfier to provide base file name and function name from calling frame used in SFLogger

func SetLogger

func SetLogger(inLogger *SFLogger)

Types

type BulkLoadCommitMode

type BulkLoadCommitMode string
var (
	COMMIT_STREAM BulkLoadCommitMode
	ABORT_STREAM  BulkLoadCommitMode
)

type BulkLoadConfig

type BulkLoadConfig struct {
	BLConfig *ingestion.BulkLoadStreamWriterConfig
}

func (*BulkLoadConfig) GetBulkLoadConfig

func (bc *BulkLoadConfig) GetBulkLoadConfig() (StagingConfig, error)

func (*BulkLoadConfig) GetFileFormat

func (bc *BulkLoadConfig) GetFileFormat() FileFormat

func (*BulkLoadConfig) GetMaxBytesPerFile

func (bc *BulkLoadConfig) GetMaxBytesPerFile() int64

func (*BulkLoadConfig) GetMaxRowsPerFile

func (bc *BulkLoadConfig) GetMaxRowsPerFile() int64

type BulkLoadOperation

type BulkLoadOperation string
var (
	APPEND    BulkLoadOperation = "APPEND"
	UPSERT    BulkLoadOperation = "UPSERT"
	OVERWRITE BulkLoadOperation = "OVERWRITE"
)

type BulkLoadState

type BulkLoadState string
var (
	CREATED          BulkLoadState = "CREATED"
	SEALED           BulkLoadState = "SEALED"
	COMMIT_SUBMITTED BulkLoadState = "COMMIT_SUBMITTED"
	COMMIT_SUCCESS   BulkLoadState = "COMMIT_SUCCESS"
	COMMIT_FAILED    BulkLoadState = "COMMIT_FAILED"
	ABORTED          BulkLoadState = "ABORTED"
)

type BulkloadCommitOptions

type BulkloadCommitOptions struct {
	Workspace      string
	VirtualCluster string
}

type BulkloadMetadata

type BulkloadMetadata struct {
	InstanceId int64
	StreamInfo *ingestion.BulkLoadStreamInfo
	Table      CZTable
}

func (*BulkloadMetadata) GetOperation

func (bm *BulkloadMetadata) GetOperation() BulkLoadOperation

func (*BulkloadMetadata) GetPartitionSpec

func (bm *BulkloadMetadata) GetPartitionSpec() string

func (*BulkloadMetadata) GetRecordKeys

func (bm *BulkloadMetadata) GetRecordKeys() []string

func (*BulkloadMetadata) GetSQLErrorMsg

func (bm *BulkloadMetadata) GetSQLErrorMsg() string

func (*BulkloadMetadata) GetState

func (bm *BulkloadMetadata) GetState() BulkLoadState

type BulkloadOptions

type BulkloadOptions struct {
	Table         string
	Operation     BulkLoadOperation
	PartitionSpec string
	RecordKeys    []string
}

type BulkloadStream

type BulkloadStream struct {
	MetaData      *BulkloadMetadata
	Connection    *ClickzettaConn
	CommitOptions *BulkloadCommitOptions
	StreamOptions *BulkloadOptions
	Closed        bool
}

func (*BulkloadStream) Abort

func (stream *BulkloadStream) Abort() error

func (*BulkloadStream) Close

func (stream *BulkloadStream) Close() error

func (*BulkloadStream) Commit

func (stream *BulkloadStream) Commit() error

func (*BulkloadStream) GetStreamId

func (stream *BulkloadStream) GetStreamId() string

func (*BulkloadStream) OpenWriter

func (stream *BulkloadStream) OpenWriter(partitionId int64) (*BulkloadWriter, error)

type BulkloadWriter

type BulkloadWriter struct {
	Connection             *ClickzettaConn
	MetaData               *BulkloadMetadata
	BLConfig               *BulkLoadConfig
	PartitionId            int64
	StreamOptions          *BulkloadOptions
	StageConfig            *StagingConfig
	PartitionSpec          map[string]string
	FileSystem             fs.FS
	FinishedFiles          []string
	FinishedFileSizes      []int
	FileNameUUID           string
	FileId                 int
	Closed                 bool
	CurrentTotalRows       int
	CurrentTotalBytes      int
	CurrentRecordBatch     map[string][]interface{}
	CurrentRecordBatchSize int
	CurrentRecordBatchRows int
	EstimateRowStaticSize  int
	ArrowSchema            *arrow.Schema
	Writer                 *pqarrow.FileWriter
	OSSBucket              *oss.Bucket
	COSClient              *cos.Client
	LocalLocation          string
}

func (*BulkloadWriter) Abort

func (bw *BulkloadWriter) Abort() error

func (*BulkloadWriter) CheckFileStatus

func (bw *BulkloadWriter) CheckFileStatus() error

func (*BulkloadWriter) Close

func (bw *BulkloadWriter) Close() error

func (*BulkloadWriter) CloseCurrentFile

func (bw *BulkloadWriter) CloseCurrentFile() error

func (*BulkloadWriter) ConstructArrowSchema

func (bw *BulkloadWriter) ConstructArrowSchema() error

func (*BulkloadWriter) CreateNextFileWriter

func (bw *BulkloadWriter) CreateNextFileWriter() (*pqarrow.FileWriter, error)

func (*BulkloadWriter) CreateRow

func (bw *BulkloadWriter) CreateRow() *Row

func (*BulkloadWriter) CurrentFileName

func (bw *BulkloadWriter) CurrentFileName() string

func (*BulkloadWriter) EstimateRowSize

func (bw *BulkloadWriter) EstimateRowSize() int

func (*BulkloadWriter) Finish

func (bw *BulkloadWriter) Finish() error

func (*BulkloadWriter) FlushRecordBatch

func (bw *BulkloadWriter) FlushRecordBatch() (int, error)

func (*BulkloadWriter) Init

func (bw *BulkloadWriter) Init() error

func (*BulkloadWriter) ParsePartitionSpec

func (bw *BulkloadWriter) ParsePartitionSpec() (map[string]string, error)

func (*BulkloadWriter) ProcessStagingType

func (bw *BulkloadWriter) ProcessStagingType() error

func (*BulkloadWriter) UploadLocalFile

func (bw *BulkloadWriter) UploadLocalFile() (string, error)

func (*BulkloadWriter) WriteRow

func (bw *BulkloadWriter) WriteRow(row *Row) error

type CZTable

type CZTable struct {
	SchemaName string
	TableName  string
	TableMeta  *ingestion.StreamSchema
	Schema     map[string]*util.DataType
}

type ClickzettaConn

type ClickzettaConn struct {
	// contains filtered or unexported fields
}

func (*ClickzettaConn) Begin

func (conn *ClickzettaConn) Begin() (driver.Tx, error)

func (*ClickzettaConn) BeginTx

func (conn *ClickzettaConn) BeginTx(
	ctx context.Context,
	opts driver.TxOptions) (
	driver.Tx, error)

func (*ClickzettaConn) Close

func (conn *ClickzettaConn) Close() (err error)

func (*ClickzettaConn) CommitBulkloadStream

func (conn *ClickzettaConn) CommitBulkloadStream(streamId string, commitMode BulkLoadCommitMode, option BulkloadOptions) (*BulkloadMetadata, error)

func (*ClickzettaConn) CreateBulkloadStream

func (conn *ClickzettaConn) CreateBulkloadStream(option BulkloadOptions) (*BulkloadStream, error)

func (*ClickzettaConn) Exec

func (conn *ClickzettaConn) Exec(
	query string,
	args []driver.Value) (
	driver.Result, error)

func (*ClickzettaConn) ExecContext

func (conn *ClickzettaConn) ExecContext(
	ctx context.Context,
	query string,
	args []driver.NamedValue) (
	driver.Result, error)

func (*ClickzettaConn) FinishBulkloadStreamWriter

func (conn *ClickzettaConn) FinishBulkloadStreamWriter(streamId string, option BulkloadOptions, partitionId uint32, writtenFileList []string, writtenLengths []uint64) (*ingestion.ResponseStatus, error)

func (*ClickzettaConn) GateWayCall

func (conn *ClickzettaConn) GateWayCall(message proto.Message, method ingestion.MethodEnum) (*fastjson.Value, error)

func (*ClickzettaConn) GetBulkloadStream

func (conn *ClickzettaConn) GetBulkloadStream(streamId string, option BulkloadOptions) (*BulkloadMetadata, error)

func (*ClickzettaConn) GetDistributeBulkloadStream

func (conn *ClickzettaConn) GetDistributeBulkloadStream(streamId string, option BulkloadOptions) (*BulkloadStream, error)

func (*ClickzettaConn) OpenBulkloadStreamWriter

func (conn *ClickzettaConn) OpenBulkloadStreamWriter(streamId string, option BulkloadOptions, partitionId uint32) (*BulkLoadConfig, error)

func (*ClickzettaConn) Ping

func (conn *ClickzettaConn) Ping(ctx context.Context) error

func (*ClickzettaConn) Prepare

func (conn *ClickzettaConn) Prepare(query string) (driver.Stmt, error)

func (*ClickzettaConn) PrepareContext

func (conn *ClickzettaConn) PrepareContext(
	ctx context.Context,
	query string) (
	driver.Stmt, error)

func (*ClickzettaConn) Query

func (conn *ClickzettaConn) Query(
	query string,
	args []driver.Value) (
	driver.Rows, error)

func (*ClickzettaConn) QueryContext

func (conn *ClickzettaConn) QueryContext(
	ctx context.Context,
	query string,
	args []driver.NamedValue) (
	driver.Rows, error)

type ClickzettaDriver

type ClickzettaDriver struct{}

ClickzettaDriver is a context of Go Driver

func (ClickzettaDriver) Open

func (d ClickzettaDriver) Open(dsn string) (driver.Conn, error)

Open creates a new connection.

func (ClickzettaDriver) OpenWithConfig

func (d ClickzettaDriver) OpenWithConfig(ctx context.Context, config Config) (driver.Conn, error)

OpenWithConfig creates a new connection with the given Config.

type ClickzettaError

type ClickzettaError struct {
	Number         int
	SQLState       string
	QueryID        string
	Message        string
	MessageArgs    []interface{}
	IncludeQueryID bool // TODO: populate this in connection
}

func (*ClickzettaError) Error

func (ce *ClickzettaError) Error() string

type ClickzettaResult

type ClickzettaResult interface {
	GetQueryID() string
	GetStatus() queryStatus
	GetError() error
}

ClickzettaResult provides an API for methods exposed to the clients

type ClickzettaRows

type ClickzettaRows interface {
	GetQueryID() string
	GetStatus() queryStatus
	GetResultRows() ([]interface{}, error)
}

type ClickzettaStmt

type ClickzettaStmt struct {
	// contains filtered or unexported fields
}

func (*ClickzettaStmt) Close

func (stmt *ClickzettaStmt) Close() error

func (*ClickzettaStmt) Exec

func (stmt *ClickzettaStmt) Exec(args []driver.Value) (driver.Result, error)

func (*ClickzettaStmt) ExecContext

func (stmt *ClickzettaStmt) ExecContext(ctx context.Context, args []driver.NamedValue) (driver.Result, error)

func (*ClickzettaStmt) NumInput

func (stmt *ClickzettaStmt) NumInput() int

func (*ClickzettaStmt) Query

func (stmt *ClickzettaStmt) Query(args []driver.Value) (driver.Rows, error)

func (*ClickzettaStmt) QueryContext

func (stmt *ClickzettaStmt) QueryContext(ctx context.Context, args []driver.NamedValue) (driver.Rows, error)

type Config

type Config struct {
	UserName       string // Username
	Password       string // Password (requires User)
	Schema         string // Schema
	Workspace      string // Workspace
	VirtualCluster string // VirtualCluster
	Service        string // Service
	Instance       string // Instance
	Protocol       string // Protocol
	Token          string
	InstanceId     int64

	Params map[string]*string // other connection parameters
}

Config is a set of configuration parameters

func ParseDSN

func ParseDSN(dsn string) (cfg *Config, err error)

ParseDSN parses the DSN string to a Config.

type ConfigBool

type ConfigBool uint8

ConfigBool is a type to represent true or false in the Config

type Connector

type Connector struct {
	// contains filtered or unexported fields
}

Connector creates Driver with the specified Config

func NewConnector

func NewConnector(driver InternalClickzettaDriver, config Config) Connector

NewConnector creates a new connector with the given ClickzettaDriver and Config.

func (Connector) Connect

func (t Connector) Connect(ctx context.Context) (driver.Conn, error)

Connect creates a new connection.

func (Connector) Driver

func (t Connector) Driver() driver.Driver

Driver creates a new driver.

type FileFormat

type FileFormat string
var (
	TEXT        FileFormat = "text"
	PARQUET     FileFormat = "parquet"
	ORC         FileFormat = "orc"
	AVRO        FileFormat = "avro"
	CSV         FileFormat = "csv"
	ARROW       FileFormat = "arrow"
	HIVE_RESULT FileFormat = "hive_result"
	DUMMY       FileFormat = "dummy"
	MEMORY      FileFormat = "memory"
	ICEBERG     FileFormat = "iceberg"
)

type InternalClickzettaDriver

type InternalClickzettaDriver interface {
	Open(dsn string) (driver.Conn, error)
	OpenWithConfig(ctx context.Context, config Config) (driver.Conn, error)
}

InternalClickzettaDriver is the interface for an internal Clickzetta driver

type InternalClient

type InternalClient interface {
	Get(context.Context, *url.URL, map[string]string, time.Duration) (*http.Response, error)
	Post(context.Context, *url.URL, map[string]string, []byte, time.Duration) (*http.Response, error)
	Close() error
}

InternalClient is implemented by HTTPClient

type Row

type Row struct {
	Columns          map[string]*util.DataType
	TableName        string
	ColumnNameValues map[string]interface{}
}

func (*Row) SetBigint

func (row *Row) SetBigint(columnName string, value interface{}) error

func (*Row) SetBoolean

func (row *Row) SetBoolean(columnName string, value interface{}) error

func (*Row) SetDate

func (row *Row) SetDate(columnName string, value interface{}) error

func (*Row) SetDecimal

func (row *Row) SetDecimal(columnName string, value interface{}) error

func (*Row) SetDouble

func (row *Row) SetDouble(columnName string, value interface{}) error

func (*Row) SetFloat

func (row *Row) SetFloat(columnName string, value interface{}) error

func (*Row) SetInt

func (row *Row) SetInt(columnName string, value interface{}) error

func (*Row) SetSmallInt

func (row *Row) SetSmallInt(columnName string, value interface{}) error

func (*Row) SetString

func (row *Row) SetString(columnName string, value interface{}) error

func (*Row) SetTimestamp

func (row *Row) SetTimestamp(columnName string, value interface{}) error

func (*Row) SetTinyInt

func (row *Row) SetTinyInt(columnName string, value interface{}) error

type SFLogger

type SFLogger interface {
	rlog.Ext1FieldLogger
	SetLogLevel(level string) error
	WithContext(ctx context.Context) *rlog.Entry
	SetOutput(output io.Writer)
}

func CreateDefaultLogger

func CreateDefaultLogger() SFLogger

CreateDefaultLogger return a new instance of SFLogger with default config

func GetLogger

func GetLogger() SFLogger

GetLogger return logger that is not public

type StagingConfig

type StagingConfig struct {
	Path     string
	ID       string
	Secret   string
	Token    string
	Endpoint string
	Type     string
}

type TypedNullTime

type TypedNullTime struct {
	Time   sql.NullTime
	TzType timezoneType
}

type UUID

type UUID [16]byte

UUID is a RFC4122 compliant uuid type

func NewUUID

func NewUUID() UUID

NewUUID creates a new clickzetta UUID

func ParseUUID

func ParseUUID(str string) UUID

ParseUUID parses a string of xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx into its UUID form

func (UUID) String

func (u UUID) String() string

Directories

Path Synopsis
protos

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL