Versions in this module Expand all Collapse all v0 v0.0.4 Jul 9, 2022 Changes in this version + var File_event_event_proto protoreflect.FileDescriptor + func KVFromProto(msg proto.Message) (string, []byte, error) + func NewConsumerContext(ctx context.Context, r Consumer) context.Context + func NewProducerContext(ctx context.Context, r Producer) context.Context + func RegisterConsumer(kind string, e LazyConsumer) + func RegisterProducer(kind string, e LazyProducer) + type Config struct + Addr string + Extra *structpb.Struct + Group string + Kafka *Config_Kafka + Pulsar *Config_Pulsar + Topic string + Type string + func (*Config) Descriptor() ([]byte, []int) + func (*Config) ProtoMessage() + func (m *Config) Validate() error + func (m *Config) ValidateAll() error + func (x *Config) GetAddr() string + func (x *Config) GetExtra() *structpb.Struct + func (x *Config) GetGroup() string + func (x *Config) GetKafka() *Config_Kafka + func (x *Config) GetPulsar() *Config_Pulsar + func (x *Config) GetTopic() string + func (x *Config) GetType() string + func (x *Config) ProtoReflect() protoreflect.Message + func (x *Config) Reset() + func (x *Config) String() string + type ConfigMultiError []error + func (m ConfigMultiError) AllErrors() []error + func (m ConfigMultiError) Error() string + type ConfigValidationError struct + func (e ConfigValidationError) Cause() error + func (e ConfigValidationError) Error() string + func (e ConfigValidationError) ErrorName() string + func (e ConfigValidationError) Field() string + func (e ConfigValidationError) Key() bool + func (e ConfigValidationError) Reason() string + type Config_Kafka struct + Version *wrapperspb.StringValue + func (*Config_Kafka) Descriptor() ([]byte, []int) + func (*Config_Kafka) ProtoMessage() + func (m *Config_Kafka) Validate() error + func (m *Config_Kafka) ValidateAll() error + func (x *Config_Kafka) GetVersion() *wrapperspb.StringValue + func (x *Config_Kafka) ProtoReflect() protoreflect.Message + func (x *Config_Kafka) Reset() + func (x *Config_Kafka) String() string + type Config_KafkaMultiError []error + func (m Config_KafkaMultiError) AllErrors() []error + func (m Config_KafkaMultiError) Error() string + type Config_KafkaValidationError struct + func (e Config_KafkaValidationError) Cause() error + func (e Config_KafkaValidationError) Error() string + func (e Config_KafkaValidationError) ErrorName() string + func (e Config_KafkaValidationError) Field() string + func (e Config_KafkaValidationError) Key() bool + func (e Config_KafkaValidationError) Reason() string + type Config_Pulsar struct + ConnectionTimeout *durationpb.Duration + OperationTimeout *durationpb.Duration + func (*Config_Pulsar) Descriptor() ([]byte, []int) + func (*Config_Pulsar) ProtoMessage() + func (m *Config_Pulsar) Validate() error + func (m *Config_Pulsar) ValidateAll() error + func (x *Config_Pulsar) GetConnectionTimeout() *durationpb.Duration + func (x *Config_Pulsar) GetOperationTimeout() *durationpb.Duration + func (x *Config_Pulsar) ProtoReflect() protoreflect.Message + func (x *Config_Pulsar) Reset() + func (x *Config_Pulsar) String() string + type Config_PulsarMultiError []error + func (m Config_PulsarMultiError) AllErrors() []error + func (m Config_PulsarMultiError) Error() string + type Config_PulsarValidationError struct + func (e Config_PulsarValidationError) Cause() error + func (e Config_PulsarValidationError) Error() string + func (e Config_PulsarValidationError) ErrorName() string + func (e Config_PulsarValidationError) Field() string + func (e Config_PulsarValidationError) Key() bool + func (e Config_PulsarValidationError) Reason() string + type Consumer interface + Process func(ctx context.Context, handler ConsumerHandler) error + func FromConsumerContext(ctx context.Context) (Consumer, bool) + type ConsumerFactoryServer struct + func NewConsumerFactoryServer(cfg *Config) *ConsumerFactoryServer + func (f *ConsumerFactoryServer) Start(ctx context.Context) error + func (f *ConsumerFactoryServer) Stop(ctx context.Context) error + type ConsumerHandler HandlerOf[Event] + func FilterKey(key string, handler ConsumerHandler) ConsumerHandler + func NewTransformer(t func(context.Context, Event) (T, error), f HandlerOf[T]) ConsumerHandler + func ProtoHandler(msg T, h HandlerOf[T]) ConsumerHandler + type ConsumerHandlerFunc HandlerFuncOf[Event] + func (h ConsumerHandlerFunc) Process(ctx context.Context, e Event) error + type ConsumerMiddlewareFunc func(ConsumerHandler) ConsumerHandler + func ConsumerChain(m ...ConsumerMiddlewareFunc) ConsumerMiddlewareFunc + func ConsumerRecover(opt ...RecoverOption) ConsumerMiddlewareFunc + func ConsumerUow(uowMgr uow.Manager) ConsumerMiddlewareFunc + func Logging(logger klog.Logger) ConsumerMiddlewareFunc + type ConsumerMux struct + func (mux *ConsumerMux) Append(h ConsumerHandler) + func (mux *ConsumerMux) Process(ctx context.Context, event Event) error + func (mux *ConsumerMux) Use(mws ...ConsumerMiddlewareFunc) + type ConsumerServer struct + func NewConsumerServer(r Consumer) *ConsumerServer + func (s *ConsumerServer) Start(ctx context.Context) error + func (s *ConsumerServer) Stop(ctx context.Context) error + type ErrFormatFunc func(ctx context.Context, err error) error + type Event interface + Header func() Header + Key func() string + Value func() []byte + func NewMessage(key string, value []byte) Event + func NewMessageFromProto(msg proto.Message) (Event, error) + type HandlerFuncOf func(context.Context, T) error + func (h HandlerFuncOf[T]) Process(ctx context.Context, e T) error + type HandlerOf interface + Process func(context.Context, T) error + type Header interface + Get func(key string) string + Keys func() []string + Set func(key string, value string) + type LazyConsumer func(ctx context.Context, c *Config) (Consumer, error) + type LazyProducer func(c *Config) (*ProducerMux, error) + type Message struct + func (m *Message) Header() Header + func (m *Message) Key() string + func (m *Message) Value() []byte + type Producer interface + BatchSend func(ctx context.Context, msg []Event) error + Send func(ctx context.Context, msg Event) error + func FromProducerContext(ctx context.Context) (Producer, bool) + type ProducerMiddlewareFunc func(HandlerOf[any]) HandlerOf[any] + func ChainProducer(m ...ProducerMiddlewareFunc) ProducerMiddlewareFunc + type ProducerMux struct + func NewFactoryProducer(cfg *Config) (*ProducerMux, error) + func NewProducer(next Producer) *ProducerMux + func (s *ProducerMux) BatchSend(ctx context.Context, msg []Event) error + func (s *ProducerMux) Close() error + func (s *ProducerMux) Send(ctx context.Context, msg Event) error + func (s *ProducerMux) Use(m ...ProducerMiddlewareFunc) + type RecoverOption func(*recoverOptions) + func WithErrorFormatter(f ErrFormatFunc) RecoverOption + func WithLogger(logger klog.Logger) RecoverOption