Documentation ¶
Index ¶
- func ParseLogLevel(level string) (logrus.Level, error)
- type Axon
- type AxonConfig
- type Changeset
- type ChangesetColumn
- type ChangesetKind
- type Config
- type DBConfig
- type LROption
- type Listener
- type LogicalReplicationListener
- type NotifyListener
- type NotifyOption
- type Option
- type Pipeline
- type Stage
- type StageFunc
- type WarpPipe
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Axon ¶
type Axon struct { Config *AxonConfig Logger *logrus.Logger // contains filtered or unexported fields }
Axon listens for Warp-Pipe change sets events. Then converts them into SQL statements, executing them on the remote target.
func (*Axon) RunWithPipeline ¶
type AxonConfig ¶
type AxonConfig struct { // source db credentials SourceDBHost string `envconfig:"source_db_host"` SourceDBPort int `envconfig:"source_db_port"` SourceDBName string `envconfig:"source_db_name"` SourceDBUser string `envconfig:"source_db_user"` SourceDBPass string `envconfig:"source_db_pass"` // target db credentials TargetDBHost string `envconfig:"target_db_host"` TargetDBPort int `envconfig:"target_db_port"` TargetDBName string `envconfig:"target_db_name"` TargetDBUser string `envconfig:"target_db_user"` TargetDBPass string `envconfig:"target_db_pass"` TargetDBSchema string `envconfig:"target_db_schema" default:"public"` // force Axon to shutdown after processing the latest changeset ShutdownAfterLastChangeset bool `envconfig:"shutdown_after_last_changeset"` }
AxonConfig store configuration for axon
func NewAxonConfigFromEnv ¶
func NewAxonConfigFromEnv() (*AxonConfig, error)
NewAxonConfigFromEnv loads the Axon configuration from environment variables.
type Changeset ¶
type Changeset struct { ID int64 `json:"id"` Kind ChangesetKind `json:"kind"` Schema string `json:"schema"` Table string `json:"table"` Timestamp time.Time `json:"timestamp"` NewValues []*ChangesetColumn `json:"new_values"` OldValues []*ChangesetColumn `json:"old_values"` }
Changeset represents a changeset for a record on a Postgres table.
func (*Changeset) GetNewColumnValue ¶
GetNewColumnValue returns the current value for a column and a bool denoting whether a new value is present in the changeset.
func (*Changeset) GetPreviousColumnValue ¶
GetPreviousColumnValue returns the previous value for a column and a bool denoting whether an old value is present in the changeset.
type ChangesetColumn ¶
type ChangesetColumn struct { Column string `json:"column"` Value interface{} `json:"value"` Type string `json:"type"` }
ChangesetColumn represents a type and value for a column in a changeset.
type ChangesetKind ¶
type ChangesetKind string
ChangesetKind is the type for changeset kinds
const ( ChangesetKindInsert ChangesetKind = "insert" ChangesetKindUpdate ChangesetKind = "update" ChangesetKindDelete ChangesetKind = "delete" )
ChangesetKind constants
func ParseChangesetKind ¶
func ParseChangesetKind(kind string) ChangesetKind
ParseChangesetKind parses a changeset kind from a string.
type Config ¶
type Config struct { // Database connection settings. Database DBConfig // If defined, warppipe will only emit changes for the specified tables. WhitelistTables []string `envconfig:"WHITELIST_TABLES"` // If set, warppipe will suppress changes for any specified tables. // Note: This setting takes precedent over the whitelisted tables. IgnoreTables []string `envconfig:"IGNORE_TABLES"` // Replication mode may be either `lr` (logical replication) or `audit`. ReplicationMode string `envconfig:"REPLICATION_MODE" default:"lr"` // Specifies the replication slot name to be used. (LR mode only) ReplicationSlotName string `envconfig:"REPLICATION_SLOT_NAME"` // Start replication from the specified logical sequence number. (LR mode only) StartFromLSN uint64 `envconfig:"START_FROM_LSN"` // Start replication from the specified changeset ID. (Audit mode only) StartFromID int64 `envconfig:"START_FROM_ID"` // Start replication from the specified changeset timestamp. (Audit mode only) StartFromTimestamp int64 `envconfig:"START_FROM_TIMESTAMP"` // Sets the log level LogLevel string `envconfig:"LOG_LEVEL" default:"info"` }
Config represents the warp pipe configuration settings.
func NewConfigFromEnv ¶
NewConfigFromEnv returns a new Config initialized with values read from the environment.
type DBConfig ¶
type DBConfig struct { Host string `envconfig:"DB_HOST"` Port int `envconfig:"DB_PORT"` User string `envconfig:"DB_USER"` Password string `envconfig:"DB_PASS"` Database string `envconfig:"DB_NAME"` Schema string `envconfig:"DB_SCHEMA"` }
DBConfig represents the database configuration settings.
type LROption ¶
type LROption func(*LogicalReplicationListener)
LROption is a LogicalReplicationListener option function
func HeartbeatInterval ¶
HeartbeatInterval is an option for setting the connection heartbeat interval.
func ReplSlotName ¶
ReplSlotName is an option for setting the replication slot name.
func StartFromLSN ¶
StartFromLSN is an option for setting the logical sequence number to start from.
type Listener ¶
type Listener interface { Dial(*pgx.ConnConfig) error ListenForChanges(context.Context) (chan *Changeset, chan error) Close() error }
Listener is an interface for implementing a changeset listener.
type LogicalReplicationListener ¶
type LogicalReplicationListener struct {
// contains filtered or unexported fields
}
LogicalReplicationListener is a Listener that uses logical replication slots to listen for changesets.
func NewLogicalReplicationListener ¶
func NewLogicalReplicationListener(opts ...LROption) *LogicalReplicationListener
NewLogicalReplicationListener returns a new LogicalReplicationListener.
func (*LogicalReplicationListener) Close ¶
func (l *LogicalReplicationListener) Close() error
Close closes the database connection.
func (*LogicalReplicationListener) Dial ¶
func (l *LogicalReplicationListener) Dial(connConfig *pgx.ConnConfig) error
Dial connects to the source database.
func (*LogicalReplicationListener) ListenForChanges ¶
func (l *LogicalReplicationListener) ListenForChanges(ctx context.Context) (chan *Changeset, chan error)
ListenForChanges returns a channel that emits database changesets.
type NotifyListener ¶
type NotifyListener struct {
// contains filtered or unexported fields
}
NotifyListener is a listener that uses Postgres' LISTEN/NOTIFY pattern for subscribing for subscribing to changeset enqueued in a changesets table. For more details see `pkg/schema/changesets`.
func NewNotifyListener ¶
func NewNotifyListener(opts ...NotifyOption) *NotifyListener
NewNotifyListener returns a new NotifyListener.
func (*NotifyListener) Close ¶
func (l *NotifyListener) Close() error
Close closes the database connection.
func (*NotifyListener) Dial ¶
func (l *NotifyListener) Dial(connConfig *pgx.ConnConfig) error
Dial connects to the source database.
func (*NotifyListener) ListenForChanges ¶
func (l *NotifyListener) ListenForChanges(ctx context.Context) (chan *Changeset, chan error)
ListenForChanges returns a channel that emits database changesets.
type NotifyOption ¶
type NotifyOption func(*NotifyListener)
NotifyOption is a NotifyListener option function
func StartFromID ¶
func StartFromID(changesetID int64) NotifyOption
StartFromID is an option for setting the startFromID
func StartFromTimestamp ¶
func StartFromTimestamp(t time.Time) NotifyOption
StartFromTimestamp is an option for setting the startFromTimestamp
type Option ¶
type Option func(*WarpPipe)
Option is a WarpPipe option function
func IgnoreTables ¶
IgnoreTables is an option for setting the tables that WarpPipe should ignore. It accepts entries in either of the following formats:
<schema>.<table> <schema>.* <table>
Any tables in this list will negate any whitelisted tables set via WhitelistTables().
func WhitelistTables ¶
WhitelistTables is an option for setting a list of tables we want to listen for change from. It accepts entries in either of the following formats:
<schema>.<table> <schema>.* <table>
Any tables set via IgnoreTables() will be excluded.
type Pipeline ¶
type Pipeline struct {
// contains filtered or unexported fields
}
Pipeline represents a sequence of stages for processing Changesets.
type StageFunc ¶
StageFunc is a function for processing changesets in a pipeline Stage. It accepts a single argument, a Changset, and returns one of:
(Changeset, nil): If the stage was successful (nil, nil): If the changeset should be dropped (useful for filtering) (nil, error): If there was an error during the stage
type WarpPipe ¶
type WarpPipe struct {
// contains filtered or unexported fields
}
WarpPipe is a daemon that listens for database changes and transmits them somewhere else.
func NewWarpPipe ¶
NewWarpPipe initializes and returns a new WarpPipe.
func (*WarpPipe) IsLatestChangeSet ¶
IsLatestChangeSet returns true if the id argument matches that of the last record in the changeset table. TODO: This feature only supports the notify listener. It needs to support others.
func (*WarpPipe) ListenForChanges ¶
ListenForChanges starts the listener listening for database changesets. It returns two channels, on for Changesets, another for errors.