Documentation
¶
Index ¶
- Constants
- func BuildHandlerAndRequest(session *OpSession, externalRequest *pub.ReadRequest, handler PublishHandler) (PublishHandler, PublishRequest, error)
- func DecomposeSafeName(safeName string) (dbName, schema, name string)
- func DescribeMiddleware(middleware PublishMiddleware) string
- func DescribeMiddlewares(middlewares ...PublishMiddleware) []string
- func DiscoverSchemasSync(session *OpSession, schemaDiscoverer SchemaDiscoverer, ...) ([]*pub.Schema, error)
- func ExtractIPFromWsarecvErr(input string) string
- func GetSchemaID(schemaName, tableName string) string
- func MetaSchemaFromPubSchema(sourceSchema *pub.Schema) *meta.Schema
- func NewServer(logger hclog.Logger) pub.PublisherServer
- type AuthType
- type Cause
- type Changes
- type Config
- type ConfigurationFormPreResponse
- type ConfigureWriteFormData
- type DefaultWriteHandler
- type ErrorMap
- type NaveegoReplicationVersioning
- type NaveegoReplicationVersioningSettings
- type OpSession
- type PropertyConfig
- type PublishHandler
- type PublishHandlerFunc
- type PublishMiddleware
- func GetRecordsRealTimeMiddleware() PublishMiddleware
- func GetRecordsStaticMiddleware() PublishMiddleware
- func InitializeRealTimeComponentsMiddleware() PublishMiddleware
- func ProcessChangesMiddleware() PublishMiddleware
- func SetRecordMiddleware() PublishMiddleware
- func StoreChangeTrackingDataMiddleware() PublishMiddleware
- type PublishRequest
- type RealTimeHelper
- type RealTimeSettings
- type RealTimeState
- type RealTimeTableSettings
- type RealTimeTableState
- type RecordCollector
- type ReplicationSettings
- type ReplicationWriter
- type SchemaDiscoverer
- type SchemaMap
- type Server
- func (s *Server) BeginOAuthFlow(ctx context.Context, req *pub.BeginOAuthFlowRequest) (*pub.BeginOAuthFlowResponse, error)
- func (s *Server) CompleteOAuthFlow(ctx context.Context, req *pub.CompleteOAuthFlowRequest) (*pub.CompleteOAuthFlowResponse, error)
- func (s *Server) Configure(ctx context.Context, req *pub.ConfigureRequest) (*pub.ConfigureResponse, error)
- func (s *Server) ConfigureConnection(ctx context.Context, req *pub.ConfigureConnectionRequest) (*pub.ConfigureConnectionResponse, error)
- func (s *Server) ConfigureQuery(ctx context.Context, req *pub.ConfigureQueryRequest) (*pub.ConfigureQueryResponse, error)
- func (s *Server) ConfigureRealTime(ctx context.Context, req *pub.ConfigureRealTimeRequest) (*pub.ConfigureRealTimeResponse, error)
- func (s *Server) ConfigureReplication(ctx context.Context, req *pub.ConfigureReplicationRequest) (resp *pub.ConfigureReplicationResponse, err error)
- func (s *Server) ConfigureWrite(ctx context.Context, req *pub.ConfigureWriteRequest) (*pub.ConfigureWriteResponse, error)
- func (s *Server) Connect(ctx context.Context, req *pub.ConnectRequest) (*pub.ConnectResponse, error)
- func (s *Server) ConnectSession(*pub.ConnectRequest, pub.Publisher_ConnectSessionServer) error
- func (s *Server) Disconnect(ctx context.Context, req *pub.DisconnectRequest) (*pub.DisconnectResponse, error)
- func (s *Server) DiscoverRelatedEntities(ctx context.Context, req *pub.DiscoverRelatedEntitiesRequest) (*pub.DiscoverRelatedEntitiesResponse, error)
- func (s *Server) DiscoverSchemas(ctx context.Context, req *pub.DiscoverSchemasRequest) (*pub.DiscoverSchemasResponse, error)
- func (s *Server) DiscoverSchemasStream(req *pub.DiscoverSchemasRequest, ...) error
- func (s *Server) DiscoverShapes(ctx context.Context, req *pub.DiscoverSchemasRequest) (*pub.DiscoverSchemasResponse, error)
- func (s *Server) GetStoredProcedures(session *Session) error
- func (s *Server) GetTablesAndViews(session *Session) error
- func (s *Server) PrepareWrite(ctx context.Context, req *pub.PrepareWriteRequest) (*pub.PrepareWriteResponse, error)
- func (s *Server) PublishStream(req *pub.ReadRequest, stream pub.Publisher_PublishStreamServer) error
- func (s *Server) ReadStream(req *pub.ReadRequest, stream pub.Publisher_ReadStreamServer) error
- func (s *Server) WriteStream(stream pub.Publisher_WriteStreamServer) error
- type Session
- type Settings
- type StoreResult
- type TxHelper
- func (t TxHelper) GetDependentSchemaKeys(dependencyID string, schemaID string, dependencyKeys meta.RowKeys) ([]meta.RowKeys, error)
- func (t TxHelper) RegisterDependency(dependencyID string, schemaID string, dependencyKeys meta.RowKeys, ...) error
- func (t TxHelper) SetSchemaHash(schemaID string, keys meta.RowKeys, values meta.RowValues) (result StoreResult, err error)
- func (t TxHelper) UnregisterDependency(dependencyID string, schemaID string, dependencyKeys meta.RowKeys, ...) error
- type VersionSet
- type WriteSettings
- type Writer
Constants ¶
const ( StoreNoop = StoreResult(0) StoreAdded = StoreResult(1) StoreChanged = StoreResult(2) StoreDeleted = StoreResult(3) )
const ( ChangeKeyPrefix = "NAVEEGO_CHANGE_KEY_" DeletedFlagColumnName = "NAVEEGO_DELETED_COLUMN_FLAG" ChangeOperationColumnName = "NAVEEGO_CHANGE_OPERATION" )
const ( AuthTypeSQL = AuthType("sql") AuthTypeWindows = AuthType("windows") )
Authentication types
const CustomTargetOption string = "Custom Target"
const ErrorMapKey = "__errors"
const (
ReplicationAPIVersion = 1
)
Variables ¶
This section is empty.
Functions ¶
func BuildHandlerAndRequest ¶
func BuildHandlerAndRequest(session *OpSession, externalRequest *pub.ReadRequest, handler PublishHandler) (PublishHandler, PublishRequest, error)
func DecomposeSafeName ¶
func DescribeMiddleware ¶
func DescribeMiddleware(middleware PublishMiddleware) string
func DescribeMiddlewares ¶
func DescribeMiddlewares(middlewares ...PublishMiddleware) []string
func DiscoverSchemasSync ¶
func DiscoverSchemasSync(session *OpSession, schemaDiscoverer SchemaDiscoverer, req *pub.DiscoverSchemasRequest) ([]*pub.Schema, error)
DiscoverSchemasSync discovers all the schemas synchronously, rather than streaming them.
func ExtractIPFromWsarecvErr ¶
func GetSchemaID ¶
func NewServer ¶
func NewServer(logger hclog.Logger) pub.PublisherServer
NewServer creates a new publisher Server.
Types ¶
type Cause ¶
type Cause struct { Action string `json:"action,omitempty"` Table string `json:"table,omitempty"` DependencyKeys meta.RowKeys `json:"dep,omitempty"` }
func ParseCause ¶
type Config ¶
type Config struct { LogLevel hclog.Level // Directory where log files should be stored. LogDirectory string // Directory where the plugin can store data permanently. PermanentDirectory string // Directory where the plugin can store temporary information which may be deleted. TemporaryDirectory string }
type ConfigurationFormPreResponse ¶
type ConfigurationFormPreResponse struct { // The JSONSchema which should be used to build the form. Schema *jsonschema.JSONSchema // The UI hints which should be provided to the form. Ui SchemaMap // The state object which should be passed in any future Configure*Request as part of this configuration session. StateJson string // Current values from the form. DataJson string // Errors which should be displayed attached to fields in the form, // in the form of a JSON object with the same shape as the data object, // but the values are arrays of strings containing the error messages. DataErrors ErrorMap // Generic errors which should be displayed at the bottom of the form, // not associated with any specific fields. Errors []string }
func (ConfigurationFormPreResponse) ToResponse ¶
func (c ConfigurationFormPreResponse) ToResponse() *pub.ConfigureRealTimeResponse
type ConfigureWriteFormData ¶
type ConfigureWriteFormData struct {
StoredProcedure string `json:"storedProcedure,omitempty"`
}
type DefaultWriteHandler ¶
type DefaultWriteHandler struct {
WriteSettings *WriteSettings
}
func (*DefaultWriteHandler) Write ¶
func (d *DefaultWriteHandler) Write(session *OpSession, record *pub.UnmarshalledRecord) error
type NaveegoReplicationVersioning ¶
type NaveegoReplicationVersioning struct {}
func (NaveegoReplicationVersioning) GetSettings ¶
func (n NaveegoReplicationVersioning) GetSettings() NaveegoReplicationVersioningSettings
type NaveegoReplicationVersioningSettings ¶
type NaveegoReplicationVersioningSettings struct {}
func (NaveegoReplicationVersioningSettings) JSON ¶
func (s NaveegoReplicationVersioningSettings) JSON() string
type OpSession ¶
type OpSession struct { Session // Cancel cancels the context in this operation. Cancel func() // Ctx is the context for this operation. It will // be done when the context from gRPC call is done, // the overall session context is cancelled (by a disconnect) // or Cancel is called on this OpSession instance. Ctx context.Context }
type PropertyConfig ¶
type PublishHandler ¶
type PublishHandler interface {
Handle(req PublishRequest) error
}
func ApplyMiddleware ¶
func ApplyMiddleware(handler PublishHandler, middlewares ...PublishMiddleware) PublishHandler
func PublishToChannelHandler ¶
func PublishToChannelHandler(out chan<- *pub.Record) PublishHandler
func PublishToStreamHandler ¶
func PublishToStreamHandler(ctx context.Context, stream pub.Publisher_PublishStreamServer) PublishHandler
type PublishHandlerFunc ¶
type PublishHandlerFunc func(item PublishRequest) error
func (PublishHandlerFunc) Handle ¶
func (f PublishHandlerFunc) Handle(item PublishRequest) error
type PublishMiddleware ¶
type PublishMiddleware func(handler PublishHandler) PublishHandler
func GetRecordsRealTimeMiddleware ¶
func GetRecordsRealTimeMiddleware() PublishMiddleware
func GetRecordsStaticMiddleware ¶
func GetRecordsStaticMiddleware() PublishMiddleware
func InitializeRealTimeComponentsMiddleware ¶
func InitializeRealTimeComponentsMiddleware() PublishMiddleware
func ProcessChangesMiddleware ¶
func ProcessChangesMiddleware() PublishMiddleware
func SetRecordMiddleware ¶
func SetRecordMiddleware() PublishMiddleware
func StoreChangeTrackingDataMiddleware ¶
func StoreChangeTrackingDataMiddleware() PublishMiddleware
type PublishRequest ¶
type PublishRequest struct { PluginRequest *pub.ReadRequest OpSession *OpSession RealTimeSettings *RealTimeSettings Store store.BoltStore Data meta.RowMap Meta meta.RowMap // The schema info for the shape in the request Schema *meta.Schema Cause *Cause Action pub.Record_Action Record *pub.Record Changes []Changes // Map from SchemaID of the dependency table to a RecordData containing the keys/values // of the row the current data depends on. Dependencies map[string]meta.RowMap RealTimeState *RealTimeState }
func (PublishRequest) WithChanges ¶
func (p PublishRequest) WithChanges(c []Changes) PublishRequest
type RealTimeHelper ¶
type RealTimeHelper struct {
// contains filtered or unexported fields
}
func (*RealTimeHelper) ConfigureRealTime ¶
func (r *RealTimeHelper) ConfigureRealTime(session *OpSession, req *pub.ConfigureRealTimeRequest) (*pub.ConfigureRealTimeResponse, error)
type RealTimeSettings ¶
type RealTimeSettings struct { Tables []RealTimeTableSettings `json:"tables" title:"Tables" description:"Add tables which will be checked for changes." minLength:"1"` PollingInterval string `` /* 171-byte string literal not displayed */ // contains filtered or unexported fields }
func (RealTimeSettings) String ¶
func (r RealTimeSettings) String() string
type RealTimeState ¶
type RealTimeState struct { Version int `json:"version"` Versions map[string]int `json:"versions"` }
func (RealTimeState) String ¶
func (r RealTimeState) String() string
type RealTimeTableSettings ¶
type RealTimeTableSettings struct { CustomTarget string `json:"customTarget" title:"Custom Target" description:"Custom target for change tracking."` SchemaID string `json:"schemaID" title:"Table" description:"The table to monitor for changes." required:"true"` Query string `` /* 210-byte string literal not displayed */ }
type RealTimeTableState ¶
type RealTimeTableState struct {
Version int `json:"version"`
}
type RecordCollector ¶
type RecordCollector struct { Records []*pub.Record Items []PublishRequest }
func (*RecordCollector) Handle ¶
func (r *RecordCollector) Handle(item PublishRequest) error
type ReplicationSettings ¶
type ReplicationSettings struct { SQLSchema string `` /* 154-byte string literal not displayed */ GoldenRecordTable string `` /* 159-byte string literal not displayed */ VersionRecordTable string `` /* 146-byte string literal not displayed */ PropertyConfiguration []PropertyConfig `json:"propertyConfig" title:"Custom Property Configuration" description:"Custom configuration of SQL types for properties."` }
func (ReplicationSettings) GetNamespacedGoldenRecordTable ¶
func (r ReplicationSettings) GetNamespacedGoldenRecordTable() string
func (ReplicationSettings) GetNamespacedVersionRecordTable ¶
func (r ReplicationSettings) GetNamespacedVersionRecordTable() string
func (ReplicationSettings) JSON ¶
func (r ReplicationSettings) JSON() string
func (ReplicationSettings) VersionRecordTableExists ¶
func (r ReplicationSettings) VersionRecordTableExists() bool
type ReplicationWriter ¶
type ReplicationWriter struct { GoldenIDMap map[string]string VersionIDMap map[string]string GoldenMetaSchema *meta.Schema VersionMetaSchema *meta.Schema Settings ReplicationSettings // contains filtered or unexported fields }
func (*ReplicationWriter) Write ¶
func (r *ReplicationWriter) Write(session *OpSession, record *pub.UnmarshalledRecord) error
type SchemaDiscoverer ¶
type SchemaDiscoverer struct {
Log hclog.Logger
}
func (*SchemaDiscoverer) DiscoverSchemas ¶
func (s *SchemaDiscoverer) DiscoverSchemas(session *OpSession, req *pub.DiscoverSchemasRequest) (<-chan *pub.Schema, error)
type SchemaMap ¶
type SchemaMap map[string]interface{}
func GetMapFromJSONSchema ¶
func GetMapFromJSONSchema(js *jsonschema.JSONSchema) SchemaMap
func GetRealTimeSchemas ¶
func GetRealTimeSchemas() (form *jsonschema.JSONSchema, ui SchemaMap)
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server type to describe a server
func (*Server) BeginOAuthFlow ¶
func (s *Server) BeginOAuthFlow(ctx context.Context, req *pub.BeginOAuthFlowRequest) (*pub.BeginOAuthFlowResponse, error)
func (*Server) CompleteOAuthFlow ¶
func (s *Server) CompleteOAuthFlow(ctx context.Context, req *pub.CompleteOAuthFlowRequest) (*pub.CompleteOAuthFlowResponse, error)
func (*Server) Configure ¶
func (s *Server) Configure(ctx context.Context, req *pub.ConfigureRequest) (*pub.ConfigureResponse, error)
func (*Server) ConfigureConnection ¶
func (s *Server) ConfigureConnection(ctx context.Context, req *pub.ConfigureConnectionRequest) (*pub.ConfigureConnectionResponse, error)
func (*Server) ConfigureQuery ¶
func (s *Server) ConfigureQuery(ctx context.Context, req *pub.ConfigureQueryRequest) (*pub.ConfigureQueryResponse, error)
func (*Server) ConfigureRealTime ¶
func (s *Server) ConfigureRealTime(ctx context.Context, req *pub.ConfigureRealTimeRequest) (*pub.ConfigureRealTimeResponse, error)
func (*Server) ConfigureReplication ¶
func (s *Server) ConfigureReplication(ctx context.Context, req *pub.ConfigureReplicationRequest) (resp *pub.ConfigureReplicationResponse, err error)
func (*Server) ConfigureWrite ¶
func (s *Server) ConfigureWrite(ctx context.Context, req *pub.ConfigureWriteRequest) (*pub.ConfigureWriteResponse, error)
ConfigureWrite
func (*Server) Connect ¶
func (s *Server) Connect(ctx context.Context, req *pub.ConnectRequest) (*pub.ConnectResponse, error)
Connect connects to the data base and validates the connections
func (*Server) ConnectSession ¶
func (s *Server) ConnectSession(*pub.ConnectRequest, pub.Publisher_ConnectSessionServer) error
func (*Server) Disconnect ¶
func (s *Server) Disconnect(ctx context.Context, req *pub.DisconnectRequest) (*pub.DisconnectResponse, error)
Disconnect disconnects from the server
func (*Server) DiscoverRelatedEntities ¶
func (s *Server) DiscoverRelatedEntities(ctx context.Context, req *pub.DiscoverRelatedEntitiesRequest) (*pub.DiscoverRelatedEntitiesResponse, error)
func (*Server) DiscoverSchemas ¶
func (s *Server) DiscoverSchemas(ctx context.Context, req *pub.DiscoverSchemasRequest) (*pub.DiscoverSchemasResponse, error)
func (*Server) DiscoverSchemasStream ¶
func (s *Server) DiscoverSchemasStream(req *pub.DiscoverSchemasRequest, stream pub.Publisher_DiscoverSchemasStreamServer) error
func (*Server) DiscoverShapes ¶
func (s *Server) DiscoverShapes(ctx context.Context, req *pub.DiscoverSchemasRequest) (*pub.DiscoverSchemasResponse, error)
DiscoverShapes discovers shapes present in the database
func (*Server) GetStoredProcedures ¶
func (*Server) GetTablesAndViews ¶
func (*Server) PrepareWrite ¶
func (s *Server) PrepareWrite(ctx context.Context, req *pub.PrepareWriteRequest) (*pub.PrepareWriteResponse, error)
PrepareWrite sets up the plugin to be able to write back
func (*Server) PublishStream ¶
func (s *Server) PublishStream(req *pub.ReadRequest, stream pub.Publisher_PublishStreamServer) error
PublishStream sends records read in request to the agent
func (*Server) ReadStream ¶
func (s *Server) ReadStream(req *pub.ReadRequest, stream pub.Publisher_ReadStreamServer) error
func (*Server) WriteStream ¶
func (s *Server) WriteStream(stream pub.Publisher_WriteStreamServer) error
WriteStream writes a stream of records back to the source system
type Session ¶
type Session struct { Ctx context.Context Cancel func() Publishing bool Log hclog.Logger Settings *Settings Writer Writer // tables that were discovered during connect SchemaInfo map[string]*meta.Schema StoredProcedures []string RealTimeHelper *RealTimeHelper Config Config DB *sql.DB DbHandles map[string]*sql.DB SchemaDiscoverer SchemaDiscoverer }
type Settings ¶
type Settings struct { Host string `json:"host"` Port int `json:"port"` Instance string `json:"instance"` Database string `json:"database"` Auth AuthType `json:"auth"` Username string `json:"username"` Password string `json:"password"` AdvancedSettings string `json:"advancedSettings"` PrePublishQuery string `json:"prePublishQuery"` PostPublishQuery string `json:"postPublishQuery"` SkipCustomQueryCount bool `json:"skipCustomQueryCount"` SkipConnectDiscovery bool `json:"skipConnectDiscovery"` DisableDiscovery bool `json:"disableDiscovery"` }
Settings object for plugin Contains connection information and pre/post queries
func (*Settings) GetConnectionString ¶
GetConnectionString builds a connection string from a settings object
type StoreResult ¶
type StoreResult int
type TxHelper ¶
type TxHelper struct {
// contains filtered or unexported fields
}
func NewTxHelper ¶
func NewTxHelper(tx *bolt.Tx) TxHelper
func (TxHelper) GetDependentSchemaKeys ¶
func (TxHelper) RegisterDependency ¶
func (TxHelper) SetSchemaHash ¶
func (t TxHelper) SetSchemaHash(schemaID string, keys meta.RowKeys, values meta.RowValues) (result StoreResult, err error)
SetSchemaHash updates the store to link the keys to the values, to allow insert/update detection.
The return value will be StoreAdded if the keys were not present in the store. The return value will be StoreChanged if the keys were present in the store, but with a different value. The return value will be StoreDeleted if the values were nil, whether or not they were present.
If the `values` parameter is nil, the keys will be deleted from the store and the method will return false, false, nil.
func (TxHelper) UnregisterDependency ¶
func (t TxHelper) UnregisterDependency(dependencyID string, schemaID string, dependencyKeys meta.RowKeys, schemaKeys meta.RowKeys) error
UnregisterDependency removes the dependency linkage between the dependencyKeys and the schemaKeys. If schemaKeys is nil, all dependencies for the dependencyKeys are removed.
type VersionSet ¶
func (VersionSet) Equals ¶
func (v VersionSet) Equals(other VersionSet) bool
func (VersionSet) String ¶
func (v VersionSet) String() string
type WriteSettings ¶
Settings object for write requests Contains target schema and the commit sla timeout
type Writer ¶
type Writer interface {
Write(session *OpSession, record *pub.UnmarshalledRecord) error
}
func NewDefaultWriteHandler ¶
func NewDefaultWriteHandler(session *OpSession, req *pub.PrepareWriteRequest) (Writer, error)
func NewReplicationWriteHandler ¶
func NewReplicationWriteHandler(session *OpSession, req *pub.PrepareWriteRequest) (Writer, error)
func PrepareWriteHandler ¶
func PrepareWriteHandler(session *OpSession, req *pub.PrepareWriteRequest) (Writer, error)