internal

package
v0.0.0-...-3223366 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2024 License: MIT Imports: 33 Imported by: 0

Documentation

Index

Constants

View Source
const (
	StoreNoop    = StoreResult(0)
	StoreAdded   = StoreResult(1)
	StoreChanged = StoreResult(2)
	StoreDeleted = StoreResult(3)
)
View Source
const (
	ChangeKeyPrefix           = "NAVEEGO_CHANGE_KEY_"
	DeletedFlagColumnName     = "NAVEEGO_DELETED_COLUMN_FLAG"
	ChangeOperationColumnName = "NAVEEGO_CHANGE_OPERATION"
)
View Source
const (
	AuthTypeSQL     = AuthType("sql")
	AuthTypeWindows = AuthType("windows")
)

Authentication types

View Source
const CustomTargetOption string = "Custom Target"
View Source
const ErrorMapKey = "__errors"
View Source
const (
	ReplicationAPIVersion = 1
)

Variables

This section is empty.

Functions

func BuildHandlerAndRequest

func BuildHandlerAndRequest(session *OpSession, externalRequest *pub.ReadRequest, handler PublishHandler) (PublishHandler, PublishRequest, error)

func DecomposeSafeName

func DecomposeSafeName(safeName string) (dbName, schema, name string)

func DescribeMiddleware

func DescribeMiddleware(middleware PublishMiddleware) string

func DescribeMiddlewares

func DescribeMiddlewares(middlewares ...PublishMiddleware) []string

func DiscoverSchemasSync

func DiscoverSchemasSync(session *OpSession, schemaDiscoverer SchemaDiscoverer, req *pub.DiscoverSchemasRequest) ([]*pub.Schema, error)

DiscoverSchemasSync discovers all the schemas synchronously, rather than streaming them.

func ExtractIPFromWsarecvErr

func ExtractIPFromWsarecvErr(input string) string

func GetSchemaID

func GetSchemaID(schemaName, tableName string) string

func MetaSchemaFromPubSchema

func MetaSchemaFromPubSchema(sourceSchema *pub.Schema) *meta.Schema

func NewServer

func NewServer(logger hclog.Logger) pub.PublisherServer

NewServer creates a new publisher Server.

Types

type AuthType

type AuthType string

AuthType underlying type

type Cause

type Cause struct {
	Action         string       `json:"action,omitempty"`
	Table          string       `json:"table,omitempty"`
	DependencyKeys meta.RowKeys `json:"dep,omitempty"`
}

func ParseCause

func ParseCause(cause string) Cause

func (*Cause) String

func (c *Cause) String() string

type Changes

type Changes struct {
	DependencyID string
	// Each item in this slice is a map containing
	// - The key values for the schema, keyed by opaque name
	// - The key values for the dependency table, keyed by opaque name
	Data []meta.RowMap
}

type Config

type Config struct {
	LogLevel hclog.Level
	// Directory where log files should be stored.
	LogDirectory string
	// Directory where the plugin can store data permanently.
	PermanentDirectory string
	// Directory where the plugin can store temporary information which may be deleted.
	TemporaryDirectory string
}

type ConfigurationFormPreResponse

type ConfigurationFormPreResponse struct {
	// The JSONSchema which should be used to build the form.
	Schema *jsonschema.JSONSchema
	// The UI hints which should be provided to the form.
	Ui SchemaMap
	// The state object which should be passed in any future Configure*Request as part of this configuration session.
	StateJson string
	// Current values from the form.
	DataJson string
	// Errors which should be displayed attached to fields in the form,
	// in the form of a JSON object with the same shape as the data object,
	// but the values are arrays of strings containing the error messages.
	DataErrors ErrorMap
	// Generic errors which should be displayed at the bottom of the form,
	// not associated with any specific fields.
	Errors []string
}

func (ConfigurationFormPreResponse) ToResponse

type ConfigureWriteFormData

type ConfigureWriteFormData struct {
	StoredProcedure string `json:"storedProcedure,omitempty"`
}

type DefaultWriteHandler

type DefaultWriteHandler struct {
	WriteSettings *WriteSettings
}

func (*DefaultWriteHandler) Write

func (d *DefaultWriteHandler) Write(session *OpSession, record *pub.UnmarshalledRecord) error

type ErrorMap

type ErrorMap map[string]interface{}

func (ErrorMap) AddError

func (e ErrorMap) AddError(err string, args ...interface{})

func (ErrorMap) GetErrors

func (e ErrorMap) GetErrors(path ...string) []string

func (ErrorMap) GetOrAddChild

func (e ErrorMap) GetOrAddChild(key interface{}) ErrorMap

func (ErrorMap) String

func (e ErrorMap) String() string
type NaveegoReplicationVersioning struct {
	ID                  int       `sql:"ID" sqlkey:"true"`
	Timestamp           time.Time `sql:"Timestamp"`
	APIVersion          int       `sql:"APIVersion"`
	ReplicatedShapeID   string    `sql:"ReplicatedShapeID"`
	ReplicatedShapeName string    `sql:"ReplicatedShapeName"`
	Settings            string    `sql:"Settings"`
}
type NaveegoReplicationVersioningSettings struct {
	Changes       []string
	Settings      ReplicationSettings
	GoldenSchema  *pub.Schema
	VersionSchema *pub.Schema
	Request       *pub.PrepareWriteRequest
}

type OpSession

type OpSession struct {
	Session
	// Cancel cancels the context in this operation.
	Cancel func()
	// Ctx is the context for this operation. It will
	// be done when the context from gRPC call is done,
	// the overall session context is cancelled (by a disconnect)
	// or Cancel is called on this OpSession instance.
	Ctx context.Context
}

type PropertyConfig

type PropertyConfig struct {
	Name string `json:"name" title:"Property Name"`
	Type string `json:"type" title:"SQL Type"`
}

type PublishHandler

type PublishHandler interface {
	Handle(req PublishRequest) error
}

func ApplyMiddleware

func ApplyMiddleware(handler PublishHandler, middlewares ...PublishMiddleware) PublishHandler

func PublishToChannelHandler

func PublishToChannelHandler(out chan<- *pub.Record) PublishHandler

type PublishHandlerFunc

type PublishHandlerFunc func(item PublishRequest) error

func (PublishHandlerFunc) Handle

func (f PublishHandlerFunc) Handle(item PublishRequest) error

type PublishMiddleware

type PublishMiddleware func(handler PublishHandler) PublishHandler

func GetRecordsRealTimeMiddleware

func GetRecordsRealTimeMiddleware() PublishMiddleware

func GetRecordsStaticMiddleware

func GetRecordsStaticMiddleware() PublishMiddleware

func InitializeRealTimeComponentsMiddleware

func InitializeRealTimeComponentsMiddleware() PublishMiddleware

func ProcessChangesMiddleware

func ProcessChangesMiddleware() PublishMiddleware

func SetRecordMiddleware

func SetRecordMiddleware() PublishMiddleware

func StoreChangeTrackingDataMiddleware

func StoreChangeTrackingDataMiddleware() PublishMiddleware

type PublishRequest

type PublishRequest struct {
	PluginRequest    *pub.ReadRequest
	OpSession        *OpSession
	RealTimeSettings *RealTimeSettings
	Store            store.BoltStore
	Data             meta.RowMap
	Meta             meta.RowMap
	// The schema info for the shape in the request
	Schema  *meta.Schema
	Cause   *Cause
	Action  pub.Record_Action
	Record  *pub.Record
	Changes []Changes
	// Map from SchemaID of the dependency table to a RecordData containing the keys/values
	// of the row the current data depends on.
	Dependencies  map[string]meta.RowMap
	RealTimeState *RealTimeState
}

func (PublishRequest) WithChanges

func (p PublishRequest) WithChanges(c []Changes) PublishRequest

type RealTimeHelper

type RealTimeHelper struct {
	// contains filtered or unexported fields
}

func (*RealTimeHelper) ConfigureRealTime

func (r *RealTimeHelper) ConfigureRealTime(session *OpSession, req *pub.ConfigureRealTimeRequest) (*pub.ConfigureRealTimeResponse, error)

type RealTimeSettings

type RealTimeSettings struct {
	Tables          []RealTimeTableSettings `json:"tables" title:"Tables" description:"Add tables which will be checked for changes." minLength:"1"`
	PollingInterval string                  `` /* 171-byte string literal not displayed */
	// contains filtered or unexported fields
}

func (RealTimeSettings) String

func (r RealTimeSettings) String() string

type RealTimeState

type RealTimeState struct {
	Version  int            `json:"version"`
	Versions map[string]int `json:"versions"`
}

func (RealTimeState) String

func (r RealTimeState) String() string

type RealTimeTableSettings

type RealTimeTableSettings struct {
	CustomTarget string `json:"customTarget" title:"Custom Target" description:"Custom target for change tracking."`
	SchemaID     string `json:"schemaID" title:"Table" description:"The table to monitor for changes." required:"true"`
	Query        string `` /* 210-byte string literal not displayed */
}

