Documentation ¶
Index ¶
- Constants
- Variables
- func IsMessageHandler(rv reflect.Value) bool
- func IsMessageHandlerType(rt reflect.Type) bool
- func StopRecursiveForwardMessageHandler(ctx *Context, msg *Message)
- type CompositeMessageObserver
- type Config
- type Context
- func (c *Context) Break()
- func (c *Context) CanRecordingLog() bool
- func (c *Context) CatchErr(err error)
- func (c *Context) Deadline() (deadline time.Time, ok bool)
- func (c *Context) Done() <-chan struct{}
- func (c *Context) Err() error
- func (c *Context) InvalidMessage(message *Message) error
- func (c *Context) IsAborted() bool
- func (c *Context) Logger() *log.Logger
- func (c *Context) RecordingLog(v bool)
- func (c *Context) SetValue(key interface{}, value interface{})
- func (c *Context) Status() StatusCode
- func (c *Context) Value(key any) any
- type ContextHelper
- type ContextMessageDelegate
- type CreateReplicationSlotSource
- type CreateReplicationSlotSourceProvider
- type Defer
- type ErrorHandler
- type Event
- type Finalizer
- type Message
- type MessageDelegateHelper
- type MessageDispatcher
- type MessageErrorHandler
- type MessageHandleModule
- type MessageHandleProc
- type MessageHandleService
- type MessageHandler
- type MessageObserver
- type MessageObserverAffair
- type MessageObserverService
- type MessageTracerService
- type NoopMessageDelegate
- type OnHostErrorHandler
- type PostgresWorker
- type PostgresWorkerModule
- type PostgresWorkerRegistrar
- func (r *PostgresWorkerRegistrar) AddRouter(slot string, handler MessageHandler, handlerComponentID string)
- func (r *PostgresWorkerRegistrar) EnableTracer(enabled bool)
- func (r *PostgresWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)
- func (r *PostgresWorkerRegistrar) RegisterMessageObserver(v MessageObserver)
- func (r *PostgresWorkerRegistrar) RegisterSlot(slotOffset postgres.SlotOffset)
- func (r *PostgresWorkerRegistrar) SetErrorHandler(handler ErrorHandler)
- func (r *PostgresWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)
- func (r *PostgresWorkerRegistrar) SetMessageManager(messageManager interface{})
- type ProcessingState
- type Recover
- type ReplyCode
- type RestrictedForwardMessageHandler
- type RestrictedMessageDelegate
- type RestrictedOperationError
- type RouteComponent
- type Router
- func (r Router) Add(slot string, handler MessageHandler, handlerComponentID string)
- func (r Router) FindHandlerComponentID(stream string) string
- func (r Router) Get(stream string) MessageHandler
- func (r Router) GetRouteComponent(stream string) RouteComponent
- func (r Router) Has(stream string) bool
- func (r Router) Remove(stream string)
- type SlotOffset
- type SlotOffsetInfo
- type StatusCode
- type StdMessageHandleModule
- func (*StdMessageHandleModule) CanSetSuccessor() bool
- func (*StdMessageHandleModule) OnInitComplete()
- func (s *StdMessageHandleModule) OnStart(ctx context.Context) error
- func (s *StdMessageHandleModule) OnStop(ctx context.Context) error
- func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *postgres.Message, state ProcessingState, ...)
- func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)
- type StopError
- type TracerManager
Constants ¶
const ( RestrictedForwardMessage_InvalidOperation int = 0 RestrictedForwardMessage_Recursive int = 1 )
const (
LOGGER_PREFIX string = "[worker-postgres] "
)
Variables ¶
var ( GlobalTracerManager *TracerManager // be register from PostgresWorker GlobalContextHelper ContextHelper = ContextHelper{} GlobalRestrictedMessageDelegate postgres.MessageDelegate = RestrictedMessageDelegate(0) GlobalNoopMessageDelegate postgres.MessageDelegate = NoopMessageDelegate(0) GlobalMessageDelegateHelper MessageDelegateHelper = MessageDelegateHelper{} PostgresWorkerModuleInstance = PostgresWorkerModule{} PostgresWorkerLogger *log.Logger = log.New(os.Stdout, LOGGER_PREFIX, log.LstdFlags|log.Lmsgprefix) )
Functions ¶
func IsMessageHandler ¶
func IsMessageHandlerType ¶
Types ¶
type CompositeMessageObserver ¶
type CompositeMessageObserver []MessageObserver
func (CompositeMessageObserver) OnAck ¶
func (o CompositeMessageObserver) OnAck(ctx *Context, message *Message)
OnAck implements MessageObserver.
func (CompositeMessageObserver) Type ¶
func (c CompositeMessageObserver) Type() reflect.Type
Type implements MessageObserver.
type Context ¶
type Context struct {
// contains filtered or unexported fields
}
func (*Context) CanRecordingLog ¶
func (*Context) InvalidMessage ¶
func (*Context) RecordingLog ¶
func (*Context) SetValue ¶
func (c *Context) SetValue(key interface{}, value interface{})
SetValue implements trace.ValueContext.
func (*Context) Status ¶
func (c *Context) Status() StatusCode
type ContextHelper ¶
type ContextHelper struct{}
func (ContextHelper) ExtractReplyCode ¶
func (ContextHelper) ExtractReplyCode(ctx *Context) ReplyCode
func (ContextHelper) InjectReplyCode ¶
func (ContextHelper) InjectReplyCode(ctx *Context, reply ReplyCode)
func (ContextHelper) InjectReplyCodeSafe ¶
func (ContextHelper) InjectReplyCodeSafe(ctx *Context, reply ReplyCode)
type ContextMessageDelegate ¶
type ContextMessageDelegate struct {
// contains filtered or unexported fields
}
func NewContextMessageDelegate ¶
func NewContextMessageDelegate(ctx *Context) *ContextMessageDelegate
func (*ContextMessageDelegate) OnAck ¶
func (d *ContextMessageDelegate) OnAck(msg *postgres.Message)
OnAck implements postgres.MessageDelegate.
type CreateReplicationSlotSource ¶
type CreateReplicationSlotSource = postgres.CreateReplicationSlotSource
type CreateReplicationSlotSourceProvider ¶
type CreateReplicationSlotSourceProvider = postgres.CreateReplicationSlotSourceProvider
type ErrorHandler ¶
type MessageDelegateHelper ¶
type MessageDelegateHelper struct{}
func (MessageDelegateHelper) IsRestricted ¶
func (MessageDelegateHelper) IsRestricted(msg *Message) (bool, error)
func (MessageDelegateHelper) Restrict ¶
func (MessageDelegateHelper) Restrict(msg *Message) error
func (MessageDelegateHelper) Unrestrict ¶
func (MessageDelegateHelper) Unrestrict(msg *Message) error
type MessageDispatcher ¶
type MessageDispatcher struct { MessageHandleService *MessageHandleService MessageTracerService *MessageTracerService MessageObserverService *MessageObserverService Router Router OnHostErrorProc OnHostErrorHandler ErrorHandler ErrorHandler InvalidMessageHandler MessageHandler SlotSet map[string]SlotOffset }
func (*MessageDispatcher) ProcessMessage ¶
func (d *MessageDispatcher) ProcessMessage(ctx *Context, message *Message)
func (*MessageDispatcher) SlotOffsets ¶
func (d *MessageDispatcher) SlotOffsets() []SlotOffset
func (*MessageDispatcher) Slots ¶
func (d *MessageDispatcher) Slots() []string
type MessageErrorHandler ¶
type MessageErrorHandler interface { MessageHandler ProcessMessageError(ctx *Context, message *Message, err interface{}) }
type MessageHandleModule ¶
type MessageHandleProc ¶
func (MessageHandleProc) ProcessMessage ¶
func (proc MessageHandleProc) ProcessMessage(ctx *Context, message *Message)
type MessageHandleService ¶
type MessageHandleService struct {
// contains filtered or unexported fields
}
func NewMessageHandleService ¶
func NewMessageHandleService() *MessageHandleService
func (*MessageHandleService) ProcessMessage ¶
func (s *MessageHandleService) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover)
func (*MessageHandleService) Register ¶
func (s *MessageHandleService) Register(module MessageHandleModule)
type MessageHandler ¶
func AsMessageHandler ¶
func AsMessageHandler(rv reflect.Value) MessageHandler
type MessageObserver ¶
type MessageObserverAffair ¶
type MessageObserverService ¶
type MessageObserverService struct { MessageObservers map[reflect.Type]MessageObserver // contains filtered or unexported fields }
func (*MessageObserverService) RegisterMessageObservers ¶
func (s *MessageObserverService) RegisterMessageObservers(msg *Message, handlerID string)
func (*MessageObserverService) UnregisterAllMessageObservers ¶
func (s *MessageObserverService) UnregisterAllMessageObservers(msg *Message)
type MessageTracerService ¶
type MessageTracerService struct { TracerManager *TracerManager Enabled bool InvalidMessageHandlerComponentID string // contains filtered or unexported fields }
func NewMessageTracerService ¶
func NewMessageTracerService() *MessageTracerService
func (*MessageTracerService) TextMapPropagator ¶
func (s *MessageTracerService) TextMapPropagator() propagation.TextMapPropagator
func (*MessageTracerService) Tracer ¶
func (s *MessageTracerService) Tracer(id string) *trace.SeverityTracer
type NoopMessageDelegate ¶
type NoopMessageDelegate int
func (NoopMessageDelegate) OnAck ¶
func (n NoopMessageDelegate) OnAck(msg *postgres.Message)
OnAck implements postgres.MessageDelegate.
type OnHostErrorHandler ¶
type PostgresWorker ¶
type PostgresWorker struct { Config *Config ReplicationSlotSourceProvider CreateReplicationSlotSourceProvider // contains filtered or unexported fields }
func (*PostgresWorker) Logger ¶
func (w *PostgresWorker) Logger() *log.Logger
func (*PostgresWorker) Start ¶
func (w *PostgresWorker) Start(ctx context.Context)
Start implements internal.Host.
type PostgresWorkerModule ¶
type PostgresWorkerModule struct{}
func (PostgresWorkerModule) ConfigureLogger ¶
func (p PostgresWorkerModule) ConfigureLogger(logflags int, w io.Writer)
ConfigureLogger implements internal.HostModule.
func (PostgresWorkerModule) DescribeHostType ¶
func (p PostgresWorkerModule) DescribeHostType() reflect.Type
DescribeHostType implements internal.HostModule.
func (PostgresWorkerModule) Init ¶
func (p PostgresWorkerModule) Init(h host.Host, app *host.AppModule)
Init implements internal.HostModule.
func (PostgresWorkerModule) InitComplete ¶
func (p PostgresWorkerModule) InitComplete(h host.Host, app *host.AppModule)
InitComplete implements internal.HostModule.
type PostgresWorkerRegistrar ¶
type PostgresWorkerRegistrar struct {
// contains filtered or unexported fields
}
func NewPostgresWorkerRegistrar ¶
func NewPostgresWorkerRegistrar(worker *PostgresWorker) *PostgresWorkerRegistrar
func (*PostgresWorkerRegistrar) AddRouter ¶
func (r *PostgresWorkerRegistrar) AddRouter(slot string, handler MessageHandler, handlerComponentID string)
func (*PostgresWorkerRegistrar) EnableTracer ¶
func (r *PostgresWorkerRegistrar) EnableTracer(enabled bool)
func (*PostgresWorkerRegistrar) RegisterMessageHandleModule ¶
func (r *PostgresWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)
func (*PostgresWorkerRegistrar) RegisterMessageObserver ¶
func (r *PostgresWorkerRegistrar) RegisterMessageObserver(v MessageObserver)
func (*PostgresWorkerRegistrar) RegisterSlot ¶
func (r *PostgresWorkerRegistrar) RegisterSlot(slotOffset postgres.SlotOffset)
func (*PostgresWorkerRegistrar) SetErrorHandler ¶
func (r *PostgresWorkerRegistrar) SetErrorHandler(handler ErrorHandler)
func (*PostgresWorkerRegistrar) SetInvalidMessageHandler ¶
func (r *PostgresWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)
func (*PostgresWorkerRegistrar) SetMessageManager ¶
func (r *PostgresWorkerRegistrar) SetMessageManager(messageManager interface{})
type ProcessingState ¶
type ProcessingState struct { Tracer *trace.SeverityTracer Span *trace.SeveritySpan Slot string }
type RestrictedForwardMessageHandler ¶
type RestrictedForwardMessageHandler int
func (RestrictedForwardMessageHandler) ProcessMessage ¶
func (h RestrictedForwardMessageHandler) ProcessMessage(ctx *Context, message *Message)
ProcessMessage implements MessageHandler.
type RestrictedMessageDelegate ¶
type RestrictedMessageDelegate int
func (RestrictedMessageDelegate) OnAck ¶
func (r RestrictedMessageDelegate) OnAck(msg *postgres.Message)
OnAck implements postgres.MessageDelegate.
type RestrictedOperationError ¶
type RestrictedOperationError string
func (RestrictedOperationError) Error ¶
func (e RestrictedOperationError) Error() string
type RouteComponent ¶
type RouteComponent struct { MessageHandler MessageHandler HandlerComponentID string }
type Router ¶
type Router map[string]RouteComponent
func (Router) Add ¶
func (r Router) Add(slot string, handler MessageHandler, handlerComponentID string)
func (Router) FindHandlerComponentID ¶
func (Router) Get ¶
func (r Router) Get(stream string) MessageHandler
func (Router) GetRouteComponent ¶
func (r Router) GetRouteComponent(stream string) RouteComponent
type SlotOffset ¶
type SlotOffset = postgres.SlotOffset
type SlotOffsetInfo ¶
type SlotOffsetInfo = postgres.SlotOffsetInfo
type StatusCode ¶
type StatusCode = ReplyCode
type StdMessageHandleModule ¶
type StdMessageHandleModule struct {
// contains filtered or unexported fields
}
func NewStdMessageHandleModule ¶
func NewStdMessageHandleModule(dispatcher *MessageDispatcher) *StdMessageHandleModule
func (*StdMessageHandleModule) CanSetSuccessor ¶
func (*StdMessageHandleModule) CanSetSuccessor() bool
CanSetSuccessor implements MessageHandleModule.
func (*StdMessageHandleModule) OnInitComplete ¶
func (*StdMessageHandleModule) OnInitComplete()
OnInitComplete implements MessageHandleModule.
func (*StdMessageHandleModule) OnStart ¶
func (s *StdMessageHandleModule) OnStart(ctx context.Context) error
OnStart implements MessageHandleModule.
func (*StdMessageHandleModule) OnStop ¶
func (s *StdMessageHandleModule) OnStop(ctx context.Context) error
OnStop implements MessageHandleModule.
func (*StdMessageHandleModule) ProcessMessage ¶
func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *postgres.Message, state ProcessingState, recover *Recover)
ProcessMessage implements MessageHandleModule.
func (*StdMessageHandleModule) SetSuccessor ¶
func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)
SetSuccessor implements MessageHandleModule.
type TracerManager ¶
type TracerManager struct { TracerProvider *trace.SeverityTracerProvider TextMapPropagator propagation.TextMapPropagator // contains filtered or unexported fields }
func NewTraceManager ¶
func NewTraceManager() *TracerManager
func (*TracerManager) GenerateManagedTracer ¶
func (m *TracerManager) GenerateManagedTracer(v interface{}) *trace.SeverityTracer
func (*TracerManager) UndefinedTracer ¶
func (m *TracerManager) UndefinedTracer() *trace.SeverityTracer
Source Files ¶
- compositeMessageObserver.go
- context.go
- contextHelper.go
- contextMessageDelegate.go
- def.go
- defer.go
- errors.go
- messageDelegateHelper.go
- messageDispatcher.go
- messageHandleService.go
- messageHandler.go
- messageObserverService.go
- messageTracerService.go
- noopMessageDelegate.go
- postgresWorker.go
- postgresWorkerModule.go
- postgresWorkerRegistrar.go
- processingState.go
- replyCode.go
- restrictedForwardMessageHandler.go
- restrictedMessageDelegate.go
- routeComponent.go
- router.go
- stdMessageHandleModule.go
- tracerManager.go
- util.go