dequeuer

package
v0.42.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 23, 2022 License: Apache-2.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrUserContainerResponseStatusCode        = "dequeuer.user_container_response_status_code"
	ErrUserContainerResponseMissingJSONHeader = "dequeuer.user_container_response_missing_json_header"
	ErrUserContainerResponseNotJSONDecodable  = "dequeuer.user_container_response_not_json_decodable"
	ErrUserContainerNotReachable              = "dequeuer.user_container_not_reachable"
)
View Source
const (
	// CortexJobIDHeader is the header containing the job id for the user container
	CortexJobIDHeader = "X-Cortex-Job-ID"
)
View Source
const (
	// CortexRequestIDHeader is the header containing the workload request id for the user container
	CortexRequestIDHeader = "X-Cortex-Request-ID"
)

Variables

This section is empty.

Functions

func ErrorUserContainerNotReachable

func ErrorUserContainerNotReachable(err error) error

func ErrorUserContainerResponseMissingJSONHeader

func ErrorUserContainerResponseMissingJSONHeader() error

func ErrorUserContainerResponseNotJSONDecodable

func ErrorUserContainerResponseNotJSONDecodable() error

func ErrorUserContainerResponseStatusCode

func ErrorUserContainerResponseStatusCode(statusCode int) error

func HasTCPProbeTargetingUserPod

func HasTCPProbeTargetingUserPod(probes []*probe.Probe, userPort int) bool

func HealthcheckHandler

func HealthcheckHandler(isHealthy func() bool) http.HandlerFunc

func ProbesFromFile

func ProbesFromFile(probesPath string, logger *zap.SugaredLogger) ([]*probe.Probe, error)

Types

type AsyncMessageHandler

type AsyncMessageHandler struct {
	// contains filtered or unexported fields
}

func NewAsyncMessageHandler

func NewAsyncMessageHandler(config AsyncMessageHandlerConfig, awsClient *awslib.Client, eventHandler RequestEventHandler, logger *zap.SugaredLogger) *AsyncMessageHandler

func (*AsyncMessageHandler) Handle

func (h *AsyncMessageHandler) Handle(message *sqs.Message) error

type AsyncMessageHandlerConfig

type AsyncMessageHandlerConfig struct {
	ClusterUID string
	Bucket     string
	APIName    string
	TargetURL  string
}

type AsyncStatsReporter

type AsyncStatsReporter struct {
	// contains filtered or unexported fields
}

func NewAsyncPrometheusStatsReporter

func NewAsyncPrometheusStatsReporter() *AsyncStatsReporter

func (*AsyncStatsReporter) HandleEvent

func (r *AsyncStatsReporter) HandleEvent(event RequestEvent)

func (*AsyncStatsReporter) ServeHTTP

func (r *AsyncStatsReporter) ServeHTTP(w http.ResponseWriter, req *http.Request)

type BatchMessageHandler

type BatchMessageHandler struct {
	// contains filtered or unexported fields
}

func NewBatchMessageHandler

func NewBatchMessageHandler(config BatchMessageHandlerConfig, awsClient *awslib.Client, statsdClient statsd.ClientInterface, log *zap.SugaredLogger) *BatchMessageHandler

func (*BatchMessageHandler) Handle

func (h *BatchMessageHandler) Handle(message *sqs.Message) error

type BatchMessageHandlerConfig

type BatchMessageHandlerConfig struct {
	APIName   string
	JobID     string
	QueueURL  string
	Region    string
	TargetURL string
}

type MessageHandler

type MessageHandler interface {
	Handle(*sqs.Message) error
}

func NewMessageHandlerFunc

func NewMessageHandlerFunc(handleFunc func(*sqs.Message) error) MessageHandler

type QueueAttributes

type QueueAttributes struct {
	VisibleMessages   int
	InvisibleMessages int
	HasRedrivePolicy  bool
}

func GetQueueAttributes

func GetQueueAttributes(client *awslib.Client, queueURL string) (QueueAttributes, error)

func (QueueAttributes) TotalMessages

func (attr QueueAttributes) TotalMessages() int

type RequestEvent

type RequestEvent struct {
	StatusCode int
	Duration   time.Duration
}

type RequestEventHandler

type RequestEventHandler interface {
	HandleEvent(event RequestEvent)
}

func NewRequestEventHandlerFunc

func NewRequestEventHandlerFunc(handleFunc func(event RequestEvent)) RequestEventHandler

type SQSDequeuer

type SQSDequeuer struct {
	// contains filtered or unexported fields
}

func NewSQSDequeuer

func NewSQSDequeuer(config SQSDequeuerConfig, awsClient *awslib.Client, logger *zap.SugaredLogger) (*SQSDequeuer, error)

func (*SQSDequeuer) ReceiveMessage

func (d *SQSDequeuer) ReceiveMessage() (*sqs.Message, error)

func (*SQSDequeuer) Shutdown

func (d *SQSDequeuer) Shutdown()

func (*SQSDequeuer) Start

func (d *SQSDequeuer) Start(messageHandler MessageHandler, readinessProbeFunc func() bool) error

func (*SQSDequeuer) StartMessageRenewer

func (d *SQSDequeuer) StartMessageRenewer(receiptHandle string) chan struct{}

type SQSDequeuerConfig

type SQSDequeuerConfig struct {
	Region           string
	QueueURL         string
	StopIfNoMessages bool
	Workers          int
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL