Documentation ¶
Index ¶
- Constants
- func NewDestination() sdk.Destination
- type Config
- type Destination
- func (d *Destination) Configure(ctx context.Context, cfg config.Config) error
- func (d *Destination) Open(ctx context.Context) error
- func (d *Destination) Parameters() config.Parameters
- func (d *Destination) Teardown(ctx context.Context) error
- func (d *Destination) Write(ctx context.Context, records []opencdc.Record) (int, error)
Constants ¶
View Source
const ( SnowflakeStage = "snowflake.stage" SnowflakePrimaryKey = "snowflake.primaryKey" SnowflakeNamingPrefix = "snowflake.namingPrefix" SnowflakeFormat = "snowflake.format" SnowflakeCSVGoRoutines = "snowflake.processingWorkers" SnowflakeFileUploadThreads = "snowflake.fileUploadThreads" )
View Source
const ( ConfigSnowflakeAutoCleanupStage = "snowflake.autoCleanupStage" ConfigSnowflakeCompression = "snowflake.compression" ConfigSnowflakeDatabase = "snowflake.database" ConfigSnowflakeFileUploadThreads = "snowflake.fileUploadThreads" ConfigSnowflakeFormat = "snowflake.format" ConfigSnowflakeHost = "snowflake.host" ConfigSnowflakeKeepAlive = "snowflake.keepAlive" ConfigSnowflakeNamingPrefix = "snowflake.namingPrefix" ConfigSnowflakePassword = "snowflake.password" ConfigSnowflakePort = "snowflake.port" ConfigSnowflakePrimaryKey = "snowflake.primaryKey" ConfigSnowflakeProcessingWorkers = "snowflake.processingWorkers" ConfigSnowflakeSchema = "snowflake.schema" ConfigSnowflakeStage = "snowflake.stage" ConfigSnowflakeTable = "snowflake.table" ConfigSnowflakeUsername = "snowflake.username" ConfigSnowflakeWarehouse = "snowflake.warehouse" )
Variables ¶
This section is empty.
Functions ¶
func NewDestination ¶
func NewDestination() sdk.Destination
NewDestination creates the Destination and wraps it in the default middleware.
Types ¶
type Config ¶
type Config struct { config.Config // Username for the snowflake connection Username string `json:"snowflake.username" validate:"required"` // Password for the snowflake connection Password string `json:"snowflake.password" validate:"required"` // Host for the snowflake connection Host string `json:"snowflake.host" validate:"required"` // Port for the snowflake connection Port int `json:"snowflake.port" validate:"required"` // Database for the snowflake connection Database string `json:"snowflake.database" validate:"required"` // Schema for the snowflake connection Schema string `json:"snowflake.schema" validate:"required"` // Warehouse for the snowflake connection Warehouse string `json:"snowflake.warehouse" validate:"required"` // Whether to keep the session alive even when the connection is idle. KeepAlive bool `json:"snowflake.keepAlive" default:"true"` // Snowflake Stage to use for uploading files before merging into destination table. Stage string `json:"snowflake.stage" validate:"required"` // Primary key of the source table PrimaryKey string `json:"snowflake.primaryKey" validate:"required"` // Prefix to append to update_at , deleted_at, create_at at destination table NamingPrefix string `json:"snowflake.namingPrefix" default:"meroxa" validate:"required"` // Data type of file we upload and copy data from to snowflake Format string `json:"snowflake.format" default:"csv" validate:"required,inclusion=csv"` // For CSV processing, the number of goroutines to concurrently process CSV rows. ProcessingWorkers int `json:"snowflake.processingWorkers" default:"1"` // Number of threads to run for PUT file uploads. FileUploadThreads int `json:"snowflake.fileUploadThreads" default:"30"` // Compression to use when staging files in Snowflake Compression string `json:"snowflake.compression" default:"zstd" validate:"required,inclusion=gzip|zstd|copy"` // Automatically clean uploaded files to stage after processing, except when they fail. AutoCleanupStage bool `json:"snowflake.autoCleanupStage" default:"true"` }
type Destination ¶
type Destination struct { sdk.UnimplementedDestination Config Config Writer writer.Writer // contains filtered or unexported fields }
func (*Destination) Open ¶
func (d *Destination) Open(ctx context.Context) error
Open prepares the plugin to receive data from given position by initializing the database connection and creating the file stage if it does not exist.
func (*Destination) Parameters ¶
func (d *Destination) Parameters() config.Parameters
Click to show internal directories.
Click to hide internal directories.