engine

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 21, 2024 License: MPL-2.0 Imports: 52 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ExitCodeConfiguration = 10
	ExitCodeNATS          = 11
	ExitCodeGRPCServer    = 12
	ExitCodeHTTP          = 13
	ExitCodeTelemetry     = 14
	ExitCodeResourceStore = 15
	ExitCodeKubernetes    = 16
	InterruptCode         = 130
)

OS status codes

View Source
const (
	CloudEventId = "ce_id"
)

Variables

View Source
var (
	EventStreamTTL = time.Hour * 24 * 3 // 3 days
	ComponentsTTL  = time.Hour * 12     // 12 hours
)
View Source
var (
	NoopCancel = func(err error) {}
)

Functions

func NewStore

func NewStore() *store

Types

type Broker

type Broker interface {
	RecordTelemetry(*core.Component, *core.Telemetry)
	AuthorizeComponent(context.Context, *Metadata) error
	Subscribe(context.Context, *SubscriptionConf) (ReplicaSubscription, error)
	RecvEvent(evt *core.Event, receiver Receiver) *BrokerEventContext
	Component() *core.Component
}

type BrokerEventContext

type BrokerEventContext struct {
	context.Context

	Key string

	Receiver   Receiver
	ReceivedAt time.Time

	Event           *core.Event
	AppDeployment   *v1alpha1.AppDeployment
	ReleaseManifest *v1alpha1.ReleaseManifest
	VirtualEnv      *v1alpha1.VirtualEnvironment

	Data *api.Data

	RouteId int64

	TargetAdapter common.Adapter

	Span   *telemetry.Span
	Log    *logkf.Logger
	Cancel context.CancelCauseFunc
	// contains filtered or unexported fields
}

func (*BrokerEventContext) CoreErr

func (ctx *BrokerEventContext) CoreErr() *core.Err

func (*BrokerEventContext) Err

func (ctx *BrokerEventContext) Err() error

func (*BrokerEventContext) MatchedEvent

func (ctx *BrokerEventContext) MatchedEvent() *core.MatchedEvent

func (*BrokerEventContext) TTL

func (ctx *BrokerEventContext) TTL() time.Duration

func (*BrokerEventContext) Value

func (ctx *BrokerEventContext) Value(key any) any

type Engine

type Engine interface {
	Start()
}

func New

func New() Engine

type GRPCServer

type GRPCServer struct {
	grpc.UnimplementedBrokerServer
	// contains filtered or unexported fields
}

func NewGRPCServer

func NewGRPCServer(brk Broker) *GRPCServer

func (*GRPCServer) Shutdown

func (srv *GRPCServer) Shutdown(timeout time.Duration)

func (*GRPCServer) Start

func (srv *GRPCServer) Start(ctx context.Context) error

func (*GRPCServer) Subscribe

func (srv *GRPCServer) Subscribe(stream grpc.Broker_SubscribeServer) error

type GroupSubscription

type GroupSubscription interface {
	Subscription
}

type HTTPClient

type HTTPClient struct {
	// contains filtered or unexported fields
}

func NewHTTPClient

func NewHTTPClient(brk Broker) *HTTPClient

func (*HTTPClient) SendEvent

func (c *HTTPClient) SendEvent(req *BrokerEventContext) error

type Metadata

type Metadata struct {
	Component *core.Component
	Platform  string
	Pod       string
	Token     string
}

type NATSClient

type NATSClient struct {
	// contains filtered or unexported fields
}

func NewNATSClient

func NewNATSClient(brk Broker) *NATSClient

func (*NATSClient) Close

func (c *NATSClient) Close()

func (*NATSClient) Connect

func (c *NATSClient) Connect(ctx context.Context) error

func (*NATSClient) ConsumeEvents

func (c *NATSClient) ConsumeEvents(ctx context.Context, name, subj string) error

func (*NATSClient) IsHealthy

func (c *NATSClient) IsHealthy(ctx context.Context) bool

func (*NATSClient) Msg

func (c *NATSClient) Msg(subject string, evt *core.Event) (*nats.Msg, error)

func (*NATSClient) Name

func (c *NATSClient) Name() string

func (*NATSClient) Publish

func (c *NATSClient) Publish(subject string, evt *core.Event) error

func (*NATSClient) Request

func (c *NATSClient) Request(subject string, evt *core.Event) error

type Receiver

type Receiver int
const (
	ReceiverNATS Receiver = iota
	ReceiverGRPCServer
	ReceiverHTTPServer
	ReceiverHTTPClient
)

func (Receiver) String

func (r Receiver) String() string

type RecvMsg

type RecvMsg func(*nats.Msg)

type ReplicaSubscription

type ReplicaSubscription interface {
	Subscription

	Component() *core.Component
	ComponentDef() *api.ComponentDefinition
	IsGroupEnabled() bool
	Cancel(err error)
	Err() error
}

type SendEvent

type SendEvent func(*BrokerEventContext) error

type Store

type Store interface {
	Open() error
	Close()

	Platform(context.Context) (*v1alpha1.Platform, error)
	AppDeployment(context.Context, string) (*v1alpha1.AppDeployment, error)
	ComponentDef(context.Context, *core.Component) (*api.ComponentDefinition, error)
	Adapter(*BrokerEventContext, string, api.ComponentType) (common.Adapter, error)

	ReleaseMatcher(context.Context) (*matcher.EventMatcher, error)
	DeploymentMatcher(*BrokerEventContext) (*matcher.EventMatcher, error)

	AttachEventContext(*BrokerEventContext) error
	IsGenesisAdapter(context.Context, *core.Component) bool
}

type Subscription

type Subscription interface {
	SendEvent(evt *BrokerEventContext) error
	IsActive() bool
	Context() context.Context
}

type SubscriptionConf

type SubscriptionConf struct {
	Component    *core.Component
	ComponentDef *api.ComponentDefinition
	SendFunc     SendEvent
	EnableGroup  bool
}

type SubscriptionMgr

type SubscriptionMgr interface {
	Create(ctx context.Context, cfg *SubscriptionConf) (ReplicaSubscription, GroupSubscription, error)
	Subscription(comp *core.Component) (Subscription, bool)
	Close()
}

func NewManager

func NewManager() SubscriptionMgr

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL