event

package
v0.0.0-...-f9df99f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 6, 2024 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Index

Constants

View Source
const FileReadyEventType = "FileReady"
View Source
const TypeSeparator = "_"

Variables

View Source
var DefaultMessageVisibility = 30
View Source
var ErrInvalidARN = errors.New("given arn is not an arn")
View Source
var FileReadyChan chan *FileReady
View Source
var MaxMessages = 3
View Source
var MaxRetries = 5

Functions

func CloseFileReadyChannel

func CloseFileReadyChannel()

func GetChannel

func GetChannel[T Identifiable]() (chan T, error)

func InitFileReadyChannel

func InitFileReadyChannel()

func NewAMQPServiceBusClient

func NewAMQPServiceBusClient(connString string) (*azservicebus.Client, error)

func NewEventFromServiceBusMessage

func NewEventFromServiceBusMessage[T Identifiable](m *azservicebus.ReceivedMessage) (T, error)

Types

type AzurePublisher

type AzurePublisher[T Identifiable] struct {
	Context     context.Context
	Sender      *azservicebus.Sender
	AdminClient *admin.Client
	Topic       string
}

func NewAzurePublisher

func NewAzurePublisher[T Identifiable](ctx context.Context, connectionString, topic string) (*AzurePublisher[T], error)

func (*AzurePublisher[T]) Close

func (ap *AzurePublisher[T]) Close() error

func (*AzurePublisher[T]) Health

func (ap *AzurePublisher[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp)

func (*AzurePublisher[T]) Publish

func (ap *AzurePublisher[T]) Publish(ctx context.Context, event T) error

type AzureSubscriber

type AzureSubscriber[T Identifiable] struct {
	Context      context.Context
	Receiver     *azservicebus.Receiver
	AdminClient  *admin.Client
	Subscription string
	Topic        string
	Max          int
}

func NewAzureSubscriber

func NewAzureSubscriber[T Identifiable](ctx context.Context, connectionString, topic, subscription string, maxMessages int) (*AzureSubscriber[T], error)

func (*AzureSubscriber[T]) Close

func (as *AzureSubscriber[T]) Close() error

func (*AzureSubscriber[T]) Health

func (as *AzureSubscriber[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp)

func (*AzureSubscriber[T]) Listen

func (as *AzureSubscriber[T]) Listen(ctx context.Context, process func(context.Context, T) error) error

type Event

type Event struct {
	ID         string `json:"id"`
	Type       string `json:"type"`
	RetryCount int    `json:"retry_count"`
}

type FilePublisher

type FilePublisher[T Identifiable] struct {
	Dir string
}

func (*FilePublisher[T]) Publish

func (mp *FilePublisher[T]) Publish(_ context.Context, event T) error

type FileReady

type FileReady struct {
	Event
	UploadId          string `json:"upload_id"`
	SrcUrl            string `json:"src_url"`
	Path              string `json:"path"`
	DestinationTarget string `json:"deliver_target"`
	Metadata          map[string]string
}

func NewFileReadyEvent

func NewFileReadyEvent(uploadId string, metadata map[string]string, path, target string) *FileReady

func (*FileReady) Identifier

func (fr *FileReady) Identifier() string

func (*FileReady) IncrementRetryCount

func (fr *FileReady) IncrementRetryCount()

func (*FileReady) RetryCount

func (fr *FileReady) RetryCount() int

func (*FileReady) SetIdentifier

func (fr *FileReady) SetIdentifier(id string)

func (*FileReady) SetType

func (fr *FileReady) SetType(t string)

func (*FileReady) Type

func (fr *FileReady) Type() string

type Identifiable

type Identifiable interface {
	Retryable
	Identifier() string
	Type() string
	SetIdentifier(id string)
	SetType(t string)
}

TODO better name for this interface would be Subscribable or Queueable or similar

type MemoryPublisher

type MemoryPublisher[T Identifiable] struct {
	Chan chan T
}

func (*MemoryPublisher[T]) Close

func (mp *MemoryPublisher[T]) Close() error

func (*MemoryPublisher[T]) Health

func (mp *MemoryPublisher[T]) Health(_ context.Context) (rsp models.ServiceHealthResp)

func (*MemoryPublisher[T]) Publish

func (mp *MemoryPublisher[T]) Publish(_ context.Context, event T) error

type MemorySubscriber

type MemorySubscriber[T Identifiable] struct {
	Chan chan T
}

func (*MemorySubscriber[T]) Close

func (ms *MemorySubscriber[T]) Close() error

func (*MemorySubscriber[T]) Health

func (ms *MemorySubscriber[T]) Health(_ context.Context) (rsp models.ServiceHealthResp)

func (*MemorySubscriber[T]) Listen

func (ms *MemorySubscriber[T]) Listen(ctx context.Context, process func(context.Context, T) error) error

type Publisher

type Publisher[T Identifiable] interface {
	Publish(ctx context.Context, event T) error
}

type Publishers

type Publishers[T Identifiable] []Publisher[T]
var FileReadyPublisher Publishers[*FileReady]

func (Publishers[T]) Close

func (p Publishers[T]) Close()

func (Publishers[T]) Publish

func (p Publishers[T]) Publish(ctx context.Context, e T) error

type Retryable

type Retryable interface {
	RetryCount() int
	IncrementRetryCount()
}

type SNSPublisher

type SNSPublisher[T Identifiable] struct {
	TopicArn string
}

func NewSNSPublisher

func NewSNSPublisher[T Identifiable](ctx context.Context, topicARN string) (*SNSPublisher[T], error)

arn format arn:aws:sns:region:account-id:topicname

func (SNSPublisher[T]) Client

func (s SNSPublisher[T]) Client(ctx context.Context) (*sns.Client, error)

func (*SNSPublisher[T]) Health

func (s *SNSPublisher[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp)

func (SNSPublisher[T]) Publish

func (s SNSPublisher[T]) Publish(ctx context.Context, e T) error

type SQSSubscriber

type SQSSubscriber[T Identifiable] struct {
	QueueURL string
	ARN      string
	Max      int
}

func NewSQSSubscriber

func NewSQSSubscriber[T Identifiable](ctx context.Context, queueArn string, batchMax int) (*SQSSubscriber[T], error)

func (*SQSSubscriber[T]) Client

func (s *SQSSubscriber[T]) Client(ctx context.Context) (*sqs.Client, error)

func (*SQSSubscriber[T]) Close

func (s *SQSSubscriber[T]) Close() error

func (*SQSSubscriber[T]) Health

func (s *SQSSubscriber[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp)

func (*SQSSubscriber[T]) Listen

func (s *SQSSubscriber[T]) Listen(ctx context.Context, process func(context.Context, T) error) error

func (*SQSSubscriber[T]) Subscribe

func (s *SQSSubscriber[T]) Subscribe(ctx context.Context, topicArn string) error

type Subscribable

type Subscribable[T Identifiable] interface {
	Listen(context.Context, func(context.Context, T) error) error
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL