Documentation ¶
Index ¶
- Constants
- Variables
- func CloseFileReadyChannel()
- func GetChannel[T Identifiable]() (chan T, error)
- func InitFileReadyChannel()
- func NewAMQPServiceBusClient(connString string) (*azservicebus.Client, error)
- func NewEventFromServiceBusMessage[T Identifiable](m *azservicebus.ReceivedMessage) (T, error)
- type AzurePublisher
- type AzureSubscriber
- type Event
- type FilePublisher
- type FileReady
- type Identifiable
- type MemoryPublisher
- type MemorySubscriber
- type Publisher
- type Publishers
- type Retryable
- type SNSPublisher
- type SQSSubscriber
- func (s *SQSSubscriber[T]) Client(ctx context.Context) (*sqs.Client, error)
- func (s *SQSSubscriber[T]) Close() error
- func (s *SQSSubscriber[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp)
- func (s *SQSSubscriber[T]) Listen(ctx context.Context, process func(context.Context, T) error) error
- func (s *SQSSubscriber[T]) Subscribe(ctx context.Context, topicArn string) error
- type Subscribable
Constants ¶
View Source
const FileReadyEventType = "FileReady"
View Source
const TypeSeparator = "_"
Variables ¶
View Source
var DefaultMessageVisibility = 30
View Source
var ErrInvalidARN = errors.New("given arn is not an arn")
View Source
var FileReadyChan chan *FileReady
View Source
var MaxMessages = 3
View Source
var MaxRetries = 5
Functions ¶
func CloseFileReadyChannel ¶
func CloseFileReadyChannel()
func GetChannel ¶
func GetChannel[T Identifiable]() (chan T, error)
func InitFileReadyChannel ¶
func InitFileReadyChannel()
func NewAMQPServiceBusClient ¶
func NewAMQPServiceBusClient(connString string) (*azservicebus.Client, error)
func NewEventFromServiceBusMessage ¶
func NewEventFromServiceBusMessage[T Identifiable](m *azservicebus.ReceivedMessage) (T, error)
Types ¶
type AzurePublisher ¶
type AzurePublisher[T Identifiable] struct { Context context.Context Sender *azservicebus.Sender AdminClient *admin.Client Topic string }
func NewAzurePublisher ¶
func NewAzurePublisher[T Identifiable](ctx context.Context, connectionString, topic string) (*AzurePublisher[T], error)
func (*AzurePublisher[T]) Close ¶
func (ap *AzurePublisher[T]) Close() error
func (*AzurePublisher[T]) Health ¶
func (ap *AzurePublisher[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp)
type AzureSubscriber ¶
type AzureSubscriber[T Identifiable] struct { Context context.Context Receiver *azservicebus.Receiver AdminClient *admin.Client Subscription string Topic string Max int }
func NewAzureSubscriber ¶
func NewAzureSubscriber[T Identifiable](ctx context.Context, connectionString, topic, subscription string, maxMessages int) (*AzureSubscriber[T], error)
func (*AzureSubscriber[T]) Close ¶
func (as *AzureSubscriber[T]) Close() error
func (*AzureSubscriber[T]) Health ¶
func (as *AzureSubscriber[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp)
type FilePublisher ¶
type FilePublisher[T Identifiable] struct { Dir string }
type FileReady ¶
type FileReady struct { Event UploadId string `json:"upload_id"` SrcUrl string `json:"src_url"` Path string `json:"path"` DestinationTarget string `json:"deliver_target"` Metadata map[string]string }
func NewFileReadyEvent ¶
func (*FileReady) Identifier ¶
func (*FileReady) IncrementRetryCount ¶
func (fr *FileReady) IncrementRetryCount()
func (*FileReady) RetryCount ¶
func (*FileReady) SetIdentifier ¶
type Identifiable ¶
type Identifiable interface { Retryable Identifier() string Type() string SetIdentifier(id string) SetType(t string) }
TODO better name for this interface would be Subscribable or Queueable or similar
type MemoryPublisher ¶
type MemoryPublisher[T Identifiable] struct { Chan chan T }
func (*MemoryPublisher[T]) Close ¶
func (mp *MemoryPublisher[T]) Close() error
func (*MemoryPublisher[T]) Health ¶
func (mp *MemoryPublisher[T]) Health(_ context.Context) (rsp models.ServiceHealthResp)
type MemorySubscriber ¶
type MemorySubscriber[T Identifiable] struct { Chan chan T }
func (*MemorySubscriber[T]) Close ¶
func (ms *MemorySubscriber[T]) Close() error
func (*MemorySubscriber[T]) Health ¶
func (ms *MemorySubscriber[T]) Health(_ context.Context) (rsp models.ServiceHealthResp)
type Publisher ¶
type Publisher[T Identifiable] interface { Publish(ctx context.Context, event T) error }
type Publishers ¶
type Publishers[T Identifiable] []Publisher[T]
var FileReadyPublisher Publishers[*FileReady]
func (Publishers[T]) Close ¶
func (p Publishers[T]) Close()
type SNSPublisher ¶
type SNSPublisher[T Identifiable] struct { TopicArn string }
func NewSNSPublisher ¶
func NewSNSPublisher[T Identifiable](ctx context.Context, topicARN string) (*SNSPublisher[T], error)
arn format arn:aws:sns:region:account-id:topicname
func (*SNSPublisher[T]) Health ¶
func (s *SNSPublisher[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp)
type SQSSubscriber ¶
type SQSSubscriber[T Identifiable] struct { QueueURL string ARN string Max int }
func NewSQSSubscriber ¶
func NewSQSSubscriber[T Identifiable](ctx context.Context, queueArn string, batchMax int) (*SQSSubscriber[T], error)
func (*SQSSubscriber[T]) Close ¶
func (s *SQSSubscriber[T]) Close() error
func (*SQSSubscriber[T]) Health ¶
func (s *SQSSubscriber[T]) Health(ctx context.Context) (rsp models.ServiceHealthResp)
type Subscribable ¶
Source Files ¶
Click to show internal directories.
Click to hide internal directories.