odps

package
v0.3.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 22, 2024 License: Apache-2.0 Imports: 21 Imported by: 1

Documentation

Index

Examples

Constants

View Source
const (
	TaskWaiting
	TaskRunning
	TaskSuccess
	TaskFailed
	TaskSuspended
	TaskCancelled
	TaskStatusUnknown
)
View Source
const (
	ProjectStatusAvailable
	ProjectStatusReadOnly
	ProjectStatusDeleting
	ProjectStatusFrozen
	ProjectStatusUnKnown
)
View Source
const (
	// ProjectTypeManaged ordinary project
	ProjectTypeManaged = "managed"
	// ProjectExternalExternal external project,like hive
	ProjectExternalExternal = "external"
)
View Source
const DefaultJobPriority = 9
View Source
const (
	HostDefault = "https://logview.alibaba-inc.com"
)

Variables

View Source
var InstanceFilter = struct {
	// Only get instances with a given status
	Status func(InstanceStatus) InsFilterFunc
	// Only get instances that create by the current account
	OnlyOwner func() InsFilterFunc
	// Instance 运行所在 quota 组过滤条件
	QuotaIndex func(string) InsFilterFunc
	// Get instances running between start and end times
	TimeRange func(time.Time, time.Time) InsFilterFunc
}{
	Status: func(status InstanceStatus) InsFilterFunc {
		return func(values url.Values) {
			if status != 0 {
				values.Set("status", status.String())
			}
		}
	},

	OnlyOwner: func() InsFilterFunc {
		return func(values url.Values) {
			values.Set("onlyowner", "yes")
		}
	},

	QuotaIndex: func(s string) InsFilterFunc {
		return func(values url.Values) {
			values.Set("quotaindex", s)
		}
	},

	TimeRange: func(s time.Time, e time.Time) InsFilterFunc {
		return func(values url.Values) {
			startTime := strconv.FormatInt(s.Unix(), 10)
			endTime := strconv.FormatInt(e.Unix(), 10)

			dateRange := fmt.Sprintf("%s:%s", startTime, endTime)
			values.Set("daterange", dateRange)
		}
	},
}
View Source
var ProjectFilter = struct {
	// Filter out projects with a name prefix
	NamePrefix func(string) PFilterFunc
	// Filter out projects with project owner name, this filter cannot be used with `User` together
	Owner func(string) PFilterFunc
	// Filter out projects with a project member name
	User func(string) PFilterFunc
	// Filter out projects with the project group name
	Group func(string) PFilterFunc
}{
	NamePrefix: withProjectNamePrefix,
	Owner:      withProjectOwner,
	User:       withUserInProject,
	Group:      withProjectGroup,
}
View Source
var TableFilter = struct {
	// Weather get extended information or not
	Extended func() TFilterFunc
	// Filter out tables with name prefix
	NamePrefix func(string) TFilterFunc
	// Filter out tables with owner name
	Owner func(string) TFilterFunc
	// Filter out tables with table type
	Type func(TableType) TFilterFunc
}{
	Extended: func() TFilterFunc {
		return func(values url.Values) {
			values.Set("extended", "")
		}
	},
	NamePrefix: func(name string) TFilterFunc {
		return func(values url.Values) {
			values.Set("name", name)
		}
	},
	Owner: func(owner string) TFilterFunc {
		return func(values url.Values) {
			values.Set("owner", owner)
		}
	},
	Type: func(tableType TableType) TFilterFunc {
		return func(values url.Values) {
			values.Set("type", tableType.String())
		}
	},
}

Functions

This section is empty.

Types

type AdminTask added in v0.3.3

type AdminTask struct {
	XMLName  xml.Name `xml:"Admin"`
	TaskName `xml:"Name"`
	// 注意: TaskConfig和Command的顺序不能更改
	TaskConfig
	Command string `xml:"Command"`
}

func NewAdminTask added in v0.3.3

func NewAdminTask(name string, command string) *AdminTask

func (*AdminTask) TaskType added in v0.3.3

func (t *AdminTask) TaskType() string

type Cluster

type Cluster struct {
	Name    string          `xml:"Name"`
	QuotaId string          `xml:"QuotaId"`
	Quotas  []OptionalQuota `xml:"Quotas"`
}

type Config

type Config struct {
	AccessId             string
	AccessKey            string
	StsToken             string
	Endpoint             string
	ProjectName          string
	TcpConnectionTimeout time.Duration
	HttpTimeout          time.Duration
	TunnelEndpoint       string
	TunnelQuotaName      string
	Hints                map[string]string
	Others               map[string]string
}

Config is the basic config for odps. The NewConfig function should be used, which sets default values.

func NewConfig

func NewConfig() *Config

func NewConfigFromIni

func NewConfigFromIni(iniPath string) (*Config, error)

func (*Config) FormatDsn

func (c *Config) FormatDsn() string

func (*Config) GenAccount

func (c *Config) GenAccount() account2.Account

func (*Config) GenOdps

func (c *Config) GenOdps() *Odps

func (*Config) GenRestClient

func (c *Config) GenRestClient() restclient.RestClient

type InsFilterFunc

type InsFilterFunc func(values url.Values)

type Instance

type Instance struct {
	// contains filtered or unexported fields
}

func NewInstance

func NewInstance(odpsIns *Odps, projectName, instanceId string) *Instance

func (*Instance) EndTime

func (instance *Instance) EndTime() time.Time

func (*Instance) GetCachedInfo

func (instance *Instance) GetCachedInfo() (string, error)

GetCachedInfo 获取instance cached信息,返回的是json字符串,需要自己进行解析

Example
instances := odps.NewInstances(odpsIns)
sqlTask := odps.NewSqlTask("hello1", "select * from user;", nil)
instance, err := instances.CreateTask(defaultProjectName, &sqlTask)
if err != nil {
	log.Fatalf("%+v", err)
}
println(instance.Id())

i, err := instance.GetCachedInfo()
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("%+v", i))
Output:

func (*Instance) GetResult

func (instance *Instance) GetResult() ([]TaskResult, error)

func (*Instance) GetTaskDetail

func (instance *Instance) GetTaskDetail(taskName string) ([]byte, error)

func (*Instance) GetTaskProgress

func (instance *Instance) GetTaskProgress(taskName string) ([]TaskProgressStage, error)
Example
instances := odps.NewInstances(odpsIns)
sqlTask := odps.NewSqlTask("hello", "select count(*) from sale_detail;", nil)
instance, err := instances.CreateTask(defaultProjectName, &sqlTask)
if err != nil {
	log.Fatalf("%+v", err)
}

println(instance.Id())

for i := 0; i < 5; i++ {
	progress, err := instance.GetTaskProgress("hello")
	if err != nil {
		log.Fatalf("%+v", err)
	}

	for _, stage := range progress {
		println(fmt.Sprintf("%+v", stage))
	}

	time.Sleep(time.Second * 1)
}

body, err := instance.GetTaskDetail("hello")
if err != nil {
	log.Fatalf("%+v", err)
}

println(string(body))
Output:

func (*Instance) GetTaskQuotaJson

func (instance *Instance) GetTaskQuotaJson(taskName string) (string, error)

func (*Instance) GetTaskSummary

func (instance *Instance) GetTaskSummary(taskName string) (*TaskSummary, error)
Example
instances := odps.NewInstances(odpsIns)
sqlTask := odps.NewSqlTask("hello1", "select count(*) from sale_detail;", nil)
instance, err := instances.CreateTask(defaultProjectName, &sqlTask)
if err != nil {
	log.Fatalf("%+v", err)
}
println(instance.Id())
_ = instance.WaitForSuccess()

taskSummary, err := instance.GetTaskSummary("hello1")
if err != nil {
	log.Fatalf("%+v", err)
}
println(fmt.Sprintf("%s\n%s\n", taskSummary.JsonSummary, taskSummary.Summary))
Output:

func (*Instance) GetTasks

func (instance *Instance) GetTasks() ([]TaskInInstance, error)

GetTasks 绝大部分时候返回一个Task(名字与提交的task名字相同),返回多个task的情况我还没有遇到过

func (*Instance) Id

func (instance *Instance) Id() string

func (*Instance) IsAsync

func (instance *Instance) IsAsync() bool

func (*Instance) IsLoaded

func (instance *Instance) IsLoaded() bool

func (*Instance) IsSync

func (instance *Instance) IsSync() bool

func (*Instance) Load

func (instance *Instance) Load() error

func (*Instance) Owner

func (instance *Instance) Owner() string

func (*Instance) ProjectName

func (instance *Instance) ProjectName() string

func (*Instance) StartTime

func (instance *Instance) StartTime() time.Time

func (*Instance) Status

func (instance *Instance) Status() InstanceStatus

func (*Instance) TaskNameCommitted

func (instance *Instance) TaskNameCommitted() string

func (*Instance) TaskResults

func (instance *Instance) TaskResults() []TaskResult

func (*Instance) Terminate

func (instance *Instance) Terminate() error
Example
instances := odps.NewInstances(odpsIns)
sqlTask := odps.NewSqlTask("hello", "select count(*) from user;", nil)
instance, err := instances.CreateTask(defaultProjectName, &sqlTask)
if err != nil {
	log.Fatalf("%+v", err)
}

println(instance.Id())

err = instance.Terminate()
if err != nil {
	log.Fatalf("%+v", err)
}

err = instance.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("%s, %s, %s", instance.StartTime(), instance.EndTime(), instance.Status()))
Output:

func (*Instance) WaitForSuccess

func (instance *Instance) WaitForSuccess() error

type InstanceOrErr

type InstanceOrErr struct {
	Ins *Instance
	Err error
}

InstanceOrErr is used for the return value of Instances.List

type InstanceStatus

type InstanceStatus int
const (
	InstanceRunning InstanceStatus
	InstanceSuspended
	InstanceTerminated
	InstanceStatusUnknown
)

func InstancesStatusFromStr

func InstancesStatusFromStr(s string) InstanceStatus

func (*InstanceStatus) MarshalXML

func (status *InstanceStatus) MarshalXML(d *xml.Encoder, start xml.StartElement) error

func (InstanceStatus) String

func (status InstanceStatus) String() string

func (*InstanceStatus) UnmarshalXML

func (status *InstanceStatus) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error

type Instances

type Instances struct {
	// contains filtered or unexported fields
}

Instances is used to get or create instance(s)

func NewInstances

func NewInstances(odpsIns *Odps, projectName ...string) *Instances

NewInstances create Instances object, if the projectName is not set, the default project name of odpsIns will be used

func (*Instances) CreateTask

func (instances *Instances) CreateTask(projectName string, task Task) (*Instance, error)
Example
instances := odpsIns.Instances()
sqlTask := odps.NewSqlTask("hello", "select count(*) from sale_detail;", nil)
instance, err := instances.CreateTask(defaultProjectName, &sqlTask)
if err != nil {
	log.Fatalf("%+v", err)
}

println(instance.Id())

err = instance.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("%s, %s, %s", instance.StartTime(), instance.EndTime(), instance.Status()))

timeFormat := "2006-01-02 15:04:05"

Loop:
for {
	tasks, err := instance.GetTasks()
	task := tasks[0]
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(
		fmt.Sprintf(
			"%s, %s, %s, %s",
			task.StartTime.Format(timeFormat), task.EndTime.Format(timeFormat), task.Status, task.Name,
		))

	switch task.Status {
	case odps.TaskCancelled, odps.TaskFailed, odps.TaskSuccess:
		break Loop
	}

	time.Sleep(time.Second * 2)
}

err = instance.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

body, err := instance.GetTaskDetail("hello")
if err != nil {
	log.Fatalf("%+v", err)
}

println(string(body))

println(
	fmt.Sprintf(
		"%s, %s, %s",
		instance.StartTime().Format(timeFormat), instance.EndTime().Format(timeFormat), instance.Status(),
	))
Output:

func (*Instances) CreateTaskWithPriority

func (instances *Instances) CreateTaskWithPriority(projectName string, task Task, jobPriority int) (*Instance, error)

func (*Instances) Get added in v0.3.8

func (instances *Instances) Get(instanceId string) *Instance

func (*Instances) List

func (instances *Instances) List(f func(*Instance), filters ...InsFilterFunc) error

List Get all instances, the filters can be given with InstanceFilter.Status, InstanceFilter.OnlyOwner, InstanceFilter.QuotaIndex, InstanceFilter.TimeRange

Example
ins := odpsIns.Instances()
timeFormat := "2006-01-02 15:04:05"
startTime, _ := time.Parse(timeFormat, "2021-11-15 02:15:30")
endTime, _ := time.Parse(timeFormat, "2021-11-18 06:22:02")

f := func(i *odps.Instance) {
	println(
		fmt.Sprintf(
			"%s, %s, %s, %s, %s",
			i.Id(), i.Owner(), i.StartTime().Format(timeFormat), i.EndTime().Format(timeFormat), i.Status(),
		))
}
ins.List(f, odps.InstanceFilter.TimeRange(startTime, endTime))
Output:

func (*Instances) ListInstancesQueued

func (instances *Instances) ListInstancesQueued(filters ...InsFilterFunc) ([]string, error)

ListInstancesQueued Get all instance Queued information, the information is in json string,you need parse it yourself。 The filters can be given with InstanceFilter.Status, InstanceFilter.OnlyOwner, InstanceFilter.QuotaIndex, InstanceFilter.TimeRange

Example
ins := odpsIns.Instances()

instances, err := ins.ListInstancesQueued()
if err != nil {
	log.Fatalf("%+v", err)
}

for _, i := range instances {
	println(fmt.Sprintf("%+v", i))
}
Output:

type LogView

type LogView struct {
	// contains filtered or unexported fields
}

func NewLogView

func NewLogView(odpsIns *Odps) *LogView

func (*LogView) GenerateLogView

func (lv *LogView) GenerateLogView(instance *Instance, hours int) (string, error)

func (*LogView) LogViewHost

func (lv *LogView) LogViewHost() string

func (*LogView) SetLogViewHost

func (lv *LogView) SetLogViewHost(logViewHost string)

type MergeTask

type MergeTask struct {
	XMLName  xml.Name `xml:"Merge"`
	TaskName `xml:"Name"`
	Comment  string
	Tables   []string `xml:"Tables>TableName"`
	TaskConfig
}

func (*MergeTask) AddTask

func (t *MergeTask) AddTask(taskName string)

func (*MergeTask) TaskType

func (t *MergeTask) TaskType() string

type Odps

type Odps struct {
	// contains filtered or unexported fields
}

func NewOdps

func NewOdps(account account2.Account, endpoint string) *Odps

func (*Odps) Account

func (odps *Odps) Account() account2.Account

func (*Odps) CurrentSchemaName added in v0.3.8

func (odps *Odps) CurrentSchemaName() string

func (*Odps) DefaultProject

func (odps *Odps) DefaultProject() *Project

func (*Odps) DefaultProjectName

func (odps *Odps) DefaultProjectName() string

func (*Odps) ExecSQl

func (odps *Odps) ExecSQl(sql string, hints ...map[string]string) (*Instance, error)

func (*Odps) ExecSQlWithHints

func (odps *Odps) ExecSQlWithHints(sql string, hints map[string]string) (*Instance, error)

func (*Odps) Instance

func (odps *Odps) Instance(instanceId string) *Instance

func (*Odps) Instances

func (odps *Odps) Instances() *Instances

func (*Odps) LogView

func (odps *Odps) LogView() *LogView

func (*Odps) Project

func (odps *Odps) Project(name string) *Project

func (*Odps) Projects

func (odps *Odps) Projects() *Projects

func (*Odps) RestClient

func (odps *Odps) RestClient() restclient.RestClient

func (*Odps) Schema added in v0.3.8

func (odps *Odps) Schema(name string) *Schema

func (*Odps) Schemas added in v0.3.8

func (odps *Odps) Schemas() *Schemas

func (*Odps) SetCurrentSchemaName added in v0.3.8

func (odps *Odps) SetCurrentSchemaName(schemaName string)

func (*Odps) SetDefaultProjectName

func (odps *Odps) SetDefaultProjectName(projectName string)

func (*Odps) SetHttpTimeout

func (odps *Odps) SetHttpTimeout(t time.Duration)

func (*Odps) SetTcpConnectTimeout

func (odps *Odps) SetTcpConnectTimeout(t time.Duration)

func (*Odps) SetUserAgent

func (odps *Odps) SetUserAgent(userAgent string)

func (*Odps) Table

func (odps *Odps) Table(name string) *Table

func (*Odps) Tables

func (odps *Odps) Tables() *Tables

type OptionalQuota

type OptionalQuota struct {
	XMLName    xml.Name          `xml:"OptionalQuota"`
	QuotaId    string            `xml:"QuotaID"`
	Properties common.Properties `xml:"Properties"`
}

type PFilterFunc

type PFilterFunc func(url.Values)

type Partition

type Partition struct {
	// contains filtered or unexported fields
}

Partition ODPS分区表中一个特定的分区

func NewPartition

func NewPartition(odpsIns *Odps, projectName, tableName string, value string) *Partition

func (*Partition) CreatedTime

func (p *Partition) CreatedTime() time.Time

func (*Partition) FileNumEx

func (p *Partition) FileNumEx() int

func (*Partition) IsArchivedEx

func (p *Partition) IsArchivedEx() bool

func (*Partition) LastDDLTime

func (p *Partition) LastDDLTime() time.Time

LastDDLTime 分区Meta修改时间

func (*Partition) LastModifiedTime

func (p *Partition) LastModifiedTime() time.Time

func (*Partition) LifeCycleEx

func (p *Partition) LifeCycleEx() int

func (*Partition) Load

func (p *Partition) Load() error
Example
err := odpsIns.Table("sale_detail").AddPartitions(true, []string{"sale_date=201910/region=shanghai"})
if err != nil {
	log.Fatalf("%+v", err)
}

partition := odps.NewPartition(odpsIns, defaultProjectName, "sale_detail", "sale_date=201910/region=shanghai")
err = partition.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("Value: %s", partition.Value()))
println(fmt.Sprintf("Record number: %d", partition.RecordNum()))
println(fmt.Sprintf("Create Time: %s", partition.CreatedTime()))
Output:

func (*Partition) LoadExtended

func (p *Partition) LoadExtended() error
Example
err := odpsIns.Table("sale_detail").AddPartitions(true, []string{"sale_date=201910/region=shanghai"})
if err != nil {
	log.Fatalf("%+v", err)
}

partition := odps.NewPartition(odpsIns, defaultProjectName, "sale_detail", "sale_date=201910/region=shanghai")
err = partition.LoadExtended()
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("Value: %s", partition.Value()))
println(fmt.Sprintf("File number: %d", partition.FileNumEx()))
println(fmt.Sprintf("PhysicalSizefd: %d", partition.PhysicalSizeEx()))
println(fmt.Sprintf("Reserved: %s", partition.ReservedEx()))
Output:

func (*Partition) Name deprecated

func (p *Partition) Name() string

Deprecated: Do not use this function. Use Value instead Name return string with format like "a=xx/b=yy"

func (*Partition) PhysicalSizeEx

func (p *Partition) PhysicalSizeEx() int

func (*Partition) RecordNum

func (p *Partition) RecordNum() int

RecordNum 获取分区数据的Record数,若无准确数据,则返回-1

func (*Partition) ReservedEx

func (p *Partition) ReservedEx() string

ReservedEx 返回扩展信息的保留字段 json 字符串

func (*Partition) Size

func (p *Partition) Size() int

func (*Partition) Spec added in v0.3.8

func (p *Partition) Spec() string

Spec return partition value with format like "a='xx',b='yy'"

func (*Partition) Value added in v0.3.8

func (p *Partition) Value() string

Value return partition value with format like "a=xx/b=yy"

type PartitionColumn added in v0.3.8

type PartitionColumn struct {
	Name  string
	Value string
}

type Project

type Project struct {
	// contains filtered or unexported fields
}
Example
projects := odpsIns.Projects()
project := projects.Get(defaultProjectName)

if err := project.Load(); err != nil {
	panic(err)
}

println(project.Status())
println(project.Owner())

creationTime := project.CreationTime()
println(creationTime.Format(time.RFC1123))

lastModifiedTime := project.LastModifiedTime()
println(lastModifiedTime.Format(time.RFC1123))

defaultCluster, _ := project.GetDefaultCluster()
println(defaultCluster)

println("************all properties")
allProperties, _ := project.GetAllProperties()
for _, p := range allProperties {
	println(p.Name, p.Value)
}
println("************extended properties")
extendedProperties, _ := project.GetExtendedProperties()
for _, p := range extendedProperties {
	println(p.Name, p.Value)
}

isExisted := project.Existed()
println(isExisted)
Output:

func NewProject

func NewProject(name string, odpsIns *Odps) *Project

func (*Project) Comment

func (p *Project) Comment() string

func (*Project) CreationTime

func (p *Project) CreationTime() time.Time

func (*Project) Existed

func (p *Project) Existed() bool

func (*Project) GetAllProperties

func (p *Project) GetAllProperties() (common.Properties, error)

GetAllProperties get all the configurable properties of the project, including the properties inherit from group. **note**, this method may return error when something wrong during loading data from the api serer

func (*Project) GetClusters

func (p *Project) GetClusters() ([]Cluster, error)

GetClusters Get information of clusters owned by this project. This is an internal method for group-api. **note**, this method may return error when something wrong during loading data from the api serer

func (*Project) GetDefaultCluster

func (p *Project) GetDefaultCluster() (string, error)

GetDefaultCluster Get default cluster. This is an internal method for group-api. Returns efault cluster when called by group owner, otherwise ,null. **note**, this method may return error when something wrong during loading data from the api serer

func (*Project) GetExtendedProperties

func (p *Project) GetExtendedProperties() (common.Properties, error)

GetExtendedProperties get the extended properties of the project **note**, this method may return error when something wrong during loading data from the api serer

func (*Project) GetTunnelEndpoint

func (p *Project) GetTunnelEndpoint(quotaNames ...string) (string, error)
Example
project := odpsIns.DefaultProject()
tunnelEndpoint, err := project.GetTunnelEndpoint()
if err != nil {
	log.Fatalf("%+v", err)
} else {
	println(tunnelEndpoint)
}
Output:

func (*Project) IsLoaded

func (p *Project) IsLoaded() bool

IsLoaded whether `Load()` has been called

func (*Project) LastModifiedTime

func (p *Project) LastModifiedTime() time.Time

func (*Project) Load

func (p *Project) Load() error

Load should be called before get properties of project

func (*Project) Name

func (p *Project) Name() string

func (*Project) OdpsIns

func (p *Project) OdpsIns() *Odps

func (*Project) Owner

func (p *Project) Owner() string

func (*Project) ProjectGroupName

func (p *Project) ProjectGroupName() string

func (*Project) PropertiesHasBeSet

func (p *Project) PropertiesHasBeSet() common.Properties

PropertiesHasBeSet Properties get the properties those have be set for the project

func (*Project) RestClient

func (p *Project) RestClient() restclient.RestClient

func (*Project) Schemas added in v0.3.8

func (p *Project) Schemas() *Schemas

func (*Project) SecurityManager

func (p *Project) SecurityManager() security.Manager

func (*Project) Status

func (p *Project) Status() ProjectStatus

func (*Project) Tables added in v0.3.8

func (p *Project) Tables() *Tables

func (*Project) Type

func (p *Project) Type() string

func (*Project) Update

func (p *Project) Update(properties map[string]string) error

Update the project properties, the properties are different in different versioned odps. When the "properties" is nil, the system will give the project all the default properties. You'd better ask technique support help when using this method.

type ProjectStatus

type ProjectStatus int

func (*ProjectStatus) FromStr

func (status *ProjectStatus) FromStr(s string)

func (ProjectStatus) MarshalXML

func (status ProjectStatus) MarshalXML(d *xml.Encoder, start xml.StartElement) error

func (ProjectStatus) String

func (status ProjectStatus) String() string

func (*ProjectStatus) UnmarshalXML

func (status *ProjectStatus) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error

type Projects

type Projects struct {
	// contains filtered or unexported fields
}

func NewProjects

func NewProjects(odps *Odps) Projects

func (*Projects) CreateExternalProject

func (p *Projects) CreateExternalProject(projectName string) error

CreateExternalProject unimplemented!

func (*Projects) DeleteExternalProject

func (p *Projects) DeleteExternalProject(projectName string) error

DeleteExternalProject unimplemented!

func (*Projects) Exists

func (p *Projects) Exists(projectName string) (bool, error)
Example
projects := odpsIns.Projects()
projectName := defaultProjectName

existed, err := projects.Exists(projectName)
if err != nil {
	log.Fatalf("%+v", err)
}

println(fmt.Sprintf("%s existed: %t", projectName, existed))
Output:

func (*Projects) Get

func (p *Projects) Get(projectName string) *Project

func (*Projects) GetDefaultProject

func (p *Projects) GetDefaultProject() *Project

func (*Projects) List

func (p *Projects) List(filters ...PFilterFunc) ([]*Project, error)

List get all the projects thant current account can access in the specific endpoint filters can be specified with ProjectFilter.NamePrefix, ProjectFilter.Owner, ProjectFilter.User, ProjectFilter.Group

Example
projectsIns := odpsIns.Projects()
projects, err := projectsIns.List(odps.ProjectFilter.NamePrefix("p"))
if err != nil {
	log.Fatalf("%+v", err)
}

for _, project := range projects {
	println(fmt.Sprintf("%+v", project))
}
Output:

func (*Projects) UpdateProject

func (p *Projects) UpdateProject(projectName string) error

type SQLCostTask

type SQLCostTask struct {
	XMLName xml.Name `xml:"SQLCost"`
	SQLTask
}

func NewSQLCostTask

func NewSQLCostTask(name string, query string, hints map[string]string) SQLCostTask

func (*SQLCostTask) TaskType

func (t *SQLCostTask) TaskType() string

type SQLPlanTask

type SQLPlanTask struct {
	XMLName xml.Name `xml:"SQLPlan"`
	SQLTask
}

func (*SQLPlanTask) TaskType

func (t *SQLPlanTask) TaskType() string

type SQLRTTask

type SQLRTTask struct {
	XMLName xml.Name `xml:"SQLRT"`
	SQLTask
}

func (*SQLRTTask) TaskType

func (t *SQLRTTask) TaskType() string

type SQLTask

type SQLTask struct {
	XMLName  xml.Name `xml:"SQL"`
	TaskName `xml:"Name"`
	TaskConfig
	Query string
}

func NewAnonymousSQLTask

func NewAnonymousSQLTask(query string, hints map[string]string) SQLTask

func NewSqlTask

func NewSqlTask(name string, query string, hints map[string]string) SQLTask

func (*SQLTask) GetSelectResultAsCsv

func (t *SQLTask) GetSelectResultAsCsv(i *Instance, withColumnName bool) (*csv.Reader, error)

GetSelectResultAsCsv 最多返回1W条数据

func (*SQLTask) Run added in v0.3.8

func (t *SQLTask) Run(odpsIns *Odps, projectName string) (*Instance, error)

func (*SQLTask) RunInOdps

func (t *SQLTask) RunInOdps(odpsIns *Odps, projectName string) (*Instance, error)

func (*SQLTask) TaskType

func (t *SQLTask) TaskType() string

type Schema added in v0.3.8

type Schema struct {
	// contains filtered or unexported fields
}

Schema represent the namespace schema in odps projects

func NewSchema added in v0.3.8

func NewSchema(odpsIns *Odps, projectName string, schemaName string) *Schema

NewSchema get specific schema

func (*Schema) Comment added in v0.3.8

func (s *Schema) Comment() string

Comment return the schema comment

func (*Schema) CreateTime added in v0.3.8

func (s *Schema) CreateTime() time.Time

CreateTime return the schema create time

func (*Schema) Exists added in v0.3.8

func (s *Schema) Exists() (bool, error)

Exists check if the schema exists

func (*Schema) IsLoaded added in v0.3.8

func (s *Schema) IsLoaded() bool

IsLoaded check if the schema is loaded

func (*Schema) Load added in v0.3.8

func (s *Schema) Load() error

Load load the schema information

func (*Schema) ModifiedTime added in v0.3.8

func (s *Schema) ModifiedTime() time.Time

ModifiedTime return the schema modified time

func (*Schema) Name added in v0.3.8

func (s *Schema) Name() string

Name return the schema name

func (*Schema) Owner added in v0.3.8

func (s *Schema) Owner() string

Owner return the schema owner

func (*Schema) ProjectName added in v0.3.8

func (s *Schema) ProjectName() string

ProjectName return the project name

func (*Schema) ResourceUrl added in v0.3.8

func (s *Schema) ResourceUrl() string

func (*Schema) Tables added in v0.3.8

func (s *Schema) Tables() *Tables

Tables return the tables in the schema

func (*Schema) Type added in v0.3.8

func (s *Schema) Type() string

Type return the schema type

type Schemas added in v0.3.8

type Schemas struct {
	// contains filtered or unexported fields
}

func NewSchemas added in v0.3.8

func NewSchemas(odpsIns *Odps, projectName string) *Schemas

NewSchemas if projectName is not set,the default projectName of odps will be used

func (*Schemas) Create added in v0.3.8

func (ss *Schemas) Create(schemaName string, createIfNotExists bool, comment string) error

Create the schema

func (*Schemas) Delete added in v0.3.8

func (ss *Schemas) Delete(schemaName string) error

Delete the schema

func (*Schemas) Get added in v0.3.8

func (ss *Schemas) Get(schemaName string) *Schema

Get the schema

func (*Schemas) List added in v0.3.8

func (ss *Schemas) List(f func(*Schema, error)) error

List get all the schemas

type TFilterFunc

type TFilterFunc func(url.Values)

type Table

type Table struct {
	// contains filtered or unexported fields
}

Table represent the table in odps projects

func NewTable

func NewTable(odpsIns *Odps, projectName string, schemaName string, tableName string) *Table

func (*Table) AddColumns added in v0.3.14

func (t *Table) AddColumns(columns []tableschema.Column, ifNotExists bool) error

func (*Table) AddPartition

func (t *Table) AddPartition(ifNotExists bool, partitionValue string) error

AddPartition Example: AddPartition(true, "region=10026/name=abc")

Example
table := odpsIns.Table("sale_detail")
err := table.AddPartition(true, "sale_date=202111/region=hangzhou")
if err != nil {
	log.Fatalf("%+v", err)
}
Output:

func (*Table) AddPartitions added in v0.3.8

func (t *Table) AddPartitions(ifNotExists bool, partitionValues []string) error

AddPartitions Example: AddPartitions(true, []string{"region=10026/name=abc", "region=10027/name=mhn"})

func (*Table) AlterColumnType added in v0.3.14

func (t *Table) AlterColumnType(columnName string, columnType datatype.DataType) error

func (*Table) ChangeClusterInfo added in v0.3.14

func (t *Table) ChangeClusterInfo(clusterInfo tableschema.ClusterInfo) error

func (*Table) ChangeColumnName added in v0.3.14

func (t *Table) ChangeColumnName(oldColumnName string, newColumnName string) error

func (*Table) ChangeComment added in v0.3.14

func (t *Table) ChangeComment(newComment string) error

ChangeComment Modify the comment content of the table.

func (*Table) ChangeOwner added in v0.3.14

func (t *Table) ChangeOwner(newOwner string) error

ChangeOwner Only the Project Owner or users with the Super_Administrator role can execute commands that modify the table Owner.

func (*Table) Comment

func (t *Table) Comment() string

func (*Table) CreatedTime

func (t *Table) CreatedTime() time.Time

func (*Table) CryptoAlgo

func (t *Table) CryptoAlgo() string

func (*Table) Delete added in v0.3.8

func (t *Table) Delete() error

func (*Table) DeletePartition

func (t *Table) DeletePartition(ifExists bool, partitionValue string) error

DeletePartition Example: DeletePartition(true, "region=10026/name=abc")

func (*Table) DeletePartitions added in v0.3.8

func (t *Table) DeletePartitions(ifExists bool, partitionValues []string) error

DeletePartitions Example: DeletePartitions(true, []string{"region=10026/name=abc", "region=10027/name=mhn"})

func (*Table) DropColumns added in v0.3.14

func (t *Table) DropColumns(columnNames []string) error

func (*Table) ExecSql

func (t *Table) ExecSql(taskName, sql string) (*Instance, error)
Example
// table := odps.NewTable(odpsIns, "go_sdk_regression_testing", "sale_detail")
table := odpsIns.Table("has_struct")
// instance, err := table.ExecSql("SelectSale_detail", "select * from sale_detail;")
instance, err := table.ExecSql("Select_has_struct", "select * from has_struct;")
if err != nil {
	log.Fatalf("%+v", err)
}

err = instance.WaitForSuccess()
if err != nil {
	log.Fatalf("%+v", err)
}

results, err := instance.GetResult()
if err != nil {
	log.Fatalf("%+v", err)
} else if len(results) == 0 {
	log.Fatalf("should get at least one result")
}

println(fmt.Sprintf("%+v", results[0].Result))
Output:

func (*Table) ExecSqlWithHints

func (t *Table) ExecSqlWithHints(taskName, sql string, hints map[string]string) (*Instance, error)

func (*Table) Exists added in v0.3.8

func (t *Table) Exists() (bool, error)

func (*Table) GetPartition added in v0.3.8

func (t *Table) GetPartition(partitionValue string) (Partition, error)

GetPartition get partitions with partitionKey like "region=10026/name=abc", error if not found

func (*Table) GetPartitionValues added in v0.3.8

func (t *Table) GetPartitionValues() ([]string, error)

func (*Table) GetPartitions

func (t *Table) GetPartitions() ([]Partition, error)

GetPartitions get partitions

Example
table := odpsIns.Table("sale_detail")
partitions, err := table.GetPartitions()
if err != nil {
	log.Fatalf("%+v", err)
}

for _, p := range partitions {
	println(fmt.Sprintf("Value: %s", p.Value()))
	println(fmt.Sprintf("Create time: %s", p.CreatedTime()))
	println(fmt.Sprintf("Last DDL time: %s", p.LastDDLTime()))
	println(fmt.Sprintf("Last Modified time: %s", p.LastModifiedTime()))
	println("")
}
Output:

func (*Table) HubLifeCycle

func (t *Table) HubLifeCycle() int

func (*Table) IsLoaded

func (t *Table) IsLoaded() bool

func (*Table) IsLoadedExtended added in v0.3.10

func (t *Table) IsLoadedExtended() bool

func (*Table) LastDDLTime

func (t *Table) LastDDLTime() time.Time

func (*Table) LastModifiedTime

func (t *Table) LastModifiedTime() time.Time

func (*Table) LifeCycle

func (t *Table) LifeCycle() int

func (*Table) Load

func (t *Table) Load() error
Example
table := odpsIns.Table("has_struct")
err := table.Load()
if err != nil {
	log.Fatalf("%+v", err)
}

schema := table.Schema()
println(fmt.Sprintf("%+v", schema.Columns))
Output:

func (*Table) LoadExtendedInfo

func (t *Table) LoadExtendedInfo() error

func (*Table) MaxExtendedLabel

func (t *Table) MaxExtendedLabel() string

func (*Table) MaxLabel

func (t *Table) MaxLabel() string

MaxLabel 获取最高的label级别 Label的定义分两部分: 1. 业务分类:C,S,B 2. 数据等级:1,2,3,4

二者是正交关系,即C1,C2,C3,C4,S1,S2,S3,S4,B1,B2,B3,B4。

MaxLabel的语意: 1. MaxLabel=max(TableLabel, ColumnLabel), max(...)函数的语意由Label中的数据等级决定:4>3>2>1 2. MaxLabel显示: 当最高等级Label只出现一次时,MaxLabel=业务分类+数据等级,例如:B4, C3,S2 当最高等级LabeL出现多次,但业务分类也唯一,MaxLabel=业务分类+数据等级,例如:B4, C3,S2 当最高等级Label出现多次,且业务不唯一,MaxLabel=L+数据等级,例如:L4, L3

func (*Table) Name

func (t *Table) Name() string

func (*Table) Owner

func (t *Table) Owner() string

func (*Table) PartitionColumns

func (t *Table) PartitionColumns() []tableschema.Column

func (*Table) ProjectName

func (t *Table) ProjectName() string

func (*Table) RecordNum

func (t *Table) RecordNum() int

func (*Table) Rename added in v0.3.14

func (t *Table) Rename(newName string) error

func (*Table) ResourceUrl

func (t *Table) ResourceUrl() string

func (*Table) Schema

func (t *Table) Schema() tableschema.TableSchema

func (*Table) SchemaJson

func (t *Table) SchemaJson() string

func (*Table) SchemaName added in v0.3.8

func (t *Table) SchemaName() string

func (*Table) SetLifeCycle added in v0.3.16

func (t *Table) SetLifeCycle(days int) error

SetLifeCycle Modify the life cycle of an existing partitioned table or non-partitioned table.

func (*Table) ShardInfoJson

func (t *Table) ShardInfoJson() string

func (*Table) Size

func (t *Table) Size() int64

func (*Table) TableExtendedLabels

func (t *Table) TableExtendedLabels() []string

func (*Table) TableID

func (t *Table) TableID() string

func (*Table) TableLabel

func (t *Table) TableLabel() string

func (*Table) Touch added in v0.3.14

func (t *Table) Touch() error

Touch can modify the LastModifiedTime of the table, make LastModifiedTime change to the current time

func (*Table) Truncate added in v0.3.14

func (t *Table) Truncate() error

func (*Table) Type

func (t *Table) Type() TableType

func (*Table) ViewText

func (t *Table) ViewText() string

type TableOrErr

type TableOrErr struct {
	Table *Table
	Err   error
}

TableOrErr is used for the return value of Tables.List

type TableType

type TableType int
const (
	ManagedTable TableType = iota
	VirtualView
	ExternalTable
	TableTypeUnknown
)

func TableTypeFromStr

func TableTypeFromStr(s string) TableType

func (TableType) MarshalXML

func (t TableType) MarshalXML(d *xml.Encoder, start xml.StartElement) error

func (TableType) String

func (t TableType) String() string

func (*TableType) UnmarshalXML

func (t *TableType) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error

type Tables

type Tables struct {
	// contains filtered or unexported fields
}

Tables used for get all the tables in an odps project

func NewTables

func NewTables(odpsIns *Odps, projectName, schemaName string) *Tables

NewTables if projectName is not set,the default projectName of odps will be used

func (*Tables) BatchLoadTables

func (ts *Tables) BatchLoadTables(tableNames []string) ([]*Table, error)

BatchLoadTables can get at most 100 tables, and the information of table is according to the permission

Example
tablesIns := odps.NewTables(odpsIns, "", "")
tableNames := []string{
	"has_struct",
	"sale_detail",
	"testtable",
	"user",
}

tables, err := tablesIns.BatchLoadTables(tableNames)
if err != nil {
	log.Fatalf("%+v", err)
}

for _, table := range tables {
	println(fmt.Sprintf("%s, %s, %s", table.Name(), table.TableID(), table.Type()))
}

schema := tables[len(tables)-1].Schema()

for _, c := range schema.Columns {
	println(fmt.Sprintf("%s, %s, %t, %s", c.Name, c.Type, c.IsNullable, c.Comment))
}
Output:

func (*Tables) Create

func (ts *Tables) Create(
	schema tableschema.TableSchema,
	createIfNotExists bool,
	hints, alias map[string]string,
) error

Create table with schema, the schema can be build with tableschema.SchemaBuilder parameter hints can affect the `Set` sql execution, like odps.mapred.map.split.size you can get introduce about alias from the reference of alias command

Example
c1 := tableschema.Column{
	Name:    "name",
	Type:    datatype.StringType,
	Comment: "name of user",
}

c2 := tableschema.Column{
	Name:    "age",
	Type:    datatype.IntType,
	Comment: "how old is the user",
}

p1 := tableschema.Column{
	Name:    "region",
	Type:    datatype.StringType,
	Comment: "居住区域",
}

p2 := tableschema.Column{
	Name: "code",
	Type: datatype.IntType,
}

hints := make(map[string]string)
hints["odps.sql.preparse.odps"] = "lot"
hints["odps.sql.planner.mode"] = "lot"
hints["odps.sql.planner.parser.odps"] = "true"
hints["odps.sql.ddl.odps"] = "true"
hints["odps.compiler.output.format"] = "lot,pot"
hints["odps.namespace.schema"] = "false"

builder := tableschema.NewSchemaBuilder()
builder.Name("user_temp").
	Comment("这就是一条注释").
	Columns(c1, c2).
	PartitionColumns(p1, p2).
	Lifecycle(2)

schema := builder.Build()
sql, _ := schema.ToSQLString(defaultProjectName, "", false)
println(sql)

tables := odps.NewTables(odpsIns, odpsIns.DefaultProjectName(), "")
err := tables.Create(schema, true, hints, nil)
if err != nil {
	log.Fatalf("%+v", err)
}
Output:

func (*Tables) CreateExternal

func (ts *Tables) CreateExternal(
	schema tableschema.TableSchema,
	createIfNotExists bool,
	serdeProperties map[string]string,
	jars []string,
	hints, alias map[string]string,
) error

CreateExternal create external table, the schema can be build with tableschema.SchemaBuilder

func (*Tables) CreateView added in v0.3.14

func (ts *Tables) CreateView(schema tableschema.TableSchema,
	orReplace,
	createIfNotExists,
	buildDeferred bool,
) error

func (*Tables) CreateWithDataHub

func (ts *Tables) CreateWithDataHub(
	schema tableschema.TableSchema,
	createIfNotExists bool,
	shardNum,
	hubLifecycle int,
) error

func (*Tables) Delete

func (ts *Tables) Delete(tableName string, ifExists bool) error

Delete delete table

Example
tables := odps.NewTables(odpsIns, odpsIns.DefaultProjectName(), "")
err := tables.Delete("user_temp", false)
if err != nil {
	log.Fatalf("%+v", err)
}
Output:

func (*Tables) Get added in v0.3.8

func (ts *Tables) Get(tableName string) *Table

func (*Tables) List

func (ts *Tables) List(f func(*Table, error), filters ...TFilterFunc)

List get all the tables, filters can be specified with TableFilter.NamePrefix, TableFilter.Extended, TableFilter.Owner

Example
ts := odps.NewTables(odpsIns, "", "")
f := func(t *odps.Table, err error) {
	if err != nil {
		log.Fatalf("%+v", err)
	}

	println(fmt.Sprintf("%s, %s, %s", t.Name(), t.Owner(), t.Type()))
}
ts.List(f, odps.TableFilter.Extended())
Output:

type Task

type Task interface {
	GetName() string
	TaskType() string
	AddProperty(key, value string)
}

type TaskConfig

type TaskConfig struct {
	Config []common.Property `xml:"Config>Property"`
}

TaskConfig 作为embedding filed使用时,使用者自动实现Task接口的AddProperty方法

func (*TaskConfig) AddProperty

func (t *TaskConfig) AddProperty(key, value string)

type TaskInInstance

type TaskInInstance struct {
	Type      string `xml:"Type,attr"`
	Name      string
	StartTime common.GMTTime
	EndTime   common.GMTTime `xml:"EndTime"`
	Status    TaskStatus
}

TaskInInstance 通过Instance创建的Task

type TaskName

type TaskName string

TaskName 作为embedding filed使用时,使用者自动实现Task接口的GetName方法

func (TaskName) GetName

func (n TaskName) GetName() string

type TaskProgressStage

type TaskProgressStage struct {
	ID                 string `xml:"ID,attr"`
	Status             string
	BackupWorkers      string
	TerminatedWorkers  string
	RunningWorkers     string
	TotalWorkers       string
	InputRecords       int
	OutRecords         int
	FinishedPercentage int
}

type TaskResult

type TaskResult struct {
	Type   string `xml:"Type,attr"`
	Name   string
	Status TaskStatus
	Result struct {
		TransForm string `xml:"Transform,attr"`
		Format    string `xml:"Format,attr"` // 这个字段没有用到
		Content   string `xml:",cdata"`
	} `xml:"Result"`
}

func (*TaskResult) Content added in v0.0.5

func (tr *TaskResult) Content() string

type TaskStatus

type TaskStatus int

func TaskStatusFromStr

func TaskStatusFromStr(s string) TaskStatus

func (*TaskStatus) MarshalXML

func (status *TaskStatus) MarshalXML(d *xml.Encoder, start xml.StartElement) error

func (TaskStatus) String

func (status TaskStatus) String() string

func (*TaskStatus) UnmarshalXML

func (status *TaskStatus) UnmarshalXML(d *xml.Decoder, start xml.StartElement) error

type TaskSummary

type TaskSummary struct {
	JsonSummary string
	Summary     string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL