Documentation
¶
Overview ¶
Package delta contains the resources required to interact with a Delta table.
Package delta contains the resources required to interact with a Delta table.
Package delta contains the resources required to interact with a Delta table.
Package delta contains the resources required to interact with a Delta table.
Package delta contains the resources required to interact with a Delta table.
Package delta contains the resources required to interact with a Delta table.
Package delta contains the resources required to interact with a Delta table.
Package delta contains the resources required to interact with a Delta table.
Index ¶
- Variables
- func BaseCommitURI() storage.Path
- func CommitOrCheckpointVersionFromURI(path storage.Path) (bool, int64)
- func CommitURIFromVersion(version int64) storage.Path
- func CommitVersionFromURI(path storage.Path) (bool, int64)
- func CreateCheckpoint(store storage.ObjectStore, checkpointLock lock.Locker, ...) (checkpointed bool, err error)
- func DoesCheckpointVersionExist(store storage.ObjectStore, version int64, validateAllPartsExist bool) (bool, error)
- func IsValidCommitOrCheckpointURI(path storage.Path) bool
- func IsValidCommitURI(path storage.Path) bool
- func LogEntryFromActions(actions []Action) ([]byte, error)
- func UpdateStats[T constraints.Ordered](s *Stats, k string, vpt *T)
- type Action
- type ActionKey
- type Add
- type CDC
- type CheckPoint
- type CheckpointConfiguration
- type CheckpointEntry
- type CommitInfo
- type ConfigKey
- type Create
- type Format
- type GUID
- type MetaData
- type OnDiskTableState
- type Operation
- type OptimizeCheckpointConfiguration
- type OutputMode
- type PreparedCommit
- type Protocol
- type Remove
- type SaveMode
- type Schema
- type SchemaDataType
- type SchemaDataTypeName
- type SchemaField
- type SchemaTypeArray
- type SchemaTypeMap
- type SchemaTypeStruct
- type Stats
- type StatsDecimal
- type StatsFloat32
- type StatsFloat64
- type StreamingUpdate
- type Table
- func NewTable(store storage.ObjectStore, lock lock.Locker, stateStore state.Store) *Table
- func NewTableWithLogStore(store storage.ObjectStore, lock lock.Locker, logStore logstore.LogStore) *Table
- func OpenTable(store storage.ObjectStore, lock lock.Locker, stateStore state.Store) (*Table, error)
- func OpenTableWithConfiguration(store storage.ObjectStore, lock lock.Locker, stateStore state.Store, ...) (*Table, error)
- func OpenTableWithVersion(store storage.ObjectStore, lock lock.Locker, stateStore state.Store, ...) (*Table, error)
- func OpenTableWithVersionAndConfiguration(store storage.ObjectStore, lock lock.Locker, stateStore state.Store, ...) (*Table, error)
- func (t *Table) Create(metadata TableMetaData, protocol Protocol, commitInfo CommitInfo, ...) error
- func (t *Table) CreateCheckpoint(checkpointLock lock.Locker, checkpointConfiguration *CheckpointConfiguration, ...) (bool, error)
- func (t *Table) CreateTransaction(opts TransactionOptions) *Transaction
- func (t *Table) Exists() (bool, error)
- func (t *Table) GetCheckpointDataPaths(checkpoint *CheckPoint) []storage.Path
- func (t *Table) LatestVersion() (int64, error)
- func (t *Table) Load(config *OptimizeCheckpointConfiguration) error
- func (t *Table) LoadVersion(version *int64) error
- func (t *Table) LoadVersionWithConfiguration(version *int64, config *OptimizeCheckpointConfiguration) error
- func (t *Table) ReadCommitVersion(version int64) ([]Action, error)
- type TableMetaData
- type TableState
- type Transaction
- func (t *Transaction) AddAction(action Action)
- func (t *Transaction) AddActions(actions []Action)
- func (t *Transaction) Commit() (int64, error)
- func (t *Transaction) CommitLogStore() (int64, error)
- func (t *Transaction) ReadActions(path storage.Path) ([]Action, error)
- func (t *Transaction) SetAppMetadata(appMetadata map[string]any)
- func (t *Transaction) SetOperation(operation Operation)
- type TransactionOptions
- type Txn
- type Write
Constants ¶
This section is empty.
Variables ¶
var ( // ErrActionJSONFormat is returned when there is an error reading actions from a commit log ErrActionJSONFormat error = errors.New("invalid format for action JSON") // ErrActionUnknown is returned when there is an unknown action in a commit log ErrActionUnknown error = errors.New("unknown action") // ErrAddZeroSize is returned when an add action has zero size ErrAddZeroSize error = errors.New("add size must not be zero to prevent optimize failures") // ErrAddGenerateStats is returned when an add action cannot generate stats ErrAddGenerateStats error = errors.New("unable to generate stats for add action") )
var ( // ErrCheckpointAlreadyExists is returned when trying to create a checkpoint but it already exists ErrCheckpointAlreadyExists error = errors.New("checkpoint already exists") // ErrCheckpointRowCountMismatch is returned when the checkpoint is generated with a different row count // than expected from the table state. This indicates an internal error. ErrCheckpointRowCountMismatch error = errors.New("checkpoint generated with unexpected row count") // ErrCheckpointIncomplete is returned when trying to read a multi-part checkpoint but not all parts exist ErrCheckpointIncomplete error = errors.New("checkpoint is missing parts") // ErrCheckpointInvalidMultipartFileName is returned when a multi-part checkpoint file has the wrong number of parts in the filename ErrCheckpointInvalidMultipartFileName error = errors.New("checkpoint file name is invalid") // ErrCheckpointAddZeroSize is returned if there is an Add action with size 0 // because including this would cause subsequent Optimize operations to fail. ErrCheckpointAddZeroSize error = errors.New("zero size in add not allowed") // ErrCheckpointEntryMultipleActions is returned if a checkpoint entry has more than one non-null action ErrCheckpointEntryMultipleActions error = errors.New("checkpoint entry contains multiple actions") // ErrCheckpointOptimizationWorkingFolder is returned if there is a problem with the optimization working folder ErrCheckpointOptimizationWorkingFolder error = errors.New("error using checkpoint optimization working folder") )
var ( // ErrExceededCommitRetryAttempts is returned when the maximum number of commit retry attempts has been exceeded. ErrExceededCommitRetryAttempts error = errors.New("exceeded commit retry attempts") // ErrNotATable is returned when a Delta table is not valid. ErrNotATable error = errors.New("not a table") // ErrInvalidVersion is returned when a version is invalid. ErrInvalidVersion error = errors.New("invalid version") // ErrUnableToLoadVersion is returned when a version cannot be loaded. ErrUnableToLoadVersion error = errors.New("unable to load specified version") // ErrLockFailed is returned a lock fails unexpectedly. ErrLockFailed error = errors.New("lock failed unexpectedly without an error") // ErrNotImplemented is returned when a feature has not been implemented. ErrNotImplemented error = errors.New("not implemented") // ErrUnsupportedReaderVersion is returned when a reader version is unsupported. ErrUnsupportedReaderVersion error = errors.New("reader version is unsupported") // ErrUnsupportedWriterVersion is returned when a writer version is unsupported. ErrUnsupportedWriterVersion error = errors.New("writer version is unsupported") // ErrFailedToCopyTempFile is returned when a temp file fails to be copied into a commit URI. ErrFailedToCopyTempFile error = errors.New("failed to copy temp file") // ErrFailedToAcknowledgeCommit is returned when a commit fails to be acknowledged. ErrFailedToAcknowledgeCommit error = errors.New("failed to acknowledge commit") )
var ( // ErrMissingMetadata is returned if trying to create a checkpoint with no metadata ErrMissingMetadata error = errors.New("missing metadata") // ErrConvertingCheckpointAdd is returned if there is an error converting an Add action to checkpoint format ErrConvertingCheckpointAdd error = errors.New("unable to generate checkpoint add") // ErrCDCNotSupported is returned if a CDC action is seen when generating a checkpoint ErrCDCNotSupported error = errors.New("cdc is not supported") // ErrReadingCheckpoint is returned if there is an error reading a checkpoint ErrReadingCheckpoint error = errors.New("unable to read checkpoint") // ErrVersionOutOfOrder is returned if the versions are out of order when loading the table state // This would indicate an internal logic error ErrVersionOutOfOrder error = errors.New("versions out of order during update") )
var ( //ErrConfigValidation is returned when a Delta configuration cannot be validated. ErrConfigValidation = errors.New("error validating delta configuration") )
var ( // ErrParseSchema is returned when parsing the schema from JSON fails ErrParseSchema error = errors.New("unable to parse schema") )
Functions ¶
func BaseCommitURI ¶
BaseCommitURI returns the base path of a commit URI.
func CommitOrCheckpointVersionFromURI ¶
CommitOrCheckpointVersionFromURI returns true plus the version if the URI is a valid commit or checkpoint filename.
func CommitURIFromVersion ¶
CommitURIFromVersion returns the URI of commit version.
func CommitVersionFromURI ¶
CommitVersionFromURI returns true plus the version if the URI is a valid commit filename.
func CreateCheckpoint ¶
func CreateCheckpoint(store storage.ObjectStore, checkpointLock lock.Locker, checkpointConfiguration *CheckpointConfiguration, version int64) (checkpointed bool, err error)
CreateCheckpoint creates a checkpoint for a table located at the store for the given version If expired log cleanup is enabled on this table, then after a successful checkpoint, run the cleanup to delete expired logs Returns whether the checkpoint was created and any error If the lock cannot be obtained, does not retry - if other processes are checkpointing there's no need to duplicate the effort
func DoesCheckpointVersionExist ¶
func DoesCheckpointVersionExist(store storage.ObjectStore, version int64, validateAllPartsExist bool) (bool, error)
DoesCheckpointVersionExist returns true if the given checkpoint version exists, either as a single- or multi-part checkpoint
func IsValidCommitOrCheckpointURI ¶
IsValidCommitOrCheckpointURI returns true if a URI is a valid commit or checkpoint file name. Otherwise, it returns false.
func IsValidCommitURI ¶
IsValidCommitURI returns true if a URI is a valid commit filename (not a checkpoint file, and not a temp commit).
func LogEntryFromActions ¶
LogEntryFromActions retrieves a log entry from a list of actions.
func UpdateStats ¶
func UpdateStats[T constraints.Ordered](s *Stats, k string, vpt *T)
UpdateStats computes Stats.NullCount, Stats.MinValues, Stats.MaxValues for a given k,v struct property the struct property is passed in as a pointer to ensure that it can be evaluated as nil[NULL] TODO Handle struct types
Types ¶
type Action ¶
type Action interface { }
Action represents a Delta log action that describes a parquet data file part of the table.
func ActionsFromLogEntries ¶
ActionsFromLogEntries retrieves all the actions from a log.
type ActionKey ¶
type ActionKey string
ActionKey represents a Delta action.
const ( // AddActionKey represents an Add action. AddActionKey ActionKey = "add" // RemoveActionKey represents a Remove action. RemoveActionKey ActionKey = "remove" // CommitInfoActionKey represents a CommitInfo action. CommitInfoActionKey ActionKey = "commitInfo" // ProtocolActionKey represents a Protocol action. ProtocolActionKey ActionKey = "protocol" // MetaDataActionKey represents a metaData action. MetaDataActionKey ActionKey = "metaData" // FormatActionKey represents a Format action. FormatActionKey ActionKey = "format" // TransactionActionKey represents a Txn action. TransactionActionKey ActionKey = "txn" // CDCActionKey represents a CDC action. CDCActionKey ActionKey = "cdc" )
type Add ¶
type Add struct { // A relative path, from the root of the table, to a file that should be added to the table Path string `json:"path" parquet:"name=path, repetition=OPTIONAL, converted=UTF8"` // A map from partition column to value for this file // This field is required even without a partition. PartitionValues map[string]string `json:"partitionValues" parquet:"name=partitionValues, repetition=OPTIONAL, keyconverted=UTF8, valueconverted=UTF8"` // The size of this file in bytes Size int64 `json:"size" parquet:"name=size, repetition=OPTIONAL"` // The time this file was created, as milliseconds since the epoch ModificationTime int64 `json:"modificationTime" parquet:"name=modificationTime, repetition=OPTIONAL"` // When false the file must already be present in the table or the records in the added file // must be contained in one or more remove actions in the same version // // streaming queries that are tailing the transaction log can use this flag to skip actions // that would not affect the final results. DataChange bool `json:"dataChange" parquet:"name=dataChange, repetition=OPTIONAL"` // Map containing metadata about this file Tags map[string]string `json:"tags,omitempty" parquet:"name=tags, repetition=OPTIONAL, keyconverted=UTF8, valueconverted=UTF8"` // Contains statistics (e.g., count, min/max values for columns) about the data in this file Stats string `json:"stats" parquet:"name=stats, repetition=OPTIONAL, converted=UTF8"` }
An Add action is typed to allow the stats_parsed and partitionValues_parsed fields to be written to checkpoints with the correct schema without using reflection. The Add variant is for a non-partitioned table; the PartitionValuesParsed field will be omitted.
func NewAdd ¶
func NewAdd(store storage.ObjectStore, location storage.Path, partitionValues map[string]string) (*Add, []string, error)
NewAdd returns a new Add action, using the given location and partition values The modification time will be set to now The size and stats will be retrieved from the parquet file at the given location It also returns a list of columns that did not have stats set in the parquet file
type CDC ¶
type CDC struct { /// A relative path, from the root of the table, or an /// absolute path to a CDC file Path string `json:"path" parquet:"name=path, repetition=OPTIONAL, converted=UTF8"` /// The size of this file in bytes Size int64 `json:"size" parquet:"name=size, repetition=OPTIONAL"` /// A map from partition column to value for this file PartitionValues map[string]string `json:"partitionValues"` /// Should always be set to false because they do not change the underlying data of the table DataChange bool `json:"dataChange" parquet:"name=dataChange, repetition=OPTIONAL"` /// Map containing metadata about this file Tags *map[string]string `json:"tags,omitempty"` }
CDC represents a CDC action.
type CheckPoint ¶
type CheckPoint struct { /// Delta table version Version int64 `json:"version"` // The number of actions in the checkpoint. -1 if not available. Size int64 `json:"size"` // The number of parts if the checkpoint has multiple parts. Omit if single part. Parts *int32 `json:"parts,omitempty"` // Size of the checkpoint in bytes SizeInBytes int64 `json:"sizeInBytes"` NumOfAddFiles int64 `json:"numOfAddFiles"` }
CheckPoint holds the metadata for a checkpoint file. This gets written out to _last_checkpoint.
type CheckpointConfiguration ¶
type CheckpointConfiguration struct { // Maximum numbers of rows to include in each multi-part checkpoint part // Current default 50k MaxRowsPerPart int // Allow checkpointing even if the table reader version or writer version is greater than supported // by this client. Defaults to false. // **WARNING** If you set this to true and the table being checkpointed uses features that are not supported by this // client, the resulting checkpoint might fail unpredictably and silently; this could cause data loss or corruption UnsafeIgnoreUnsupportedReaderWriterVersionErrors bool // Disable any cleanup after checkpointing, even if it was enabled in the table configuration. // Defaults to false. DisableCleanup bool // Configure use of on-disk intermediate storage to reduce memory requirements ReadWriteConfiguration OptimizeCheckpointConfiguration }
CheckpointConfiguration contains additional configuration for checkpointing
func NewCheckpointConfiguration ¶
func NewCheckpointConfiguration() *CheckpointConfiguration
NewCheckpointConfiguration returns the default configuration for creating checkpoints
type CheckpointEntry ¶
type CheckpointEntry struct { Txn *Txn `parquet:"name=txn"` Add *Add `parquet:"name=add"` Remove *Remove `parquet:"name=remove"` MetaData *MetaData `parquet:"name=metaData"` Protocol *Protocol `parquet:"name=protocol"` Cdc *CDC `parquet:"-"` // CDC not implemented yet }
CheckpointEntry contains a single entry in the checkpoint Parquet file All but one of the pointers should be nil
type ConfigKey ¶
type ConfigKey string
ConfigKey represents a Delta configuration.
const ( // AppendOnlyDeltaConfigKey represents the Delta configuration to specify whethere a table is append-only. AppendOnlyDeltaConfigKey ConfigKey = "delta.appendOnly" // CheckpointIntervalDeltaConfigKey represents the Delta configuration to specify a checkpoint interval. CheckpointIntervalDeltaConfigKey ConfigKey = "delta.checkpointInterval" // AutoOptimizeAutoCompactDeltaConfigKey represents the Delta configuration to specify whether auto compaction needs to be enabled. AutoOptimizeAutoCompactDeltaConfigKey ConfigKey = "delta.autoOptimize.autoCompact" // AutoOptimizeOptimizeWriteDeltaConfigKey represents the Delta configuration to specify whether optimized writing needs to be enabled. AutoOptimizeOptimizeWriteDeltaConfigKey ConfigKey = "delta.autoOptimize.optimizeWrite" // CheckpointWriteStatsAsJSONDeltaConfigKey represents the Delta configuration to specify whether stats need to be written as a JSON object in a checkpoint. CheckpointWriteStatsAsJSONDeltaConfigKey ConfigKey = "delta.checkpoint.writeStatsAsJson" // CheckpointWriteStatsAsStructDeltaConfigKey represents the Delta configuration to specify whether stats need to be written as a struct in a checkpoint. CheckpointWriteStatsAsStructDeltaConfigKey ConfigKey = "delta.checkpoint.writeStatsAsStruct" // ColumnMappingModeDeltaConfigKey represents the Delta configuration to specify whether column mapping needs to be enabled. ColumnMappingModeDeltaConfigKey ConfigKey = "delta.columnMapping.mode" // DataSkippingNumIndexedColsDeltaConfigKey represents the Delta configuration to specify the number of columns for which to collect stats. DataSkippingNumIndexedColsDeltaConfigKey ConfigKey = "delta.dataSkippingNumIndexedCols" // DeletedFileRetentionDurationDeltaConfigKey represents the Delta configuration to specify the retention duration of a deleted file. DeletedFileRetentionDurationDeltaConfigKey ConfigKey = "delta.deletedFileRetentionDuration" // EnableChangeDataFeedDeltaConfigKey represents the Delta configuration to specify whether change data feed needs to be enabled. EnableChangeDataFeedDeltaConfigKey ConfigKey = "delta.enableChangeDataFeed" // IsolationLevelDeltaConfigKey represents the Delta configuration to specify what isolation level to use. IsolationLevelDeltaConfigKey ConfigKey = "delta.isolationLevel" // LogRetentionDurationDeltaConfigKey represents the Delta configuration to specify the retention duration of commit logs. LogRetentionDurationDeltaConfigKey ConfigKey = "delta.logRetentionDuration" // EnableExpiredLogCleanupDeltaConfigKey represents the Delta configuration to specify whether expired commit logs need be cleaned up. EnableExpiredLogCleanupDeltaConfigKey ConfigKey = "delta.enableExpiredLogCleanup" // MinReaderVersionDeltaConfigKey represents the Delta configuration tp specify the minimum reader version. MinReaderVersionDeltaConfigKey ConfigKey = "delta.minReaderVersion" // MinWriterVersionDeltaConfigKey represents the Delta configuration to specify the minimum writer version. MinWriterVersionDeltaConfigKey ConfigKey = "delta.minWriterVersion" // RandomizeFilePrefixesDeltaConfigKey represents the Delta configuration to specify whether file prefixes should be randomized. RandomizeFilePrefixesDeltaConfigKey ConfigKey = "delta.randomizeFilePrefixes" // RandomPrefixLengthDeltaConfigKey represents the Delta configuration to specify the number of characters generated for random prefixes. RandomPrefixLengthDeltaConfigKey ConfigKey = "delta.randomPrefixLength" // SetTransactionRetentionDurationDeltaConfigKey represents the Delta configuration to specify the retention duration of a transaction. SetTransactionRetentionDurationDeltaConfigKey ConfigKey = "delta.setTransactionRetentionDuration" // TargetFileSizeDeltaConfigKey represents the Delta configuration to specify the target size of a file. TargetFileSizeDeltaConfigKey ConfigKey = "delta.targetFileSize" // TuneFileSizesForRewritesDeltaConfigKey represents the Delta configuration to specify whether file sizes need to be tuned for rewrites. TuneFileSizesForRewritesDeltaConfigKey ConfigKey = "delta.tuneFileSizesForRewrites" )
type Create ¶
type Create struct { /// The save mode used during the create. Mode SaveMode `json:"mode"` /// The storage location of the new table Location string `json:"location"` /// The min reader and writer protocol versions of the table Protocol Protocol /// Metadata associated with the new table MetaData TableMetaData }
Create represents a Delta `Create` operation. Would usually only create the table, if also data is written, a `Write` operations is more appropriate
func (Create) GetCommitInfo ¶
func (op Create) GetCommitInfo() CommitInfo
GetCommitInfo retrieves commit info.
type Format ¶
type Format struct { /// Name of the encoding for files in this table. // Default: "parquet" Provider string `json:"provider" parquet:"name=provider, repetition=OPTIONAL, converted=UTF8"` /// A map containing configuration options for the format. // Default: {} Options map[string]string `json:"options" parquet:"name=options, repetition=OPTIONAL, keyconverted=UTF8, valueconverted=UTF8"` }
Format describes the data format of files in the table. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#format-specification
func (*Format) Default ¶
Default provides the correct default format options as of Delta Lake 0.3.0 https://github.com/delta-io/delta/blob/master/PROTOCOL.md#format-specification As of Delta Lake 0.3.0, user-facing APIs only allow the creation of tables where format = 'parquet' and options = {}.
type GUID ¶
type GUID string
GUID is a type alias for a string expected to match a GUID/UUID format.
type MetaData ¶
type MetaData struct { /// Unique identifier for this table ID uuid.UUID `json:"id" parquet:"-"` /// Parquet library cannot import to UUID IDAsString string `json:"-" parquet:"name=id, repetition=OPTIONAL, converted=UTF8"` /// User-provided identifier for this table Name *string `json:"name" parquet:"name=name, repetition=OPTIONAL, converted=UTF8"` /// User-provided description for this table Description *string `json:"description" parquet:"name=description, repetition=OPTIONAL, converted=UTF8"` /// Specification of the encoding for the files stored in the table Format Format `json:"format" parquet:"name=format, repetition=OPTIONAL"` /// Schema of the table SchemaString string `json:"schemaString" parquet:"name=schemaString, repetition=OPTIONAL, converted=UTF8"` /// An array containing the names of columns by which the data should be partitioned PartitionColumns []string `json:"partitionColumns" parquet:"name=partitionColumns, repetition=OPTIONAL, valueconverted=UTF8"` /// A map containing configuration options for the table Configuration map[string]string `json:"configuration" parquet:"name=configuration, repetition=OPTIONAL, keyconverted=UTF8, valueconverted=UTF8"` /// The time when this metadata action is created, in milliseconds since the Unix epoch CreatedTime *int64 `json:"createdTime" parquet:"name=createdTime, repetition=OPTIONAL"` }
MetaData represents the action that describes the metadata of the table. This is a top-level action in Delta log entries.
type OnDiskTableState ¶
type OnDiskTableState struct {
// contains filtered or unexported fields
}
OnDiskTableState contains information about the table state that is stored on disk instead of in memory
type Operation ¶
type Operation interface {
GetCommitInfo() CommitInfo
}
Operation represents the operation performed when creating a new log entry with one or more actions.
type OptimizeCheckpointConfiguration ¶
type OptimizeCheckpointConfiguration struct { // Use an intermediate on-disk storage location to reduce memory OnDiskOptimization bool WorkingStore storage.ObjectStore WorkingFolder storage.Path // If these are > 1, checkpoint read and write operations will use this many goroutines ConcurrentCheckpointRead int ConcurrentCheckpointWrite int }
OptimizeCheckpointConfiguration holds settings for optimizing checkpoint read and write operations
func NewOptimizeCheckpointConfiguration ¶
func NewOptimizeCheckpointConfiguration(store storage.ObjectStore, version int64) (*OptimizeCheckpointConfiguration, error)
NewOptimizeCheckpointConfiguration returns a default enabled optimization configuration with a working folder in the table store's _delta_log/.tmp/ folder but no concurrency enabled
type OutputMode ¶
type OutputMode string
OutputMode represents the output mode used in streaming operations.
const ( // AppendOutputMode causes only new rows to be written when new data is available. AppendOutputMode OutputMode = "Append" // Complete causes the full output (all rows) to be written whenever new data is available. Complete OutputMode = "Complete" // Update causes only rows with updates to be written when new or changed data is available. Update OutputMode = "Update" )
type PreparedCommit ¶
PreparedCommit holds the URI of a temp commit.
type Protocol ¶
type Protocol struct { /// Minimum version of the Delta read protocol a client must implement to correctly read the /// table. MinReaderVersion int32 `json:"minReaderVersion" parquet:"name=minReaderVersion, repetition=OPTIONAL"` /// Minimum version of the Delta write protocol a client must implement to correctly read the /// table. MinWriterVersion int32 `json:"minWriterVersion" parquet:"name=minWriterVersion, repetition=OPTIONAL"` /// A collection of features that a client must implement in order to correctly read this table /// (exist only when minReaderVersion is set to 3) ReaderFeatures []string `json:"readerFeatures,omitempty" parquet:"name=readerFeatures, repetition=OPTIONAL, valueconverted=UTF8"` /// A collection of features that a client must implement in order to correctly write this table /// (exist only when minWriterVersion is set to 7) WriterFeatures []string `json:"writerFeatures,omitempty" parquet:"name=writerFeatures, repetition=OPTIONAL, valueconverted=UTF8"` }
Protocol represents the action used to increase the version of the Delta protocol required to read or write to the table.
type Remove ¶
type Remove struct { /// The path of the file that is removed from the table. Path string `json:"path" parquet:"name=path, repetition=OPTIONAL, converted=UTF8"` /// The timestamp when the remove was added to table state. DeletionTimestamp *int64 `json:"deletionTimestamp" parquet:"name=deletionTimestamp, repetition=OPTIONAL"` /// Whether data is changed by the remove. A table optimize will report this as false for /// example, since it adds and removes files by combining many files into one. DataChange bool `json:"dataChange" parquet:"name=dataChange, repetition=OPTIONAL"` /// When true the fields partitionValues, size, and tags are present /// /// NOTE: Although it's defined as required in scala Delta implementation, but some writes /// it's still nullable so we keep it as Option<> for compatibly. ExtendedFileMetadata bool `json:"extendedFileMetadata" parquet:"name=extendedFileMetadata, repetition=OPTIONAL"` /// A map from partition column to value for this file. PartitionValues *map[string]string `json:"partitionValues" parquet:"name=partitionValues, repetition=OPTIONAL, keyconverted=UTF8, valueconverted=UTF8"` /// Size of this file in bytes Size *int64 `json:"size" parquet:"name=size, repetition=OPTIONAL"` /// Map containing metadata about this file Tags *map[string]string `json:"tags" parquet:"-"` }
Remove represents a tombstone (deleted file) in the Delta log. This is a top-level action in Delta log entries.
type SaveMode ¶
type SaveMode string
SaveMode represents the save mode used when performing a Operation.
const ( // Append causes files to be appended to the target location. Append SaveMode = "Append" // Overwrite causes a target location to be overwritten. Overwrite SaveMode = "Overwrite" // ErrorIfExists causes an operation to fail if files exist for the target. ErrorIfExists SaveMode = "ErrorIfExists" // Ignore causes an operation to not proceed or change any data if files exist for the target. Ignore SaveMode = "Ignore" )
type SchemaDataType ¶
type SchemaDataType interface{}
SchemaDataType is one of: SchemaDataTypeName | SchemaTypeArray | SchemaTypeMap | SchemaTypeStruct We can't use a union constraint because the type is recursive
type SchemaDataTypeName ¶
type SchemaDataTypeName string
SchemaDataTypeName contains the string .
const ( // String is the schema data type representing a string. String SchemaDataTypeName = "string" // * string: utf8 // Long is the schema data type representing a long. Long SchemaDataTypeName = "long" // * long // undocumented, i64? // Integer is the schema data type representing an integer. Integer SchemaDataTypeName = "integer" // * integer: i32 // Short is the schema data type representing a short. Short SchemaDataTypeName = "short" // * short: i16 // Byte is the schema data type representing a byte. Byte SchemaDataTypeName = "byte" // * byte: i8 // Float is the schema data type representing a float. Float SchemaDataTypeName = "float" // * float: f32 // Double is the schema data type representing a double. Double SchemaDataTypeName = "double" // * double: f64 // Boolean is the schema data type representing a boolean. Boolean SchemaDataTypeName = "boolean" // * boolean: bool // Binary is the schema data type representing a binary. Binary SchemaDataTypeName = "binary" // * binary: a sequence of binary data // Date is the schema data type representing a date. Date SchemaDataTypeName = "date" // * date: A calendar date, represented as a year-month-day triple without a timezone // Timestamp is the schema data type representing a timestamp. Timestamp SchemaDataTypeName = "timestamp" // * timestamp: Microsecond precision timestamp without a timezone // Struct is the schema data type representing a struct. Struct SchemaDataTypeName = "struct" // * struct: // Array is the schema data type representing an array. Array SchemaDataTypeName = "array" // * array: // Map is the schema data type representing a map. Map SchemaDataTypeName = "map" // * map: // Variant represents the preview variant type Variant SchemaDataTypeName = "variant" // Unknown is the schema data type representing an unknown. Unknown SchemaDataTypeName = "unknown" )
type SchemaField ¶
type SchemaField struct { // Name of this (possibly nested) column Name string `json:"name"` Type SchemaDataType `json:"type"` // Boolean denoting whether this field can be null Nullable bool `json:"nullable"` // A JSON map containing information about this column. Keys prefixed with Delta are reserved // for the implementation. Metadata map[string]any `json:"metadata"` }
SchemaField describes a specific field of the Delta table schema.
func (*SchemaField) UnmarshalJSON ¶
func (s *SchemaField) UnmarshalJSON(b []byte) error
UnmarshalJSON unmarshals a JSON object into a schema field.
type SchemaTypeArray ¶
type SchemaTypeArray struct { Type SchemaDataTypeName `json:"type"` // Has to be "array" ElementType SchemaDataType `json:"elementType"` ContainsNull bool `json:"containsNull"` }
SchemaTypeArray represents an array field
type SchemaTypeMap ¶
type SchemaTypeMap struct { Type SchemaDataTypeName `json:"type"` // Has to be "map" KeyType SchemaDataType `json:"keyType"` ValueType SchemaDataType `json:"valueType"` ValueContainsNull bool `json:"valueContainsNull"` }
SchemaTypeMap represents a map field
type SchemaTypeStruct ¶
type SchemaTypeStruct struct { Type SchemaDataTypeName `json:"type"` // Has to be "struct" Fields []SchemaField `json:"fields"` }
SchemaTypeStruct represents a struct in the schema
func GetSchema ¶
func GetSchema(i any) SchemaTypeStruct
GetSchema recursively walks over the given struct interface i and extracts SchemaTypeStruct StructFields using reflect
This is not currently being used in production and results should be inspected before being used.
TODO: Handle error cases where types are not compatible with spark types. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#schema-serialization-format i.e. Value int is not currently readable with spark.read.format("delta").load("...")
func (*SchemaTypeStruct) JSON ¶
func (s *SchemaTypeStruct) JSON() []byte
JSON marshals a struct type field in a schema into a JSON object.
type Stats ¶
type Stats struct { NumRecords int64 `json:"numRecords" parquet:"name=numRecords, repetition=OPTIONAL"` TightBounds bool `json:"tightBounds" parquet:"name=tightBounds, repetition=OPTIONAL"` MinValues map[string]any `json:"minValues" parquet:"name=minValues, repetition=OPTIONAL, keyconverted=UTF8"` MaxValues map[string]any `json:"maxValues" parquet:"name=maxValues, repetition=OPTIONAL, keyconverted=UTF8"` NullCount map[string]int64 `json:"nullCount" parquet:"name=nullCount, repetition=OPTIONAL, keyconverted=UTF8, valuetype=INT64"` }
Stats contains statistics about a Parquet file in an Add action
func StatsFromJSON ¶
StatsFromJSON parses JSON into a Stats object
func StatsFromParquet ¶
StatsFromParquet retrieves stats directly from the Parquet file in the Add action It does not currently support nested types, or logical types that can't be generated in Spark (UUID, interval, JSON, BSON) It also will not return stats for timestamps stored in int96 columns because the Parquet file won't have those stats
type StatsDecimal ¶
type StatsDecimal string
StatsDecimal allows us to store decimal stats as a string and write to JSON without quotes
func (StatsDecimal) MarshalJSON ¶
func (sd StatsDecimal) MarshalJSON() ([]byte, error)
MarshalJSON writes the decimal string without surrounding quotes
type StatsFloat32 ¶
type StatsFloat32 float32
StatsFloat32 allows us to marshal and unmarshal inf and -inf as strings
func (StatsFloat32) MarshalJSON ¶
func (sf StatsFloat32) MarshalJSON() ([]byte, error)
MarshalJSON writes the float as a string if it is NaN, inf or -inf
type StatsFloat64 ¶
type StatsFloat64 float64
StatsFloat64 allows us to marshal and unmarshal inf and -inf as strings
func (StatsFloat64) MarshalJSON ¶
func (sf StatsFloat64) MarshalJSON() ([]byte, error)
MarshalJSON writes the float as a string if it is NaN, inf or -inf
type StreamingUpdate ¶
type StreamingUpdate struct { /// The output mode the streaming writer is using. OutputMode OutputMode /// The query id of the streaming writer. QueryID string /// The epoch id of the written micro-batch. EpochID int64 }
StreamingUpdate represents a Delta `StreamingUpdate` operation.
type Table ¶
type Table struct { // The state of the table as of the most recent loaded Delta log entry. State TableState // The remote store of the state of the table as of the most recent loaded Delta log entry. StateStore state.Store // object store to access log and data files Store storage.ObjectStore // Locking client to ensure optimistic locked commits from distributed workers LockClient lock.Locker // file metadata for latest checkpoint LastCheckPoint *CheckPoint // table versions associated with timestamps VersionTimestamp map[int64]time.Time // Log store which provides multi-cluster write support LogStore logstore.LogStore }
Table represents a Delta table.
func NewTable ¶
NewTable creates a new Table struct without loading any data from backing storage.
NOTE: This is for advanced users. If you don't know why you need to use this method, please call one of the `open_table` helper methods instead.
func NewTableWithLogStore ¶
func NewTableWithLogStore(store storage.ObjectStore, lock lock.Locker, logStore logstore.LogStore) *Table
NewTableWithLogStore creates a new Table instance with a log store configured.
func OpenTable ¶
OpenTable loads the latest version of the table If the table reader or writer version is greater than the client supports, the table will still be opened, but an error will also be returned
func OpenTableWithConfiguration ¶
func OpenTableWithConfiguration(store storage.ObjectStore, lock lock.Locker, stateStore state.Store, config *OptimizeCheckpointConfiguration) (*Table, error)
OpenTableWithConfiguration loads the latest version of the table, using the given configuration for optimization settings
func OpenTableWithVersion ¶
func OpenTableWithVersion(store storage.ObjectStore, lock lock.Locker, stateStore state.Store, version int64) (*Table, error)
OpenTableWithVersion loads the table at this specific version If the table reader or writer version is greater than the client supports, the table will still be opened, but an error will also be returned
func OpenTableWithVersionAndConfiguration ¶
func OpenTableWithVersionAndConfiguration(store storage.ObjectStore, lock lock.Locker, stateStore state.Store, version int64, config *OptimizeCheckpointConfiguration) (*Table, error)
OpenTableWithVersionAndConfiguration loads the table at this specific version using the given configuration for optimization settings
func (*Table) Create ¶
func (t *Table) Create(metadata TableMetaData, protocol Protocol, commitInfo CommitInfo, addActions []Add) error
Create creates a Table with version 0 given the provided MetaData, Protocol, and CommitInfo. Note that if the protocol MinReaderVersion or MinWriterVersion is too high, the table will be created and then an error will be returned
func (*Table) CreateCheckpoint ¶
func (t *Table) CreateCheckpoint(checkpointLock lock.Locker, checkpointConfiguration *CheckpointConfiguration, version int64) (bool, error)
CreateCheckpoint creates a checkpoint for this table at the given version The existing table state will not be used or modified; a new table instance will be opened at the checkpoint version Returns whether the checkpoint was created and any error If the lock cannot be obtained, does not retry
func (*Table) CreateTransaction ¶
func (t *Table) CreateTransaction(opts TransactionOptions) *Transaction
CreateTransaction creates a new Transaction for the Table. The transaction holds a mutable reference to the Table, preventing other references until the transaction is dropped.
func (*Table) GetCheckpointDataPaths ¶
func (t *Table) GetCheckpointDataPaths(checkpoint *CheckPoint) []storage.Path
GetCheckpointDataPaths returns the expected file path(s) for the given checkpoint Parquet files If it is a multi-part checkpoint then there will be one path for each part
func (*Table) LatestVersion ¶
LatestVersion gets the latest version of a table.
func (*Table) Load ¶
func (t *Table) Load(config *OptimizeCheckpointConfiguration) error
Load loads the table state using the given configuration
func (*Table) LoadVersion ¶
LoadVersion loads the table state at the specified version using default configuration options
func (*Table) LoadVersionWithConfiguration ¶
func (t *Table) LoadVersionWithConfiguration(version *int64, config *OptimizeCheckpointConfiguration) error
LoadVersionWithConfiguration loads the table state at the specified version using the given configuration
type TableMetaData ¶
type TableMetaData struct { // Unique identifier for this table ID uuid.UUID /// User-provided identifier for this table Name string /// User-provided description for this table Description string /// Specification of the encoding for the files stored in the table Format Format /// Schema of the table Schema Schema /// An array containing the names of columns by which the data should be partitioned PartitionColumns []string /// The time when this metadata action is created, in milliseconds since the Unix epoch CreatedTime time.Time /// table properties Configuration map[string]string }
TableMetaData represents the metadata of a Delta table.
func NewTableMetaData ¶
func NewTableMetaData(name string, description string, format Format, schema Schema, partitionColumns []string, configuration map[string]string) *TableMetaData
NewTableMetaData creates a new TableMetaData instance.
type TableState ¶
type TableState struct { // current table version represented by this table state Version int64 // A remove action should remain in the state of the table as a tombstone until it has expired. // A tombstone expires when the creation timestamp of the Delta file exceeds the expiration // This is empty if on-disk optimization is enabled Tombstones map[string]Remove // Active files for table state // This is empty if on-disk optimization is enabled Files map[string]Add // Information added to individual commits CommitInfos []CommitInfo AppTransactionVersion map[string]int64 MinReaderVersion int32 MinWriterVersion int32 ReaderFeatures map[string]bool WriterFeatures map[string]bool // Table metadata corresponding to current version CurrentMetadata *TableMetaData // Retention period for tombstones as time.Duration (nanoseconds) TombstoneRetention time.Duration // Retention period for log entries as time.Duration (nanoseconds) LogRetention time.Duration // Expired log cleanup has not been thoroughly tested, so marking as experimental ExperimentalEnableExpiredLogCleanup bool OnDiskTableState // contains filtered or unexported fields }
TableState maintains the current known state of a table This is used in reading and generating checkpoints If on-disk optimization is enabled, some of the information here is empty as the state is offloaded to disk to reduce memory use
func NewTableState ¶
func NewTableState(version int64) *TableState
NewTableState creates an empty table state for the given version
func NewTableStateFromActions ¶
func NewTableStateFromActions(actions []Action, version int64) (*TableState, error)
NewTableStateFromActions generates table state from a list of actions
func NewTableStateFromCommit ¶
func NewTableStateFromCommit(table *Table, version int64) (*TableState, error)
NewTableStateFromCommit reads a specific commit version and returns the contained TableState
func (*TableState) FileCount ¶
func (tableState *TableState) FileCount() int
FileCount returns the total number of Parquet files making up the table at the loaded version
func (*TableState) TombstoneCount ¶
func (tableState *TableState) TombstoneCount() int
TombstoneCount returns the total number of tombstones (logically but not physically deleted files) in the table at the loaded version
type Transaction ¶
type Transaction struct { Table *Table Actions []Action Operation Operation AppMetadata map[string]any // contains filtered or unexported fields }
Transaction represents a Delta transaction. Clients that do not need to mutate action content in case a Transaction conflict is encountered may use the `commit` method and rely on optimistic concurrency to determine the appropriate Delta version number for a commit. A good example of this type of client is an append only client that does not need to maintain Transaction state with external systems. Clients that may need to do conflict resolution if the Delta version changes should use the `prepare_commit` and `try_commit_transaction` methods and manage the Delta version themselves so that they can resolve data conflicts that may occur between Delta versions.
Please not that in case of non-retryable error the temporary commit file such as `_delta_log/_commit_<uuid>.json` will orphaned in storage.
func (*Transaction) AddAction ¶
func (t *Transaction) AddAction(action Action)
AddAction adds an arbitrary "action" to the actions associated with this transaction.
func (*Transaction) AddActions ¶
func (t *Transaction) AddActions(actions []Action)
AddActions adds an arbitrary number of actions to the actions associated with this transaction.
func (*Transaction) Commit ¶
func (t *Transaction) Commit() (int64, error)
Commit commits the given actions to the Delta log. This method will retry the transaction commit based on the value of `max_retry_commit_attempts` set in `TransactionOptions`.
func (*Transaction) CommitLogStore ¶
func (t *Transaction) CommitLogStore() (int64, error)
CommitLogStore writes actions to a file.
To commit for Delta version N: - Step 0: Fail if N.json already exists in the file system. - Step 1: Ensure that N-1.json exists. If not, perform a recovery. - Step 2: PREPARE the commit.
- Write the actions into temp file T(N).
- Write uncompleted commit entry E(N, T(N)) with mutual exclusion to the log store.
- Step 3: COMMIT the commit to the Delta log.
- Copy T(N) into N.json.
- Step 4: ACKNOWLEDGE the commit.
- Overwrite and complete commit entry E in the log store.
func (*Transaction) ReadActions ¶
func (t *Transaction) ReadActions(path storage.Path) ([]Action, error)
ReadActions gets actions from a file.
With many concurrent readers/writers, there's a chance that concurrent recovery operations occur on the same file, i.e. the same temp file T(N) is copied into the target N.json file more than once. Though data loss will *NOT* occur, readers of N.json may receive an error from S3 as the ETag of N.json was changed. This is safe to retry, so we do so here.
func (*Transaction) SetAppMetadata ¶
func (t *Transaction) SetAppMetadata(appMetadata map[string]any)
SetAppMetadata sets the app metadata for this transaction.
func (*Transaction) SetOperation ¶
func (t *Transaction) SetOperation(operation Operation)
SetOperation sets the Delta operation for this transaction.
type TransactionOptions ¶
type TransactionOptions struct { // number of retry attempts allowed when committing a transaction MaxRetryCommitAttempts uint32 // RetryWaitDuration sets the amount of times between retry's on the transaction RetryWaitDuration time.Duration // Number of retry attempts allowed when reading actions from a log entry MaxRetryReadAttempts uint16 // Number of retry attempts allowed when writing actions to a log entry MaxRetryWriteAttempts uint32 // number of retry commit attempts before loading the latest version from the table rather // than using the state store RetryCommitAttemptsBeforeLoadingTable uint32 // Number of retry attempts allowed when fixing the Delta log MaxRetryLogFixAttempts uint16 }
TransactionOptions customizes the behavior of a transaction.
func NewTransactionOptions ¶
func NewTransactionOptions() TransactionOptions
NewTransactionOptions sets the default transaction options.
type Txn ¶
type Txn struct { /// A unique identifier for the application performing the transaction. AppID string `json:"appId" parquet:"name=appId, repetition=OPTIONAL, converted=UTF8"` /// An application-specific numeric identifier for this transaction. Version int64 `json:"version" parquet:"name=version, repetition=OPTIONAL"` /// The time when this transaction action was created in milliseconds since the Unix epoch. LastUpdated *int64 `json:"-" parquet:"name=lastUpdated, repetition=OPTIONAL"` }
Txn represents the action used by streaming systems to track progress using application-specific versions to enable idempotency.
type Write ¶
type Write struct { /// The save mode used during the write. Mode SaveMode `json:"mode"` /// The columns the write is partitioned by. PartitionBy []string `json:"partitionBy"` /// The predicate used during the write. Predicate []string `json:"predicate"` }
Write represents a Delta `Write` operation. Write operations will typically only include `Add` actions.
func (Write) GetCommitInfo ¶
func (op Write) GetCommitInfo() CommitInfo
GetCommitInfo retrieves commit info.
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
examples
|
|
internal
|
|
dynamodbutils
Package dynamodbutils implements utilities used to interact with DynamoDB.
|
Package dynamodbutils implements utilities used to interact with DynamoDB. |
s3utils
Package s3utils implements utilities used to interact with S3.
|
Package s3utils implements utilities used to interact with S3. |
Package lock contains the resources required to create a lock.
|
Package lock contains the resources required to create a lock. |
dynamolock
Package dynamolock contains the resources required a create a DynamoDB lock.
|
Package dynamolock contains the resources required a create a DynamoDB lock. |
filelock
Package filelock provides the resources required to create a file lock.
|
Package filelock provides the resources required to create a file lock. |
nillock
Package nillock contains the resources required to create a nil lock.
|
Package nillock contains the resources required to create a nil lock. |
redislock
Package redislock contains the resources required a create a Redis lock.
|
Package redislock contains the resources required a create a Redis lock. |
Package logstore contains the resources required to create a log store.
|
Package logstore contains the resources required to create a log store. |
dynamodblogstore
Package dynamodblogstore contains the resources required to create a DynamoDB log store.
|
Package dynamodblogstore contains the resources required to create a DynamoDB log store. |
Package state contains the resources required to create a state store.
|
Package state contains the resources required to create a state store. |
dynamostate
Package dynamostate contains the resources required to create a DynamoDB state store.
|
Package dynamostate contains the resources required to create a DynamoDB state store. |
filestate
Package filestate contains the resources required to create a file state store.
|
Package filestate contains the resources required to create a file state store. |
localstate
Package localstate contains the resources required to create a localstate.
|
Package localstate contains the resources required to create a localstate. |
redisstate
Package redisstate contains the resources required to create a Redis state store.
|
Package redisstate contains the resources required to create a Redis state store. |
Package storage contains the resources required to interact with an object store.
|
Package storage contains the resources required to interact with an object store. |
filestore
Package filestore contains the resources required to interact with an file store.
|
Package filestore contains the resources required to interact with an file store. |
s3store
Package s3store contains the resources required to interact with an S3 store.
|
Package s3store contains the resources required to interact with an S3 store. |