Documentation ¶
Index ¶
- type CleanupService
- type InhooksConfigService
- type MessageBuilder
- type MessageEnqueuer
- type MessageFetcher
- type MessageProcessor
- type MessageTransformer
- type MessageVerifier
- type ProcessingRecoveryService
- type ProcessingResultsService
- type RedisStore
- type RetryCalculator
- type SchedulerService
- type TimeService
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CleanupService ¶ added in v0.1.4
type CleanupService interface {
CleanupDoneQueue(ctx context.Context, f *models.Flow, sink *models.Sink, doneQueueCleanupDelay time.Duration) (int, error)
}
func NewCleanupService ¶ added in v0.1.4
func NewCleanupService(redisStore RedisStore, timeSvc TimeService) CleanupService
type InhooksConfigService ¶
type InhooksConfigService interface { Load(path string) error FindFlowForSource(sourceSlug string) *models.Flow GetFlow(flowID string) *models.Flow GetFlows() map[string]*models.Flow GetTransformDefinition(transformID string) *models.TransformDefinition }
func NewInhooksConfigService ¶
func NewInhooksConfigService(logger *zap.Logger, appConf *lib.AppConfig) InhooksConfigService
type MessageBuilder ¶
type MessageBuilder interface {
FromHttp(flow *models.Flow, r *http.Request, reqID string) ([]*models.Message, error)
}
func NewMessageBuilder ¶
func NewMessageBuilder(timeSvc TimeService) MessageBuilder
type MessageEnqueuer ¶
type MessageEnqueuer interface {
Enqueue(ctx context.Context, messages []*models.Message) ([]*models.QueuedInfo, error)
}
func NewMessageEnqueuer ¶
func NewMessageEnqueuer(redisStore RedisStore, timeSvc TimeService) MessageEnqueuer
type MessageFetcher ¶
type MessageFetcher interface {
GetMessageForProcessing(ctx context.Context, timeout time.Duration, flowID string, sinkID string) (*models.Message, error)
}
func NewMessageFetcher ¶
func NewMessageFetcher(redisStore RedisStore, timeSvc TimeService) MessageFetcher
type MessageProcessor ¶
type MessageProcessor interface {
Process(ctx context.Context, sink *models.Sink, m *models.Message) error
}
func NewMessageProcessor ¶
func NewMessageProcessor(httpClient *http.Client) MessageProcessor
type MessageTransformer ¶ added in v0.1.9
type MessageTransformer interface {
Transform(ctx context.Context, transformDefinition *models.TransformDefinition, m *models.Message) error
}
func NewMessageTransformer ¶ added in v0.1.9
func NewMessageTransformer(config *lib.TransformConfig) MessageTransformer
type MessageVerifier ¶ added in v0.1.6
func NewMessageVerifier ¶ added in v0.1.6
func NewMessageVerifier() MessageVerifier
type ProcessingRecoveryService ¶ added in v0.1.3
type ProcessingRecoveryService interface { MoveProcessingToReady(ctx context.Context, flow *models.Flow, sink *models.Sink, processingRecoveryInterval time.Duration) ([]string, error) AddToCache(mID string, ttl time.Duration) }
func NewProcessingRecoveryService ¶ added in v0.1.3
func NewProcessingRecoveryService(redisStore RedisStore) (ProcessingRecoveryService, error)
type ProcessingResultsService ¶
type ProcessingResultsService interface { HandleFailed(ctx context.Context, sink *models.Sink, m *models.Message, processingErr error) (*models.QueuedInfo, error) HandleOK(ctx context.Context, m *models.Message) error }
func NewProcessingResultsService ¶
func NewProcessingResultsService(timeSvc TimeService, redisStore RedisStore, retryCalculator RetryCalculator) ProcessingResultsService
type RedisStore ¶
type RedisStore interface { Get(ctx context.Context, messageKey string) ([]byte, error) SetAndEnqueue(ctx context.Context, messageKey string, value []byte, queueKey string, messageID string) error SetAndZAdd(ctx context.Context, messageKey string, value []byte, queueKey string, messageID string, score float64) error SetAndMove(ctx context.Context, messageKey string, value []byte, sourceQueueKey, destQueueKey string, messageID string) error SetLRemZAdd(ctx context.Context, messageKey string, value []byte, sourceQueueKey, destQueueKey string, messageID string, score float64) error Enqueue(ctx context.Context, key string, value []byte) error Dequeue(ctx context.Context, timeout time.Duration, key string) ([]byte, error) BLMove(ctx context.Context, timeout time.Duration, sourceQueueKey string, destQueueKey string) ([]byte, error) ZRangeBelowScore(ctx context.Context, queueKey string, score float64) ([]string, error) ZRemRpush(ctx context.Context, messageIDs []string, sourceQueueKey string, destQueueKey string) error LRangeAll(ctx context.Context, queueKey string) ([]string, error) LRemRPush(ctx context.Context, sourceQueueKey, destQueueKey string, messageIDs []string) error ZRemRangeBelowScore(ctx context.Context, queueKey string, maxScore int) (int, error) ZRemDel(ctx context.Context, queueKey string, messageIDs []string, messageKeys []string) error }
func NewRedisStore ¶
func NewRedisStore(client *redis.Client, inhooksDBName string) (RedisStore, error)
type RetryCalculator ¶ added in v0.1.3
type RetryCalculator interface {
NextAttemptInterval(attemptsCount int, retryInterval *time.Duration, retryExpMultiplier *float64) time.Duration
}
func NewRetryCalculator ¶ added in v0.1.3
func NewRetryCalculator() RetryCalculator
type SchedulerService ¶
type SchedulerService interface {
MoveDueScheduled(ctx context.Context, f *models.Flow, sink *models.Sink) error
}
func NewSchedulerService ¶
func NewSchedulerService(redisStore RedisStore, timeSvc TimeService) SchedulerService
type TimeService ¶
func NewTimeService ¶
func NewTimeService() TimeService
Source Files ¶
Click to show internal directories.
Click to hide internal directories.