odps

package
v0.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 30, 2024 License: Apache-2.0 Imports: 20 Imported by: 5

Documentation

Index

Examples

Constants

View Source
const (
	TaskWaiting
	TaskRunning
	TaskSuccess
	TaskFailed
	TaskSuspended
	TaskCancelled
	TaskStatusUnknown
)
View Source
const (
	ProjectStatusAvailable
	ProjectStatusReadOnly
	ProjectStatusDeleting
	ProjectStatusFrozen
	ProjectStatusUnKnown
)
View Source
const (
	// ProjectTypeManaged ordinary project
	ProjectTypeManaged = "managed"
	// ProjectExternalExternal external project,like hive
	ProjectExternalExternal = "external"
)
View Source
const DefaultJobPriority = 9
View Source
const (
	HostDefault = "https://logview.alibaba-inc.com"
)

Variables

View Source
var InstanceFilter = struct {
	// Only get instances with a given status
	Status func(InstanceStatus) InsFilterFunc
	// Only get instances that create by the current account
	OnlyOwner func() InsFilterFunc
	// Instance 运行所在 quota 组过滤条件
	QuotaIndex func(string) InsFilterFunc
	// Get instances running between start and end times
	TimeRange func(time.Time, time.Time) InsFilterFunc
}{
	Status: func(status InstanceStatus) InsFilterFunc {
		return func(values url.Values) {
			if status != 0 {
				values.Set("status", status.String())
			}
		}
	},

	OnlyOwner: func() InsFilterFunc {
		return func(values url.Values) {
			values.Set("onlyowner", "yes")
		}
	},

	QuotaIndex: func(s string) InsFilterFunc {
		return func(values url.Values) {
			values.Set("quotaindex", s)
		}
	},

	TimeRange: func(s time.Time, e time.Time) InsFilterFunc {
		return func(values url.Values) {
			startTime := strconv.FormatInt(s.Unix(), 10)
			endTime := strconv.FormatInt(e.Unix(), 10)

			dateRange := fmt.Sprintf("%s:%s", startTime, endTime)
			values.Set("daterange", dateRange)
		}
	},
}
View Source
var ProjectFilter = struct {
	// Filter out projects with a name prefix
	NamePrefix func(string) PFilterFunc
	// Filter out projects with project owner name, this filter cannot be used with `User` together
	Owner func(string) PFilterFunc
	// Filter out projects with a project member name
	User func(string) PFilterFunc
	// Filter out projects with the project group name
	Group func(string) PFilterFunc
}{
	NamePrefix: withProjectNamePrefix,
	Owner:      withProjectOwner,
	User:       withUserInProject,
	Group:      withProjectGroup,
}
View Source
var TableFilter = struct {
	// Weather get extended information or not
	Extended func() TFilterFunc
	// Filter out tables with name prefix
	NamePrefix func(string) TFilterFunc
	// Filter out tables with owner name
	Owner func(string) TFilterFunc
	// Filter out tables with table type
	Type func(TableType) TFilterFunc
}{
	Extended: func() TFilterFunc {
		return func(values url.Values) {
			values.Set("extended", "")
		}
	},
	NamePrefix: func(name string) TFilterFunc {
		return func(values url.Values) {
			values.Set("name", name)
		}
	},
	Owner: func(owner string) TFilterFunc {
		return func(values url.Values) {
			values.Set("owner", owner)
		}
	},
	Type: func(tableType TableType) TFilterFunc {
		return func(values url.Values) {
			values.Set("type", tableType.String())
		}
	},
}

Functions

This section is empty.

Types

type Cluster

type Cluster struct {
	Name    string          `xml:"Name"`
	QuotaId string          `xml:"QuotaId"`
	Quotas  []OptionalQuota `xml:"Quotas"`
}

type Config

type Config struct {
	AccessId             string
	AccessKey            string
	StsToken             string
	Endpoint             string
	ProjectName          string
	TcpConnectionTimeout time.Duration
	HttpTimeout          time.Duration
	TunnelEndpoint       string
	TunnelQuotaName      string
	Hints                map[string]string
	Others               map[string]string
}

Config is the basic config for odps. The NewConfig function should be used, which sets default values.

func NewConfig

func NewConfig() *Config

func NewConfigFromIni

func NewConfigFromIni(iniPath string) (*Config, error)

func (*Config) FormatDsn

func (c *Config) FormatDsn() string

func (*Config) GenAccount

func (c *Config) GenAccount() account2.Account

func (*Config) GenOdps

func (c *Config) GenOdps() *Odps

func (*Config) GenRestClient

func (c *Config) GenRestClient() restclient.RestClient

type InsFilterFunc

type InsFilterFunc func(values url.Values)

type Instance

type Instance struct {
	// contains filtered or unexported fields
}

func NewInstance

func NewInstance(odpsIns *Odps, projectName, instanceId string) Instance

func (*Instance) EndTime

func (instance *Instance) EndTime() time.Time

func (*Instance) GetCachedInfo

func (instance *Instance) GetCachedInfo() (string, error)

GetCachedInfo 获取instance cached信息,返回的是json字符串,需要自己进行解析

Example
instances := odps.NewInstances(odpsIns)
sqlTask := odps.NewSqlTask("hello1", "select * from user;", "", nil)
instance, err := instances.CreateTask(defaultProjectName, &sqlTask)

if err != nil {
	log.Fatalf("%+v", err)
}
println(instance.Id())

i, err := instance.GetCachedInfo()
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("%+v", i))
Output:

func (*Instance) GetResult

func (instance *Instance) GetResult() ([]TaskResult, error)

func (*Instance) GetTaskDetail

func (instance *Instance) GetTaskDetail(taskName string) ([]byte, error)

func (*Instance) GetTaskProgress

func (instance *Instance) GetTaskProgress(taskName string) ([]TaskProgressStage, error)
Example
instances := odps.NewInstances(odpsIns)
sqlTask := odps.NewSqlTask("hello", "select count(*) from sale_detail;", "", nil)
instance, err := instances.CreateTask(defaultProjectName, &sqlTask)

if err != nil {
	log.Fatalf("%+v", err)
}

println(instance.Id())

for i := 0; i < 5; i++ {
	progress, err := instance.GetTaskProgress("hello")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for _, stage := range progress {
		println(fmt.Sprintf("%+v", stage))

	}

	time.Sleep(time.Second * 1)
}

body, err := instance.GetTaskDetail("hello")
if err != nil {
	log.Fatalf("%+v", err)
}

println(string(body))
Output:

func (*Instance) GetTaskQuotaJson

func (instance *Instance) GetTaskQuotaJson(taskName string) (string, error)

func (*Instance) GetTaskSummary

func (instance *Instance) GetTaskSummary(taskName string) (*TaskSummary, error)
Example
instances := odps.NewInstances(odpsIns)
sqlTask := odps.NewSqlTask("hello1", "select count(*) from sale_detail;", "", nil)
instance, err := instances.CreateTask(defaultProjectName, &sqlTask)

if err != nil {
	log.Fatalf("%+v", err)
}
println(instance.Id())
_ = instance.WaitForSuccess()

taskSummary, err := instance.GetTaskSummary("hello1")
if err != nil {
	log.Fatalf("%+v", err)
}
println(fmt.Sprintf("%s\n%s\n", taskSummary.JsonSummary, taskSummary.Summary))
Output:

func (*Instance) GetTasks

func (instance *Instance) GetTasks() ([]TaskInInstance, error)

GetTasks 绝大部分时候返回一个Task(名字与提交的task名字相同),返回多个task的情况我还没有遇到过

func (*Instance) Id

func (instance *Instance) Id() string

func (*Instance) IsAsync

func (instance *Instance) IsAsync() bool

func (*Instance) IsLoaded

func (instance *Instance) IsLoaded() bool

func (*Instance) IsSync

func (instance *Instance) IsSync() bool

func (*Instance) Load

func (instance *Instance) Load() error

func (*Instance) Owner

func (instance *Instance) Owner() string

func (*Instance) ProjectName

func (instance *Instance) ProjectName() string

func (*Instance) StartTime

func (instance *Instance) StartTime() time.Time

func (*Instance) Status

func (instance *Instance) Status() InstanceStatus

func (*Instance) TaskNameCommitted

func (instance *Instance) TaskNameCommitted() string

func (*Instance) TaskResults

func (instance *Instance) TaskResults() []TaskResult

func (*Instance) Terminate

func (instance *Instance) Terminate() error
Example
instances := odps.NewInstances(odpsIns)
sqlTask := odps.NewSqlTask("hello", "select count(*) from user;", "", nil)
instance, err := instances.CreateTask(defaultProjectName, &sqlTask)

if err != nil {
	log.Fatalf("%+v", err)
}

println(instance.Id())

err = instance.Terminate()
if err != nil {
	log.Fatalf("%+v", err)
}

err = instance.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("%s, %s, %s", instance.StartTime(), instance.EndTime(), instance.Status()))
Output:

func (*Instance) WaitForSuccess

func (instance *Instance) WaitForSuccess() error

type InstanceOrErr

type InstanceOrErr struct {
	Ins *Instance
	Err error
}

InstanceOrErr is used for the return value of Instances.List

type InstanceStatus

type InstanceStatus int
const (
	InstanceRunning InstanceStatus
	InstanceSuspended
	InstanceTerminated
	InstanceStatusUnknown
)

func InstancesStatusFromStr

func InstancesStatusFromStr(s string) InstanceStatus

