services

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 28, 2024 License: LGPL-3.0 Imports: 24 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type CleanupService added in v0.1.4

type CleanupService interface {
	CleanupDoneQueue(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error)
}

func NewCleanupService added in v0.1.4

func NewCleanupService(redisStore RedisStore, timeSvc TimeService) CleanupService

type InhooksConfigService

type InhooksConfigService interface {
	Load(path string) error
	FindFlowForSource(sourceSlug string) *models.Flow
	GetFlow(flowID string) *models.Flow
	GetFlows() map[string]*models.Flow
	GetTransformDefinition(transformID string) *models.TransformDefinition
}

func NewInhooksConfigService

func NewInhooksConfigService(logger *zap.Logger, appConf *lib.AppConfig) InhooksConfigService

type MessageBuilder

type MessageBuilder interface {
	FromHttp(flow *models.Flow, r *http.Request, reqID string) ([]*models.Message, error)
}

func NewMessageBuilder

func NewMessageBuilder(timeSvc TimeService) MessageBuilder

type MessageEnqueuer

type MessageEnqueuer interface {
	Enqueue(ctx context.Context, messages []*models.Message) ([]*models.QueuedInfo, error)
}

func NewMessageEnqueuer

func NewMessageEnqueuer(redisStore RedisStore, timeSvc TimeService) MessageEnqueuer

type MessageFetcher

type MessageFetcher interface {
	GetMessageForProcessing(ctx context.Context, timeout time.Duration, flowID string, sinkID string) (*models.Message, error)
}

func NewMessageFetcher

func NewMessageFetcher(redisStore RedisStore, timeSvc TimeService) MessageFetcher

type MessageProcessor

type MessageProcessor interface {
	Process(ctx context.Context, sink *models.Sink, m *models.Message) error
}

func NewMessageProcessor

func NewMessageProcessor(httpClient *http.Client) MessageProcessor

type MessageTransformer added in v0.1.9

type MessageTransformer interface {
	Transform(ctx context.Context, transformDefinition *models.TransformDefinition, m *models.Message) error
}

func NewMessageTransformer added in v0.1.9

func NewMessageTransformer(config *lib.TransformConfig) MessageTransformer

type MessageVerifier added in v0.1.6

type MessageVerifier interface {
	Verify(flow *models.Flow, m *models.Message) error
}

func NewMessageVerifier added in v0.1.6

func NewMessageVerifier() MessageVerifier

type ProcessingRecoveryService added in v0.1.3

type ProcessingRecoveryService interface {
	MoveProcessingToReady(ctx context.Context, flow *models.Flow, sink *models.Sink, processingRecoveryInterval time.Duration) ([]string, error)
	AddToCache(mID string, ttl time.Duration)
}

func NewProcessingRecoveryService added in v0.1.3

func NewProcessingRecoveryService(redisStore RedisStore) (ProcessingRecoveryService, error)

type ProcessingResultsService

type ProcessingResultsService interface {
	HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.QueuedInfo, error)
	HandleOK(ctx context.Context, m *models.Message) error
}

func NewProcessingResultsService

func NewProcessingResultsService(timeSvc TimeService, redisStore RedisStore, retryCalculator RetryCalculator) ProcessingResultsService

type RedisStore

type RedisStore interface {
	Get(ctx context.Context, messageKey string) ([]byte, error)
	SetAndEnqueue(ctx context.Context, messageKey string, value []byte, queueKey string, messageID string) error
	SetAndZAdd(ctx context.Context, messageKey string, value []byte, queueKey string, messageID string, score float64) error
	SetAndMove(ctx context.Context, messageKey string, value []byte, sourceQueueKey, destQueueKey string, messageID string) error
	SetLRemZAdd(ctx context.Context, messageKey string, value []byte, sourceQueueKey, destQueueKey string, messageID string, score float64) error
	Enqueue(ctx context.Context, key string, value []byte) error
	Dequeue(ctx context.Context, timeout time.Duration, key string) ([]byte, error)
	BLMove(ctx context.Context, timeout time.Duration, sourceQueueKey string, destQueueKey string) ([]byte, error)
	ZRangeBelowScore(ctx context.Context, queueKey string, score float64) ([]string, error)
	ZRemRpush(ctx context.Context, messageIDs []string, sourceQueueKey string, destQueueKey string) error
	LRangeAll(ctx context.Context, queueKey string) ([]string, error)
	LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey string, messageIDs []string) error
	ZRemRangeBelowScore(ctx context.Context, queueKey string, maxScore int) (int, error)
	ZRemDel(ctx context.Context, queueKey string, messageIDs []string, messageKeys []string) error
}

func NewRedisStore

func NewRedisStore(client *redis.Client, inhooksDBName string) (RedisStore, error)

type RetryCalculator added in v0.1.3

type RetryCalculator interface {
	NextAttemptInterval(attemptsCount int, retryInterval *time.Duration, retryExpMultiplier *float64) time.Duration
}

func NewRetryCalculator added in v0.1.3

func NewRetryCalculator() RetryCalculator

type SchedulerService

type SchedulerService interface {
	MoveDueScheduled(ctx context.Context, f *models.Flow, sink *models.Sink) error
}

func NewSchedulerService

func NewSchedulerService(redisStore RedisStore, timeSvc TimeService) SchedulerService

type TimeService

type TimeService interface {
	Now() time.Time
}

func NewTimeService

func NewTimeService() TimeService

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL