internal

package
v0.2.2-alpha.202404301... Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 30, 2024 License: MIT Imports: 16 Imported by: 0

README

Preparation

1. Install godotenv tool

$ go get -v -t github.com/joho/godotenv/cmd/godotenv 

2. Configurate Your .env File from .env.sample

$ cp .env.sample .env

⚠️ Don't modify the .env.sample as your go test source

$ vim .env

Run Test

$ godotenv -f .env go test -v

Documentation

Index

Constants

View Source
const (
	RestrictedForwardMessage_InvalidOperation int = 0
	RestrictedForwardMessage_Recursive        int = 1
)
View Source
const (
	LOGGER_PREFIX string = "[worker-redis] "
)
View Source
const (
	REDIS_BUSYGROUP_PREFIX = "BUSYGROUP"
)

Variables

View Source
var (
	GlobalContextHelper             ContextHelper         = ContextHelper{}
	GlobalRestrictedMessageDelegate redis.MessageDelegate = RestrictedMessageDelegate(0)
	GlobalMessageHelper             MessageHelper         = MessageHelper{}

	RedisWorkerModuleInstance = RedisWorkerModule{}

	RedisWorkerLogger *log.Logger = log.New(os.Stdout, LOGGER_PREFIX, log.LstdFlags|log.Lmsgprefix)
)

Functions

func SetTracerManager

func SetTracerManager(v *TracerManager)

Types

type CompositeMessageObserver

type CompositeMessageObserver []MessageObserver

func (CompositeMessageObserver) OnAck

func (o CompositeMessageObserver) OnAck(ctx *Context, message *Message)

OnAck implements MessageObserver.

func (CompositeMessageObserver) OnDel

func (o CompositeMessageObserver) OnDel(ctx *Context, message *Message)

OnDel implements MessageObserver.

func (CompositeMessageObserver) Type

Type implements MessageObserver.

type Context

type Context struct {
	ConsumerGroup string
	ConsumerName  string
	// contains filtered or unexported fields
}

func (*Context) Deadline

func (*Context) Deadline() (deadline time.Time, ok bool)

Deadline implements context.Context.

func (*Context) Done

func (*Context) Done() <-chan struct{}

Done implements context.Context.

func (*Context) Err

func (*Context) Err() error

Err implements context.Context.

func (*Context) InvalidMessage

func (c *Context) InvalidMessage(message *Message)

func (*Context) Logger

func (c *Context) Logger() *log.Logger

func (*Context) Pause added in v0.3.0

func (c *Context) Pause(streams ...string) error

func (*Context) Resume added in v0.3.0

func (c *Context) Resume(streams ...string) error

func (*Context) SetValue

func (c *Context) SetValue(key interface{}, value interface{})

SetValue implements trace.ValueContext.

func (*Context) Value

func (c *Context) Value(key any) any

Value implements context.Context.

type ContextBuilder

type ContextBuilder struct {
	// contains filtered or unexported fields
}

func NewContextBuilder

func NewContextBuilder() *ContextBuilder

func (*ContextBuilder) Build

func (b *ContextBuilder) Build() *Context

func (*ContextBuilder) ConsumerGroup

func (b *ContextBuilder) ConsumerGroup(v string) *ContextBuilder

func (*ContextBuilder) ConsumerName

func (b *ContextBuilder) ConsumerName(v string) *ContextBuilder

func (*ContextBuilder) Context

func (*ContextBuilder) InvalidMessageHandler

func (b *ContextBuilder) InvalidMessageHandler(v MessageHandler) *ContextBuilder

func (*ContextBuilder) Logger

func (b *ContextBuilder) Logger(v *log.Logger) *ContextBuilder

type ContextHelper

type ContextHelper struct{}

func (ContextHelper) ExtractReplyCode

func (ContextHelper) ExtractReplyCode(ctx *Context) ReplyCode

func (ContextHelper) InjectReplyCode

func (ContextHelper) InjectReplyCode(ctx *Context, reply ReplyCode)

type ContextMessageDelegate

type ContextMessageDelegate struct {
	// contains filtered or unexported fields
}

func NewContextMessageDelegate

func NewContextMessageDelegate(ctx *Context) *ContextMessageDelegate

func (*ContextMessageDelegate) OnAck

func (d *ContextMessageDelegate) OnAck(msg *redis.Message)

OnAck implements redis.MessageDelegate.

func (*ContextMessageDelegate) OnDel

func (d *ContextMessageDelegate) OnDel(msg *redis.Message)

OnDel implements redis.MessageDelegate.

type Defer

type Defer struct {
	// contains filtered or unexported fields
}

func (*Defer) Do

func (d *Defer) Do(do func(f Finalizer))

type ErrorHandler

type ErrorHandler func(ctx *Context, message *Message, err interface{})

type Finalizer

type Finalizer struct {
	// contains filtered or unexported fields
}

func (Finalizer) Add

func (f Finalizer) Add(actions ...func(err interface{}))

type Message

type Message = redis.Message

type MessageDispatcher

type MessageDispatcher struct {
	MessageHandleService   *MessageHandleService
	MessageTracerService   *MessageTracerService
	MessageObserverService *MessageObserverService
	Router                 Router

	OnHostErrorProc OnHostErrorHandler

	ErrorHandler          ErrorHandler
	InvalidMessageHandler MessageHandler

	StreamSet map[string]StreamOffset
	// contains filtered or unexported fields
}

func (*MessageDispatcher) ProcessMessage

func (d *MessageDispatcher) ProcessMessage(ctx *Context, message *Message)

func (*MessageDispatcher) StreamOffsets

func (d *MessageDispatcher) StreamOffsets() []StreamOffset

func (*MessageDispatcher) Streams

func (d *MessageDispatcher) Streams() []string

type MessageHandleModule

type MessageHandleModule interface {
	CanSetSuccessor() bool
	SetSuccessor(successor MessageHandleModule)
	ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover)
	OnInitComplete()
	OnStart(ctx context.Context) error
	OnStop(ctx context.Context) error
}

type MessageHandleService

type MessageHandleService struct {
	// contains filtered or unexported fields
}

func NewMessageHandleService

func NewMessageHandleService() *MessageHandleService

func (*MessageHandleService) ProcessMessage

func (s *MessageHandleService) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover)

func (*MessageHandleService) Register

func (s *MessageHandleService) Register(module MessageHandleModule)

type MessageHandler

type MessageHandler interface {
	ProcessMessage(ctx *Context, message *Message)
}

type MessageHelper

type MessageHelper struct{}

func (MessageHelper) IsRestricted

func (MessageHelper) IsRestricted(msg *Message) (bool, error)

func (MessageHelper) Restrict

func (MessageHelper) Restrict(msg *Message) error

func (MessageHelper) Unrestrict

func (MessageHelper) Unrestrict(msg *Message) error

type MessageObserver

type MessageObserver interface {
	OnAck(ctx *Context, message *Message)
	OnDel(ctx *Context, message *Message)
	Type() reflect.Type
}

type MessageObserverAffair

type MessageObserverAffair interface {
	MessageObserverTypes() []reflect.Type
}

type MessageObserverService

type MessageObserverService struct {
	MessageObservers map[reflect.Type]MessageObserver
	// contains filtered or unexported fields
}

func (*MessageObserverService) RegisterMessageObservers

func (s *MessageObserverService) RegisterMessageObservers(msg *Message, handlerID string)

func (*MessageObserverService) UnregisterAllMessageObservers

func (s *MessageObserverService) UnregisterAllMessageObservers(msg *Message)

type MessageTracerService

type MessageTracerService struct {
	TracerManager *TracerManager

	Enabled bool

	InvalidMessageHandlerComponentID string
	// contains filtered or unexported fields
}

func NewMessageTracerService

func NewMessageTracerService() *MessageTracerService

func (*MessageTracerService) TextMapPropagator

func (s *MessageTracerService) TextMapPropagator() propagation.TextMapPropagator

func (*MessageTracerService) Tracer

type OnHostErrorHandler

type OnHostErrorHandler func(err error) (disposed bool)

type ProcessingState

type ProcessingState struct {
	Tracer *trace.SeverityTracer
	Span   *trace.SeveritySpan
	Stream string
}

type Recover

type Recover struct {
	// contains filtered or unexported fields
}

func (*Recover) Defer

func (s *Recover) Defer(catch func(err interface{})) *Defer

type RedisWorker

type RedisWorker struct {
	ConsumerGroup       string
	ConsumerName        string
	RedisOption         *redis.UniversalOptions
	MaxInFlight         int64
	MaxPollingTimeout   time.Duration
	ClaimMinIdleTime    time.Duration
	IdlingTimeout       time.Duration // 若沒有任何訊息時等待多久
	ClaimSensitivity    int           // Read 時取得的訊息數小於等於 n 的話, 執行 Claim
	ClaimOccurrenceRate int32         // Read 每執行 n 次後 執行 Claim 1 次
	AllowCreateGroup    bool          // 自動註冊 consumer group
	// contains filtered or unexported fields
}

func (*RedisWorker) Logger

func (w *RedisWorker) Logger() *log.Logger

func (*RedisWorker) Start

func (w *RedisWorker) Start(ctx context.Context)

func (*RedisWorker) Stop

func (w *RedisWorker) Stop(ctx context.Context) error

type RedisWorkerModule

type RedisWorkerModule struct{}

func (RedisWorkerModule) ConfigureLogger

func (RedisWorkerModule) ConfigureLogger(logflags int, w io.Writer)

ConfigureLogger implements host.HostModule

func (RedisWorkerModule) DescribeHostType

func (RedisWorkerModule) DescribeHostType() reflect.Type

func (RedisWorkerModule) Init

func (RedisWorkerModule) Init(h host.Host, app *host.AppModule)

func (RedisWorkerModule) InitComplete

func (RedisWorkerModule) InitComplete(h host.Host, ctx *host.AppModule)

type RedisWorkerRegistrar

type RedisWorkerRegistrar struct {
	// contains filtered or unexported fields
}

func NewRedisWorkerRegistrar

func NewRedisWorkerRegistrar(worker *RedisWorker) *RedisWorkerRegistrar

func (*RedisWorkerRegistrar) AddRouter

func (r *RedisWorkerRegistrar) AddRouter(stream string, handler MessageHandler, handlerComponentID string, setting *StreamSetting)

func (*RedisWorkerRegistrar) EnableTracer

func (r *RedisWorkerRegistrar) EnableTracer(enabled bool)

func (*RedisWorkerRegistrar) RegisterMessageHandleModule

func (r *RedisWorkerRegistrar) RegisterMessageHandleModule(module MessageHandleModule)

func (*RedisWorkerRegistrar) RegisterMessageObserver

func (r *RedisWorkerRegistrar) RegisterMessageObserver(v MessageObserver)

func (*RedisWorkerRegistrar) RegisterStream

func (r *RedisWorkerRegistrar) RegisterStream(streamOffset redis.StreamOffset)

func (*RedisWorkerRegistrar) SetErrorHandler

func (r *RedisWorkerRegistrar) SetErrorHandler(handler ErrorHandler)

func (*RedisWorkerRegistrar) SetInvalidMessageHandler

func (r *RedisWorkerRegistrar) SetInvalidMessageHandler(handler MessageHandler)

func (*RedisWorkerRegistrar) SetMessageManager

func (r *RedisWorkerRegistrar) SetMessageManager(messageManager interface{})

type ReplyCode

type ReplyCode int
const (
	UNSET ReplyCode = iota
	PASS
	FAIL
	ABORT

	INVALID ReplyCode = -1
)

func (ReplyCode) String

func (code ReplyCode) String() string

type RestrictedForwardMessageHandler

type RestrictedForwardMessageHandler int

func (RestrictedForwardMessageHandler) ProcessMessage

func (h RestrictedForwardMessageHandler) ProcessMessage(ctx *Context, message *Message)

ProcessMessage implements MessageHandler.

type RestrictedMessageDelegate

type RestrictedMessageDelegate int

func (RestrictedMessageDelegate) OnAck

OnAck implements redis.MessageDelegate.

func (RestrictedMessageDelegate) OnDel

OnDel implements redis.MessageDelegate.

type RestrictedOperationError

type RestrictedOperationError string

func (RestrictedOperationError) Error

func (e RestrictedOperationError) Error() string

type RouteComponent

type RouteComponent struct {
	MessageHandler     MessageHandler
	HandlerComponentID string
	StreamSetting      *StreamSetting
}

type Router

type Router map[string]RouteComponent

func (Router) Add

func (r Router) Add(stream string, handler MessageHandler, handlerComponentID string, streamSetting *StreamSetting)

func (Router) FindHandlerComponentID

func (r Router) FindHandlerComponentID(stream string) string

func (Router) Get

func (r Router) Get(stream string) MessageHandler

func (Router) GetRouteComponent added in v0.2.2

func (r Router) GetRouteComponent(stream string) RouteComponent

func (Router) Has

func (r Router) Has(stream string) bool

func (Router) Remove

func (r Router) Remove(stream string)

type StdInvalidMessageHandler

type StdInvalidMessageHandler struct {
	// contains filtered or unexported fields
}

func (*StdInvalidMessageHandler) ProcessMessage

func (h *StdInvalidMessageHandler) ProcessMessage(ctx *Context, message *Message)

ProcessMessage implements MessageHandler.

type StdMessageHandleModule

type StdMessageHandleModule struct {
	// contains filtered or unexported fields
}

func NewStdMessageHandleModule

func NewStdMessageHandleModule(dispatcher *MessageDispatcher) *StdMessageHandleModule

func (*StdMessageHandleModule) CanSetSuccessor

func (*StdMessageHandleModule) CanSetSuccessor() bool

CanSetSuccessor implements MessageHandleModule.

func (*StdMessageHandleModule) OnInitComplete

func (*StdMessageHandleModule) OnInitComplete()

OnInitComplete implements MessageHandleModule.

func (*StdMessageHandleModule) OnStart

OnStart implements MessageHandleModule.

func (*StdMessageHandleModule) OnStop

OnStop implements MessageHandleModule.

func (*StdMessageHandleModule) ProcessMessage

func (m *StdMessageHandleModule) ProcessMessage(ctx *Context, message *Message, state ProcessingState, recover *Recover)

ProcessMessage implements MessageHandleModule.

func (*StdMessageHandleModule) SetSuccessor

func (*StdMessageHandleModule) SetSuccessor(successor MessageHandleModule)

SetSuccessor implements MessageHandleModule.

type StopError

type StopError struct {
	// contains filtered or unexported fields
}

func (*StopError) Error

func (e *StopError) Error() string

func (*StopError) Unwrap

func (e *StopError) Unwrap() error

type StreamOffset

type StreamOffset = redis.StreamOffset

type StreamOffsetInfo

type StreamOffsetInfo = redis.StreamOffsetInfo

type StreamSetting added in v0.2.2

type StreamSetting struct {
	MessageStateKeyPrefix *string
}

func (*StreamSetting) DecodeMessageContentOption added in v0.2.2

func (s *StreamSetting) DecodeMessageContentOption() []redis.DecodeMessageContentOption

type TracerManager

type TracerManager struct {
	TracerProvider    *trace.SeverityTracerProvider
	TextMapPropagator propagation.TextMapPropagator
	// contains filtered or unexported fields
}

func GetTracerManager

func GetTracerManager() *TracerManager

func NewTraceManager

func NewTraceManager() *TracerManager

func (*TracerManager) GenerateManagedTracer

func (m *TracerManager) GenerateManagedTracer(v interface{}) *trace.SeverityTracer

type UniversalClient

type UniversalClient = redis.UniversalClient

type UniversalOptions

type UniversalOptions = redis.UniversalOptions

type XMessage

type XMessage = redis.XMessage

type XStream

type XStream = redis.XStream

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL