Documentation ¶
Index ¶
- Variables
- func NatsMsgFromEvent(subject string, event *Event) *natscore.Msg
- func NewNatsConn(uri string, logger logging.Logger) (*natscore.Conn, error)
- type Config
- type Event
- type NatsConfig
- type NatsPublisher
- type NatsSubscriber
- func (subscriber *NatsSubscriber) Connect(ctx context.Context) error
- func (subscriber *NatsSubscriber) Disconnect(ctx context.Context) error
- func (subscriber *NatsSubscriber) Liveness() error
- func (subscriber *NatsSubscriber) Name() string
- func (subscriber *NatsSubscriber) Readiness() error
- func (subscriber *NatsSubscriber) Sub(ctx context.Context, topic string, handler SubHandler) error
- type Publisher
- type PublisherConfig
- type Stream
- type SubHandler
- type Subscriber
- type SubscriberConfig
Constants ¶
This section is empty.
Variables ¶
View Source
var ( ErrNotConnected = errors.New("INFRASTRUCTURE.STREAMING.NOT_CONNECTED.ERROR") ErrAlreadyConnected = errors.New("INFRASTRUCTURE.STREAMING.ALREADY_CONNECTED.ERROR") ErrSubNotConnected = errors.New("INFRASTRUCTURE.STREAMING.SUBSCRIBER.NOT_CONNECTED.ERROR") ErrSubAlreadyConnected = errors.New("INFRASTRUCTURE.STREAMING.SUBSCRIBER.ALREADY_CONNECTED.ERROR") ErrSubTerminiated = errors.New("INFRASTRUCTURE.STREAMING.SUBSCRIBER.TERMINATED.ERROR") ErrSubAckFail = errors.New("INFRASTRUCTURE.STREAMING.SUBSCRIBER.ACK_FAIL.ERROR") ErrSubNakFail = errors.New("INFRASTRUCTURE.STREAMING.SUBSCRIBER.NAK_FAIL.ERROR") )
View Source
var ( MetaId = "KANTHOR_META_ID" HeaderTelemetryTrace = "x-telemtry-trace" )
Functions ¶
Types ¶
type Config ¶
type Config struct { Name string `json:"name" yaml:"name" mapstructure:"name"` Uri string `json:"uri" yaml:"uri" mapstructure:"uri"` Nats NatsConfig `json:"nats" yaml:"nats" mapstructure:"nats"` Publisher PublisherConfig `json:"publisher" yaml:"publisher" mapstructure:"publisher"` Subscriber SubscriberConfig `json:"subscriber" yaml:"subscriber" mapstructure:"subscriber"` }
type Event ¶
type Event struct { Subject string `json:"subject"` Id string `json:"id"` Data []byte `json:"data"` Metadata map[string]string `json:"metadata"` }
func NatsMsgToEvent ¶
type NatsConfig ¶
type NatsConfig struct { Replicas int `json:"replicas" yaml:"replicas" mapstructure:"replicas"` Limits struct { Size int64 `json:"size" yaml:"size" mapstructure:"size"` MsgSize int32 `json:"msg_size" yaml:"msg_size" mapstructure:"msg_size"` MsgCount int64 `json:"msg_count" yaml:"msg_count" mapstructure:"msg_count"` MsgAge int64 `json:"msg_age" yaml:"msg_age" mapstructure:"msg_age"` } `json:"limits" yaml:"limits" mapstructure:"limits"` }
func (*NatsConfig) Validate ¶
func (conf *NatsConfig) Validate() error
type NatsPublisher ¶
type NatsPublisher struct {
// contains filtered or unexported fields
}
func (*NatsPublisher) Name ¶
func (publisher *NatsPublisher) Name() string
type NatsSubscriber ¶
type NatsSubscriber struct {
// contains filtered or unexported fields
}
func (*NatsSubscriber) Connect ¶
func (subscriber *NatsSubscriber) Connect(ctx context.Context) error
func (*NatsSubscriber) Disconnect ¶
func (subscriber *NatsSubscriber) Disconnect(ctx context.Context) error
func (*NatsSubscriber) Liveness ¶
func (subscriber *NatsSubscriber) Liveness() error
func (*NatsSubscriber) Name ¶
func (subscriber *NatsSubscriber) Name() string
func (*NatsSubscriber) Readiness ¶
func (subscriber *NatsSubscriber) Readiness() error
func (*NatsSubscriber) Sub ¶
func (subscriber *NatsSubscriber) Sub(ctx context.Context, topic string, handler SubHandler) error
type PublisherConfig ¶
type PublisherConfig struct {
RateLimit int `json:"rate_limit" yaml:"rate_limit" mapstructure:"rate_limit"`
}
func (*PublisherConfig) Validate ¶
func (conf *PublisherConfig) Validate() error
type Stream ¶
type Stream interface { patterns.Connectable Publisher(name string) Publisher Subscriber(name string) Subscriber }
type SubHandler ¶
type Subscriber ¶
type Subscriber interface { patterns.Connectable Name() string Sub(ctx context.Context, topic string, handler SubHandler) error }
type SubscriberConfig ¶
type SubscriberConfig struct { // MaxRetry is how many times we should try to re-deliver message if we get any error MaxRetry int `json:"max_retry" yaml:"max_retry" mapstructure:"max_retry"` Concurrency int `json:"concurrency" yaml:"concurrency" mapstructure:"concurrency"` Throughput int `json:"throughput" yaml:"throughput" mapstructure:"throughput"` }
func (*SubscriberConfig) Validate ¶
func (conf *SubscriberConfig) Validate() error
Click to show internal directories.
Click to hide internal directories.