type RealTimeTableState

type RealTimeTableState struct {
	Version int `json:"version"`
}

type RecordCollector

type RecordCollector struct {
	Records []*pub.Record
	Items   []PublishRequest
}

func (*RecordCollector) Handle

func (r *RecordCollector) Handle(item PublishRequest) error

type ReplicationSettings

type ReplicationSettings struct {
	SQLSchema             string           `` /* 154-byte string literal not displayed */
	GoldenRecordTable     string           `` /* 159-byte string literal not displayed */
	VersionRecordTable    string           `` /* 146-byte string literal not displayed */
	PropertyConfiguration []PropertyConfig `json:"propertyConfig" title:"Custom Property Configuration" description:"Custom configuration of SQL types for properties."`
}

func (ReplicationSettings) GetNamespacedGoldenRecordTable

func (r ReplicationSettings) GetNamespacedGoldenRecordTable() string

func (ReplicationSettings) GetNamespacedVersionRecordTable

func (r ReplicationSettings) GetNamespacedVersionRecordTable() string

func (ReplicationSettings) JSON

func (r ReplicationSettings) JSON() string

func (ReplicationSettings) VersionRecordTableExists

func (r ReplicationSettings) VersionRecordTableExists() bool

type ReplicationWriter

type ReplicationWriter struct {
	GoldenIDMap       map[string]string
	VersionIDMap      map[string]string
	GoldenMetaSchema  *meta.Schema
	VersionMetaSchema *meta.Schema

	Settings ReplicationSettings
	// contains filtered or unexported fields
}

func (*ReplicationWriter) Write

func (r *ReplicationWriter) Write(session *OpSession, record *pub.UnmarshalledRecord) error

type SchemaDiscoverer

type SchemaDiscoverer struct {
	Log hclog.Logger
}

func (*SchemaDiscoverer) DiscoverSchemas

func (s *SchemaDiscoverer) DiscoverSchemas(session *OpSession, req *pub.DiscoverSchemasRequest) (<-chan *pub.Schema, error)

type SchemaMap

type SchemaMap map[string]interface{}

func GetMapFromJSONSchema

func GetMapFromJSONSchema(js *jsonschema.JSONSchema) SchemaMap

func GetRealTimeSchemas

func GetRealTimeSchemas() (form *jsonschema.JSONSchema, ui SchemaMap)

func (SchemaMap) String

func (s SchemaMap) String() string

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server type to describe a server

func (*Server) BeginOAuthFlow

func (*Server) CompleteOAuthFlow

func (*Server) Configure

func (s *Server) Configure(ctx context.Context, req *pub.ConfigureRequest) (*pub.ConfigureResponse, error)

func (*Server) ConfigureConnection

func (*Server) ConfigureQuery

func (*Server) ConfigureRealTime

func (*Server) ConfigureReplication

func (s *Server) ConfigureReplication(ctx context.Context, req *pub.ConfigureReplicationRequest) (resp *pub.ConfigureReplicationResponse, err error)

func (*Server) ConfigureWrite

ConfigureWrite

func (*Server) Connect

func (s *Server) Connect(ctx context.Context, req *pub.ConnectRequest) (*pub.ConnectResponse, error)

Connect connects to the data base and validates the connections

func (*Server) ConnectSession

func (*Server) Disconnect

func (s *Server) Disconnect(ctx context.Context, req *pub.DisconnectRequest) (*pub.DisconnectResponse, error)

Disconnect disconnects from the server

func (*Server) DiscoverSchemas

func (*Server) DiscoverSchemasStream

func (s *Server) DiscoverSchemasStream(req *pub.DiscoverSchemasRequest, stream pub.Publisher_DiscoverSchemasStreamServer) error

func (*Server) DiscoverShapes

DiscoverShapes discovers shapes present in the database

func (*Server) GetStoredProcedures

func (s *Server) GetStoredProcedures(session *Session) error

func (*Server) GetTablesAndViews

func (s *Server) GetTablesAndViews(session *Session) error

func (*Server) PrepareWrite

func (s *Server) PrepareWrite(ctx context.Context, req *pub.PrepareWriteRequest) (*pub.PrepareWriteResponse, error)

PrepareWrite sets up the plugin to be able to write back

func (*Server) PublishStream

func (s *Server) PublishStream(req *pub.ReadRequest, stream pub.Publisher_PublishStreamServer) error

