stream

package
v0.0.0-...-19e0650 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 10, 2024 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Aggregator

type Aggregator struct {
	sync.Mutex
	// contains filtered or unexported fields
}

Aggregator manages converging multiple streams into one.

func NewAggregator

func NewAggregator(c GatewayClient, shardID string, l *log.Logger) *Aggregator

NewAggregator configures and returns a new Aggregator.

func (*Aggregator) Add

func (a *Aggregator) Add(r Resource)

Add adds a new source ID to the aggregator. This is called by the orchestrator.

func (*Aggregator) Consume

func (a *Aggregator) Consume() <-chan interface{}

Consume returns a channel from which a client can read from the aggregated stream.

func (*Aggregator) List

func (a *Aggregator) List() []interface{}

List returns the current list of stream IDs being aggregated. This is called by the orchestrator.

func (*Aggregator) Remove

func (a *Aggregator) Remove(id string)

Remove removes an existing source ID from the aggregator. This is called by the orchestrator.

type Communicator

type Communicator struct{}

func (Communicator) Add

func (a Communicator) Add(ctx context.Context, worker, task interface{}) error

func (Communicator) List

func (a Communicator) List(ctx context.Context, worker interface{}) ([]interface{}, error)

func (Communicator) Remove

func (a Communicator) Remove(ctx context.Context, worker, task interface{}) error

type GatewayClient

type GatewayClient interface {
	Stream(ctx context.Context, req *loggregator_v2.EgressBatchRequest) loggregator.EnvelopeStream
}

GatewayClient is the interface used to open a new stream with the logs provider gateway.

type Getter

type Getter interface {
	Get(url string) (*http.Response, error)
}

type Orchestrator

type Orchestrator interface {
	NextTerm(ctx context.Context)
	UpdateTasks([]orchestrator.Task)
}

type Resource

type Resource struct {
	GUID string `json:"guid"`
	Name string `json:"name"`
}

type SingleOrSpaceProvider

type SingleOrSpaceProvider struct {
	Source          string
	ApiAddr         string
	SpaceGuid       string
	IncludeServices bool
	// contains filtered or unexported fields
}

func NewSingleOrSpaceProvider

func NewSingleOrSpaceProvider(
	sourceID string,
	apiAddr string,
	spaceID string,
	includeServices bool,
	opts ...SingleOrSpaceProviderOption,
) *SingleOrSpaceProvider

func (*SingleOrSpaceProvider) Resources

func (s *SingleOrSpaceProvider) Resources() ([]Resource, error)

type SingleOrSpaceProviderOption

type SingleOrSpaceProviderOption func(*SingleOrSpaceProvider)

func WithSourceProviderClient

func WithSourceProviderClient(httpClient Getter) SingleOrSpaceProviderOption

func WithSourceProviderSpaceExcludeFilter

func WithSourceProviderSpaceExcludeFilter(excludeFilter SourceIDFilter) SingleOrSpaceProviderOption

type SourceIDFilter

type SourceIDFilter func(string) bool

SourceIDFilter returns true if the passed source id should be filtered from the space list from CAPI

type SourceManager

type SourceManager struct {
	// contains filtered or unexported fields
}

func NewSourceManager

func NewSourceManager(s SourceProvider, o Orchestrator, interval time.Duration) *SourceManager

func (*SourceManager) Start

func (s *SourceManager) Start()

type SourceProvider

type SourceProvider interface {
	Resources() ([]Resource, error)
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL