Documentation
¶
Index ¶
- Constants
- Variables
- func NewStore() *store
- type Broker
- type BrokerEventContext
- type Engine
- type GRPCServer
- type GroupSubscription
- type HTTPClient
- type Metadata
- type NATSClient
- func (c *NATSClient) Close()
- func (c *NATSClient) Connect(ctx context.Context) error
- func (c *NATSClient) ConsumeEvents(ctx context.Context, name, subj string) error
- func (c *NATSClient) IsHealthy(ctx context.Context) bool
- func (c *NATSClient) Msg(subject string, evt *core.Event) (*nats.Msg, error)
- func (c *NATSClient) Name() string
- func (c *NATSClient) Publish(subject string, evt *core.Event) error
- func (c *NATSClient) Request(subject string, evt *core.Event) error
- type Receiver
- type RecvMsg
- type ReplicaSubscription
- type SendEvent
- type Store
- type Subscription
- type SubscriptionConf
- type SubscriptionMgr
Constants ¶
View Source
const ( ExitCodeConfiguration = 10 ExitCodeNATS = 11 ExitCodeGRPCServer = 12 ExitCodeHTTP = 13 ExitCodeTelemetry = 14 ExitCodeResourceStore = 15 ExitCodeKubernetes = 16 InterruptCode = 130 )
OS status codes
View Source
const (
CloudEventId = "ce_id"
)
Variables ¶
View Source
var ( EventStreamTTL = time.Hour * 24 * 3 // 3 days ComponentsTTL = time.Hour * 12 // 12 hours )
View Source
var (
NoopCancel = func(err error) {}
)
Functions ¶
Types ¶
type BrokerEventContext ¶
type BrokerEventContext struct { context.Context Key string Receiver Receiver ReceivedAt time.Time Event *core.Event AppDeployment *v1alpha1.AppDeployment ReleaseManifest *v1alpha1.ReleaseManifest VirtualEnv *v1alpha1.VirtualEnvironment Data *api.Data RouteId int64 TargetAdapter common.Adapter Span *telemetry.Span Log *logkf.Logger Cancel context.CancelCauseFunc // contains filtered or unexported fields }
func (*BrokerEventContext) CoreErr ¶
func (ctx *BrokerEventContext) CoreErr() *core.Err
func (*BrokerEventContext) Err ¶
func (ctx *BrokerEventContext) Err() error
func (*BrokerEventContext) MatchedEvent ¶
func (ctx *BrokerEventContext) MatchedEvent() *core.MatchedEvent
func (*BrokerEventContext) TTL ¶
func (ctx *BrokerEventContext) TTL() time.Duration
func (*BrokerEventContext) Value ¶
func (ctx *BrokerEventContext) Value(key any) any
type GRPCServer ¶
type GRPCServer struct { grpc.UnimplementedBrokerServer // contains filtered or unexported fields }
func NewGRPCServer ¶
func NewGRPCServer(brk Broker) *GRPCServer
func (*GRPCServer) Shutdown ¶
func (srv *GRPCServer) Shutdown(timeout time.Duration)
func (*GRPCServer) Subscribe ¶
func (srv *GRPCServer) Subscribe(stream grpc.Broker_SubscribeServer) error
type GroupSubscription ¶
type GroupSubscription interface { Subscription }
type HTTPClient ¶
type HTTPClient struct {
// contains filtered or unexported fields
}
func NewHTTPClient ¶
func NewHTTPClient(brk Broker) *HTTPClient
func (*HTTPClient) SendEvent ¶
func (c *HTTPClient) SendEvent(req *BrokerEventContext) error
type NATSClient ¶
type NATSClient struct {
// contains filtered or unexported fields
}
func NewNATSClient ¶
func NewNATSClient(brk Broker) *NATSClient
func (*NATSClient) Close ¶
func (c *NATSClient) Close()
func (*NATSClient) ConsumeEvents ¶
func (c *NATSClient) ConsumeEvents(ctx context.Context, name, subj string) error
func (*NATSClient) Msg ¶
func (c *NATSClient) Msg(subject string, evt *core.Event) (*nats.Msg, error)
func (*NATSClient) Name ¶
func (c *NATSClient) Name() string
type ReplicaSubscription ¶
type ReplicaSubscription interface { Subscription Component() *core.Component ComponentDef() *api.ComponentDefinition IsGroupEnabled() bool Cancel(err error) Err() error }
type SendEvent ¶
type SendEvent func(*BrokerEventContext) error
type Store ¶
type Store interface { Open() error Close() Platform(context.Context) (*v1alpha1.Platform, error) AppDeployment(context.Context, string) (*v1alpha1.AppDeployment, error) ComponentDef(context.Context, *core.Component) (*api.ComponentDefinition, error) Adapter(*BrokerEventContext, string, api.ComponentType) (common.Adapter, error) ReleaseMatcher(context.Context) (*matcher.EventMatcher, error) DeploymentMatcher(*BrokerEventContext) (*matcher.EventMatcher, error) AttachEventContext(*BrokerEventContext) error IsGenesisAdapter(context.Context, *core.Component) bool }
type Subscription ¶
type Subscription interface { SendEvent(evt *BrokerEventContext) error IsActive() bool Context() context.Context }
type SubscriptionConf ¶
type SubscriptionMgr ¶
type SubscriptionMgr interface { Create(ctx context.Context, cfg *SubscriptionConf) (ReplicaSubscription, GroupSubscription, error) Subscription(comp *core.Component) (Subscription, bool) Close() }
func NewManager ¶
func NewManager() SubscriptionMgr
Click to show internal directories.
Click to hide internal directories.