Documentation ¶
Overview ¶
Example (AlterTable) ¶
ctx := context.TODO() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) return } defer db.Close(ctx) // cleanup resources err = db.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { return s.AlterTable(ctx, path.Join(db.Name(), "series"), options.WithAddColumn("series_id", types.Optional(types.TypeUint64)), options.WithAddColumn("title", types.Optional(types.TypeText)), options.WithSetTimeToLiveSettings( options.NewTTLSettings().ColumnDateType("expire_at").ExpireAfter(time.Hour), ), options.WithDropTimeToLive(), options.WithAddIndex("idx_series_series_id", options.WithIndexColumns("series_id"), options.WithDataColumns("title"), options.WithIndexType(options.GlobalIndex()), ), options.WithDropIndex("idx_series_title"), options.WithAlterAttribute("hello", "world"), options.WithAddAttribute("foo", "bar"), options.WithDropAttribute("baz"), ) }, table.WithIdempotent(), ) if err != nil { fmt.Printf("unexpected error: %v", err) }
Output:
Example (BulkUpsert) ¶
ctx := context.TODO() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) return } defer db.Close(ctx) // cleanup resources type logMessage struct { App string Host string Timestamp time.Time HTTPCode uint32 Message string } // prepare native go data const batchSize = 10000 logs := make([]logMessage, 0, batchSize) for i := 0; i < batchSize; i++ { logs = append(logs, logMessage{ App: fmt.Sprintf("App_%d", i/256), Host: fmt.Sprintf("192.168.0.%d", i%256), Timestamp: time.Now().Add(time.Millisecond * time.Duration(i%1000)), HTTPCode: 200, Message: "GET / HTTP/1.1", }) } // execute bulk upsert with native ydb data err = db.Table().Do( // Do retry operation on errors with best effort ctx, // context manage exiting from Do func(ctx context.Context, s table.Session) (err error) { // retry operation rows := make([]types.Value, 0, len(logs)) for _, msg := range logs { rows = append(rows, types.StructValue( types.StructFieldValue("App", types.TextValue(msg.App)), types.StructFieldValue("Host", types.TextValue(msg.Host)), types.StructFieldValue("Timestamp", types.TimestampValueFromTime(msg.Timestamp)), types.StructFieldValue("HTTPCode", types.Uint32Value(msg.HTTPCode)), types.StructFieldValue("Message", types.TextValue(msg.Message)), )) } return s.BulkUpsert(ctx, "/local/bulk_upsert_example", types.ListValue(rows...)) }, table.WithIdempotent(), ) if err != nil { fmt.Printf("unexpected error: %v", err) }
Output:
Example (BulkUpsertWithCompression) ¶
ctx := context.TODO() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) return } defer db.Close(ctx) // cleanup resources type logMessage struct { App string Host string Timestamp time.Time HTTPCode uint32 Message string } // prepare native go data const batchSize = 10000 logs := make([]logMessage, 0, batchSize) for i := 0; i < batchSize; i++ { logs = append(logs, logMessage{ App: fmt.Sprintf("App_%d", i/256), Host: fmt.Sprintf("192.168.0.%d", i%256), Timestamp: time.Now().Add(time.Millisecond * time.Duration(i%1000)), HTTPCode: 200, Message: "GET /images/logo.png HTTP/1.1", }) } // execute bulk upsert with native ydb data err = db.Table().Do( // Do retry operation on errors with best effort ctx, // context manage exiting from Do func(ctx context.Context, s table.Session) (err error) { // retry operation rows := make([]types.Value, 0, len(logs)) for _, msg := range logs { rows = append(rows, types.StructValue( types.StructFieldValue("App", types.TextValue(msg.App)), types.StructFieldValue("Host", types.TextValue(msg.Host)), types.StructFieldValue("Timestamp", types.TimestampValueFromTime(msg.Timestamp)), types.StructFieldValue("HTTPCode", types.Uint32Value(msg.HTTPCode)), types.StructFieldValue("Message", types.TextValue(msg.Message)), )) } return s.BulkUpsert(ctx, "/local/bulk_upsert_example", types.ListValue(rows...), options.WithCallOptions(grpc.UseCompressor(gzip.Name)), ) }, table.WithIdempotent(), ) if err != nil { fmt.Printf("unexpected error: %v", err) }
Output:
Example (CopyTables) ¶
ctx := context.TODO() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) return } defer db.Close(ctx) // cleanup resources err = db.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { return s.CopyTables(ctx, options.CopyTablesItem( path.Join(db.Name(), "from", "series"), path.Join(db.Name(), "to", "series"), true, ), ) }, table.WithIdempotent(), ) if err != nil { fmt.Printf("unexpected error: %v", err) }
Output:
Example (CreateTable) ¶
ctx := context.TODO() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) return } defer db.Close(ctx) // cleanup resources err = db.Table().Do(ctx, func(ctx context.Context, s table.Session) (err error) { return s.CreateTable(ctx, path.Join(db.Name(), "series"), options.WithColumn("series_id", types.Optional(types.TypeUint64)), options.WithColumn("title", types.Optional(types.TypeText)), options.WithColumn("series_info", types.Optional(types.TypeText)), options.WithColumn("release_date", types.Optional(types.TypeDate)), options.WithColumn("expire_at", types.Optional(types.TypeDate)), options.WithColumn("comment", types.Optional(types.TypeText)), options.WithPrimaryKeyColumn("series_id"), options.WithTimeToLiveSettings( options.NewTTLSettings().ColumnDateType("expire_at").ExpireAfter(time.Hour), ), options.WithIndex("idx_series_title", options.WithIndexColumns("title"), options.WithIndexType(options.GlobalAsyncIndex()), ), ) }, table.WithIdempotent(), ) if err != nil { fmt.Printf("unexpected error: %v", err) }
Output:
Example (DataQueryWithCompression) ¶
ctx := context.TODO() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) return } defer db.Close(ctx) // cleanup resources var ( query = `SELECT 42 as id, "my string" as myStr` id int32 // required value myStr string // optional value ) err = db.Table().Do( // Do retry operation on errors with best effort ctx, // context manage exiting from Do func(ctx context.Context, s table.Session) (err error) { // retry operation _, res, err := s.Execute(ctx, table.DefaultTxControl(), query, nil, options.WithCallOptions( grpc.UseCompressor(gzip.Name), ), ) if err != nil { return err // for auto-retry with driver } defer res.Close() // cleanup resources if err = res.NextResultSetErr(ctx); err != nil { // check single result set and switch to it return err // for auto-retry with driver } for res.NextRow() { // iterate over rows err = res.ScanNamed( named.Required("id", &id), named.OptionalWithDefault("myStr", &myStr), ) if err != nil { return err // generally scan error not retryable, return it for driver check error } fmt.Printf("id=%v, myStr='%s'\n", id, myStr) } return res.Err() // return finally result error for auto-retry with driver }, table.WithIdempotent(), ) if err != nil { fmt.Printf("unexpected error: %v", err) }
Output:
Example (LazyTransaction) ¶
ctx := context.TODO() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) return } defer db.Close(ctx) err = db.Table().Do(ctx, func(ctx context.Context, session table.Session) (err error) { // execute query with opening lazy transaction tx, result, err := session.Execute(ctx, table.SerializableReadWriteTxControl(), "DECLARE $id AS Uint64; "+ "SELECT `title`, `description` FROM `path/to/mytable` WHERE id = $id", table.NewQueryParameters( table.ValueParam("$id", types.Uint64Value(1)), ), ) if err != nil { return err } defer func() { _ = tx.Rollback(ctx) _ = result.Close() }() if !result.NextResultSet(ctx) { return retry.RetryableError(fmt.Errorf("no result sets")) } if !result.NextRow() { return retry.RetryableError(fmt.Errorf("no rows")) } var ( id uint64 title string description string ) if err = result.ScanNamed( named.OptionalWithDefault("id", &id), named.OptionalWithDefault("title", &title), named.OptionalWithDefault("description", &description), ); err != nil { return err } fmt.Println(id, title, description) // execute query with commit transaction _, err = tx.Execute(ctx, "DECLARE $id AS Uint64; "+ "DECLARE $description AS Text; "+ "UPSERT INTO `path/to/mytable` "+ "(id, description) "+ "VALUES ($id, $description);", table.NewQueryParameters( table.ValueParam("$id", types.Uint64Value(1)), table.ValueParam("$description", types.TextValue("changed description")), ), options.WithCommit(), ) if err != nil { return err } return result.Err() }, table.WithIdempotent(), ) if err != nil { fmt.Printf("unexpected error: %v", err) }
Output:
Example (ScanQueryWithCompression) ¶
ctx := context.TODO() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) return } defer db.Close(ctx) // cleanup resources var ( query = `SELECT 42 as id, "my string" as myStr` id int32 // required value myStr string // optional value ) err = db.Table().Do( // Do retry operation on errors with best effort ctx, // context manage exiting from Do func(ctx context.Context, s table.Session) (err error) { // retry operation res, err := s.StreamExecuteScanQuery(ctx, query, nil, options.WithCallOptions( grpc.UseCompressor(gzip.Name), ), ) if err != nil { return err // for auto-retry with driver } defer res.Close() // cleanup resources if err = res.NextResultSetErr(ctx); err != nil { // check single result set and switch to it return err // for auto-retry with driver } for res.NextRow() { // iterate over rows err = res.ScanNamed( named.Required("id", &id), named.OptionalWithDefault("myStr", &myStr), ) if err != nil { return err // generally scan error not retryable, return it for driver check error } fmt.Printf("id=%v, myStr='%s'\n", id, myStr) } return res.Err() // return finally result error for auto-retry with driver }, table.WithIdempotent(), ) if err != nil { fmt.Printf("unexpected error: %v", err) }
Output:
Example (Select) ¶
ctx := context.TODO() db, err := ydb.Open(ctx, "grpc://localhost:2136/local") if err != nil { fmt.Printf("failed connect: %v", err) return } defer db.Close(ctx) // cleanup resources var ( query = `SELECT 42 as id, "my string" as myStr` id int32 // required value myStr string // optional value ) err = db.Table().Do( // Do retry operation on errors with best effort ctx, // context manage exiting from Do func(ctx context.Context, s table.Session) (err error) { // retry operation _, res, err := s.Execute(ctx, table.DefaultTxControl(), query, nil) if err != nil { return err // for auto-retry with driver } defer res.Close() // cleanup resources if err = res.NextResultSetErr(ctx); err != nil { // check single result set and switch to it return err // for auto-retry with driver } for res.NextRow() { // iterate over rows err = res.ScanNamed( named.Required("id", &id), named.OptionalWithDefault("myStr", &myStr), ) if err != nil { return err // generally scan error not retryable, return it for driver check error } fmt.Printf("id=%v, myStr='%s'\n", id, myStr) } return res.Err() // return finally result error for auto-retry with driver }, table.WithIdempotent(), ) if err != nil { fmt.Printf("unexpected error: %v", err) }
Output:
Index ¶
- Constants
- func WithIdempotent() idempotentOption
- func WithLabel(label string) labelOption
- func WithRetryOptions(retryOptions []retry.Option) retryOptionsOption
- func WithTrace(t trace.Table) traceOption
- func WithTxCommitOptions(opts ...options.CommitTransactionOption) txCommitOptionsOption
- func WithTxSettings(txSettings *TransactionSettings) txSettingsOption
- type Client
- type ClosableSession
- type DataQuery
- type DataQueryExplanation
- type Explanation
- type Operation
- type Option
- type Options
- type ParameterOption
- type QueryParameters
- type ScriptingYQLExplanation
- type Session
- type SessionInfo
- type SessionStatus
- type Statement
- type Transaction
- type TransactionActor
- type TransactionControl
- func DefaultTxControl() *TransactionControl
- func OnlineReadOnlyTxControl(opts ...TxOnlineReadOnlyOption) *TransactionControl
- func SerializableReadWriteTxControl(opts ...TxControlOption) *TransactionControl
- func SnapshotReadOnlyTxControl() *TransactionControl
- func StaleReadOnlyTxControl() *TransactionControl
- func TxControl(opts ...TxControlOption) *TransactionControl
- type TransactionIdentifier
- type TransactionSettings
- type TxControlOption
- type TxOnlineReadOnlyOption
- type TxOperation
- type TxOption
Examples ¶
Constants ¶
const ( SessionStatusUnknown = SessionStatus("unknown") SessionReady = SessionStatus("ready") SessionBusy = SessionStatus("busy") SessionClosing = SessionStatus("closing") SessionClosed = SessionStatus("closed") )
Variables ¶
This section is empty.
Functions ¶
func WithIdempotent ¶ added in v3.2.1
func WithIdempotent() idempotentOption
func WithRetryOptions ¶ added in v3.53.0
func WithTxCommitOptions ¶ added in v3.5.0
func WithTxCommitOptions(opts ...options.CommitTransactionOption) txCommitOptionsOption
func WithTxSettings ¶ added in v3.5.0
func WithTxSettings(txSettings *TransactionSettings) txSettingsOption
Types ¶
type Client ¶
type Client interface { // CreateSession returns session or error for manually control of session lifecycle // // CreateSession implements internal busy loop until one of the following conditions is met: // - context was canceled or deadlined // - session was created // // Deprecated: don't use CreateSession explicitly. This method only for ORM's compatibility. // Use Do for queries with session CreateSession(ctx context.Context, opts ...Option) (s ClosableSession, err error) // Do provide the best effort for execute operation. // // Do implements internal busy loop until one of the following conditions is met: // - deadline was canceled or deadlined // - retry operation returned nil as error // // Warning: if context without deadline or cancellation func than Do can run indefinitely. Do(ctx context.Context, op Operation, opts ...Option) error // DoTx provide the best effort for execute transaction. // // DoTx implements internal busy loop until one of the following conditions is met: // - deadline was canceled or deadlined // - retry operation returned nil as error // // DoTx makes auto begin (with TxSettings, by default - SerializableReadWrite), commit and // rollback (on error) of transaction. // // If op TxOperation returns nil - transaction will be committed // If op TxOperation return non nil - transaction will be rollback // Warning: if context without deadline or cancellation func than DoTx can run indefinitely DoTx(ctx context.Context, op TxOperation, opts ...Option) error }
type ClosableSession ¶ added in v3.5.0
type DataQueryExplanation ¶
type DataQueryExplanation struct { Explanation AST string }
DataQueryExplanation is a result of ExplainDataQuery call.
type Explanation ¶ added in v3.7.0
type Explanation struct {
Plan string
}
Explanation is a result of Explain calls.
type Operation ¶ added in v3.2.1
Operation is the interface that holds an operation for retry. if Operation returns not nil - operation will retry if Operation returns nil - retry loop will break
type Options ¶ added in v3.2.1
type Options struct { Label string Idempotent bool TxSettings *TransactionSettings TxCommitOptions []options.CommitTransactionOption RetryOptions []retry.Option Trace *trace.Table }
type ParameterOption ¶
QueryParameters
func ValueParam ¶
func ValueParam(name string, v value.Value) ParameterOption
type QueryParameters ¶
type QueryParameters struct {
// contains filtered or unexported fields
}
QueryParameters
func NewQueryParameters ¶
func NewQueryParameters(opts ...ParameterOption) *QueryParameters
func (*QueryParameters) Add ¶
func (q *QueryParameters) Add(params ...ParameterOption)
func (*QueryParameters) Count ¶ added in v3.44.0
func (q *QueryParameters) Count() int
func (*QueryParameters) Params ¶
func (q *QueryParameters) Params() queryParams
func (*QueryParameters) String ¶
func (q *QueryParameters) String() string
type ScriptingYQLExplanation ¶ added in v3.7.0
type ScriptingYQLExplanation struct { Explanation ParameterTypes map[string]types.Type }
ScriptingYQLExplanation is a result of Explain calls.
type Session ¶
type Session interface { SessionInfo CreateTable( ctx context.Context, path string, opts ...options.CreateTableOption, ) (err error) DescribeTable( ctx context.Context, path string, opts ...options.DescribeTableOption, ) (desc options.Description, err error) DropTable( ctx context.Context, path string, opts ...options.DropTableOption, ) (err error) AlterTable( ctx context.Context, path string, opts ...options.AlterTableOption, ) (err error) CopyTable( ctx context.Context, dst, src string, opts ...options.CopyTableOption, ) (err error) CopyTables( ctx context.Context, opts ...options.CopyTablesOption, ) (err error) Explain( ctx context.Context, query string, ) (exp DataQueryExplanation, err error) // Prepare prepares query for executing in the future Prepare( ctx context.Context, query string, ) (stmt Statement, err error) // Execute executes query. // // By default, Execute have a flag options.WithKeepInCache(true) if params is not empty. For redefine behavior - // append option options.WithKeepInCache(false) Execute( ctx context.Context, tx *TransactionControl, query string, params *QueryParameters, opts ...options.ExecuteDataQueryOption, ) (txr Transaction, r result.Result, err error) ExecuteSchemeQuery( ctx context.Context, query string, opts ...options.ExecuteSchemeQueryOption, ) (err error) DescribeTableOptions( ctx context.Context, ) (desc options.TableOptionsDescription, err error) StreamReadTable( ctx context.Context, path string, opts ...options.ReadTableOption, ) (r result.StreamResult, err error) StreamExecuteScanQuery( ctx context.Context, query string, params *QueryParameters, opts ...options.ExecuteScanQueryOption, ) (_ result.StreamResult, err error) BulkUpsert( ctx context.Context, table string, rows value.Value, opts ...options.BulkUpsertOption, ) (err error) ReadRows( ctx context.Context, path string, keys value.Value, opts ...options.ReadRowsOption, ) (_ result.Result, err error) BeginTransaction( ctx context.Context, tx *TransactionSettings, ) (x Transaction, err error) KeepAlive( ctx context.Context, ) error }
type SessionInfo ¶
type SessionInfo interface { ID() string NodeID() uint32 Status() SessionStatus LastUsage() time.Time }
type SessionStatus ¶ added in v3.37.7
type SessionStatus = string
type Statement ¶
type Statement interface { Execute( ctx context.Context, tx *TransactionControl, params *QueryParameters, opts ...options.ExecuteDataQueryOption, ) (txr Transaction, r result.Result, err error) NumInput() int Text() string }
type Transaction ¶
type TransactionActor ¶ added in v3.5.2
type TransactionActor interface { TransactionIdentifier Execute( ctx context.Context, query string, params *QueryParameters, opts ...options.ExecuteDataQueryOption, ) (result.Result, error) ExecuteStatement( ctx context.Context, stmt Statement, params *QueryParameters, opts ...options.ExecuteDataQueryOption, ) (result.Result, error) }
type TransactionControl ¶
type TransactionControl struct {
// contains filtered or unexported fields
}
func DefaultTxControl ¶ added in v3.20.0
func DefaultTxControl() *TransactionControl
DefaultTxControl returns default transaction control with serializable read-write isolation mode and auto-commit
func OnlineReadOnlyTxControl ¶ added in v3.36.0
func OnlineReadOnlyTxControl(opts ...TxOnlineReadOnlyOption) *TransactionControl
OnlineReadOnlyTxControl returns online read-only transaction control
func SerializableReadWriteTxControl ¶ added in v3.37.0
func SerializableReadWriteTxControl(opts ...TxControlOption) *TransactionControl
SerializableReadWriteTxControl returns transaction control with serializable read-write isolation mode
func SnapshotReadOnlyTxControl ¶ added in v3.54.0
func SnapshotReadOnlyTxControl() *TransactionControl
SnapshotReadOnlyTxControl returns snapshot read-only transaction control
func StaleReadOnlyTxControl ¶ added in v3.36.0
func StaleReadOnlyTxControl() *TransactionControl
StaleReadOnlyTxControl returns stale read-only transaction control
func TxControl ¶
func TxControl(opts ...TxControlOption) *TransactionControl
TxControl makes transaction control from given options
func (*TransactionControl) Desc ¶
func (t *TransactionControl) Desc() *Ydb_Table.TransactionControl
type TransactionIdentifier ¶ added in v3.5.2
type TransactionIdentifier interface {
ID() string
}
type TransactionSettings ¶
type TransactionSettings struct {
// contains filtered or unexported fields
}
func TxSettings ¶
func TxSettings(opts ...TxOption) *TransactionSettings
TxSettings returns transaction settings
func (*TransactionSettings) Settings ¶
func (t *TransactionSettings) Settings() *Ydb_Table.TransactionSettings
type TxControlOption ¶
type TxControlOption func(*txControlDesc)
func BeginTx ¶
func BeginTx(opts ...TxOption) TxControlOption
BeginTx returns begin transaction control option
func WithTx ¶
func WithTx(t TransactionIdentifier) TxControlOption
func WithTxID ¶ added in v3.41.0
func WithTxID(txID string) TxControlOption
type TxOnlineReadOnlyOption ¶
type TxOnlineReadOnlyOption func(*txOnlineReadOnly)
func WithInconsistentReads ¶
func WithInconsistentReads() TxOnlineReadOnlyOption
type TxOperation ¶ added in v3.5.0
type TxOperation func(ctx context.Context, tx TransactionActor) error
TxOperation is the interface that holds an operation for retry. if TxOperation returns not nil - operation will retry if TxOperation returns nil - retry loop will break
type TxOption ¶
type TxOption func(*txDesc)
Transaction control options
func WithOnlineReadOnly ¶
func WithOnlineReadOnly(opts ...TxOnlineReadOnlyOption) TxOption
func WithSerializableReadWrite ¶
func WithSerializableReadWrite() TxOption
func WithSnapshotReadOnly ¶ added in v3.38.0
func WithSnapshotReadOnly() TxOption
func WithStaleReadOnly ¶
func WithStaleReadOnly() TxOption