internal

package
v0.0.0-...-576b245 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 16, 2025 License: MIT Imports: 17 Imported by: 0

Documentation

Index

Constants

View Source
const (
	RestrictedForwardMessage_InvalidOperation int = 0
	RestrictedForwardMessage_Recursive        int = 1
)
View Source
const (
	LOGGER_PREFIX string = "[worker-postgres] "
)

Variables

View Source
var (
	GlobalTracerManager             *TracerManager           // be register from PostgresWorker
	GlobalContextHelper             ContextHelper            = ContextHelper{}
	GlobalRestrictedMessageDelegate postgres.MessageDelegate = RestrictedMessageDelegate(0)
	GlobalNoopMessageDelegate       postgres.MessageDelegate = NoopMessageDelegate(0)
	GlobalMessageDelegateHelper     MessageDelegateHelper    = MessageDelegateHelper{}

	PostgresWorkerModuleInstance = PostgresWorkerModule{}

	PostgresWorkerLogger *log.Logger = log.New(os.Stdout, LOGGER_PREFIX, log.LstdFlags|log.Lmsgprefix)
)

Functions

func IsMessageHandler

func IsMessageHandler(rv reflect.Value) bool

func IsMessageHandlerType

func IsMessageHandlerType(rt reflect.Type) bool

func StopRecursiveForwardMessageHandler

func StopRecursiveForwardMessageHandler(ctx *Context, msg *Message)

Types

type CompositeMessageObserver

type CompositeMessageObserver []MessageObserver

func (CompositeMessageObserver) OnAck

func (o CompositeMessageObserver) OnAck(ctx *Context, message *Message)

OnAck implements MessageObserver.

func (CompositeMessageObserver) Type

Type implements MessageObserver.

type Config

type Config = postgres.Config

type Context

type Context struct {
	// contains filtered or unexported fields
}

func (*Context) Break

func (c *Context) Break()

func (*Context) CanRecordingLog

func (c *Context) CanRecordingLog() bool

func (*Context) CatchErr

func (c *Context) CatchErr(err error)

func (*Context) Deadline

func (c *Context) Deadline() (deadline time.Time, ok bool)

Deadline implements context.Context.

func (*Context) Done

func (c *Context) Done() <-chan struct{}

Done implements context.Context.

func (*Context) Err

func (c *Context) Err() error

Err implements context.Context.

func (*Context) InvalidMessage

func (c *Context) InvalidMessage(message *Message) error

func (*Context) IsAborted

func (c *Context) IsAborted() bool

func (*Context) Logger

func (c *Context) Logger() *log.Logger

func (*Context) RecordingLog

func (c *Context) RecordingLog(v bool)

func (*Context) SetValue

func (c *Context) SetValue(key interface{}, value interface{})

SetValue implements trace.ValueContext.

func (*Context) Status

func (c *Context) Status() StatusCode

func (*Context) Value

func (c *Context) Value(key any) any

Value implements context.Context.

type ContextHelper

type ContextHelper struct{}

func (ContextHelper) ExtractReplyCode

func (ContextHelper) ExtractReplyCode(ctx *Context) ReplyCode

func (ContextHelper) InjectReplyCode

func (ContextHelper) InjectReplyCode(ctx *Context, reply ReplyCode)

func (ContextHelper) InjectReplyCodeSafe

func (ContextHelper) InjectReplyCodeSafe(ctx *Context, reply ReplyCode)

type ContextMessageDelegate

type ContextMessageDelegate struct {
	// contains filtered or unexported fields
}

func NewContextMessageDelegate

func NewContextMessageDelegate(ctx *Context) *ContextMessageDelegate

func (*ContextMessageDelegate) OnAck

func (d *ContextMessageDelegate) OnAck(msg *postgres.Message)

OnAck implements postgres.MessageDelegate.

type CreateReplicationSlotSource

type CreateReplicationSlotSource = postgres.CreateReplicationSlotSource

type Defer

type Defer struct {
	// contains filtered or unexported fields
}

func (*Defer) Do

func (d *Defer) Do(do func(f Finalizer))

type ErrorHandler

type ErrorHandler func(ctx *Context, message *Message, err interface{})

type Event

type Event = postgres.Event

type Finalizer

type Finalizer struct {
	// contains filtered or unexported fields
}

func (Finalizer) Add

func (f Finalizer) Add(actions ...func(err interface{}))

type Message

type Message = postgres.Message

type MessageDelegateHelper

type MessageDelegateHelper struct{}

func (MessageDelegateHelper) IsRestricted

func (MessageDelegateHelper) IsRestricted(msg *Message) (bool, error)

func (MessageDelegateHelper) Restrict

func (MessageDelegateHelper) Restrict(msg *Message) error

func (MessageDelegateHelper) Unrestrict

func (MessageDelegateHelper) Unrestrict(msg *Message) error

type MessageDispatcher

type MessageDispatcher struct {
	MessageHandleService   *MessageHandleService
	MessageTracerService   *MessageTracerService
	MessageObserverService *MessageObserverService
	Router                 Router

	OnHostErrorProc OnHostErrorHandler

	ErrorHandler          ErrorHandler
	InvalidMessageHandler MessageHandler

	SlotSet map[string]SlotOffset
}

func (*MessageDispatcher) ProcessMessage

func (d *MessageDispatcher) ProcessMessage(ctx *Context, message *Message)

func (*MessageDispatcher) SlotOffsets

func (d *MessageDispatcher) SlotOffsets() []SlotOffset

func (*MessageDispatcher) Slots

func (d *MessageDispatcher) Slots() []string

type MessageErrorHandler

type MessageErrorHandler interface {
	MessageHandler
	ProcessMessageError(ctx *Context, message *Message, err interface{})
}

type MessageHandleModule

type MessageHandleModule interface {
	CanSetSuccessor() bool
	SetSuccessor(successor MessageHandleModule)
	ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover)
	OnInitComplete()
	OnStart(ctx context.Context) error
	OnStop(ctx context.Context) error
}

type MessageHandleProc

type MessageHandleProc func(ctx *Context, message *Message)

func (MessageHandleProc) ProcessMessage

func (proc MessageHandleProc) ProcessMessage(ctx *Context, message *Message)

type MessageHandleService

type MessageHandleService struct {
	// contains filtered or unexported fields
}

func NewMessageHandleService

func NewMessageHandleService() *MessageHandleService

func (*MessageHandleService) ProcessMessage

func (s *MessageHandleService) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover)

func (*MessageHandleService) Register

func (s *MessageHandleService) Register(module MessageHandleModule)

type MessageHandler

type MessageHandler interface {
	ProcessMessage(ctx *Context, message *Message)
}

func AsMessageHandler

func AsMessageHandler(rv reflect.Value) MessageHandler

type MessageObserver

type MessageObserver interface {
	OnAck(ctx *Context, message *Message)
	Type() reflect.Type
}

type MessageObserverAffair

type MessageObserverAffair interface {
	MessageObserverTypes() []reflect.Type
}

type MessageObserverService

type MessageObserverService struct {
	MessageObservers map[reflect.Type]MessageObserver
	// contains filtered or unexported fields
}

func (*MessageObserverService) RegisterMessageObservers

func (s *MessageObserverService) RegisterMessageObservers(msg *Message, handlerID string)

func (*MessageObserverService) UnregisterAllMessageObservers

func (s *MessageObserverService) UnregisterAllMessageObservers(msg *Message)

type MessageTracerService

type MessageTracerService struct {
	TracerManager *TracerManager

	Enabled bool

	InvalidMessageHandlerComponentID string
	// contains filtered or unexported fields
}

func NewMessageTracerService

func NewMessageTracerService() *MessageTracerService

func (*MessageTracerService) TextMapPropagator

func (s *MessageTracerService) TextMapPropagator() propagation.TextMapPropagator

func (*MessageTracerService) Tracer

type NoopMessageDelegate

type NoopMessageDelegate int

func (NoopMessageDelegate) OnAck

func (n NoopMessageDelegate) OnAck(msg *postgres.Message)

OnAck implements postgres.MessageDelegate.

type OnHostErrorHandler

type OnHostErrorHandler func(err error) (disposed bool)

type PostgresWorker

type PostgresWorker struct {
	Config                        *Config
	ReplicationSlotSourceProvider CreateReplicationSlotSourceProvider
	// contains filtered or unexported fields
}

func (*PostgresWorker) Logger

func (w *PostgresWorker) Logger() *log.Logger

func (*PostgresWorker) Start

func (w *PostgresWorker) Start(ctx context.Context)

Start implements internal.Host.

func (*PostgresWorker) Stop

func (w *PostgresWorker) Stop(ctx context.Context) error

Stop implements internal.Host.

type PostgresWorkerModule

type PostgresWorkerModule struct{}

func (PostgresWorkerModule) ConfigureLogger

func (p PostgresWorkerModule) ConfigureLogger(logflags int, w io.Writer)

ConfigureLogger implements internal.HostModule.

func (PostgresWorkerModule) DescribeHostType

func (p PostgresWorkerModule) DescribeHostType() reflect.Type

DescribeHostType implements internal.HostModule.

func (PostgresWorkerModule) Init

func (p PostgresWorkerModule) Init(h host.Host, app *host.AppModule)

Init implements internal.HostModule.

func (PostgresWorkerModule) InitComplete

func (p PostgresWorkerModule) InitComplete(h host.Host, app *host.AppModule)

InitComplete implements internal.HostModule.

type PostgresWorkerRegistrar

type PostgresWorkerRegistrar struct {
	// contains filtered or unexported fields
}

func NewPostgresWorkerRegistrar

func NewPostgresWorkerRegistrar(worker *PostgresWorker) *PostgresWorkerRegistrar

func (*PostgresWorkerRegistrar) AddRouter

func (r *PostgresWorkerRegistrar) AddRouter(slot string, handler MessageHandler, handlerComponentID string)

func (*PostgresWorkerRegistrar) EnableTracer

func (r *PostgresWorkerRegistrar) EnableTracer(enabled bool)

func (*PostgresWorkerRegistrar) RegisterMessageHandleModule

func (r *PostgresWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)

func (*PostgresWorkerRegistrar) RegisterMessageObserver

func (r *PostgresWorkerRegistrar) RegisterMessageObserver(v MessageObserver)

func (*PostgresWorkerRegistrar) RegisterSlot

func (r *PostgresWorkerRegistrar) RegisterSlot(slotOffset postgres.SlotOffset)

func (*PostgresWorkerRegistrar) SetErrorHandler

func (r *PostgresWorkerRegistrar) SetErrorHandler(handler ErrorHandler)

func (*PostgresWorkerRegistrar) SetInvalidMessageHandler

func (r *PostgresWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)

func (*PostgresWorkerRegistrar) SetMessageManager

func (r *PostgresWorkerRegistrar) SetMessageManager(messageManager interface{})

type ProcessingState

type ProcessingState struct {
	Tracer *trace.SeverityTracer
	Span   *trace.SeveritySpan
	Slot   string
}

type Recover

type Recover struct {
	// contains filtered or unexported fields
}

func (*Recover) Defer

func (s *Recover) Defer(catch func(err interface{})) *Defer

type ReplyCode

type ReplyCode int
const (
	UNSET ReplyCode = iota
	PASS
	FAIL
	ABORT

	INVALID ReplyCode = -1
)

func (ReplyCode) String

func (code ReplyCode) String() string

type RestrictedForwardMessageHandler

type RestrictedForwardMessageHandler int

func (RestrictedForwardMessageHandler) ProcessMessage

func (h RestrictedForwardMessageHandler) ProcessMessage(ctx *Context, message *Message)

ProcessMessage implements MessageHandler.

type RestrictedMessageDelegate

type RestrictedMessageDelegate int

func (RestrictedMessageDelegate) OnAck

OnAck implements postgres.MessageDelegate.

type RestrictedOperationError

type RestrictedOperationError string

func (RestrictedOperationError) Error

func (e RestrictedOperationError) Error() string

type RouteComponent

type RouteComponent struct {
	MessageHandler     MessageHandler
	HandlerComponentID string
}

type Router

type Router map[string]RouteComponent

func (Router) Add

func (r Router) Add(slot string, handler MessageHandler, handlerComponentID string)

func (Router) FindHandlerComponentID

func (r Router) FindHandlerComponentID(stream string) string

func (Router) Get

func (r Router) Get(stream string) MessageHandler

func (Router) GetRouteComponent

func (r Router) GetRouteComponent(stream string) RouteComponent

func (Router) Has

func (r Router) Has(stream string) bool

func (Router) Remove

func (r Router) Remove(stream string)

type SlotOffset

type SlotOffset = postgres.SlotOffset

type SlotOffsetInfo

type SlotOffsetInfo = postgres.SlotOffsetInfo

type StatusCode

type StatusCode = ReplyCode

type StdMessageHandleModule

type StdMessageHandleModule struct {
	// contains filtered or unexported fields
}

func NewStdMessageHandleModule

func NewStdMessageHandleModule(dispatcher *MessageDispatcher) *StdMessageHandleModule

func (*StdMessageHandleModule) CanSetSuccessor

func (*StdMessageHandleModule) CanSetSuccessor() bool

CanSetSuccessor implements MessageHandleModule.

func (*StdMessageHandleModule) OnInitComplete

func (*StdMessageHandleModule) OnInitComplete()

OnInitComplete implements MessageHandleModule.

func (*StdMessageHandleModule) OnStart

func (s *StdMessageHandleModule) OnStart(ctx context.Context) error

OnStart implements MessageHandleModule.

func (*StdMessageHandleModule) OnStop

OnStop implements MessageHandleModule.

func (*StdMessageHandleModule) ProcessMessage

func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *postgres.Message, state ProcessingState, recover *Recover)

ProcessMessage implements MessageHandleModule.

func (*StdMessageHandleModule) SetSuccessor

func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)

SetSuccessor implements MessageHandleModule.

type StopError

type StopError struct {
	// contains filtered or unexported fields
}

func (*StopError) Error

func (e *StopError) Error() string

func (*StopError) Unwrap

func (e *StopError) Unwrap() error

type TracerManager

type TracerManager struct {
	TracerProvider    *trace.SeverityTracerProvider
	TextMapPropagator propagation.TextMapPropagator
	// contains filtered or unexported fields
}

func NewTraceManager

func NewTraceManager() *TracerManager

func (*TracerManager) GenerateManagedTracer

func (m *TracerManager) GenerateManagedTracer(v interface{}) *trace.SeverityTracer

func (*TracerManager) UndefinedTracer

func (m *TracerManager) UndefinedTracer() *trace.SeverityTracer

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL