Documentation
¶
Index ¶
- Constants
- Variables
- func StopRecursiveForwardMessageHandler(ctx *Context, msg *Message) error
- type CompositeMessageObserver
- type Config
- type Context
- func (*Context) Deadline() (deadline time.Time, ok bool)
- func (*Context) Done() <-chan struct{}
- func (*Context) Err() error
- func (c *Context) InvalidMessage(message *Message) error
- func (c *Context) Logger() *log.Logger
- func (c *Context) Pause(topic ...string) error
- func (c *Context) Resume(topic ...string) error
- func (c *Context) SetValue(key, value interface{})
- func (c *Context) Value(key interface{}) interface{}
- type ContextHelper
- type ContextMessageDelegate
- type Defer
- type ErrorHandler
- type Finalizer
- type Message
- type MessageDispatcher
- type MessageHandleModule
- type MessageHandleProc
- type MessageHandleService
- type MessageHandler
- type MessageObserver
- type MessageObserverAffair
- type MessageObserverService
- type MessageTracerService
- type NoopMessageDelegate
- type NsqWorker
- type NsqWorkerModule
- type NsqWorkerRegistrar
- func (r *NsqWorkerRegistrar) AddRouter(topic string, handler MessageHandler, handlerComponentID string)
- func (r *NsqWorkerRegistrar) EnableTracer(enabled bool)
- func (r *NsqWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)
- func (r *NsqWorkerRegistrar) RegisterMessageObserver(v MessageObserver)
- func (r *NsqWorkerRegistrar) SetErrorHandler(handler ErrorHandler)
- func (r *NsqWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)
- func (r *NsqWorkerRegistrar) SetMessageManager(messageManager interface{})
- type OnHostErrorHandler
- type ProcessingState
- type Recover
- type ReplyCode
- type RestrictedForwardMessageHandler
- type RestrictedMessageDelegate
- type RestrictedOperationError
- type RouteComponent
- type Router
- type StdMessageHandleModule
- func (*StdMessageHandleModule) CanSetSuccessor() bool
- func (*StdMessageHandleModule) OnInitComplete()
- func (*StdMessageHandleModule) OnStart(ctx context.Context) error
- func (*StdMessageHandleModule) OnStop(ctx context.Context) error
- func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover) error
- func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)
- type StopError
- type TracerManager
Constants ¶
const ( RestrictedForwardMessage_InvalidOperation int = 0 RestrictedForwardMessage_Recursive int = 1 )
const (
LOGGER_PREFIX string = "[worker-nsq] "
)
Variables ¶
var ( GlobalTracerManager *TracerManager // be register from NsqWorker GlobalContextHelper ContextHelper = ContextHelper{} GlobalRestrictedMessageDelegate nsq.MessageDelegate = RestrictedMessageDelegate(0) NsqWorkerModuleInstance = NsqWorkerModule{} NsqWorkerLogger *log.Logger = log.New(os.Stdout, LOGGER_PREFIX, log.LstdFlags|log.Lmsgprefix) )
Functions ¶
Types ¶
type CompositeMessageObserver ¶
type CompositeMessageObserver []MessageObserver
func (CompositeMessageObserver) OnFinish ¶
func (o CompositeMessageObserver) OnFinish(ctx *Context, message *nsq.Message)
OnFinish implements MessageObserver.
func (CompositeMessageObserver) OnRequeue ¶
func (o CompositeMessageObserver) OnRequeue(ctx *Context, message *nsq.Message)
OnRequeue implements MessageObserver.
func (CompositeMessageObserver) OnTouch ¶
func (o CompositeMessageObserver) OnTouch(ctx *Context, message *nsq.Message)
OnTouch implements MessageObserver.
func (CompositeMessageObserver) Type ¶
func (o CompositeMessageObserver) Type() reflect.Type
Type implements MessageObserver.
type Context ¶
type Context struct { Channel string // contains filtered or unexported fields }
func (*Context) InvalidMessage ¶
type ContextHelper ¶
type ContextHelper struct{}
func (ContextHelper) ExtractReplyCode ¶
func (ContextHelper) ExtractReplyCode(ctx *Context) ReplyCode
func (ContextHelper) InjectReplyCode ¶
func (ContextHelper) InjectReplyCode(ctx *Context, reply ReplyCode)
type ContextMessageDelegate ¶
type ContextMessageDelegate struct {
// contains filtered or unexported fields
}
func NewContextMessageDelegate ¶
func NewContextMessageDelegate(ctx *Context) *ContextMessageDelegate
func (*ContextMessageDelegate) OnFinish ¶
func (d *ContextMessageDelegate) OnFinish(msg *nsq.Message)
func (*ContextMessageDelegate) OnTouch ¶
func (d *ContextMessageDelegate) OnTouch(msg *nsq.Message)
type ErrorHandler ¶
type MessageDispatcher ¶
type MessageDispatcher struct { MessageHandleService *MessageHandleService MessageTracerService *MessageTracerService MessageObserverService *MessageObserverService Router Router OnHostErrorProc OnHostErrorHandler ErrorHandler ErrorHandler InvalidMessageHandler MessageHandler }
func (*MessageDispatcher) ProcessMessage ¶
func (d *MessageDispatcher) ProcessMessage(ctx *Context, message *Message) error
func (*MessageDispatcher) Topics ¶
func (d *MessageDispatcher) Topics() []string
type MessageHandleModule ¶
type MessageHandleProc ¶
func (MessageHandleProc) ProcessMessage ¶
func (proc MessageHandleProc) ProcessMessage(ctx *Context, message *Message) error
type MessageHandleService ¶
type MessageHandleService struct {
// contains filtered or unexported fields
}
func NewMessageHandleService ¶
func NewMessageHandleService() *MessageHandleService
func (*MessageHandleService) ProcessMessage ¶
func (s *MessageHandleService) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover) error
func (*MessageHandleService) Register ¶
func (s *MessageHandleService) Register(module MessageHandleModule)
type MessageHandler ¶
type MessageObserver ¶
type MessageObserverAffair ¶
type MessageObserverService ¶
type MessageObserverService struct { MessageObservers map[reflect.Type]MessageObserver // contains filtered or unexported fields }
func (*MessageObserverService) RegisterMessageObservers ¶
func (s *MessageObserverService) RegisterMessageObservers(msg *Message, handlerID string)
func (*MessageObserverService) UnregisterAllMessageObservers ¶
func (s *MessageObserverService) UnregisterAllMessageObservers(msg *Message)
type MessageTracerService ¶
type MessageTracerService struct { TracerManager *TracerManager Enabled bool InvalidMessageHandlerComponentID string // contains filtered or unexported fields }
func (*MessageTracerService) TextMapPropagator ¶
func (s *MessageTracerService) TextMapPropagator() propagation.TextMapPropagator
func (*MessageTracerService) Tracer ¶
func (s *MessageTracerService) Tracer(id string) *trace.SeverityTracer
type NoopMessageDelegate ¶
type NoopMessageDelegate int
func (NoopMessageDelegate) OnFinish ¶
func (NoopMessageDelegate) OnFinish(*nsq.Message)
OnFinish implements nsq.MessageDelegate.
func (NoopMessageDelegate) OnTouch ¶
func (NoopMessageDelegate) OnTouch(*nsq.Message)
OnTouch implements nsq.MessageDelegate.
type NsqWorker ¶
type NsqWorkerModule ¶
type NsqWorkerModule struct{}
func (NsqWorkerModule) ConfigureLogger ¶
func (NsqWorkerModule) ConfigureLogger(logflags int, w io.Writer)
ConfigureLogger implements host.HostModule
func (NsqWorkerModule) DescribeHostType ¶
func (NsqWorkerModule) DescribeHostType() reflect.Type
DescribeHostType implements host.HostService
func (NsqWorkerModule) Init ¶
func (NsqWorkerModule) Init(h host.Host, app *host.AppModule)
Init implements host.HostService
func (NsqWorkerModule) InitComplete ¶
func (NsqWorkerModule) InitComplete(h host.Host, app *host.AppModule)
InitComplete implements host.HostService
type NsqWorkerRegistrar ¶
type NsqWorkerRegistrar struct {
// contains filtered or unexported fields
}
func NewNsqWorkerRegistrar ¶
func NewNsqWorkerRegistrar(worker *NsqWorker) *NsqWorkerRegistrar
func (*NsqWorkerRegistrar) AddRouter ¶
func (r *NsqWorkerRegistrar) AddRouter(topic string, handler MessageHandler, handlerComponentID string)
func (*NsqWorkerRegistrar) EnableTracer ¶
func (r *NsqWorkerRegistrar) EnableTracer(enabled bool)
func (*NsqWorkerRegistrar) RegisterMessageHandleModule ¶
func (r *NsqWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)
func (*NsqWorkerRegistrar) RegisterMessageObserver ¶
func (r *NsqWorkerRegistrar) RegisterMessageObserver(v MessageObserver)
func (*NsqWorkerRegistrar) SetErrorHandler ¶
func (r *NsqWorkerRegistrar) SetErrorHandler(handler ErrorHandler)
func (*NsqWorkerRegistrar) SetInvalidMessageHandler ¶
func (r *NsqWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)
func (*NsqWorkerRegistrar) SetMessageManager ¶
func (r *NsqWorkerRegistrar) SetMessageManager(messageManager interface{})
type OnHostErrorHandler ¶
type ProcessingState ¶
type ProcessingState struct { Tracer *trace.SeverityTracer Span *trace.SeveritySpan Topic string }
type RestrictedForwardMessageHandler ¶
type RestrictedForwardMessageHandler int
func (RestrictedForwardMessageHandler) ProcessMessage ¶
func (h RestrictedForwardMessageHandler) ProcessMessage(ctx *Context, message *Message) error
ProcessMessage implements MessageHandler.
type RestrictedMessageDelegate ¶
type RestrictedMessageDelegate int
func (RestrictedMessageDelegate) OnFinish ¶
func (RestrictedMessageDelegate) OnFinish(*nsq.Message)
OnFinish implements nsq.MessageDelegate.
func (RestrictedMessageDelegate) OnTouch ¶
func (RestrictedMessageDelegate) OnTouch(*nsq.Message)
OnTouch implements nsq.MessageDelegate.
type RestrictedOperationError ¶
type RestrictedOperationError string
func (RestrictedOperationError) Error ¶
func (e RestrictedOperationError) Error() string
type RouteComponent ¶
type RouteComponent struct { MessageHandler MessageHandler HandlerComponentID string }
type Router ¶
type Router map[string]RouteComponent
func (Router) Add ¶
func (r Router) Add(topic string, handler MessageHandler, handlerComponentID string)
func (Router) FindHandlerComponentID ¶
func (Router) Get ¶
func (r Router) Get(topic string) MessageHandler
type StdMessageHandleModule ¶
type StdMessageHandleModule struct {
// contains filtered or unexported fields
}
func NewStdMessageHandleModule ¶
func NewStdMessageHandleModule(dispatcher *MessageDispatcher) *StdMessageHandleModule
func (*StdMessageHandleModule) CanSetSuccessor ¶
func (*StdMessageHandleModule) CanSetSuccessor() bool
CanSetSuccessor implements MessageHandleModule.
func (*StdMessageHandleModule) OnInitComplete ¶
func (*StdMessageHandleModule) OnInitComplete()
OnInitComplete implements MessageHandleModule.
func (*StdMessageHandleModule) OnStart ¶
func (*StdMessageHandleModule) OnStart(ctx context.Context) error
OnStart implements MessageHandleModule.
func (*StdMessageHandleModule) OnStop ¶
func (*StdMessageHandleModule) OnStop(ctx context.Context) error
OnStop implements MessageHandleModule.
func (*StdMessageHandleModule) ProcessMessage ¶
func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover) error
ProcessMessage implements MessageHandleModule.
func (*StdMessageHandleModule) SetSuccessor ¶
func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)
SetSuccessor implements MessageHandleModule.
type TracerManager ¶
type TracerManager struct { TracerProvider *trace.SeverityTracerProvider TextMapPropagator propagation.TextMapPropagator // contains filtered or unexported fields }
func NewTraceManager ¶
func NewTraceManager() *TracerManager
func (*TracerManager) GenerateManagedTracer ¶
func (m *TracerManager) GenerateManagedTracer(v interface{}) *trace.SeverityTracer
Source Files
¶
- compositeMessageObserver.go
- context.go
- contextHelper.go
- contextMessageDelegate.go
- def.go
- defer.go
- errors.go
- messageDispatcher.go
- messageHandleService.go
- messageObserverService.go
- messageTracerService.go
- noopMessageDelegate.go
- nsqWorker.go
- nsqWorkerModule.go
- nsqWorkerRegistrar.go
- processingState.go
- replyCode.go
- restrictedForwardMessageHandler.go
- restrictedMessageDelegate.go
- routeComponent.go
- router.go
- stdMessageHandleModule.go
- tracerManager.go
- util.go