Documentation
¶
Index ¶
- Constants
- Variables
- func IsMessageHandler(rv reflect.Value) bool
- func IsMessageHandlerType(rt reflect.Type) bool
- func StopRecursiveForwardMessageHandler(ctx *Context, msg *Message) error
- type CompositeMessageObserver
- type Config
- type Context
- func (c *Context) CatchErr(err error)
- func (*Context) Deadline() (deadline time.Time, ok bool)
- func (*Context) Done() <-chan struct{}
- func (c *Context) Err() error
- func (c *Context) InvalidMessage(message *Message) error
- func (c *Context) IsRecordingLog() bool
- func (c *Context) Logger() *log.Logger
- func (c *Context) Pause(topic ...string) error
- func (c *Context) RecordingLog(v bool)
- func (c *Context) Resume(topic ...string) error
- func (c *Context) SetValue(key, value interface{})
- func (c *Context) Status() StatusCode
- func (c *Context) Value(key interface{}) interface{}
- type ContextHelper
- type ContextMessageDelegate
- type Defer
- type ErrorHandler
- type Finalizer
- type Message
- type MessageDelegateHelper
- type MessageDispatcher
- type MessageHandleModule
- type MessageHandleProc
- type MessageHandleService
- type MessageHandler
- type MessageObserver
- type MessageObserverAffair
- type MessageObserverService
- type MessageTracerService
- type NoopMessageDelegate
- type NsqWorker
- type NsqWorkerModule
- type NsqWorkerRegistrar
- func (r *NsqWorkerRegistrar) AddRouter(topic string, handler MessageHandler, handlerComponentID string)
- func (r *NsqWorkerRegistrar) EnableTracer(enabled bool)
- func (r *NsqWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)
- func (r *NsqWorkerRegistrar) RegisterMessageObserver(v MessageObserver)
- func (r *NsqWorkerRegistrar) SetErrorHandler(handler ErrorHandler)
- func (r *NsqWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)
- func (r *NsqWorkerRegistrar) SetMessageManager(messageManager interface{})
- type OnHostErrorHandler
- type ProcessingState
- type Recover
- type ReplyCode
- type RestrictedForwardMessageHandler
- type RestrictedMessageDelegate
- type RestrictedOperationError
- type RouteComponent
- type Router
- type StatusCode
- type StdMessageHandleModule
- func (*StdMessageHandleModule) CanSetSuccessor() bool
- func (*StdMessageHandleModule) OnInitComplete()
- func (*StdMessageHandleModule) OnStart(ctx context.Context) error
- func (*StdMessageHandleModule) OnStop(ctx context.Context) error
- func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover) error
- func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)
- type StopError
- type TracerManager
Constants ¶
const ( RestrictedForwardMessage_InvalidOperation int = 0 RestrictedForwardMessage_Recursive int = 1 )
const (
LOGGER_PREFIX string = "[worker-nsq] "
)
Variables ¶
var ( GlobalTracerManager *TracerManager // be register from NsqWorker GlobalContextHelper ContextHelper = ContextHelper{} GlobalRestrictedMessageDelegate nsq.MessageDelegate = RestrictedMessageDelegate(0) GlobalNoopMessageDelegate nsq.MessageDelegate = NoopMessageDelegate(0) GlobalMessageDelegateHelper MessageDelegateHelper = MessageDelegateHelper{} NsqWorkerModuleInstance = NsqWorkerModule{} NsqWorkerLogger *log.Logger = log.New(os.Stdout, LOGGER_PREFIX, log.LstdFlags|log.Lmsgprefix) )
Functions ¶
func IsMessageHandler ¶ added in v0.3.2
func IsMessageHandlerType ¶ added in v0.3.2
Types ¶
type CompositeMessageObserver ¶
type CompositeMessageObserver []MessageObserver
func (CompositeMessageObserver) OnFinish ¶
func (o CompositeMessageObserver) OnFinish(ctx *Context, message *nsq.Message)
OnFinish implements MessageObserver.
func (CompositeMessageObserver) OnRequeue ¶
func (o CompositeMessageObserver) OnRequeue(ctx *Context, message *nsq.Message)
OnRequeue implements MessageObserver.
func (CompositeMessageObserver) OnTouch ¶
func (o CompositeMessageObserver) OnTouch(ctx *Context, message *nsq.Message)
OnTouch implements MessageObserver.
func (CompositeMessageObserver) Type ¶
func (o CompositeMessageObserver) Type() reflect.Type
Type implements MessageObserver.
type Context ¶
type Context struct { Channel string // contains filtered or unexported fields }
func (*Context) InvalidMessage ¶
func (*Context) IsRecordingLog ¶ added in v0.3.1
func (*Context) RecordingLog ¶ added in v0.3.1
func (*Context) SetValue ¶
func (c *Context) SetValue(key, value interface{})
SetValue implements trace.ValueContext.
func (*Context) Status ¶ added in v0.3.1
func (c *Context) Status() StatusCode
type ContextHelper ¶
type ContextHelper struct{}
func (ContextHelper) ExtractReplyCode ¶
func (ContextHelper) ExtractReplyCode(ctx *Context) ReplyCode
func (ContextHelper) InjectReplyCode ¶
func (ContextHelper) InjectReplyCode(ctx *Context, reply ReplyCode)
func (ContextHelper) InjectReplyCodeSafe ¶ added in v0.3.1
func (ContextHelper) InjectReplyCodeSafe(ctx *Context, reply ReplyCode)
type ContextMessageDelegate ¶
type ContextMessageDelegate struct {
// contains filtered or unexported fields
}
func NewContextMessageDelegate ¶
func NewContextMessageDelegate(ctx *Context) *ContextMessageDelegate
func (*ContextMessageDelegate) OnFinish ¶
func (d *ContextMessageDelegate) OnFinish(msg *nsq.Message)
func (*ContextMessageDelegate) OnTouch ¶
func (d *ContextMessageDelegate) OnTouch(msg *nsq.Message)
type ErrorHandler ¶
type MessageDelegateHelper ¶ added in v0.3.3
type MessageDelegateHelper struct{}
func (MessageDelegateHelper) IsRestricted ¶ added in v0.3.3
func (MessageDelegateHelper) IsRestricted(msg *Message) (bool, error)
func (MessageDelegateHelper) Restrict ¶ added in v0.3.3
func (MessageDelegateHelper) Restrict(msg *Message) error
func (MessageDelegateHelper) Unrestrict ¶ added in v0.3.3
func (MessageDelegateHelper) Unrestrict(msg *Message) error
type MessageDispatcher ¶
type MessageDispatcher struct { MessageHandleService *MessageHandleService MessageTracerService *MessageTracerService MessageObserverService *MessageObserverService Router Router OnHostErrorProc OnHostErrorHandler ErrorHandler ErrorHandler InvalidMessageHandler MessageHandler }
func (*MessageDispatcher) ProcessMessage ¶
func (d *MessageDispatcher) ProcessMessage(ctx *Context, message *Message) error
func (*MessageDispatcher) Topics ¶
func (d *MessageDispatcher) Topics() []string
type MessageHandleModule ¶
type MessageHandleProc ¶
func (MessageHandleProc) ProcessMessage ¶
func (proc MessageHandleProc) ProcessMessage(ctx *Context, message *Message) error
type MessageHandleService ¶
type MessageHandleService struct {
// contains filtered or unexported fields
}
func NewMessageHandleService ¶
func NewMessageHandleService() *MessageHandleService
func (*MessageHandleService) ProcessMessage ¶
func (s *MessageHandleService) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover) error
func (*MessageHandleService) Register ¶
func (s *MessageHandleService) Register(module MessageHandleModule)
type MessageHandler ¶
func AsMessageHandler ¶ added in v0.3.2
func AsMessageHandler(rv reflect.Value) MessageHandler
type MessageObserver ¶
type MessageObserverAffair ¶
type MessageObserverService ¶
type MessageObserverService struct { MessageObservers map[reflect.Type]MessageObserver // contains filtered or unexported fields }
func (*MessageObserverService) RegisterMessageObservers ¶
func (s *MessageObserverService) RegisterMessageObservers(msg *Message, handlerID string)
func (*MessageObserverService) UnregisterAllMessageObservers ¶
func (s *MessageObserverService) UnregisterAllMessageObservers(msg *Message)
type MessageTracerService ¶
type MessageTracerService struct { TracerManager *TracerManager Enabled bool InvalidMessageHandlerComponentID string // contains filtered or unexported fields }
func (*MessageTracerService) TextMapPropagator ¶
func (s *MessageTracerService) TextMapPropagator() propagation.TextMapPropagator
func (*MessageTracerService) Tracer ¶
func (s *MessageTracerService) Tracer(id string) *trace.SeverityTracer
type NoopMessageDelegate ¶
type NoopMessageDelegate int
func (NoopMessageDelegate) OnFinish ¶
func (NoopMessageDelegate) OnFinish(*nsq.Message)
OnFinish implements nsq.MessageDelegate.
func (NoopMessageDelegate) OnTouch ¶
func (NoopMessageDelegate) OnTouch(*nsq.Message)
OnTouch implements nsq.MessageDelegate.
type NsqWorker ¶
type NsqWorkerModule ¶
type NsqWorkerModule struct{}
func (NsqWorkerModule) ConfigureLogger ¶
func (NsqWorkerModule) ConfigureLogger(logflags int, w io.Writer)
ConfigureLogger implements host.HostModule
func (NsqWorkerModule) DescribeHostType ¶
func (NsqWorkerModule) DescribeHostType() reflect.Type
DescribeHostType implements host.HostService
func (NsqWorkerModule) Init ¶
func (NsqWorkerModule) Init(h host.Host, app *host.AppModule)
Init implements host.HostService
func (NsqWorkerModule) InitComplete ¶
func (NsqWorkerModule) InitComplete(h host.Host, app *host.AppModule)
InitComplete implements host.HostService
type NsqWorkerRegistrar ¶
type NsqWorkerRegistrar struct {
// contains filtered or unexported fields
}
func NewNsqWorkerRegistrar ¶
func NewNsqWorkerRegistrar(worker *NsqWorker) *NsqWorkerRegistrar
func (*NsqWorkerRegistrar) AddRouter ¶
func (r *NsqWorkerRegistrar) AddRouter(topic string, handler MessageHandler, handlerComponentID string)
func (*NsqWorkerRegistrar) EnableTracer ¶
func (r *NsqWorkerRegistrar) EnableTracer(enabled bool)
func (*NsqWorkerRegistrar) RegisterMessageHandleModule ¶
func (r *NsqWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)
func (*NsqWorkerRegistrar) RegisterMessageObserver ¶
func (r *NsqWorkerRegistrar) RegisterMessageObserver(v MessageObserver)
func (*NsqWorkerRegistrar) SetErrorHandler ¶
func (r *NsqWorkerRegistrar) SetErrorHandler(handler ErrorHandler)
func (*NsqWorkerRegistrar) SetInvalidMessageHandler ¶
func (r *NsqWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)
func (*NsqWorkerRegistrar) SetMessageManager ¶
func (r *NsqWorkerRegistrar) SetMessageManager(messageManager interface{})
type OnHostErrorHandler ¶
type ProcessingState ¶
type ProcessingState struct { Tracer *trace.SeverityTracer Span *trace.SeveritySpan Topic string }
type RestrictedForwardMessageHandler ¶
type RestrictedForwardMessageHandler int
func (RestrictedForwardMessageHandler) ProcessMessage ¶
func (h RestrictedForwardMessageHandler) ProcessMessage(ctx *Context, message *Message) error
ProcessMessage implements MessageHandler.
type RestrictedMessageDelegate ¶
type RestrictedMessageDelegate int
func (RestrictedMessageDelegate) OnFinish ¶
func (RestrictedMessageDelegate) OnFinish(*nsq.Message)
OnFinish implements nsq.MessageDelegate.
func (RestrictedMessageDelegate) OnTouch ¶
func (RestrictedMessageDelegate) OnTouch(*nsq.Message)
OnTouch implements nsq.MessageDelegate.
type RestrictedOperationError ¶
type RestrictedOperationError string
func (RestrictedOperationError) Error ¶
func (e RestrictedOperationError) Error() string
type RouteComponent ¶
type RouteComponent struct { MessageHandler MessageHandler HandlerComponentID string }
type Router ¶
type Router map[string]RouteComponent
func (Router) Add ¶
func (r Router) Add(topic string, handler MessageHandler, handlerComponentID string)
func (Router) FindHandlerComponentID ¶
func (Router) Get ¶
func (r Router) Get(topic string) MessageHandler
type StatusCode ¶ added in v0.3.1
type StatusCode = ReplyCode
type StdMessageHandleModule ¶
type StdMessageHandleModule struct {
// contains filtered or unexported fields
}
func NewStdMessageHandleModule ¶
func NewStdMessageHandleModule(dispatcher *MessageDispatcher) *StdMessageHandleModule
func (*StdMessageHandleModule) CanSetSuccessor ¶
func (*StdMessageHandleModule) CanSetSuccessor() bool
CanSetSuccessor implements MessageHandleModule.
func (*StdMessageHandleModule) OnInitComplete ¶
func (*StdMessageHandleModule) OnInitComplete()
OnInitComplete implements MessageHandleModule.
func (*StdMessageHandleModule) OnStart ¶
func (*StdMessageHandleModule) OnStart(ctx context.Context) error
OnStart implements MessageHandleModule.
func (*StdMessageHandleModule) OnStop ¶
func (*StdMessageHandleModule) OnStop(ctx context.Context) error
OnStop implements MessageHandleModule.
func (*StdMessageHandleModule) ProcessMessage ¶
func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover) error
ProcessMessage implements MessageHandleModule.
func (*StdMessageHandleModule) SetSuccessor ¶
func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)
SetSuccessor implements MessageHandleModule.
type TracerManager ¶
type TracerManager struct { TracerProvider *trace.SeverityTracerProvider TextMapPropagator propagation.TextMapPropagator // contains filtered or unexported fields }
func NewTraceManager ¶
func NewTraceManager() *TracerManager
func (*TracerManager) GenerateManagedTracer ¶
func (m *TracerManager) GenerateManagedTracer(v interface{}) *trace.SeverityTracer
func (*TracerManager) UndefinedTracer ¶ added in v0.3.2
func (m *TracerManager) UndefinedTracer() *trace.SeverityTracer
Source Files
¶
- compositeMessageObserver.go
- context.go
- contextHelper.go
- contextMessageDelegate.go
- def.go
- defer.go
- errors.go
- messageDelegateHelper.go
- messageDispatcher.go
- messageHandleService.go
- messageObserverService.go
- messageTracerService.go
- noopMessageDelegate.go
- nsqWorker.go
- nsqWorkerModule.go
- nsqWorkerRegistrar.go
- processingState.go
- replyCode.go
- restrictedForwardMessageHandler.go
- restrictedMessageDelegate.go
- routeComponent.go
- router.go
- stdMessageHandleModule.go
- tracerManager.go
- util.go