Documentation ¶
Index ¶
- Constants
- Variables
- func LoadFileBatching(files []warehouseutils.LoadFile, batchSize int) [][]warehouseutils.LoadFile
- func UseGlue(w *model.Warehouse) bool
- type GlueSchemaRepository
- func (gl *GlueSchemaRepository) AddColumns(ctx context.Context, tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error)
- func (gl *GlueSchemaRepository) AlterColumn(ctx context.Context, tableName, columnName, columnType string) (model.AlterTableResponse, error)
- func (gl *GlueSchemaRepository) CreateSchema(ctx context.Context) (err error)
- func (gl *GlueSchemaRepository) CreateTable(ctx context.Context, tableName string, columnMap model.TableSchema) (err error)
- func (gl *GlueSchemaRepository) FetchSchema(ctx context.Context, warehouse model.Warehouse) (model.Schema, model.Schema, error)
- func (gl *GlueSchemaRepository) RefreshPartitions(ctx context.Context, tableName string, loadFiles []warehouseutils.LoadFile) error
- type LocalSchemaRepository
- func (ls *LocalSchemaRepository) AddColumns(ctx context.Context, tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error)
- func (ls *LocalSchemaRepository) AlterColumn(ctx context.Context, tableName, columnName, columnType string) (model.AlterTableResponse, error)
- func (*LocalSchemaRepository) CreateSchema(context.Context) (err error)
- func (ls *LocalSchemaRepository) CreateTable(ctx context.Context, tableName string, columnMap model.TableSchema) (err error)
- func (ls *LocalSchemaRepository) FetchSchema(ctx context.Context, _ model.Warehouse) (model.Schema, model.Schema, error)
- func (*LocalSchemaRepository) RefreshPartitions(context.Context, string, []warehouseutils.LoadFile) error
- type SchemaRepository
Constants ¶
View Source
const MAX_CHARACTER_LIMIT = 65535
Variables ¶
View Source
var ( PartitionFolderRegex = regexp.MustCompile(`.*/(?P<name>.*)=(?P<value>.*)$`) PartitionWindowRegex = regexp.MustCompile(`^(?P<name>.*)=(?P<value>.*)$`) )
View Source
var UseGlueConfig = "useGlue"
View Source
var (
VARCHAR_TYPE = fmt.Sprintf("varchar(%d)", MAX_CHARACTER_LIMIT)
)
Functions ¶
func LoadFileBatching ¶
func LoadFileBatching(files []warehouseutils.LoadFile, batchSize int) [][]warehouseutils.LoadFile
LoadFileBatching batches load files for refresh partitions
Types ¶
type GlueSchemaRepository ¶
type GlueSchemaRepository struct { GlueClient *glue.Glue Warehouse model.Warehouse Namespace string Logger logger.Logger // contains filtered or unexported fields }
func NewGlueSchemaRepository ¶
func NewGlueSchemaRepository(wh model.Warehouse) (*GlueSchemaRepository, error)
func (*GlueSchemaRepository) AddColumns ¶
func (gl *GlueSchemaRepository) AddColumns(ctx context.Context, tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error)
func (*GlueSchemaRepository) AlterColumn ¶
func (gl *GlueSchemaRepository) AlterColumn(ctx context.Context, tableName, columnName, columnType string) (model.AlterTableResponse, error)
func (*GlueSchemaRepository) CreateSchema ¶
func (gl *GlueSchemaRepository) CreateSchema(ctx context.Context) (err error)
func (*GlueSchemaRepository) CreateTable ¶
func (gl *GlueSchemaRepository) CreateTable(ctx context.Context, tableName string, columnMap model.TableSchema) (err error)
func (*GlueSchemaRepository) FetchSchema ¶
func (*GlueSchemaRepository) RefreshPartitions ¶
func (gl *GlueSchemaRepository) RefreshPartitions(ctx context.Context, tableName string, loadFiles []warehouseutils.LoadFile) error
RefreshPartitions takes a tableName and a list of loadFiles and refreshes all the partitions that are modified by the path in those loadFiles. It returns any error reported by Glue
type LocalSchemaRepository ¶
type LocalSchemaRepository struct {
// contains filtered or unexported fields
}
func NewLocalSchemaRepository ¶
func NewLocalSchemaRepository(wh model.Warehouse, uploader warehouseutils.Uploader) (*LocalSchemaRepository, error)
func (*LocalSchemaRepository) AddColumns ¶
func (ls *LocalSchemaRepository) AddColumns(ctx context.Context, tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error)
func (*LocalSchemaRepository) AlterColumn ¶
func (ls *LocalSchemaRepository) AlterColumn(ctx context.Context, tableName, columnName, columnType string) (model.AlterTableResponse, error)
func (*LocalSchemaRepository) CreateSchema ¶
func (*LocalSchemaRepository) CreateSchema(context.Context) (err error)
func (*LocalSchemaRepository) CreateTable ¶
func (ls *LocalSchemaRepository) CreateTable(ctx context.Context, tableName string, columnMap model.TableSchema) (err error)
func (*LocalSchemaRepository) FetchSchema ¶
func (*LocalSchemaRepository) RefreshPartitions ¶
func (*LocalSchemaRepository) RefreshPartitions(context.Context, string, []warehouseutils.LoadFile) error
type SchemaRepository ¶
type SchemaRepository interface { FetchSchema(ctx context.Context, warehouse model.Warehouse) (model.Schema, model.Schema, error) CreateSchema(ctx context.Context) (err error) CreateTable(ctx context.Context, tableName string, columnMap model.TableSchema) (err error) AddColumns(ctx context.Context, tableName string, columnsInfo []warehouseutils.ColumnInfo) (err error) AlterColumn(ctx context.Context, tableName, columnName, columnType string) (model.AlterTableResponse, error) RefreshPartitions(ctx context.Context, tableName string, loadFiles []warehouseutils.LoadFile) error }
func NewSchemaRepository ¶
func NewSchemaRepository(wh model.Warehouse, uploader warehouseutils.Uploader) (SchemaRepository, error)
Click to show internal directories.
Click to hide internal directories.