Documentation ¶
Overview ¶
Example (Tunnel_download_arrow_simple) ¶
session, err := tunnelIns.CreateDownloadSession( "test_new_console_gcc", // "upload_sample_arrow", "has_struct", ) if err != nil { log.Fatalf("%+v", err) } recordCount := session.RecordCount() println(fmt.Sprintf("record count is %d", recordCount)) reader, err := session.OpenRecordArrowReader(0, 2, nil) if err != nil { log.Fatalf("%+v", err) } n := 0 reader.Iterator(func(rec array.Record, err error) { if err != nil { log.Fatalf("%+v", err) } for i, col := range rec.Columns() { println(fmt.Sprintf("rec[%d][%d]: %v", n, i, col)) } rec.Release() n++ }) err = reader.Close() if err != nil { log.Fatalf("%+v", err) }
Output:
Example (Tunnel_download_arrow_with_partition) ¶
session, err := tunnelIns.CreateDownloadSession( "test_new_console_gcc", "sale_detail", tunnel2.SessionCfg.WithPartitionKey("sale_date='202111',region='hangzhou'"), ) if err != nil { log.Fatalf("%+v", err) } recordCount := session.RecordCount() println(fmt.Sprintf("record count is %d", recordCount)) reader, err := session.OpenRecordArrowReader( 0, 1000, []string{"shop_name", "total_price"}) if err != nil { log.Fatalf("%+v", err) } n := 0 reader.Iterator(func(rec array.Record, err error) { if err != nil { log.Fatalf("%+v", err) } for i, col := range rec.Columns() { println(fmt.Sprintf("rec[%d][%d]: %v", n, i, col)) } rec.Release() n++ }) err = reader.Close() if err != nil { log.Fatalf("%+v", err) }
Output:
Example (Tunnel_download_instance_result) ¶
package main import ( "log" "github.com/aliyun/aliyun-odps-go-sdk/odps" account2 "github.com/aliyun/aliyun-odps-go-sdk/odps/account" "github.com/aliyun/aliyun-odps-go-sdk/odps/data" "github.com/aliyun/aliyun-odps-go-sdk/odps/restclient" "github.com/aliyun/aliyun-odps-go-sdk/odps/tunnel" ) func main() { var account = account2.AliyunAccountFromEnv() var endpoint = restclient.LoadEndpointFromEnv() var odpsIns = odps.NewOdps(account, endpoint) projectName := "project_1" odpsIns.SetDefaultProjectName(projectName) project := odpsIns.DefaultProject() tunnelIns, err := tunnel.NewTunnelFromProject(project) if err != nil { log.Fatalf("%+v", err) } ins, err := odpsIns.ExecSQl("select * from data_type_demo;") if err != nil { log.Fatalf("%+v", err) } err = ins.WaitForSuccess() if err != nil { log.Fatalf("%+v", err) } session, err := tunnelIns.CreateInstanceResultDownloadSession(projectName, ins.Id()) if err != nil { log.Fatalf("%+v", err) } // columnNames := []string { // "ti", "si", "i", "bi", "b", "f", "d", "dc", "vc", "c", "s", "da", "dat", "t", "bl", // } // set columnNames=nil for get all the columns reader, err := session.OpenRecordReader(0, 100, 0, nil) if err != nil { log.Fatalf("%+v", err) } // 用read()逐个读取 // record, err := reader.Read() // if err != nil && err != io.EOF { // println(err.Error()) // } else { // for i, n := 0, record.Len(); i < n; i ++ { // f := record.Get(i) // println(f.String()) // } // } // 或用iterator遍历读取 reader.Iterator(func(record data.Record, err error) { if err != nil { log.Fatalf("%+v", err) } for i, n := 0, record.Len(); i < n; i++ { f := record.Get(i) println(f.String()) } }) }
Output:
Example (Tunnel_upload_arrow) ¶
tunnelIns.SetHttpTimeout(10 * time.Second) session, err := tunnelIns.CreateUploadSession( ProjectName, "sale_detail", tunnel2.SessionCfg.WithPartitionKey("sale_date='202111',region='hangzhou'"), // tunnel.SessionCfg.WithSnappyFramedCompressor(), // tunnel.SessionCfg.WithDeflateCompressor(tunnel.DeflateLevel.DefaultCompression), tunnel2.SessionCfg.WithDefaultDeflateCompressor(), ) if err != nil { log.Fatalf("%+v", err) } schema := session.ArrowSchema() type SaleDetailData struct { ShopNames []string CustomIDs []string totalPrice []float64 } rawData := []SaleDetailData{ { []string{"sun", "moon", "earth"}, []string{"fixed_start1", "satellite1", "planet3"}, []float64{10000.032, 200.00, 1500.232}, }, { []string{"mars", "venus"}, []string{"planet4", "planet2"}, []float64{1000.1, 1232.2}, }, { []string{"songjiang", "wusong"}, []string{"liangshan1", "liangshan2"}, []float64{100.13, 232.2}, }, } blockIds := make([]int, len(rawData)) writeBlock := func(blockId int, data SaleDetailData) error { recordWriter, err := session.OpenRecordArrowWriter(blockId) if err != nil { return errors.WithStack(err) } pool := memory.NewGoAllocator() recordBuilder := array.NewRecordBuilder(pool, schema) defer recordBuilder.Release() for i, field := range schema.Fields() { fieldBuilder := recordBuilder.Field(i) switch field.Name { case "shop_name": builder := fieldBuilder.(*array.StringBuilder) builder.AppendValues(data.ShopNames, nil) case "customer_id": builder := fieldBuilder.(*array.StringBuilder) builder.AppendValues(data.CustomIDs, nil) case "total_price": builder := fieldBuilder.(*array.Float64Builder) builder.AppendValues(data.totalPrice, nil) } } record := recordBuilder.NewRecord() defer record.Release() err = recordWriter.WriteArrowRecord(record) if err != nil { return errors.WithStack(err) } return errors.WithStack(recordWriter.Close()) } wait := make(chan error, len(rawData)) for i, n := 0, len(rawData); i < n; i++ { i := i blockIds[i] = i go func() { err := writeBlock(i, rawData[i]) wait <- err }() } for i, n := 0, len(rawData); i < n; i++ { e := <-wait if e != nil { log.Fatalf("%+v", err) return } } err = session.Commit(blockIds) if err != nil { log.Fatalf("%+v", err) }
Output:
Index ¶
- Constants
- Variables
- func Retry(f func() error)
- func WrapByCompressor(rc io.ReadCloser, contentEncoding string) io.ReadCloser
- type ArrowStreamReader
- type ArrowStreamWriter
- type Compressor
- type Crc32CheckSum
- type Deflate
- type DownLoadStatus
- type DownloadSession
- func (ds *DownloadSession) ArrowSchema() *arrow.Schema
- func (ds *DownloadSession) OpenRecordArrowReader(start, count int, columnNames []string) (*RecordArrowReader, error)
- func (ds *DownloadSession) OpenRecordReader(start, count int, columnNames []string) (*RecordProtocReader, error)
- func (ds *DownloadSession) PartitionKey() string
- func (ds *DownloadSession) RecordCount() int
- func (ds *DownloadSession) ResourceUrl() string
- func (ds *DownloadSession) Schema() tableschema.TableSchema
- func (ds *DownloadSession) SetPartitionKey(partitionKey string)
- func (ds *DownloadSession) ShouldTransformDate() bool
- func (ds *DownloadSession) Status() DownLoadStatus
- type InstanceOption
- type InstanceResultDownloadSession
- func AttachToExistedIRDownloadSession(downloadId, projectName, instanceId string, restClient restclient.RestClient, ...) (*InstanceResultDownloadSession, error)
- func CreateInstanceResultDownloadSession(projectName, instanceId string, restClient restclient.RestClient, ...) (*InstanceResultDownloadSession, error)
- func (is *InstanceResultDownloadSession) OpenRecordReader(start, count, sizeLimit int, columnNames []string) (*RecordProtocReader, error)
- func (is *InstanceResultDownloadSession) RecordCount() int
- func (is *InstanceResultDownloadSession) ResourceUrl() string
- func (is *InstanceResultDownloadSession) Schema() tableschema.TableSchema
- func (is *InstanceResultDownloadSession) ShouldTransformDate() bool
- func (is *InstanceResultDownloadSession) Status() DownLoadStatus
- type Option
- type ProtocStreamReader
- func (r *ProtocStreamReader) ReadBool() (bool, error)
- func (r *ProtocStreamReader) ReadBytes() ([]byte, error)
- func (r *ProtocStreamReader) ReadFixed32() (uint32, error)
- func (r *ProtocStreamReader) ReadFixed64() (uint64, error)
- func (r *ProtocStreamReader) ReadFloat32() (float32, error)
- func (r *ProtocStreamReader) ReadFloat64() (float64, error)
- func (r *ProtocStreamReader) ReadInt32() (int32, error)
- func (r *ProtocStreamReader) ReadInt64() (int64, error)
- func (r *ProtocStreamReader) ReadSFixed32() (int32, error)
- func (r *ProtocStreamReader) ReadSFixed64() (int64, error)
- func (r *ProtocStreamReader) ReadSInt32() (int32, error)
- func (r *ProtocStreamReader) ReadSInt64() (int64, error)
- func (r *ProtocStreamReader) ReadString() (string, error)
- func (r *ProtocStreamReader) ReadTag() (protowire.Number, protowire.Type, error)
- func (r *ProtocStreamReader) ReadUInt32() (uint32, error)
- func (r *ProtocStreamReader) ReadUInt64() (uint64, error)
- func (r *ProtocStreamReader) ReadVarint() (uint64, error)
- type ProtocStreamWriter
- func (r *ProtocStreamWriter) WriteBool(val bool) error
- func (r *ProtocStreamWriter) WriteBytes(b []byte) error
- func (r *ProtocStreamWriter) WriteFixed32(val uint32) error
- func (r *ProtocStreamWriter) WriteFixed64(val uint64) error
- func (r *ProtocStreamWriter) WriteFloat32(val float32) error
- func (r *ProtocStreamWriter) WriteFloat64(val float64) error
- func (r *ProtocStreamWriter) WriteInt32(val int32) error
- func (r *ProtocStreamWriter) WriteInt64(val int64) error
- func (r *ProtocStreamWriter) WriteSInt32(val int32) error
- func (r *ProtocStreamWriter) WriteSInt64(val int64) error
- func (r *ProtocStreamWriter) WriteTag(num protowire.Number, typ protowire.Type) error
- func (r *ProtocStreamWriter) WriteUInt32(val uint32) error
- func (r *ProtocStreamWriter) WriteUInt64(val uint64) error
- func (r *ProtocStreamWriter) WriteVarint(v uint64) error
- type RecordArrowReader
- type RecordArrowWriter
- type RecordPackStreamWriter
- type RecordProtocReader
- type RecordProtocWriter
- type SnappyFramed
- type StreamUploadSession
- type Tunnel
- func (t *Tunnel) AttachToExistedDownloadSession(projectName, tableName, sessionId string, opts ...Option) (*DownloadSession, error)
- func (t *Tunnel) AttachToExistedUploadSession(projectName, tableName, sessionId string, opts ...Option) (*UploadSession, error)
- func (t *Tunnel) CreateDownloadSession(projectName, tableName string, opts ...Option) (*DownloadSession, error)
- func (t *Tunnel) CreateInstanceResultDownloadSession(projectName, instanceId string, opts ...InstanceOption) (*InstanceResultDownloadSession, error)
- func (t *Tunnel) CreateStreamUploadSession(projectName, tableName string, opts ...Option) (*StreamUploadSession, error)
- func (t *Tunnel) CreateUploadSession(projectName, tableName string, opts ...Option) (*UploadSession, error)
- func (t *Tunnel) HttpTimeout() time.Duration
- func (t *Tunnel) SetHttpTimeout(httpTimeout time.Duration)
- func (t *Tunnel) SetTcpConnectionTimeout(tcpConnectionTimeout time.Duration)
- func (t *Tunnel) TcpConnectionTimeout() time.Duration
- type UploadSession
- func (u *UploadSession) ArrowSchema() *arrow.Schema
- func (u *UploadSession) BlockIds() []int
- func (u *UploadSession) Commit(blockIds []int) error
- func (u *UploadSession) Load() error
- func (u *UploadSession) OpenRecordArrowWriter(blockId int) (*RecordArrowWriter, error)
- func (u *UploadSession) OpenRecordWriter(blockId int) (*RecordProtocWriter, error)
- func (u *UploadSession) PartitionKey() string
- func (u *UploadSession) ResourceUrl() string
- func (u *UploadSession) Schema() tableschema.TableSchema
- func (u *UploadSession) SetPartitionKey(partitionKey string)
- func (u *UploadSession) ShouldTransform() bool
- func (u *UploadSession) Status() UploadStatus
- type UploadStatus
Examples ¶
Constants ¶
const ( DateTransformVersion = "v1" Version = "5" DefaultTcpConnectionTimeout = 10 * time.Second )
const ( MetaCount = protowire.Number(33554430) // magic num 2^25-2 MetaChecksum = protowire.Number(33554431) // magic num 2^25-1 EndRecord = protowire.Number(33553408) // magic num 2^25-1024 )
const DefaultChunkSize = 65536
const DeflateName = "deflate"
const SnappyFramedName = "x-snappy-framed"
Variables ¶
var ArrowCrcErr = errors.New("crc value error when get a tunnel arrow stream")
var DeflateLevel = struct { NoCompression int BestSpeed int BestCompression int DefaultCompression int HuffmanOnly int }{ NoCompression: flate.NoCompression, BestSpeed: flate.BestSpeed, BestCompression: flate.BestCompression, DefaultCompression: flate.DefaultCompression, HuffmanOnly: flate.HuffmanOnly, }
var InstanceSessionCfg = struct { WithTaskName func(string) InstanceOption WithQueryId func(int) InstanceOption WithDefaultDeflateCompressor func() InstanceOption WithDeflateCompressor func(int) InstanceOption WithSnappyFramedCompressor func() InstanceOption EnableLimit func() InstanceOption }{ WithTaskName: withTaskName, WithQueryId: withQueryId, WithDefaultDeflateCompressor: _withDefaultDeflateCompressor, WithDeflateCompressor: _withDeflateCompressor, WithSnappyFramedCompressor: _withSnappyFramedCompressor, EnableLimit: enableLimit, }
var SessionCfg = struct { WithPartitionKey func(string) Option WithSchemaName func(string) Option WithDefaultDeflateCompressor func() Option WithDeflateCompressor func(int) Option WithSnappyFramedCompressor func() Option Overwrite func() Option WithShardId func(int) Option Async func() Option WithSlotNum func(int) Option WithCreatePartition func() Option WithColumns func([]string) Option }{ WithPartitionKey: withPartitionKey, WithSchemaName: withSchemaName, WithDefaultDeflateCompressor: withDefaultDeflateCompressor, WithDeflateCompressor: withDeflateCompressor, WithSnappyFramedCompressor: withSnappyFramedCompressor, Overwrite: overWrite, WithShardId: withShardId, Async: async, WithSlotNum: withSlotNum, WithCreatePartition: withCreatePartition, WithColumns: withColumns, }
Functions ¶
func WrapByCompressor ¶
func WrapByCompressor(rc io.ReadCloser, contentEncoding string) io.ReadCloser
Types ¶
type ArrowStreamReader ¶
type ArrowStreamReader struct {
// contains filtered or unexported fields
}
func NewArrowStreamReader ¶
func NewArrowStreamReader(rc io.ReadCloser) *ArrowStreamReader
func (*ArrowStreamReader) Close ¶
func (ar *ArrowStreamReader) Close() error
func (*ArrowStreamReader) ReadChunk ¶
func (ar *ArrowStreamReader) ReadChunk() error
type ArrowStreamWriter ¶
type ArrowStreamWriter struct {
// contains filtered or unexported fields
}
ArrowStreamWriter calculates the crc value in chunk unit
func NewArrowStreamWriter ¶
func NewArrowStreamWriter(w io.WriteCloser) *ArrowStreamWriter
func (*ArrowStreamWriter) Close ¶
func (aw *ArrowStreamWriter) Close() error
type Compressor ¶
type Compressor interface { Name() string NewReader(readCloser io.ReadCloser) io.ReadCloser NewWriter(writeCloser io.WriteCloser) io.WriteCloser }
type Crc32CheckSum ¶
type Crc32CheckSum struct {
// contains filtered or unexported fields
}
func NewCrc32CheckSum ¶
func NewCrc32CheckSum() Crc32CheckSum
func (*Crc32CheckSum) Reset ¶
func (crc *Crc32CheckSum) Reset()
func (*Crc32CheckSum) Update ¶
func (crc *Crc32CheckSum) Update(data interface{})
Update can not use data of int type, as the size of int is different on 32 and 64 platform. In java the size of int is always 32 bits, so the same int data can generate different crc value when using java and go
func (*Crc32CheckSum) Value ¶
func (crc *Crc32CheckSum) Value() uint32
type Deflate ¶
type Deflate struct {
// contains filtered or unexported fields
}
func (Deflate) NewReader ¶
func (d Deflate) NewReader(rc io.ReadCloser) io.ReadCloser
func (Deflate) NewWriter ¶
func (d Deflate) NewWriter(wc io.WriteCloser) io.WriteCloser
type DownLoadStatus ¶
type DownLoadStatus int
const ( DownloadStatusUnknown DownLoadStatus DownloadStatusNormal DownloadStatusClosed DownloadStatusExpired DownloadStatusInitiating )
func DownloadStatusFromStr ¶
func DownloadStatusFromStr(s string) DownLoadStatus
func (DownLoadStatus) String ¶
func (status DownLoadStatus) String() string
type DownloadSession ¶
type DownloadSession struct { Id string ProjectName string // TODO use schema to get the resource url of a table SchemaName string TableName string Async bool ShardId int Compressor Compressor RestClient restclient.RestClient // contains filtered or unexported fields }
DownloadSession is used to download table data, it can be created by Tunnel. You can use RecordCount to get the count of total records, and can create multiply RecordReader in parallel according the record count to download the data in less time. The RecordArrowReader is the only RecordReader now.
Underneath the RecordReader is the http connection, when no data occurs in it during 300s, the tunnel sever will closeRes it.
func AttachToExistedDownloadSession ¶
func AttachToExistedDownloadSession( sessionId, projectName, tableName string, restClient restclient.RestClient, opts ...Option, ) (*DownloadSession, error)
AttachToExistedDownloadSession get an existed session by the session id. The opts can be one or more of: SessionCfg.WithPartitionKey SessionCfg.WithSchemaName, it doesn't work now SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level SessionCfg.WithDeflateCompressor, using deflate compressor with specific level SessionCfg.WithSnappyFramedCompressor SessionCfg.Overwrite, overwrite data SessionCfg.DisableArrow, disable arrow reader, using protoc reader instead. SessionCfg.ShardId, set the shard id of the table SessionCfg.Async, enable the async mode of the session which can avoiding timeout when there are many small files
func CreateDownloadSession ¶
func CreateDownloadSession( projectName, tableName string, restClient restclient.RestClient, opts ...Option, ) (*DownloadSession, error)
CreateDownloadSession create a new download session before downing data. The opts can be one or more of: SessionCfg.WithPartitionKey SessionCfg.WithSchemaName, it doesn't work now SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level SessionCfg.WithDeflateCompressor, using deflate compressor with specific level SessionCfg.WithSnappyFramedCompressor SessionCfg.Overwrite, overwrite data SessionCfg.DisableArrow, disable arrow reader, using protoc reader instead. SessionCfg.ShardId, set the shard id of the table SessionCfg.Async, enable the async mode of the session which can avoiding timeout when there are many small files
func (*DownloadSession) ArrowSchema ¶
func (ds *DownloadSession) ArrowSchema() *arrow.Schema
func (*DownloadSession) OpenRecordArrowReader ¶
func (ds *DownloadSession) OpenRecordArrowReader(start, count int, columnNames []string) (*RecordArrowReader, error)
func (*DownloadSession) OpenRecordReader ¶
func (ds *DownloadSession) OpenRecordReader(start, count int, columnNames []string) (*RecordProtocReader, error)
func (*DownloadSession) PartitionKey ¶
func (ds *DownloadSession) PartitionKey() string
func (*DownloadSession) RecordCount ¶
func (ds *DownloadSession) RecordCount() int
func (*DownloadSession) ResourceUrl ¶
func (ds *DownloadSession) ResourceUrl() string
func (*DownloadSession) Schema ¶
func (ds *DownloadSession) Schema() tableschema.TableSchema
func (*DownloadSession) SetPartitionKey ¶
func (ds *DownloadSession) SetPartitionKey(partitionKey string)
func (*DownloadSession) ShouldTransformDate ¶
func (ds *DownloadSession) ShouldTransformDate() bool
func (*DownloadSession) Status ¶
func (ds *DownloadSession) Status() DownLoadStatus
type InstanceOption ¶
type InstanceOption func(cfg *instanceSessionConfig)
InstanceOption must be created by InstanceSessionCfg.XXX
type InstanceResultDownloadSession ¶
type InstanceResultDownloadSession struct { Id string InstanceId string ProjectName string TaskName string QueryId int LimitEnabled bool IsLongPolling bool Compressor Compressor RestClient restclient.RestClient // contains filtered or unexported fields }
func AttachToExistedIRDownloadSession ¶
func AttachToExistedIRDownloadSession( downloadId, projectName, instanceId string, restClient restclient.RestClient, opts ...InstanceOption, ) (*InstanceResultDownloadSession, error)
func CreateInstanceResultDownloadSession ¶
func CreateInstanceResultDownloadSession( projectName, instanceId string, restClient restclient.RestClient, opts ...InstanceOption, ) (*InstanceResultDownloadSession, error)
func (*InstanceResultDownloadSession) OpenRecordReader ¶
func (is *InstanceResultDownloadSession) OpenRecordReader( start, count, sizeLimit int, columnNames []string, ) (*RecordProtocReader, error)
OpenRecordReader open a reader to read result of select. The parameter start is the start position to read the result, count is the max number records to read, sizeLit is the max bytes of the result.
func (*InstanceResultDownloadSession) RecordCount ¶
func (is *InstanceResultDownloadSession) RecordCount() int
func (*InstanceResultDownloadSession) ResourceUrl ¶
func (is *InstanceResultDownloadSession) ResourceUrl() string
func (*InstanceResultDownloadSession) Schema ¶
func (is *InstanceResultDownloadSession) Schema() tableschema.TableSchema
func (*InstanceResultDownloadSession) ShouldTransformDate ¶
func (is *InstanceResultDownloadSession) ShouldTransformDate() bool
func (*InstanceResultDownloadSession) Status ¶
func (is *InstanceResultDownloadSession) Status() DownLoadStatus
type Option ¶
type Option func(cfg *sessionConfig)
Option can not be used directly, it can be created by SessionCfg.XXX
type ProtocStreamReader ¶
type ProtocStreamReader struct {
// contains filtered or unexported fields
}
func NewProtocStreamReader ¶
func NewProtocStreamReader(r io.Reader) *ProtocStreamReader
func (*ProtocStreamReader) ReadBool ¶
func (r *ProtocStreamReader) ReadBool() (bool, error)
func (*ProtocStreamReader) ReadBytes ¶
func (r *ProtocStreamReader) ReadBytes() ([]byte, error)
func (*ProtocStreamReader) ReadFixed32 ¶
func (r *ProtocStreamReader) ReadFixed32() (uint32, error)
func (*ProtocStreamReader) ReadFixed64 ¶
func (r *ProtocStreamReader) ReadFixed64() (uint64, error)
func (*ProtocStreamReader) ReadFloat32 ¶
func (r *ProtocStreamReader) ReadFloat32() (float32, error)
func (*ProtocStreamReader) ReadFloat64 ¶
func (r *ProtocStreamReader) ReadFloat64() (float64, error)
func (*ProtocStreamReader) ReadInt32 ¶
func (r *ProtocStreamReader) ReadInt32() (int32, error)
func (*ProtocStreamReader) ReadInt64 ¶
func (r *ProtocStreamReader) ReadInt64() (int64, error)
func (*ProtocStreamReader) ReadSFixed32 ¶
func (r *ProtocStreamReader) ReadSFixed32() (int32, error)
func (*ProtocStreamReader) ReadSFixed64 ¶
func (r *ProtocStreamReader) ReadSFixed64() (int64, error)
func (*ProtocStreamReader) ReadSInt32 ¶
func (r *ProtocStreamReader) ReadSInt32() (int32, error)
func (*ProtocStreamReader) ReadSInt64 ¶
func (r *ProtocStreamReader) ReadSInt64() (int64, error)
func (*ProtocStreamReader) ReadString ¶
func (r *ProtocStreamReader) ReadString() (string, error)
func (*ProtocStreamReader) ReadUInt32 ¶
func (r *ProtocStreamReader) ReadUInt32() (uint32, error)
func (*ProtocStreamReader) ReadUInt64 ¶
func (r *ProtocStreamReader) ReadUInt64() (uint64, error)
func (*ProtocStreamReader) ReadVarint ¶
func (r *ProtocStreamReader) ReadVarint() (uint64, error)
type ProtocStreamWriter ¶
type ProtocStreamWriter struct {
// contains filtered or unexported fields
}
func NewProtocStreamWriter ¶
func NewProtocStreamWriter(w io.Writer) *ProtocStreamWriter
func (*ProtocStreamWriter) WriteBool ¶
func (r *ProtocStreamWriter) WriteBool(val bool) error
func (*ProtocStreamWriter) WriteBytes ¶
func (r *ProtocStreamWriter) WriteBytes(b []byte) error
func (*ProtocStreamWriter) WriteFixed32 ¶
func (r *ProtocStreamWriter) WriteFixed32(val uint32) error
func (*ProtocStreamWriter) WriteFixed64 ¶
func (r *ProtocStreamWriter) WriteFixed64(val uint64) error
func (*ProtocStreamWriter) WriteFloat32 ¶
func (r *ProtocStreamWriter) WriteFloat32(val float32) error
func (*ProtocStreamWriter) WriteFloat64 ¶
func (r *ProtocStreamWriter) WriteFloat64(val float64) error
func (*ProtocStreamWriter) WriteInt32 ¶
func (r *ProtocStreamWriter) WriteInt32(val int32) error
func (*ProtocStreamWriter) WriteInt64 ¶
func (r *ProtocStreamWriter) WriteInt64(val int64) error
func (*ProtocStreamWriter) WriteSInt32 ¶
func (r *ProtocStreamWriter) WriteSInt32(val int32) error
func (*ProtocStreamWriter) WriteSInt64 ¶
func (r *ProtocStreamWriter) WriteSInt64(val int64) error
func (*ProtocStreamWriter) WriteUInt32 ¶
func (r *ProtocStreamWriter) WriteUInt32(val uint32) error
func (*ProtocStreamWriter) WriteUInt64 ¶
func (r *ProtocStreamWriter) WriteUInt64(val uint64) error
func (*ProtocStreamWriter) WriteVarint ¶
func (r *ProtocStreamWriter) WriteVarint(v uint64) error
type RecordArrowReader ¶
type RecordArrowReader struct {
// contains filtered or unexported fields
}
func (*RecordArrowReader) Close ¶
func (r *RecordArrowReader) Close() error
func (*RecordArrowReader) HttpRes ¶
func (r *RecordArrowReader) HttpRes() *http.Response
func (*RecordArrowReader) Iterator ¶
func (r *RecordArrowReader) Iterator(f func(array.Record, error))
func (*RecordArrowReader) RecordBatchReader ¶
func (r *RecordArrowReader) RecordBatchReader() *ipc.RecordBatchReader
type RecordArrowWriter ¶
type RecordArrowWriter struct {
// contains filtered or unexported fields
}
func (*RecordArrowWriter) Close ¶
func (writer *RecordArrowWriter) Close() error
func (*RecordArrowWriter) WriteArrowRecord ¶
func (writer *RecordArrowWriter) WriteArrowRecord(record array.Record) error
type RecordPackStreamWriter ¶
type RecordPackStreamWriter struct {
// contains filtered or unexported fields
}
func (*RecordPackStreamWriter) Append ¶
func (rsw *RecordPackStreamWriter) Append(record data.Record) error
func (*RecordPackStreamWriter) DataSize ¶
func (rsw *RecordPackStreamWriter) DataSize() int64
func (*RecordPackStreamWriter) Flush ¶
func (rsw *RecordPackStreamWriter) Flush(timeout_ ...time.Duration) (string, error)
func (*RecordPackStreamWriter) RecordCount ¶
func (rsw *RecordPackStreamWriter) RecordCount() int64
type RecordProtocReader ¶
type RecordProtocReader struct {
// contains filtered or unexported fields
}
func (*RecordProtocReader) Close ¶
func (r *RecordProtocReader) Close() error
func (*RecordProtocReader) HttpRes ¶
func (r *RecordProtocReader) HttpRes() *http.Response
type RecordProtocWriter ¶
type RecordProtocWriter struct {
// contains filtered or unexported fields
}
func (*RecordProtocWriter) Close ¶
func (r *RecordProtocWriter) Close() error
type SnappyFramed ¶
type SnappyFramed int
func (SnappyFramed) Name ¶
func (s SnappyFramed) Name() string
func (SnappyFramed) NewReader ¶
func (s SnappyFramed) NewReader(rc io.ReadCloser) io.ReadCloser
func (SnappyFramed) NewWriter ¶
func (s SnappyFramed) NewWriter(wc io.WriteCloser) io.WriteCloser
type StreamUploadSession ¶
type StreamUploadSession struct { ProjectName string // TODO use schema to get the resource url of a table SchemaName string TableName string Compressor Compressor RestClient restclient.RestClient Columns []string P2PMode bool CreatePartition bool SlotNum int // contains filtered or unexported fields }
func CreateStreamUploadSession ¶
func CreateStreamUploadSession( projectName, tableName string, restClient restclient.RestClient, opts ...Option, ) (*StreamUploadSession, error)
CreateStreamUploadSession create a new stream upload session before uploading data。 The opts can be one or more of: SessionCfg.WithPartitionKey SessionCfg.WithSchemaName, it doesn't work now SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level SessionCfg.WithDeflateCompressor, using deflate compressor with specific level SessionCfg.WithSnappyFramedCompressor SessionCfg.SlotNum, 暂不对外开放 SessionCfg.CreatePartition, create partition if the partition specified by WithPartitionKey does not exist SessionCfg.Columns, TODO 作用待明确
func (*StreamUploadSession) OpenRecordPackWriter ¶
func (su *StreamUploadSession) OpenRecordPackWriter() *RecordPackStreamWriter
func (*StreamUploadSession) ResourceUrl ¶
func (su *StreamUploadSession) ResourceUrl() string
func (*StreamUploadSession) Schema ¶
func (su *StreamUploadSession) Schema() tableschema.TableSchema
type Tunnel ¶
type Tunnel struct {
// contains filtered or unexported fields
}
Tunnel is used to upload or download data in odps, it can also be used to download the result of sql query. From the begging of one upload or download to the ending is called a session. As some table is very big, more than one http connections are used for the upload or download, all the http connections are created by session. The timeout of session is 24 hours
The typical table upload processes are 1. create tunnel 2. create UploadSession 3. create RecordWriter, use the writer to write Record data 4. commit the data
The typical table download processes are 1. create tunnel 2. create DownloadSession 3. create RecordReader, use the reader to read out Record
func (*Tunnel) AttachToExistedDownloadSession ¶
func (t *Tunnel) AttachToExistedDownloadSession( projectName, tableName, sessionId string, opts ...Option) (*DownloadSession, error)
func (*Tunnel) AttachToExistedUploadSession ¶
func (t *Tunnel) AttachToExistedUploadSession( projectName, tableName, sessionId string, opts ...Option) (*UploadSession, error)
func (*Tunnel) CreateDownloadSession ¶
func (t *Tunnel) CreateDownloadSession(projectName, tableName string, opts ...Option) (*DownloadSession, error)
func (*Tunnel) CreateInstanceResultDownloadSession ¶
func (t *Tunnel) CreateInstanceResultDownloadSession( projectName, instanceId string, opts ...InstanceOption, ) (*InstanceResultDownloadSession, error)
func (*Tunnel) CreateStreamUploadSession ¶
func (t *Tunnel) CreateStreamUploadSession(projectName, tableName string, opts ...Option) (*StreamUploadSession, error)
func (*Tunnel) CreateUploadSession ¶
func (t *Tunnel) CreateUploadSession(projectName, tableName string, opts ...Option) (*UploadSession, error)
func (*Tunnel) HttpTimeout ¶
func (*Tunnel) SetHttpTimeout ¶
func (*Tunnel) SetTcpConnectionTimeout ¶
func (*Tunnel) TcpConnectionTimeout ¶
type UploadSession ¶
type UploadSession struct { Id string ProjectName string // TODO use schema to get the resource url of a table SchemaName string TableName string Overwrite bool Compressor Compressor RestClient restclient.RestClient // contains filtered or unexported fields }
UploadSession works as "insert into", multiply sessions for the same table or partition do not affect each other. Session id is the unique identifier of a session。
UploadSession uses OpenRecordArrowWriter to create a RecordArrowWriter or OpenRecordWriter to create a RecordProtocWriter for writing data into a table. Each RecordWriter uses a http connection to transfer data with the tunnel server, and each UploadSession can create multiply RecordWriters, so multiply http connections can be used to upload data in parallel.
A block id must be given when creating a RecordWriter, it is the unique identifier of a writer. The block id can be one number in [0, 20000)。A single RecordWriter can write at most 100G data。If multiply RecordWriters are created with the same block id, the data will be overwritten, and only the data from the writer who calls Close lastly will be kept.
The timeout of http connection used by RecordWriter is 120s, the sever will closeRes the connection when no data occurs in the connection during 120 seconds.
The Commit method must be called to notify the server that all data has been upload and the data can be written into the table
In particular, the partition keys used by a session can not contain "'", for example, "region=hangzhou" is a positive case, and "region='hangzhou'" is a negative case. But the partition keys like "region='hangzhou'" are more common, to avoid the users use the error format, the partitionKey of UploadSession is private, it can be set when creating a session or using SetPartitionKey.
func AttachToExistedUploadSession ¶
func AttachToExistedUploadSession( sessionId, projectName, tableName string, restClient restclient.RestClient, opts ...Option) (*UploadSession, error)
AttachToExistedUploadSession get an existed session by the session id. The opts can be one or more of: SessionCfg.WithPartitionKey SessionCfg.WithSchemaName, it doesn't work now SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level SessionCfg.WithDeflateCompressor, using deflate compressor with specific level SessionCfg.WithSnappyFramedCompressor SessionCfg.Overwrite, overwrite data SessionCfg.UseArrow, it is the default config
func CreateUploadSession ¶
func CreateUploadSession( projectName, tableName string, restClient restclient.RestClient, opts ...Option, ) (*UploadSession, error)
CreateUploadSession create a new upload session before uploading data。 The opts can be one or more of: SessionCfg.WithPartitionKey SessionCfg.WithSchemaName, it doesn't work now SessionCfg.WithDefaultDeflateCompressor, using deflate compressor with default level SessionCfg.WithDeflateCompressor, using deflate compressor with specific level SessionCfg.WithSnappyFramedCompressor SessionCfg.Overwrite, overwrite data
func (*UploadSession) ArrowSchema ¶
func (u *UploadSession) ArrowSchema() *arrow.Schema
func (*UploadSession) BlockIds ¶
func (u *UploadSession) BlockIds() []int
func (*UploadSession) Commit ¶
func (u *UploadSession) Commit(blockIds []int) error
func (*UploadSession) Load ¶
func (u *UploadSession) Load() error
func (*UploadSession) OpenRecordArrowWriter ¶
func (u *UploadSession) OpenRecordArrowWriter(blockId int) (*RecordArrowWriter, error)
func (*UploadSession) OpenRecordWriter ¶
func (u *UploadSession) OpenRecordWriter(blockId int) (*RecordProtocWriter, error)
func (*UploadSession) PartitionKey ¶
func (u *UploadSession) PartitionKey() string
func (*UploadSession) ResourceUrl ¶
func (u *UploadSession) ResourceUrl() string
func (*UploadSession) Schema ¶
func (u *UploadSession) Schema() tableschema.TableSchema
func (*UploadSession) SetPartitionKey ¶
func (u *UploadSession) SetPartitionKey(partitionKey string)
func (*UploadSession) ShouldTransform ¶
func (u *UploadSession) ShouldTransform() bool
func (*UploadSession) Status ¶
func (u *UploadSession) Status() UploadStatus
type UploadStatus ¶
type UploadStatus int
const ( UploadStatusUnknown UploadStatus UploadStatusNormal UploadStatusClosing UploadStatusClosed UploadStatusCanceled UploadStatusExpired UploadStatusCritical UploadStatusCommitting )
func UploadStatusFromStr ¶
func UploadStatusFromStr(s string) UploadStatus
func (UploadStatus) String ¶
func (status UploadStatus) String() string
Source Files ¶
- arrow_stream_reader.go
- arrow_stream_writer.go
- common.go
- compressor.go
- const.go
- crc32_checksum.go
- download_session.go
- http_conn.go
- instance_result_download_session.go
- instance_session_config.go
- protoc_common.go
- protoc_stream_reader.go
- protoc_stream_writer.go
- record_arrow_reader.go
- record_arrow_writer.go
- record_pack_stream_writer.go
- record_protoc_reader.go
- record_protoc_writer.go
- session_config.go
- slot.go
- stream_upload_session.go
- tunnel.go
- upload_session.go