func (*InstanceStatus) MarshalXML

func (status *InstanceStatus) MarshalXML(d *xml.Encoder, start xml.StartElement) error

func (InstanceStatus) String

func (status InstanceStatus) String() string

func (*InstanceStatus) UnmarshalXML

func (status *InstanceStatus) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error

type Instances

type Instances struct {
	// contains filtered or unexported fields
}

Instances is used to get or create instance(s)

func NewInstances

func NewInstances(odpsIns *Odps, projectName ...string) Instances

NewInstances create Instances object, if the projectName is not set, the default project name of odpsIns will be used

func (Instances) CreateTask

func (instances Instances) CreateTask(projectName string, task Task) (*Instance, error)
Example
instances := odpsIns.Instances()
sqlTask := odps.NewSqlTask("hello", "select count(*) from sale_detail;", "", nil)
instance, err := instances.CreateTask(defaultProjectName, &sqlTask)

if err != nil {
	log.Fatalf("%+v", err)
}

println(instance.Id())

err = instance.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("%s, %s, %s", instance.StartTime(), instance.EndTime(), instance.Status()))

timeFormat := "2006-01-02 15:04:05"

Loop:
for {
	tasks, err := instance.GetTasks()
	task := tasks[0]
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(
		fmt.Sprintf(
			"%s, %s, %s, %s",
			task.StartTime.Format(timeFormat), task.EndTime.Format(timeFormat), task.Status, task.Name,
		))

	switch task.Status {
	case odps.TaskCancelled, odps.TaskFailed, odps.TaskSuccess:
		break Loop
	}

	time.Sleep(time.Second * 2)
}

err = instance.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

body, err := instance.GetTaskDetail("hello")
if err != nil {
	log.Fatalf("%+v", err)
}

println(string(body))

println(
	fmt.Sprintf(
		"%s, %s, %s",
		instance.StartTime().Format(timeFormat), instance.EndTime().Format(timeFormat), instance.Status(),
	))
Output:

func (Instances) CreateTaskWithPriority

func (instances Instances) CreateTaskWithPriority(projectName string, task Task, jobPriority int) (*Instance, error)

func (Instances) List

func (instances Instances) List(f func(*Instance, error), filters ...InsFilterFunc)

List Get all instances, the filters can be given with InstanceFilter.Status, InstanceFilter.OnlyOwner, InstanceFilter.QuotaIndex, InstanceFilter.TimeRange

Example
ins := odpsIns.Instances()
timeFormat := "2006-01-02 15:04:05"
startTime, _ := time.Parse(timeFormat, "2021-11-15 02:15:30")
endTime, _ := time.Parse(timeFormat, "2021-11-18 06:22:02")

var f = func(i *odps.Instance, err error) {
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(
		fmt.Sprintf(
			"%s, %s, %s, %s, %s",
			i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
		))
}
ins.List(f, odps.InstanceFilter.TimeRange(startTime, endTime))
Output:

func (Instances) ListInstancesQueued

func (instances Instances) ListInstancesQueued(filters ...InsFilterFunc) ([]string, error)

ListInstancesQueued Get all instance Queued information, the information is in json string,you need parse it yourself。 The filters can be given with InstanceFilter.Status, InstanceFilter.OnlyOwner, InstanceFilter.QuotaIndex, InstanceFilter.TimeRange

Example
ins := odpsIns.Instances()

instances, err := ins.ListInstancesQueued()
if err != nil {
	log.Fatalf("%+v", err)
}

for _, i := range instances {
	println(fmt.Sprintf("%+v", i))
}
Output:

type LogView

type LogView struct {
	// contains filtered or unexported fields
}

func NewLogView

func NewLogView(odpsIns *Odps) *LogView

func (*LogView) GenerateLogView

func (lv *LogView) GenerateLogView(instance *Instance, hours int) (string, error)

func (*LogView) LogViewHost

func (lv *LogView) LogViewHost() string

func (*LogView) SetLogViewHost

func (lv *LogView) SetLogViewHost(logViewHost string)

type MergeTask

type MergeTask struct {
	XMLName  xml.Name `xml:"Merge"`
	TaskName `xml:"Name"`
	Comment  string
	Tables   []string `xml:"Tables>TableName"`
	TaskConfig
}

func (*MergeTask) AddTask

func (t *MergeTask) AddTask(taskName string)

func (*MergeTask) TaskType

func (t *MergeTask) TaskType() string

type Odps

type Odps struct {
	// contains filtered or unexported fields
}

func NewOdps

func NewOdps(account account2.Account, endpoint string) *Odps

func (*Odps) Account

func (odps *Odps) Account() account2.Account

func (*Odps) DefaultProject

func (odps *Odps) DefaultProject() Project

func (*Odps) DefaultProjectName

func (odps *Odps) DefaultProjectName() string

func (*Odps) ExecSQl

func (odps *Odps) ExecSQl(sql string) (*Instance, error)

func (*Odps) ExecSQlWithHints

func (odps *Odps) ExecSQlWithHints(sql string, hints map[string]string) (*Instance, error)

func (*Odps) Instance

func (odps *Odps) Instance(instanceId string) Instance

func (*Odps) Instances

func (odps *Odps) Instances() Instances

func (*Odps) LogView

func (odps *Odps) LogView() LogView

func (*Odps) Project

func (odps *Odps) Project(name string) Project

func (*Odps) Projects

func (odps *Odps) Projects() Projects

func (*Odps) RestClient

func (odps *Odps) RestClient() restclient.RestClient

func (*Odps) SetDefaultProjectName

func (odps *Odps) SetDefaultProjectName(projectName string)

func (*Odps) SetHttpTimeout

func (odps *Odps) SetHttpTimeout(t time.Duration)

func (*Odps) SetTcpConnectTimeout

func (odps *Odps) SetTcpConnectTimeout(t time.Duration)

func (*Odps) SetUserAgent

func (odps *Odps) SetUserAgent(userAgent string)

func (*Odps) Table

func (odps *Odps) Table(name string) Table

func (*Odps) Tables

func (odps *Odps) Tables() Tables

type OptionalQuota

type OptionalQuota struct {
	XMLName    xml.Name          `xml:"OptionalQuota"`
	QuotaId    string            `xml:"QuotaID"`
	Properties common.Properties `xml:"Properties"`
}

type PFilterFunc

type PFilterFunc func(url.Values)

type Partition

type Partition struct {
	// contains filtered or unexported fields
}

Partition ODPS分区表中一个特定的分区

func NewPartition

func NewPartition(odpsIns *Odps, projectName, tableName string, kv map[string]string) Partition

func (*Partition) CreatedTime

func (p *Partition) CreatedTime() time.Time

func (*Partition) FileNumEx

func (p *Partition) FileNumEx() int

func (*Partition) IsArchivedEx

func (p *Partition) IsArchivedEx() bool

func (*Partition) LastDDLTime

func (p *Partition) LastDDLTime() time.Time

LastDDLTime 分区Meta修改时间

func (*Partition) LastModifiedTime

func (p *Partition) LastModifiedTime() time.Time

func (*Partition) LifeCycleEx

func (p *Partition) LifeCycleEx() int

func (*Partition) Load

func (p *Partition) Load() error
Example
kv := make(map[string]string, 2)
kv["sale_date"] = "201910"
kv["region"] = "shanghai"

partition := odps.NewPartition(odpsIns, "project_1", "sale_detail", kv)
err := partition.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("Name: %s", partition.Name()))
println(fmt.Sprintf("Record number: %d", partition.RecordNum()))
println(fmt.Sprintf("Create Time: %s", partition.CreatedTime()))
Output:

func (*Partition) LoadExtended

func (p *Partition) LoadExtended() error
Example
kv := make(map[string]string, 2)
kv["sale_date"] = "201910"
kv["region"] = "shanghai"

partition := odps.NewPartition(odpsIns, "project_1", "sale_detail", kv)
err := partition.LoadExtended()
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("Name: %s", partition.Name()))
println(fmt.Sprintf("File number: %d", partition.FileNumEx()))
println(fmt.Sprintf("PhysicalSizefd: %d", partition.PhysicalSizeEx()))
println(fmt.Sprintf("Reserved: %s", partition.ReservedEx()))
Output:

func (*Partition) Name

func (p *Partition) Name() string

Name return string with format like "a=xx,b=yy"

func (*Partition) PhysicalSizeEx

func (p *Partition) PhysicalSizeEx() int

func (*Partition) RecordNum

func (p *Partition) RecordNum() int

RecordNum 获取分区数据的Record数,若无准确数据,则返回-1

func (*Partition) ReservedEx

func (p *Partition) ReservedEx() string

ReservedEx 返回扩展信息的保留字段 json 字符串

func (*Partition) Size

func (p *Partition) Size() int

type Project

type Project struct {
	// contains filtered or unexported fields
}
Example
projects := odpsIns.Projects()
project := projects.Get(defaultProjectName)

if err := project.Load(); err != nil {
	panic(err)
}

println(project.Status())
println(project.Owner())

creationTime := project.CreationTime()
println(creationTime.Format(time.RFC1123))

lastModifiedTime := project.LastModifiedTime()
println(lastModifiedTime.Format(time.RFC1123))

defaultCluster, _ := project.GetDefaultCluster()
println(defaultCluster)

println("************all properties")
allProperties, _ := project.GetAllProperties()
for _, p := range allProperties {
	println(p.Name, p.Value)
}
println("************extended properties")
extendedProperties, _ := project.GetExtendedProperties()
for _, p := range extendedProperties {
	println(p.Name, p.Value)
}

isExisted := project.Existed()
println(isExisted)
Output:

func NewProject

func NewProject(name string, odpsIns *Odps) Project

func (*Project) Comment

func (p *Project) Comment() string

func (*Project) CreationTime

func (p *Project) CreationTime() time.Time

func (*Project) Existed

func (p *Project) Existed() bool

func (*Project) GetAllProperties

func (p *Project) GetAllProperties() (common.Properties, error)

GetAllProperties get all the configurable properties of the project, including the properties inherit from group. **note**, this method may return error when something wrong during loading data from the api serer

func (*Project) GetClusters

func (p *Project) GetClusters() ([]Cluster, error)

GetClusters Get information of clusters owned by this project. This is an internal method for group-api. **note**, this method may return error when something wrong during loading data from the api serer

func (*Project) GetDefaultCluster

func (p *Project) GetDefaultCluster() (string, error)

GetDefaultCluster Get default cluster. This is an internal method for group-api. Returns efault cluster when called by group owner, otherwise ,null. **note**, this method may return error when something wrong during loading data from the api serer

func (*Project) GetExtendedProperties

func (p *Project) GetExtendedProperties() (common.Properties, error)

GetExtendedProperties get the extended properties of the project **note**, this method may return error when something wrong during loading data from the api serer

func (*Project) GetTunnelEndpoint

func (p *Project) GetTunnelEndpoint(quotaNames ...string) (string, error)
Example
project := odpsIns.DefaultProject()
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
	log.Fatalf("%+v", err)
} else {
	println(tunnelEndpoint)
}
Output:

func (*Project) IsLoaded

func (p *Project) IsLoaded() bool

IsLoaded whether `Load()` has been called

func (*Project) LastModifiedTime

func (p *Project) LastModifiedTime() time.Time

func (*Project) Load

func (p *Project) Load() error

Load should be called before get properties of project

func (*Project) Name

func (p *Project) Name() string

func (*Project) OdpsIns

func (p *Project) OdpsIns() *Odps

func (*Project) Owner

func (p *Project) Owner() string

func (*Project) ProjectGroupName

func (p *Project) ProjectGroupName() string

func (*Project) PropertiesHasBeSet

func (p *Project) PropertiesHasBeSet() common.Properties

PropertiesHasBeSet Properties get the properties those have be set for the project

func (*Project) RestClient

func (p *Project) RestClient() restclient.RestClient

func (*Project) SecurityManager

func (p *Project) SecurityManager() security.Manager

func (*Project) Status

func (p *Project) Status() ProjectStatus

func (*Project) Type

func (p *Project) Type() string

func (*Project) Update

func (p *Project) Update(properties map[string]string) error

Update the project properties, the properties are different in different versioned odps. When the "properties" is nil, the system will give the project all the default properties. You'd better ask technique support help when using this method.

type ProjectStatus

type ProjectStatus int

func (*ProjectStatus) FromStr

func (status *ProjectStatus) FromStr(s string)

func (ProjectStatus) MarshalXML

func (status ProjectStatus) MarshalXML(d *xml.Encoder, start xml.StartElement) error

func (ProjectStatus) String

func (status ProjectStatus) String() string

func (*ProjectStatus) UnmarshalXML

func (status *ProjectStatus) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error

type Projects

type Projects struct {
	// contains filtered or unexported fields
}

func NewProjects

func NewProjects(odps *Odps) Projects

func (*Projects) CreateExternalProject

func (p *Projects) CreateExternalProject(projectName string) error

CreateExternalProject unimplemented!

func (*Projects) DeleteExternalProject

func (p *Projects) DeleteExternalProject(projectName string) error

DeleteExternalProject unimplemented!

func (*Projects) Exists

func (p *Projects) Exists(projectName string) (bool, error)
Example
projects := odpsIns.Projects()
projectName := "project_1"

existed, err := projects.Exists(projectName)
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("%s existed: %t", projectName, existed))
Output:

func (*Projects) Get

func (p *Projects) Get(projectName string) Project

func (*Projects) GetDefaultProject

func (p *Projects) GetDefaultProject() Project

func (*Projects) List

func (p *Projects) List(filters ...PFilterFunc) ([]Project, error)

List get all the projects thant current account can access in the specific endpoint filters can be specified with ProjectFilter.NamePrefix, ProjectFilter.Owner, ProjectFilter.User, ProjectFilter.Group

Example
projectsIns := odpsIns.Projects()
projects, err := projectsIns.List(odps.ProjectFilter.NamePrefix("p"))

if err != nil {
	log.Fatalf("%+v", err)
}

for _, project := range projects {
	println(fmt.Sprintf("%+v", project))
}
Output:

func (*Projects) UpdateProject

func (p *Projects) UpdateProject(projectName string) error

type SQLCostTask

type SQLCostTask struct {
	XMLName xml.Name `xml:"SQLCost"`
	SQLTask
}

func NewSQLCostTask

func NewSQLCostTask(name string, query string, comment string, hints map[string]string) SQLCostTask

func (*SQLCostTask) TaskType

func (t *SQLCostTask) TaskType() string

type SQLPlanTask

type SQLPlanTask struct {
	XMLName xml.Name `xml:"SQLPlan"`
	SQLTask
}

func (*SQLPlanTask) TaskType

func (t *SQLPlanTask) TaskType() string

type SQLRTTask

type SQLRTTask struct {
	XMLName xml.Name `xml:"SQLRT"`
	SQLTask
}

func (*SQLRTTask) TaskType

func (t *SQLRTTask) TaskType() string

type SQLTask

type SQLTask struct {
	XMLName  xml.Name `xml:"SQL"`
	TaskName `xml:"Name"`
	Comment  string
	TaskConfig
	Query string
}

func NewAnonymousSQLTask

func NewAnonymousSQLTask(query string, comment string, hints map[string]string) SQLTask

func NewSqlTask

func NewSqlTask(name string, query string, comment string, hints map[string]string) SQLTask

func (*SQLTask) GetSelectResultAsCsv

func (t *SQLTask) GetSelectResultAsCsv(i *Instance, withColumnName bool) (*csv.Reader, error)

GetSelectResultAsCsv 最多返回1W条数据

func (*SQLTask) RunInOdps

func (t *SQLTask) RunInOdps(odpsIns *Odps, projectName string) (*Instance, error)

func (*SQLTask) TaskType

func (t *SQLTask) TaskType() string

type TFilterFunc

type TFilterFunc func(url.Values)

type Table

type Table struct {
	// contains filtered or unexported fields
}

Table represent the table in odps projects

func NewTable

func NewTable(odpsIns *Odps, projectName string, tableName string) Table

func (*Table) AddPartition

func (t *Table) AddPartition(ifNotExists bool, partitionKey string) error

AddPartition Example: AddPartition(true, "region='10026, name='abc'")

Example
table := odpsIns.Table("sale_detail")
err := table.AddPartition(true, "sale_date='202111',region='hangzhou'")
if err != nil {
	log.Fatalf("%+v", err)
}
Output:

func (*Table) Comment

func (t *Table) Comment() string

func (*Table) CreateShards

func (t *Table) CreateShards(shardCount int) error

func (*Table) CreatedTime

func (t *Table) CreatedTime() time.Time

func (*Table) CryptoAlgo

func (t *Table) CryptoAlgo() string

func (*Table) DeletePartition

func (t *Table) DeletePartition(ifExists bool, partitionKey string) error

DeletePartition Example: DeletePartition(true, "region='10026, name='abc'")

func (*Table) ExecSql

func (t *Table) ExecSql(taskName, sql string) (*Instance, error)
Example
//table := odps.NewTable(odpsIns, "project_1", "sale_detail")
table := odpsIns.Table("has_struct")
//instance, err := table.ExecSql("SelectSale_detail", "select * from sale_detail;")
instance, err := table.ExecSql("Select_has_struct", "select * from has_struct;")
if err != nil {
	log.Fatalf("%+v", err)
}

err = instance.WaitForSuccess()
if err != nil {
	log.Fatalf("%+v", err)
}

results, err := instance.GetResult()
if err != nil {
	log.Fatalf("%+v", err)
} else if len(results) == 0 {
	log.Fatalf("should get at least one result")
}

println(fmt.Sprintf("%+v", results[0].Result))
Output:

func (*Table) ExecSqlWithHints

func (t *Table) ExecSqlWithHints(taskName, sql string, hints map[string]string) (*Instance, error)

func (*Table) GetPartitions

func (t *Table) GetPartitions(partitionKey string) ([]Partition, error)

GetPartitions get partitions with partitionKey like "region='10026, name='abc'"

Example
table := odpsIns.Table("sale_detail")
partitions, err := table.GetPartitions("")
if err != nil {
	log.Fatalf("%+v", err)
}

for _, p := range partitions {
	println(fmt.Sprintf("Name: %s", p.Name()))
	println(fmt.Sprintf("Create time: %s", p.CreatedTime()))
	println(fmt.Sprintf("Last DDL time: %s", p.LastDDLTime()))
	println(fmt.Sprintf("Last Modified time: %s", p.LastModifiedTime()))
	println("")
}
Output:

func (*Table) GetSchema

func (t *Table) GetSchema() (*tableschema.TableSchema, error)

func (*Table) HubLifeCycle

func (t *Table) HubLifeCycle() int

func (*Table) IsLoaded

func (t *Table) IsLoaded() bool

func (*Table) LastDDLTime

func (t *Table) LastDDLTime() time.Time

func (*Table) LastModifiedTime

func (t *Table) LastModifiedTime() time.Time

func (*Table) LifeCycle

func (t *Table) LifeCycle() int

func (*Table) Load

func (t *Table) Load() error
Example
table := odpsIns.Table("has_struct")
err := table.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

schema := table.Schema()
println(fmt.Sprintf("%+v", schema.Columns))
Output:

func (*Table) LoadExtendedInfo

func (t *Table) LoadExtendedInfo() error

func (*Table) MaxExtendedLabel

func (t *Table) MaxExtendedLabel() string

func (*Table) MaxLabel

func (t *Table) MaxLabel() string

MaxLabel 获取最高的label级别 Label的定义分两部分: 1. 业务分类:C,S,B 2. 数据等级:1,2,3,4

二者是正交关系,即C1,C2,C3,C4,S1,S2,S3,S4,B1,B2,B3,B4。

MaxLabel的语意: 1. MaxLabel=max(TableLabel, ColumnLabel), max(...)函数的语意由Label中的数据等级决定:4>3>2>1 2. MaxLabel显示: 当最高等级Label只出现一次时,MaxLabel=业务分类+数据等级,例如:B4, C3,S2 当最高等级LabeL出现多次,但业务分类也唯一,MaxLabel=业务分类+数据等级,例如:B4, C3,S2 当最高等级Label出现多次,且业务不唯一,MaxLabel=L+数据等级,例如:L4, L3

func (*Table) Name

func (t *Table) Name() string

func (*Table) Owner

func (t *Table) Owner() string

func (*Table) PartitionColumns

func (t *Table) PartitionColumns() []tableschema.Column

func (*Table) ProjectName

func (t *Table) ProjectName() string

func (*Table) RecordNum

func (t *Table) RecordNum() int

func (*Table) ResourceUrl

func (t *Table) ResourceUrl() string

func (*Table) Schema

func (t *Table) Schema() tableschema.TableSchema

func (*Table) SchemaJson

func (t *Table) SchemaJson() string

func (*Table) ShardInfoJson

func (t *Table) ShardInfoJson() string

func (*Table) Size

func (t *Table) Size() int

func (*Table) TableExtendedLabels

func (t *Table) TableExtendedLabels() []string

func (*Table) TableID

func (t *Table) TableID() string

func (*Table) TableLabel

func (t *Table) TableLabel() string

func (*Table) Type

func (t *Table) Type() TableType

func (*Table) ViewText

func (t *Table) ViewText() string

type TableOrErr

type TableOrErr struct {
	Table *Table
	Err   error
}

TableOrErr is used for the return value of Tables.List

type TableType

type TableType int
const (
	ManagedTable TableType
	VirtualView
	ExternalTable
	TableTypeUnknown
)

func TableTypeFromStr

func TableTypeFromStr(s string) TableType

func (TableType) MarshalXML

func (t TableType) MarshalXML(d *xml.Encoder, start xml.StartElement) error

func (TableType) String

func (t TableType) String() string

func (*TableType) UnmarshalXML

func (t *TableType) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error

type Tables

type Tables struct {
	// contains filtered or unexported fields
}

Tables used for get all the tables in an odps project

func NewTables

func NewTables(odpsIns *Odps, projectName ...string) Tables

NewTables if projectName is not set,the default projectName of odps will be used

func (*Tables) BatchLoadTables

func (ts *Tables) BatchLoadTables(tableNames []string) ([]Table, error)

BatchLoadTables can get at most 100 tables, and the information of table is according to the permission

Example
tablesIns := odps.NewTables(odpsIns)
tableNames := []string{
	"jet_mr_input",
	"jet_smode_test",
	"odps_smoke_table",
	"user",
}

tables, err := tablesIns.BatchLoadTables(tableNames)
if err != nil {
	log.Fatalf("%+v", err)
}

for _, table := range tables {
	println(fmt.Sprintf("%s, %s, %s", table.Name(), table.TableID(), table.Type()))
}

schema, err := tables[len(tables)-1].GetSchema()
if err != nil {
	log.Fatalf("%+v", err)
}

for _, c := range schema.Columns {
	println(fmt.Sprintf("%s, %s, %t, %s", c.Name, c.Type, c.IsNullable, c.Comment))
}
Output:

func (*Tables) Create

func (ts *Tables) Create(
	schema tableschema.TableSchema,
	createIfNotExists bool,
	hints, alias map[string]string) error

Create table with schema, the schema can be build with tableschema.SchemaBuilder parameter hints can affect the `Set` sql execution, like odps.mapred.map.split.size you can get introduce about alias from the reference of alias command

Example
c1 := tableschema.Column{
	Name:    "name",
	Type:    datatype.StringType,
	Comment: "name of user",
}

c2 := tableschema.Column{
	Name:    "age",
	Type:    datatype.IntType,
	Comment: "how old is the user",
}

p1 := tableschema.Column{
	Name:    "region",
	Type:    datatype.StringType,
	Comment: "居住区域",
}

p2 := tableschema.Column{
	Name: "code",
	Type: datatype.IntType,
}

hints := make(map[string]string)
hints["odps.sql.preparse.odps"] = "lot"
hints["odps.sql.planner.mode"] = "lot"
hints["odps.sql.planner.parser.odps"] = "true"
hints["odps.sql.ddl.odps"] = "true"
hints["odps.compiler.output.format"] = "lot,pot"

builder := tableschema.NewSchemaBuilder()
builder.Name("user_temp").
	Comment("这就是一条注释").
	Columns(c1, c2).
	PartitionColumns(p1, p2).
	Lifecycle(2)

schema := builder.Build()
sql, _ := schema.ToSQLString(defaultProjectName, false)
println(sql)

tables := odps.NewTables(odpsIns)
err := tables.Create(schema, true, hints, nil)
if err != nil {
	log.Fatalf("%+v", err)
}
Output:

func (*Tables) CreateExternal

func (ts *Tables) CreateExternal(
	schema tableschema.TableSchema,
	createIfNotExists bool,
	serdeProperties map[string]string,
	jars []string,
	hints, alias map[string]string) error

CreateExternal create external table, the schema can be build with tableschema.SchemaBuilder

func (*Tables) CreateWithDataHub

func (ts *Tables) CreateWithDataHub(
	schema tableschema.TableSchema,
	createIfNotExists bool,
	shardNum,
	hubLifecycle int,
) error

func (*Tables) Delete

func (ts *Tables) Delete(tableName string, ifExists bool) error

Delete delete table

Example
tables := odps.NewTables(odpsIns, odpsIns.DefaultProjectName())
err := tables.Delete("user_temp", false)
if err != nil {
	log.Fatalf("%+v", err)
}
Output:

func (*Tables) List

func (ts *Tables) List(f func(*Table, error), filters ...TFilterFunc)

List get all the tables, filters can be specified with TableFilter.NamePrefix, TableFilter.Extended, TableFilter.Owner

Example
ts := odps.NewTables(odpsIns)
var f = func(t *odps.Table, err error) {
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(fmt.Sprintf("%s, %s, %s", t.Name(), t.Owner(), t.Type()))
}
ts.List(f, odps.TableFilter.Extended())
Output:

type Task

type Task interface {
	GetName() string
	TaskType() string
	AddProperty(key, value string)
}

type TaskConfig

type TaskConfig struct {
	Config []common.Property `xml:"Config>Property"`
}

TaskConfig 作为embedding filed使用时,使用者自动实现Task接口的AddProperty方法

func (*TaskConfig) AddProperty

func (t *TaskConfig) AddProperty(key, value string)

type TaskInInstance

type TaskInInstance struct {
	Type      string `xml:"Type,attr"`
	Name      string
	StartTime common.GMTTime
	EndTime   common.GMTTime `xml:"EndTime"`
	Status    TaskStatus
}

TaskInInstance 通过Instance创建的Task

type TaskName

type TaskName string

TaskName 作为embedding filed使用时,使用者自动实现Task接口的GetName方法

func (TaskName) GetName

func (n TaskName) GetName() string

type TaskProgressStage

type TaskProgressStage struct {
	ID                 string `xml:"ID,attr"`
	Status             string
	BackupWorkers      string
	TerminatedWorkers  string
	RunningWorkers     string
	TotalWorkers       string
	InputRecords       int
	OutRecords         int
	FinishedPercentage int
}

type TaskResult

type TaskResult struct {
	Type   string `xml:"Type,attr"`
	Name   string
	Status TaskStatus
	Result struct {
		TransForm string `xml:"Transform,attr"`
		Format    string `xml:"Format,attr"` // 这个字段没有用到
		Content   string `xml:",cdata"`
	} `xml:"Result"`
}

func (*TaskResult) Content added in v0.0.5

func (tr *TaskResult) Content() string

type TaskStatus

type TaskStatus int

func TaskStatusFromStr

func TaskStatusFromStr(s string) TaskStatus

func (*TaskStatus) MarshalXML

func (status *TaskStatus) MarshalXML(d *xml.Encoder, start xml.StartElement) error

func (TaskStatus) String

func (status TaskStatus) String() string

func (*TaskStatus) UnmarshalXML

func (status *TaskStatus) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error

type TaskSummary

type TaskSummary struct {
	JsonSummary string
	Summary     string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL