stream

package
v1.6.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 7, 2023 License: MPL-2.0 Imports: 13 Imported by: 2

Documentation

Index

Constants

View Source
const (
	ACLCheckNodeRead   = "node-read"
	ACLCheckManagement = "management"
)

Variables

View Source
var ErrACLInvalid = errors.New("Provided ACL token is invalid for requested topics")
View Source
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")

ErrSubscriptionClosed is a error signalling the subscription has been closed. The client should Unsubscribe, then re-Subscribe.

View Source
var (
	// JsonHeartbeat is an empty JSON object to send as a heartbeat
	// Avoids creating many heartbeat instances
	JsonHeartbeat = &structs.EventJson{Data: []byte("{}")}
)

Functions

This section is empty.

Types

type ACLDelegate

type ACLDelegate interface {
	TokenProvider() ACLTokenProvider
}

type ACLTokenProvider

type ACLTokenProvider interface {
	ACLTokenBySecretID(ws memdb.WatchSet, secretID string) (*structs.ACLToken, error)
	ACLPolicyByName(ws memdb.WatchSet, policyName string) (*structs.ACLPolicy, error)
	GetACLRoleByID(ws memdb.WatchSet, roleID string) (*structs.ACLRole, error)
}

type EventBroker

type EventBroker struct {
	// contains filtered or unexported fields
}

func NewEventBroker

func NewEventBroker(ctx context.Context, aclDelegate ACLDelegate, cfg EventBrokerCfg) (*EventBroker, error)

NewEventBroker returns an EventBroker for publishing change events. A goroutine is run in the background to publish events to an event buffer. Cancelling the context will shutdown the goroutine to free resources, and stop all publishing.

func (*EventBroker) CloseAll

func (e *EventBroker) CloseAll()

CloseAll closes all subscriptions

func (*EventBroker) Len

func (e *EventBroker) Len() int

Len returns the current length of the event buffer.

func (*EventBroker) Publish

func (e *EventBroker) Publish(events *structs.Events)

Publish events to all subscribers of the event Topic.

func (*EventBroker) Subscribe

func (e *EventBroker) Subscribe(req *SubscribeRequest) (*Subscription, error)

Subscribe returns a new Subscription for a given request. A Subscription will receive an initial empty currentItem value which points to the first item in the buffer. This allows the new subscription to call Next() without first checking for the current Item.

A Subscription will start at the requested index, or as close as possible to the requested index if it is no longer in the buffer. If StartExactlyAtIndex is set and the index is no longer in the buffer or not yet in the buffer an error will be returned.

When a caller is finished with the subscription it must call Subscription.Unsubscribe to free ACL tracking resources.

func (*EventBroker) SubscribeWithACLCheck

func (e *EventBroker) SubscribeWithACLCheck(req *SubscribeRequest) (*Subscription, *time.Time, error)

SubscribeWithACLCheck validates the SubscribeRequest's token and requested topics to ensure that the tokens privileges are sufficient. It will also return the token expiry time, if any. It is the callers responsibility to check this before publishing events to the caller.

type EventBrokerCfg

type EventBrokerCfg struct {
	EventBufferSize int64
	Logger          hclog.Logger
}

type JsonStream

type JsonStream struct {
	// contains filtered or unexported fields
}

JsonStream is used to send new line delimited JSON and heartbeats to a destination (out channel)

func NewJsonStream

func NewJsonStream(ctx context.Context, heartbeat time.Duration) *JsonStream

NewJsonStream creates a new json stream that will output Json structs to the passed output channel. The constructor starts a goroutine to begin heartbeating on its set interval and also sends an initial heartbeat to notify the client about the successful connection initialization.

func (*JsonStream) OutCh

func (n *JsonStream) OutCh() chan *structs.EventJson

func (*JsonStream) Send

func (n *JsonStream) Send(v interface{}) error

Send encodes an object into Newline delimited json. An error is returned if json encoding fails or if the stream is no longer running.

type SubscribeRequest

type SubscribeRequest struct {
	Token     string
	Index     uint64
	Namespace string

	Topics map[structs.Topic][]string

	// StartExactlyAtIndex specifies if a subscription needs to
	// start exactly at the requested Index. If set to false,
	// the closest index in the buffer will be returned if there is not
	// an exact match
	StartExactlyAtIndex bool
}

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func (*Subscription) Next

func (s *Subscription) Next(ctx context.Context) (structs.Events, error)

func (*Subscription) NextNoBlock

func (s *Subscription) NextNoBlock() ([]structs.Event, error)

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe()

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL