Documentation ¶
Index ¶
- type Broker
- type MessageHandler
- type MetadataService
- type MetadataServiceOp
- func (s *MetadataServiceOp) Create(ctx context.Context, req *message.MetadataCreateRequest) error
- func (s *MetadataServiceOp) Delete(ctx context.Context, req *message.MetadataDeleteRequest) error
- func (s *MetadataServiceOp) Read(ctx context.Context, req *message.MetadataReadRequest) (*message.MetadataReadResponse, error)
- func (s *MetadataServiceOp) Update(ctx context.Context, req *message.MetadataUpdateRequest) error
- type PreservationService
- type PreservationServiceOp
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Broker ¶
type Broker struct { Metadata MetadataService Preservation PreservationService // contains filtered or unexported fields }
Broker is a RDSS client using the SQS and SNS services.
Messages are received from sqsQueueMainURL and sent to an interna channel (messages). The channel is unbuffered so the receiver controls how often we are going to receive from SQS. However, the current processor is unbounded, i.e. processMessage is launched on a new goroutine for each message received.
The message processor will:
* Extract, unmarshal and validate the message payload.
* Reject messages that have been received before.
* Run the designated handler and capture the returned error.
In case of errors, messages are sent to the {Invalid,Error} Message Queue according to the behaviour described in the RDSS API specification.
Messages are deleted from SQS as soon as they're processed. This includes cases where the processing have failed, e.g. validation or handler error. The visibility timeout is set by the SQS queue owner under the assumption that the underlying preservation system is capable to process the requests within the window given (the maximum is 12 hours).
Potential improvements:
* Create a limited number of processors to avoid bursting.
- Increase throughput: sqs.DeleteMessageBatch, multiple consumers, etc... Low priority since we don't expect many messages.
- Handlers could take a long time to complete. Do we need cancellation? What are we doing when we exceed the visibility timeout? Is the adapter accountable?
func New ¶
func New( logger logrus.FieldLogger, validator message.Validator, sqsClient sqsiface.SQSAPI, sqsQueueMainURL string, snsClient snsiface.SNSAPI, snsTopicMainARN, snsTopicInvalidARN, snsTopicErrorARN string, dynamodbClient dynamodbiface.DynamoDBAPI, dynamodbTable string, incomingMessages prometheus.Counter) *Broker
New returns a usable Broker.
Example ¶
package main import ( "context" "fmt" "sync" "time" "github.com/JiscSD/rdss-archivematica-channel-adapter/broker" "github.com/JiscSD/rdss-archivematica-channel-adapter/broker/message" "github.com/JiscSD/rdss-archivematica-channel-adapter/internal/testutil" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/service/dynamodb" "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" "github.com/aws/aws-sdk-go/service/sns" "github.com/aws/aws-sdk-go/service/sns/snsiface" "github.com/aws/aws-sdk-go/service/sqs" "github.com/aws/aws-sdk-go/service/sqs/sqsiface" "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" ) type sqsMock struct { sqsiface.SQSAPI count int } func (m *sqsMock) DeleteMessageWithContext(aws.Context, *sqs.DeleteMessageInput, ...request.Option) (*sqs.DeleteMessageOutput, error) { return &sqs.DeleteMessageOutput{}, nil } func (m *sqsMock) ReceiveMessageWithContext(aws.Context, *sqs.ReceiveMessageInput, ...request.Option) (*sqs.ReceiveMessageOutput, error) { m.count++ switch m.count { case 1: blob := testutil.MustSpecFixture("../message-api-spec/messages/example_message.json") return &sqs.ReceiveMessageOutput{ Messages: []*sqs.Message{ &sqs.Message{ Body: aws.String(string(blob)), }, }, }, nil default: // When this method is called again. time.Sleep(time.Millisecond * 1) return &sqs.ReceiveMessageOutput{ Messages: []*sqs.Message{}, }, nil } } type snsMock struct { snsiface.SNSAPI } func (m *snsMock) PublishWithContext(aws.Context, *sns.PublishInput, ...request.Option) (*sns.PublishOutput, error) { return &sns.PublishOutput{}, nil } type dynaMock struct { dynamodbiface.DynamoDBAPI } func (m *dynaMock) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) { return &dynamodb.GetItemOutput{}, nil } func (m *dynaMock) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) { return &dynamodb.PutItemOutput{}, nil } func main() { // Create the broker client. b := broker.New( logrus.StandardLogger(), &message.NoOpValidatorImpl{}, &sqsMock{}, "http://localhost:4576/queue/main", &snsMock{}, "arn:aws:sns:us-east-1:123456789012:main", "arn:aws:sns:us-east-1:123456789012:invalid", "arn:aws:sns:us-east-1:123456789012:error", &dynaMock{}, "local_data_repository", prometheus.NewCounter(prometheus.CounterOpts{}), ) var wg sync.WaitGroup wg.Add(1) // We can subscribe a handler for a particular message type. The handler // is executed by the broker as soon as the message is received. b.Subscribe(message.MessageTypeEnum_MetadataCreate, func(m *message.Message) error { defer wg.Done() fmt.Println("[MetadataCreate] Message received!") return nil }) // Run the broker client. go b.Run() // We can use the broker client to publish messages too. b.Metadata.Create(context.TODO(), &message.MetadataCreateRequest{}) // Stop the broker client - but not until our handler runs. wg.Wait() b.Stop() }
Output: [MetadataCreate] Message received!
func (*Broker) RequestResponse ¶
RequestResponse sends a request and waits until a response is received.
func (*Broker) Subscribe ¶
func (s *Broker) Subscribe(t message.MessageTypeEnum, h MessageHandler)
Subscribe a handler to a specific message type.
type MessageHandler ¶
MessageHandler is a function supplied by message subscribers.
type MetadataService ¶
type MetadataService interface { Create(context.Context, *message.MetadataCreateRequest) error Read(context.Context, *message.MetadataReadRequest) (*message.MetadataReadResponse, error) Update(context.Context, *message.MetadataUpdateRequest) error Delete(context.Context, *message.MetadataDeleteRequest) error }
MetadataService generates Metadata-type messages.
type MetadataServiceOp ¶
type MetadataServiceOp struct {
// contains filtered or unexported fields
}
MetadataServiceOp implements MetadataService.
func (*MetadataServiceOp) Create ¶
func (s *MetadataServiceOp) Create(ctx context.Context, req *message.MetadataCreateRequest) error
Create publishes a MetadataCreate message.
func (*MetadataServiceOp) Delete ¶
func (s *MetadataServiceOp) Delete(ctx context.Context, req *message.MetadataDeleteRequest) error
Delete publishes a MetadataDelete message.
func (*MetadataServiceOp) Read ¶
func (s *MetadataServiceOp) Read(ctx context.Context, req *message.MetadataReadRequest) (*message.MetadataReadResponse, error)
Read publishes a MetadataRead message.
func (*MetadataServiceOp) Update ¶
func (s *MetadataServiceOp) Update(ctx context.Context, req *message.MetadataUpdateRequest) error
Update publishes a MetadataUpdate message.
type PreservationService ¶
type PreservationService interface {
Event(context.Context, *message.PreservationEventRequest) error
}
PreservationService publishes Preservation-type messages.
type PreservationServiceOp ¶
type PreservationServiceOp struct {
// contains filtered or unexported fields
}
PreservationServiceOp implements PreservationService.
func (*PreservationServiceOp) Event ¶
func (s *PreservationServiceOp) Event(ctx context.Context, req *message.PreservationEventRequest) error
Event publishes a PreservationEvent message.