Documentation ¶
Index ¶
- Constants
- Variables
- func SetTracerManager(v *TracerManager)
- type CompositeMessageObserver
- type Context
- func (*Context) Deadline() (deadline time.Time, ok bool)
- func (*Context) Done() <-chan struct{}
- func (*Context) Err() error
- func (c *Context) InvalidMessage(message *Message)
- func (c *Context) Logger() *log.Logger
- func (c *Context) Pause(streams ...string) error
- func (c *Context) Resume(streams ...string) error
- func (c *Context) SetValue(key interface{}, value interface{})
- func (c *Context) Value(key any) any
- type ContextBuilder
- func (b *ContextBuilder) Build() *Context
- func (b *ContextBuilder) ConsumerGroup(v string) *ContextBuilder
- func (b *ContextBuilder) ConsumerName(v string) *ContextBuilder
- func (b *ContextBuilder) Context(v context.Context) *ContextBuilder
- func (b *ContextBuilder) InvalidMessageHandler(v MessageHandler) *ContextBuilder
- func (b *ContextBuilder) Logger(v *log.Logger) *ContextBuilder
- type ContextHelper
- type ContextMessageDelegate
- type Defer
- type ErrorHandler
- type Finalizer
- type Message
- type MessageDispatcher
- type MessageHandleModule
- type MessageHandleService
- type MessageHandler
- type MessageHelper
- type MessageObserver
- type MessageObserverAffair
- type MessageObserverService
- type MessageTracerService
- type OnHostErrorHandler
- type ProcessingState
- type Recover
- type RedisWorker
- type RedisWorkerModule
- type RedisWorkerRegistrar
- func (r *RedisWorkerRegistrar) AddRouter(stream string, handler MessageHandler, handlerComponentID string, ...)
- func (r *RedisWorkerRegistrar) EnableTracer(enabled bool)
- func (r *RedisWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)
- func (r *RedisWorkerRegistrar) RegisterMessageObserver(v MessageObserver)
- func (r *RedisWorkerRegistrar) RegisterStream(streamOffset redis.StreamOffset)
- func (r *RedisWorkerRegistrar) SetErrorHandler(handler ErrorHandler)
- func (r *RedisWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)
- func (r *RedisWorkerRegistrar) SetMessageManager(messageManager interface{})
- type ReplyCode
- type RestrictedForwardMessageHandler
- type RestrictedMessageDelegate
- type RestrictedOperationError
- type RouteComponent
- type Router
- func (r Router) Add(stream string, handler MessageHandler, handlerComponentID string, ...)
- func (r Router) FindHandlerComponentID(stream string) string
- func (r Router) Get(stream string) MessageHandler
- func (r Router) GetRouteComponent(stream string) RouteComponent
- func (r Router) Has(stream string) bool
- func (r Router) Remove(stream string)
- type StdInvalidMessageHandler
- type StdMessageHandleModule
- func (*StdMessageHandleModule) CanSetSuccessor() bool
- func (*StdMessageHandleModule) OnInitComplete()
- func (*StdMessageHandleModule) OnStart(ctx context.Context) error
- func (*StdMessageHandleModule) OnStop(ctx context.Context) error
- func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover)
- func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)
- type StopError
- type StreamOffset
- type StreamOffsetInfo
- type StreamSetting
- type TracerManager
- type UniversalClient
- type UniversalOptions
- type XMessage
- type XStream
Constants ¶
View Source
const ( RestrictedForwardMessage_InvalidOperation int = 0 RestrictedForwardMessage_Recursive int = 1 )
View Source
const (
LOGGER_PREFIX string = "[worker-redis] "
)
View Source
const (
REDIS_BUSYGROUP_PREFIX = "BUSYGROUP"
)
Variables ¶
View Source
var ( GlobalContextHelper ContextHelper = ContextHelper{} GlobalRestrictedMessageDelegate redis.MessageDelegate = RestrictedMessageDelegate(0) GlobalMessageHelper MessageHelper = MessageHelper{} RedisWorkerModuleInstance = RedisWorkerModule{} RedisWorkerLogger *log.Logger = log.New(os.Stdout, LOGGER_PREFIX, log.LstdFlags|log.Lmsgprefix) )
Functions ¶
func SetTracerManager ¶
func SetTracerManager(v *TracerManager)
Types ¶
type CompositeMessageObserver ¶
type CompositeMessageObserver []MessageObserver
func (CompositeMessageObserver) OnAck ¶
func (o CompositeMessageObserver) OnAck(ctx *Context, message *Message)
OnAck implements MessageObserver.
func (CompositeMessageObserver) OnDel ¶
func (o CompositeMessageObserver) OnDel(ctx *Context, message *Message)
OnDel implements MessageObserver.
func (CompositeMessageObserver) Type ¶
func (o CompositeMessageObserver) Type() reflect.Type
Type implements MessageObserver.
type Context ¶
type Context struct { ConsumerGroup string ConsumerName string // contains filtered or unexported fields }
func (*Context) InvalidMessage ¶
type ContextBuilder ¶
type ContextBuilder struct {
// contains filtered or unexported fields
}
func NewContextBuilder ¶
func NewContextBuilder() *ContextBuilder
func (*ContextBuilder) Build ¶
func (b *ContextBuilder) Build() *Context
func (*ContextBuilder) ConsumerGroup ¶
func (b *ContextBuilder) ConsumerGroup(v string) *ContextBuilder
func (*ContextBuilder) ConsumerName ¶
func (b *ContextBuilder) ConsumerName(v string) *ContextBuilder
func (*ContextBuilder) Context ¶
func (b *ContextBuilder) Context(v context.Context) *ContextBuilder
func (*ContextBuilder) InvalidMessageHandler ¶
func (b *ContextBuilder) InvalidMessageHandler(v MessageHandler) *ContextBuilder
func (*ContextBuilder) Logger ¶
func (b *ContextBuilder) Logger(v *log.Logger) *ContextBuilder
type ContextHelper ¶
type ContextHelper struct{}
func (ContextHelper) ExtractReplyCode ¶
func (ContextHelper) ExtractReplyCode(ctx *Context) ReplyCode
func (ContextHelper) InjectReplyCode ¶
func (ContextHelper) InjectReplyCode(ctx *Context, reply ReplyCode)
type ContextMessageDelegate ¶
type ContextMessageDelegate struct {
// contains filtered or unexported fields
}
func NewContextMessageDelegate ¶
func NewContextMessageDelegate(ctx *Context) *ContextMessageDelegate
func (*ContextMessageDelegate) OnAck ¶
func (d *ContextMessageDelegate) OnAck(msg *redis.Message)
OnAck implements redis.MessageDelegate.
func (*ContextMessageDelegate) OnDel ¶
func (d *ContextMessageDelegate) OnDel(msg *redis.Message)
OnDel implements redis.MessageDelegate.
type ErrorHandler ¶
type MessageDispatcher ¶
type MessageDispatcher struct { MessageHandleService *MessageHandleService MessageTracerService *MessageTracerService MessageObserverService *MessageObserverService Router Router OnHostErrorProc OnHostErrorHandler ErrorHandler ErrorHandler InvalidMessageHandler MessageHandler StreamSet map[string]StreamOffset // contains filtered or unexported fields }
func (*MessageDispatcher) ProcessMessage ¶
func (d *MessageDispatcher) ProcessMessage(ctx *Context, message *Message)
func (*MessageDispatcher) StreamOffsets ¶
func (d *MessageDispatcher) StreamOffsets() []StreamOffset
func (*MessageDispatcher) Streams ¶
func (d *MessageDispatcher) Streams() []string
type MessageHandleModule ¶
type MessageHandleService ¶
type MessageHandleService struct {
// contains filtered or unexported fields
}
func NewMessageHandleService ¶
func NewMessageHandleService() *MessageHandleService
func (*MessageHandleService) ProcessMessage ¶
func (s *MessageHandleService) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover)
func (*MessageHandleService) Register ¶
func (s *MessageHandleService) Register(module MessageHandleModule)
type MessageHandler ¶
type MessageHelper ¶
type MessageHelper struct{}
func (MessageHelper) IsRestricted ¶
func (MessageHelper) IsRestricted(msg *Message) (bool, error)
func (MessageHelper) Restrict ¶
func (MessageHelper) Restrict(msg *Message) error
func (MessageHelper) Unrestrict ¶
func (MessageHelper) Unrestrict(msg *Message) error
type MessageObserver ¶
type MessageObserverAffair ¶
type MessageObserverService ¶
type MessageObserverService struct { MessageObservers map[reflect.Type]MessageObserver // contains filtered or unexported fields }
func (*MessageObserverService) RegisterMessageObservers ¶
func (s *MessageObserverService) RegisterMessageObservers(msg *Message, handlerID string)
func (*MessageObserverService) UnregisterAllMessageObservers ¶
func (s *MessageObserverService) UnregisterAllMessageObservers(msg *Message)
type MessageTracerService ¶
type MessageTracerService struct { TracerManager *TracerManager Enabled bool InvalidMessageHandlerComponentID string // contains filtered or unexported fields }
func NewMessageTracerService ¶
func NewMessageTracerService() *MessageTracerService
func (*MessageTracerService) TextMapPropagator ¶
func (s *MessageTracerService) TextMapPropagator() propagation.TextMapPropagator
func (*MessageTracerService) Tracer ¶
func (s *MessageTracerService) Tracer(id string) *trace.SeverityTracer
type OnHostErrorHandler ¶
type ProcessingState ¶
type ProcessingState struct { Tracer *trace.SeverityTracer Span *trace.SeveritySpan Stream string }
type RedisWorker ¶
type RedisWorker struct { ConsumerGroup string ConsumerName string RedisOption *redis.UniversalOptions MaxInFlight int64 MaxPollingTimeout time.Duration ClaimMinIdleTime time.Duration IdlingTimeout time.Duration // 若沒有任何訊息時等待多久 ClaimSensitivity int // Read 時取得的訊息數小於等於 n 的話, 執行 Claim ClaimOccurrenceRate int32 // Read 每執行 n 次後 執行 Claim 1 次 AllowCreateGroup bool // 自動註冊 consumer group // contains filtered or unexported fields }
func (*RedisWorker) Logger ¶
func (w *RedisWorker) Logger() *log.Logger
func (*RedisWorker) Start ¶
func (w *RedisWorker) Start(ctx context.Context)
type RedisWorkerModule ¶
type RedisWorkerModule struct{}
func (RedisWorkerModule) ConfigureLogger ¶
func (RedisWorkerModule) ConfigureLogger(logflags int, w io.Writer)
ConfigureLogger implements host.HostModule
func (RedisWorkerModule) DescribeHostType ¶
func (RedisWorkerModule) DescribeHostType() reflect.Type
func (RedisWorkerModule) InitComplete ¶
func (RedisWorkerModule) InitComplete(h host.Host, ctx *host.AppModule)
type RedisWorkerRegistrar ¶
type RedisWorkerRegistrar struct {
// contains filtered or unexported fields
}
func NewRedisWorkerRegistrar ¶
func NewRedisWorkerRegistrar(worker *RedisWorker) *RedisWorkerRegistrar
func (*RedisWorkerRegistrar) AddRouter ¶
func (r *RedisWorkerRegistrar) AddRouter(stream string, handler MessageHandler, handlerComponentID string, setting *StreamSetting)
func (*RedisWorkerRegistrar) EnableTracer ¶
func (r *RedisWorkerRegistrar) EnableTracer(enabled bool)
func (*RedisWorkerRegistrar) RegisterMessageHandleModule ¶
func (r *RedisWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)
func (*RedisWorkerRegistrar) RegisterMessageObserver ¶
func (r *RedisWorkerRegistrar) RegisterMessageObserver(v MessageObserver)
func (*RedisWorkerRegistrar) RegisterStream ¶
func (r *RedisWorkerRegistrar) RegisterStream(streamOffset redis.StreamOffset)
func (*RedisWorkerRegistrar) SetErrorHandler ¶
func (r *RedisWorkerRegistrar) SetErrorHandler(handler ErrorHandler)
func (*RedisWorkerRegistrar) SetInvalidMessageHandler ¶
func (r *RedisWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)
func (*RedisWorkerRegistrar) SetMessageManager ¶
func (r *RedisWorkerRegistrar) SetMessageManager(messageManager interface{})
type RestrictedForwardMessageHandler ¶
type RestrictedForwardMessageHandler int
func (RestrictedForwardMessageHandler) ProcessMessage ¶
func (h RestrictedForwardMessageHandler) ProcessMessage(ctx *Context, message *Message)
ProcessMessage implements MessageHandler.
type RestrictedMessageDelegate ¶
type RestrictedMessageDelegate int
func (RestrictedMessageDelegate) OnAck ¶
func (RestrictedMessageDelegate) OnAck(*redis.Message)
OnAck implements redis.MessageDelegate.
func (RestrictedMessageDelegate) OnDel ¶
func (RestrictedMessageDelegate) OnDel(*redis.Message)
OnDel implements redis.MessageDelegate.
type RestrictedOperationError ¶
type RestrictedOperationError string
func (RestrictedOperationError) Error ¶
func (e RestrictedOperationError) Error() string
type RouteComponent ¶
type RouteComponent struct { MessageHandler MessageHandler HandlerComponentID string StreamSetting *StreamSetting }
type Router ¶
type Router map[string]RouteComponent
func (Router) Add ¶
func (r Router) Add(stream string, handler MessageHandler, handlerComponentID string, streamSetting *StreamSetting)
func (Router) FindHandlerComponentID ¶
func (Router) Get ¶
func (r Router) Get(stream string) MessageHandler
func (Router) GetRouteComponent ¶ added in v0.2.2
func (r Router) GetRouteComponent(stream string) RouteComponent
type StdInvalidMessageHandler ¶
type StdInvalidMessageHandler struct {
// contains filtered or unexported fields
}
func (*StdInvalidMessageHandler) ProcessMessage ¶
func (h *StdInvalidMessageHandler) ProcessMessage(ctx *Context, message *Message)
ProcessMessage implements MessageHandler.
type StdMessageHandleModule ¶
type StdMessageHandleModule struct {
// contains filtered or unexported fields
}
func NewStdMessageHandleModule ¶
func NewStdMessageHandleModule(dispatcher *MessageDispatcher) *StdMessageHandleModule
func (*StdMessageHandleModule) CanSetSuccessor ¶
func (*StdMessageHandleModule) CanSetSuccessor() bool
CanSetSuccessor implements MessageHandleModule.
func (*StdMessageHandleModule) OnInitComplete ¶
func (*StdMessageHandleModule) OnInitComplete()
OnInitComplete implements MessageHandleModule.
func (*StdMessageHandleModule) OnStart ¶
func (*StdMessageHandleModule) OnStart(ctx context.Context) error
OnStart implements MessageHandleModule.
func (*StdMessageHandleModule) OnStop ¶
func (*StdMessageHandleModule) OnStop(ctx context.Context) error
OnStop implements MessageHandleModule.
func (*StdMessageHandleModule) ProcessMessage ¶
func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover)
ProcessMessage implements MessageHandleModule.
func (*StdMessageHandleModule) SetSuccessor ¶
func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)
SetSuccessor implements MessageHandleModule.
type StreamOffset ¶
type StreamOffset = redis.StreamOffset
type StreamOffsetInfo ¶
type StreamOffsetInfo = redis.StreamOffsetInfo
type StreamSetting ¶ added in v0.2.2
type StreamSetting struct {
MessageStateKeyPrefix *string
}
func (*StreamSetting) DecodeMessageContentOption ¶ added in v0.2.2
func (s *StreamSetting) DecodeMessageContentOption() []redis.DecodeMessageContentOption
type TracerManager ¶
type TracerManager struct { TracerProvider *trace.SeverityTracerProvider TextMapPropagator propagation.TextMapPropagator // contains filtered or unexported fields }
func GetTracerManager ¶
func GetTracerManager() *TracerManager
func NewTraceManager ¶
func NewTraceManager() *TracerManager
func (*TracerManager) GenerateManagedTracer ¶
func (m *TracerManager) GenerateManagedTracer(v interface{}) *trace.SeverityTracer
type UniversalClient ¶
type UniversalClient = redis.UniversalClient
type UniversalOptions ¶
type UniversalOptions = redis.UniversalOptions
Source Files ¶
- compositeMessageObserver.go
- context.go
- contextBuilder.go
- contextHelper.go
- contextMessageDelegate.go
- def.go
- defer.go
- errors.go
- messageDispatcher.go
- messageHandleService.go
- messageHelper.go
- messageObserverService.go
- messageTracerService.go
- processingState.go
- redisWorker.go
- redisWorkerModule.go
- redisWorkerRegistrar.go
- replyCode.go
- restrictedForwardMessageHandler.go
- restrictedMessageDelegate.go
- routeComponent.go
- router.go
- stdInvalidMessageHandler.go
- stdMessageHandleModule.go
- streamSetting.go
- tracerManager.go
- util.go
Click to show internal directories.
Click to hide internal directories.