bus

package
v0.0.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 15, 2023 License: Apache-2.0 Imports: 16 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// FullSubject is for non-tail/peek RedisBackend pubsub messages
	FullSubject = "streamdal_events:broadcast"

	// TailSubjectPrefix is the prefix for the RedisBackend wildcard pubsub topic for tail/peek responses
	TailSubjectPrefix = "streamdal_events:tail"

	// BroadcastChannelBufferSize is the size of the shared broadcast channel.
	BroadcastChannelBufferSize = 1000

	// TailChannelBufferSize is the size of the tail channel. This buffer size
	// is intentionally large so that we can survive a burst of tail responses.
	TailChannelBufferSize = 10_000

	// DefaultNumTailWorkers is the default number of tail workers that will be
	// set if the config value is not set.
	DefaultNumTailWorkers = 4

	// DefaultNumBroadcastWorkers is the default number of broadcast workers
	// that will be set if the config value is not set.
	DefaultNumBroadcastWorkers = 4
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

func New

func New(opts *Options) (*Bus, error)

func (*Bus) BroadcastAttachPipeline

func (b *Bus) BroadcastAttachPipeline(ctx context.Context, req *protos.AttachPipelineRequest) error

func (*Bus) BroadcastDeleteAudience

func (b *Bus) BroadcastDeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error

func (*Bus) BroadcastDeletePipeline

func (b *Bus) BroadcastDeletePipeline(ctx context.Context, req *protos.DeletePipelineRequest) error

func (*Bus) BroadcastDeregister

func (b *Bus) BroadcastDeregister(ctx context.Context, req *protos.DeregisterRequest) error

func (*Bus) BroadcastDetachPipeline

func (b *Bus) BroadcastDetachPipeline(ctx context.Context, req *protos.DetachPipelineRequest) error

func (*Bus) BroadcastKVCreate

func (b *Bus) BroadcastKVCreate(ctx context.Context, kvs []*protos.KVObject, overwrite bool) error

BroadcastKVCreate will transform the req into a generic KVRequest and broadcast it to other server nodes.

func (*Bus) BroadcastKVDelete

func (b *Bus) BroadcastKVDelete(ctx context.Context, key string) error

func (*Bus) BroadcastKVDeleteAll

func (b *Bus) BroadcastKVDeleteAll(ctx context.Context) error

func (*Bus) BroadcastKVUpdate

func (b *Bus) BroadcastKVUpdate(ctx context.Context, kvs []*protos.KVObject) error

BroadcastKVUpdate will transform the req into a generic KVRequest and broadcast it to other server nodes.

func (*Bus) BroadcastMetrics

func (b *Bus) BroadcastMetrics(ctx context.Context, req *protos.MetricsRequest) error

func (*Bus) BroadcastNewAudience

func (b *Bus) BroadcastNewAudience(ctx context.Context, req *protos.NewAudienceRequest) error

func (*Bus) BroadcastPausePipeline

func (b *Bus) BroadcastPausePipeline(ctx context.Context, req *protos.PausePipelineRequest) error

func (*Bus) BroadcastRegister

func (b *Bus) BroadcastRegister(ctx context.Context, req *protos.RegisterRequest) error

func (*Bus) BroadcastResumePipeline

func (b *Bus) BroadcastResumePipeline(ctx context.Context, req *protos.ResumePipelineRequest) error

func (*Bus) BroadcastTailRequest

func (b *Bus) BroadcastTailRequest(ctx context.Context, req *protos.TailRequest) error

func (*Bus) BroadcastTailResponse

func (b *Bus) BroadcastTailResponse(ctx context.Context, resp *protos.TailResponse) error

func (*Bus) BroadcastUpdatePipeline

func (b *Bus) BroadcastUpdatePipeline(ctx context.Context, req *protos.UpdatePipelineRequest) error

func (*Bus) RunBroadcastConsumer

func (b *Bus) RunBroadcastConsumer() error

RunBroadcastConsumer is used for consuming message from the broadcast stream and executing a message handler. It automatically recovers from Redis connection errors.

func (*Bus) RunTailConsumer

func (b *Bus) RunTailConsumer() error

RunTailConsumer is a dedicated consumer that listens for tail messages on a channel with a * pattern. It automatically recovers from connection errors.

type IBus

type IBus interface {
	// RunBroadcastConsumer runs a redis consumer that listens for messages on the broadcast topic
	RunBroadcastConsumer() error

	// RunTailConsumer is used for consuming message from the RedisBackend
	// wildcard pubsub topic. This method is different from RunBroadcastConsumer()
	// because we must call PSubscribe() and PUnsubscribe() instead of
	// Subscribe() and Unsubscribe() respectively.
	// See: https://redis.io/commands/psubscribe
	RunTailConsumer() error

	// BroadcastRegister broadcasts a RegisterRequest to all nodes in the cluster
	BroadcastRegister(ctx context.Context, req *protos.RegisterRequest) error

	// BroadcastDeregister broadcasts a DeregisterRequest to all nodes in the cluster
	BroadcastDeregister(ctx context.Context, req *protos.DeregisterRequest) error

	// BroadcastDeleteAudience broadcasts a DeleteAudienceRequest to all nodes in the cluster
	BroadcastDeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error

	// BroadcastUpdatePipeline broadcasts a UpdatePipelineRequest to all nodes in the cluster
	BroadcastUpdatePipeline(ctx context.Context, req *protos.UpdatePipelineRequest) error

	// BroadcastDeletePipeline broadcasts a DeletePipelineRequest to all nodes in the cluster
	BroadcastDeletePipeline(ctx context.Context, req *protos.DeletePipelineRequest) error

	// BroadcastAttachPipeline broadcasts a AttachPipelineRequest to all nodes in the cluster
	BroadcastAttachPipeline(ctx context.Context, req *protos.AttachPipelineRequest) error

	// BroadcastDetachPipeline broadcasts a DetachPipelineRequest to all nodes in the cluster
	BroadcastDetachPipeline(ctx context.Context, req *protos.DetachPipelineRequest) error

	// BroadcastPausePipeline broadcasts a PausePipelineRequest to all nodes in the cluster
	BroadcastPausePipeline(ctx context.Context, req *protos.PausePipelineRequest) error

	// BroadcastResumePipeline broadcasts a ResumePipelineRequest to all nodes in the cluster
	BroadcastResumePipeline(ctx context.Context, req *protos.ResumePipelineRequest) error

	// BroadcastMetrics broadcasts a MetricsRequest to all nodes in the cluster
	BroadcastMetrics(ctx context.Context, req *protos.MetricsRequest) error

	// BroadcastKVCreate broadcasts a KVRequest to all nodes in the cluster
	BroadcastKVCreate(ctx context.Context, kvs []*protos.KVObject, overwrite bool) error

	// BroadcastKVUpdate broadcasts a KVRequest to all nodes in the cluster
	BroadcastKVUpdate(ctx context.Context, kvs []*protos.KVObject) error

	// BroadcastKVDelete broadcasts a KVRequest to all nodes in the cluster
	BroadcastKVDelete(ctx context.Context, key string) error

	// BroadcastKVDeleteAll broadcasts a KVRequest to all nodes in the cluster
	BroadcastKVDeleteAll(ctx context.Context) error

	// BroadcastNewAudience broadcasts a NewAudienceRequest to all nodes in the cluster
	BroadcastNewAudience(ctx context.Context, req *protos.NewAudienceRequest) error

	// BroadcastTailRequest broadcasts a TailRequest to all nodes in the cluster
	BroadcastTailRequest(ctx context.Context, req *protos.TailRequest) error

	// BroadcastTailResponse broadcasts a TailResponse to all nodes in the cluster
	BroadcastTailResponse(ctx context.Context, resp *protos.TailResponse) error
}

type Options

type Options struct {
	Store               store.IStore
	RedisBackend        *redis.Client
	Metrics             metrics.IMetrics
	Cmd                 cmd.ICmd
	NodeName            string
	WASMDir             string
	ShutdownCtx         context.Context
	PubSub              pubsub.IPubSub
	NumBroadcastWorkers int
	NumTailWorkers      int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL