Documentation ¶
Index ¶
- Constants
- type Bus
- func (b *Bus) BroadcastAttachPipeline(ctx context.Context, req *protos.AttachPipelineRequest) error
- func (b *Bus) BroadcastDeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error
- func (b *Bus) BroadcastDeletePipeline(ctx context.Context, req *protos.DeletePipelineRequest) error
- func (b *Bus) BroadcastDeregister(ctx context.Context, req *protos.DeregisterRequest) error
- func (b *Bus) BroadcastDetachPipeline(ctx context.Context, req *protos.DetachPipelineRequest) error
- func (b *Bus) BroadcastKVCreate(ctx context.Context, kvs []*protos.KVObject, overwrite bool) error
- func (b *Bus) BroadcastKVDelete(ctx context.Context, key string) error
- func (b *Bus) BroadcastKVDeleteAll(ctx context.Context) error
- func (b *Bus) BroadcastKVUpdate(ctx context.Context, kvs []*protos.KVObject) error
- func (b *Bus) BroadcastMetrics(ctx context.Context, req *protos.MetricsRequest) error
- func (b *Bus) BroadcastNewAudience(ctx context.Context, req *protos.NewAudienceRequest) error
- func (b *Bus) BroadcastPausePipeline(ctx context.Context, req *protos.PausePipelineRequest) error
- func (b *Bus) BroadcastRegister(ctx context.Context, req *protos.RegisterRequest) error
- func (b *Bus) BroadcastResumePipeline(ctx context.Context, req *protos.ResumePipelineRequest) error
- func (b *Bus) BroadcastTailRequest(ctx context.Context, req *protos.TailRequest) error
- func (b *Bus) BroadcastTailResponse(ctx context.Context, resp *protos.TailResponse) error
- func (b *Bus) BroadcastUpdatePipeline(ctx context.Context, req *protos.UpdatePipelineRequest) error
- func (b *Bus) RunBroadcastConsumer() error
- func (b *Bus) RunTailConsumer() error
- type IBus
- type Options
Constants ¶
View Source
const ( // FullSubject is for non-tail/peek RedisBackend pubsub messages FullSubject = "streamdal_events:broadcast" // TailSubjectPrefix is the prefix for the RedisBackend wildcard pubsub topic for tail/peek responses TailSubjectPrefix = "streamdal_events:tail" // BroadcastChannelBufferSize is the size of the shared broadcast channel. BroadcastChannelBufferSize = 1000 // TailChannelBufferSize is the size of the tail channel. This buffer size // is intentionally large so that we can survive a burst of tail responses. TailChannelBufferSize = 10_000 // DefaultNumTailWorkers is the default number of tail workers that will be // set if the config value is not set. DefaultNumTailWorkers = 4 // DefaultNumBroadcastWorkers is the default number of broadcast workers // that will be set if the config value is not set. DefaultNumBroadcastWorkers = 4 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
func (*Bus) BroadcastAttachPipeline ¶
func (*Bus) BroadcastDeleteAudience ¶
func (*Bus) BroadcastDeletePipeline ¶
func (*Bus) BroadcastDeregister ¶
func (*Bus) BroadcastDetachPipeline ¶
func (*Bus) BroadcastKVCreate ¶
BroadcastKVCreate will transform the req into a generic KVRequest and broadcast it to other server nodes.
func (*Bus) BroadcastKVDelete ¶
func (*Bus) BroadcastKVUpdate ¶
BroadcastKVUpdate will transform the req into a generic KVRequest and broadcast it to other server nodes.
func (*Bus) BroadcastMetrics ¶
func (*Bus) BroadcastNewAudience ¶
func (*Bus) BroadcastPausePipeline ¶
func (*Bus) BroadcastRegister ¶
func (*Bus) BroadcastResumePipeline ¶
func (*Bus) BroadcastTailRequest ¶
func (*Bus) BroadcastTailResponse ¶
func (*Bus) BroadcastUpdatePipeline ¶
func (*Bus) RunBroadcastConsumer ¶
RunBroadcastConsumer is used for consuming message from the broadcast stream and executing a message handler. It automatically recovers from Redis connection errors.
func (*Bus) RunTailConsumer ¶
RunTailConsumer is a dedicated consumer that listens for tail messages on a channel with a * pattern. It automatically recovers from connection errors.
type IBus ¶
type IBus interface { // RunBroadcastConsumer runs a redis consumer that listens for messages on the broadcast topic RunBroadcastConsumer() error // RunTailConsumer is used for consuming message from the RedisBackend // wildcard pubsub topic. This method is different from RunBroadcastConsumer() // because we must call PSubscribe() and PUnsubscribe() instead of // Subscribe() and Unsubscribe() respectively. // See: https://redis.io/commands/psubscribe RunTailConsumer() error // BroadcastRegister broadcasts a RegisterRequest to all nodes in the cluster BroadcastRegister(ctx context.Context, req *protos.RegisterRequest) error // BroadcastDeregister broadcasts a DeregisterRequest to all nodes in the cluster BroadcastDeregister(ctx context.Context, req *protos.DeregisterRequest) error // BroadcastDeleteAudience broadcasts a DeleteAudienceRequest to all nodes in the cluster BroadcastDeleteAudience(ctx context.Context, req *protos.DeleteAudienceRequest) error // BroadcastUpdatePipeline broadcasts a UpdatePipelineRequest to all nodes in the cluster BroadcastUpdatePipeline(ctx context.Context, req *protos.UpdatePipelineRequest) error // BroadcastDeletePipeline broadcasts a DeletePipelineRequest to all nodes in the cluster BroadcastDeletePipeline(ctx context.Context, req *protos.DeletePipelineRequest) error // BroadcastAttachPipeline broadcasts a AttachPipelineRequest to all nodes in the cluster BroadcastAttachPipeline(ctx context.Context, req *protos.AttachPipelineRequest) error // BroadcastDetachPipeline broadcasts a DetachPipelineRequest to all nodes in the cluster BroadcastDetachPipeline(ctx context.Context, req *protos.DetachPipelineRequest) error // BroadcastPausePipeline broadcasts a PausePipelineRequest to all nodes in the cluster BroadcastPausePipeline(ctx context.Context, req *protos.PausePipelineRequest) error // BroadcastResumePipeline broadcasts a ResumePipelineRequest to all nodes in the cluster BroadcastResumePipeline(ctx context.Context, req *protos.ResumePipelineRequest) error // BroadcastMetrics broadcasts a MetricsRequest to all nodes in the cluster BroadcastMetrics(ctx context.Context, req *protos.MetricsRequest) error // BroadcastKVCreate broadcasts a KVRequest to all nodes in the cluster BroadcastKVCreate(ctx context.Context, kvs []*protos.KVObject, overwrite bool) error // BroadcastKVUpdate broadcasts a KVRequest to all nodes in the cluster BroadcastKVUpdate(ctx context.Context, kvs []*protos.KVObject) error // BroadcastKVDelete broadcasts a KVRequest to all nodes in the cluster BroadcastKVDelete(ctx context.Context, key string) error // BroadcastKVDeleteAll broadcasts a KVRequest to all nodes in the cluster BroadcastKVDeleteAll(ctx context.Context) error // BroadcastNewAudience broadcasts a NewAudienceRequest to all nodes in the cluster BroadcastNewAudience(ctx context.Context, req *protos.NewAudienceRequest) error // BroadcastTailRequest broadcasts a TailRequest to all nodes in the cluster BroadcastTailRequest(ctx context.Context, req *protos.TailRequest) error // BroadcastTailResponse broadcasts a TailResponse to all nodes in the cluster BroadcastTailResponse(ctx context.Context, resp *protos.TailResponse) error }
Click to show internal directories.
Click to hide internal directories.