PublishStream sends records read in request to the agent

func (*Server) ReadStream

func (s *Server) ReadStream(req *pub.ReadRequest, stream pub.Publisher_ReadStreamServer) error

func (*Server) WriteStream

func (s *Server) WriteStream(stream pub.Publisher_WriteStreamServer) error

WriteStream writes a stream of records back to the source system

type Session

type Session struct {
	Ctx        context.Context
	Cancel     func()
	Publishing bool
	Log        hclog.Logger
	Settings   *Settings
	Writer     Writer
	// tables that were discovered during connect
	SchemaInfo       map[string]*meta.Schema
	StoredProcedures []string
	RealTimeHelper   *RealTimeHelper
	Config           Config
	DB               *sql.DB
	DbHandles        map[string]*sql.DB
	SchemaDiscoverer SchemaDiscoverer
}

func (*Session) OpSession

func (s *Session) OpSession(ctx context.Context) *OpSession

type Settings

type Settings struct {
	Host                 string   `json:"host"`
	Port                 int      `json:"port"`
	Instance             string   `json:"instance"`
	Database             string   `json:"database"`
	Auth                 AuthType `json:"auth"`
	Username             string   `json:"username"`
	Password             string   `json:"password"`
	AdvancedSettings     string   `json:"advancedSettings"`
	PrePublishQuery      string   `json:"prePublishQuery"`
	PostPublishQuery     string   `json:"postPublishQuery"`
	SkipCustomQueryCount bool     `json:"skipCustomQueryCount"`
	SkipConnectDiscovery bool     `json:"skipConnectDiscovery"`
	DisableDiscovery     bool     `json:"disableDiscovery"`
}

Settings object for plugin Contains connection information and pre/post queries

func (*Settings) GetConnectionString

func (s *Settings) GetConnectionString() (string, error)

GetConnectionString builds a connection string from a settings object

func (*Settings) Validate

func (s *Settings) Validate() error

Validate returns an error if the Settings are not valid. It also populates the internal fields of settings.

type StoreResult

type StoreResult int

type TxHelper

type TxHelper struct {
	// contains filtered or unexported fields
}

func NewTxHelper

func NewTxHelper(tx *bolt.Tx) TxHelper

func (TxHelper) GetDependentSchemaKeys

func (t TxHelper) GetDependentSchemaKeys(dependencyID string, schemaID string, dependencyKeys meta.RowKeys) ([]meta.RowKeys, error)

func (TxHelper) RegisterDependency

func (t TxHelper) RegisterDependency(dependencyID string, schemaID string, dependencyKeys meta.RowKeys, schemaKeys meta.RowKeys) error

func (TxHelper) SetSchemaHash

func (t TxHelper) SetSchemaHash(schemaID string, keys meta.RowKeys, values meta.RowValues) (result StoreResult, err error)

SetSchemaHash updates the store to link the keys to the values, to allow insert/update detection.

The return value will be StoreAdded if the keys were not present in the store. The return value will be StoreChanged if the keys were present in the store, but with a different value. The return value will be StoreDeleted if the values were nil, whether or not they were present.

If the `values` parameter is nil, the keys will be deleted from the store and the method will return false, false, nil.

func (TxHelper) UnregisterDependency

func (t TxHelper) UnregisterDependency(dependencyID string, schemaID string, dependencyKeys meta.RowKeys, schemaKeys meta.RowKeys) error

UnregisterDependency removes the dependency linkage between the dependencyKeys and the schemaKeys. If schemaKeys is nil, all dependencies for the dependencyKeys are removed.

type VersionSet

type VersionSet map[string]int

func (VersionSet) Equals

func (v VersionSet) Equals(other VersionSet) bool

func (VersionSet) String

func (v VersionSet) String() string

type WriteSettings

type WriteSettings struct {
	Schema    *pub.Schema `json:"schema"`
	CommitSLA int32       `json:"commitSla"`
}

Settings object for write requests Contains target schema and the commit sla timeout

type Writer

type Writer interface {
	Write(session *OpSession, record *pub.UnmarshalledRecord) error
}

func NewDefaultWriteHandler

func NewDefaultWriteHandler(session *OpSession, req *pub.PrepareWriteRequest) (Writer, error)

func NewReplicationWriteHandler

func NewReplicationWriteHandler(session *OpSession, req *pub.PrepareWriteRequest) (Writer, error)

func PrepareWriteHandler

func PrepareWriteHandler(session *OpSession, req *pub.PrepareWriteRequest) (Writer, error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL