Documentation
¶
Index ¶
- func ParseMessage(ctx context.Context, kafkaMessage *kafka.Message) (messaging.Message, error)
- type Datastore
- type EventConsumer
- func (eventConsumer *EventConsumer) Close()
- func (eventConsumer *EventConsumer) Commit() error
- func (eventConsumer *EventConsumer) Poll(ctx context.Context) (messaging.Message, messaging.Topic, error)
- func (eventConsumer *EventConsumer) ProcessMessage(ctx context.Context, consumedTopic messaging.Topic, ...) error
- func (eventConsumer *EventConsumer) SetPostProcessHook(postProcessHook func(topic messaging.Topic, message messaging.Message) bool)
- func (eventConsumer *EventConsumer) SetPreParseHook(preParseHook func(topic messaging.Topic, message messaging.Message) bool)
- func (eventConsumer *EventConsumer) SetPreProcessHook(preProcessHook func(topic messaging.Topic, message messaging.Message) bool)
- func (eventConsumer *EventConsumer) SubscribeTopicPattern(regexp *regexp.Regexp) error
- func (eventConsumer *EventConsumer) SubscribeTopics(topics messaging.Topics) error
- type EventProducer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ParseMessage ¶
ParseMessage parses a message on the form below and return a corresponding Message object:
{ "header": { "sender": "obs-input-service", "bodyCategory": "FHIR", "bodyType": "Observation", "contentVersion": "3.3.0", "prefer": "return=representation", "etag": "W/\"4232\"", "location":"http://fhirtest.uhn.ca/baseDstu3/Observation/15354/_history/1" "correlationId": "xez5ZXQcDG6" "transactionId": "e6651fe2-fb8e-4a54-8b8e-7343dbdb997c" "security": ... }, "body": { "resourceType": "Observation", ... } }
Types ¶
type Datastore ¶
type Datastore struct {
// contains filtered or unexported fields
}
Datastore is a data storage struct that is used to mock the Kafka message queue
func GetDatastore ¶
func GetDatastore() *Datastore
GetDatastore returns either an existing instance of the Datastore, if one exists, or a new one.
func (*Datastore) Retrieve ¶
Retrieve retrieves the oldest kafka.Message stored under a specific topic from the Datastore
type EventConsumer ¶
type EventConsumer struct { PreParseHook func(topic messaging.Topic, message messaging.Message) bool PreProcessHook func(topic messaging.Topic, message messaging.Message) bool PostProcessHook func(topic messaging.Topic, message messaging.Message) bool // contains filtered or unexported fields }
EventConsumer is a struct for representing an event consumer Mock implementation
func NewEventConsumer ¶
func NewEventConsumer(ctx context.Context, processor messaging.EventProcessor) (EventConsumer, error)
NewEventConsumer functions as a constructor for EventConsumer that returns a new instance of EventConsumer
func (*EventConsumer) Close ¶
func (eventConsumer *EventConsumer) Close()
Close does nothing and is only implemented to implement the interface
func (*EventConsumer) Commit ¶
func (eventConsumer *EventConsumer) Commit() error
Commit commits offset for currently assigned partitions This is a blocking call
func (*EventConsumer) Poll ¶
func (eventConsumer *EventConsumer) Poll(ctx context.Context) (messaging.Message, messaging.Topic, error)
Poll polls the topics subscribed to by the EventConsumer
func (*EventConsumer) ProcessMessage ¶
func (eventConsumer *EventConsumer) ProcessMessage(ctx context.Context, consumedTopic messaging.Topic, receivedMessage messaging.Message, messageProcessedTopic *messaging.Topic, outgoingMessage *messaging.Message) error
ProcessMessage is...
func (*EventConsumer) SetPostProcessHook ¶
func (eventConsumer *EventConsumer) SetPostProcessHook(postProcessHook func(topic messaging.Topic, message messaging.Message) bool)
SetPostProcessHook is used to set a method that should be called after an incoming message is processed
func (*EventConsumer) SetPreParseHook ¶
func (eventConsumer *EventConsumer) SetPreParseHook(preParseHook func(topic messaging.Topic, message messaging.Message) bool)
SetPreParseHook is used to set a method that should be called before an incoming message is parsed
func (*EventConsumer) SetPreProcessHook ¶
func (eventConsumer *EventConsumer) SetPreProcessHook(preProcessHook func(topic messaging.Topic, message messaging.Message) bool)
SetPreProcessHook is used to set a method that should be called before an incoming message is processed
func (*EventConsumer) SubscribeTopicPattern ¶
func (eventConsumer *EventConsumer) SubscribeTopicPattern(regexp *regexp.Regexp) error
SubscribeTopicPattern subscribes the EventConsumer to all topics matching the provided regexp pattern
func (*EventConsumer) SubscribeTopics ¶
func (eventConsumer *EventConsumer) SubscribeTopics(topics messaging.Topics) error
SubscribeTopics subscribes the EventConsumer to topics based on provided Topic-slice
type EventProducer ¶
type EventProducer struct {
// contains filtered or unexported fields
}
EventProducer is a struct for representing an event producer Mock implementation
func NewEventProducer ¶
func NewEventProducer(ctx context.Context, sender string) (EventProducer, error)
NewEventProducer functions as a constructor for EventProducer that returns a new instance of EventProducer
func (*EventProducer) Close ¶
func (eventProducer *EventProducer) Close()
Close does nothing and is only implemented to implement the interface
func (*EventProducer) GetFutureTopic ¶
func (eventProducer *EventProducer) GetFutureTopic( ctx context.Context, topic messaging.Topic, timeout time.Duration, ) chan messaging.Message
GetFutureTopic returns a chan functioning as a future that resolves to a message, when one is available on the specified topic (if the timeout does not occur first)