Documentation ¶
Index ¶
- func NewDuplicateEventError(eventInStore cloudevents.Event) error
- func NewFossilServer(collector Collector, factory *EventStreamFactory, store EventStore, ...) *chi.Mux
- type Collector
- type CollectorRouter
- type ConsumerTimedOut
- type ConsumerWaiter
- type ConsumerWaiterRouter
- type DefaultCollector
- type DistributedLock
- type DuplicateEventError
- type EventAcknowledgment
- type EventLoader
- type EventNotFound
- type EventStore
- type EventStreamFactory
- type InMemoryPublisher
- type InMemoryStorage
- func (s *InMemoryStorage) Find(ctx context.Context, identifier string) (*cloudevents.Event, error)
- func (s *InMemoryStorage) MatchingStream(ctx context.Context, matcher events.Matcher) chan cloudevents.Event
- func (s *InMemoryStorage) Store(ctx context.Context, stream string, event *cloudevents.Event) error
- type NamedConsumers
- type Publisher
- type Router
- type SSERouter
- type SequenceNumberDoNotMatchError
- type WaitConsumerConfiguration
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func NewDuplicateEventError ¶
func NewDuplicateEventError(eventInStore cloudevents.Event) error
func NewFossilServer ¶
func NewFossilServer( collector Collector, factory *EventStreamFactory, store EventStore, loader EventLoader, lock DistributedLock, jwtTokenSecret string, ) *chi.Mux
Types ¶
type Collector ¶
type Collector interface {
Collect(context context.Context, event *cloudevents.Event) error
}
type CollectorRouter ¶
type CollectorRouter struct {
// contains filtered or unexported fields
}
func NewCollectorRouter ¶
func NewCollectorRouter(collector Collector, waiter *ConsumerWaiter) *CollectorRouter
func (*CollectorRouter) CollectEvent ¶
func (r *CollectorRouter) CollectEvent(w http.ResponseWriter, request *http.Request)
func (*CollectorRouter) Mount ¶
func (r *CollectorRouter) Mount(router *chi.Mux)
type ConsumerTimedOut ¶
type ConsumerTimedOut struct {
WaitConsumerConfiguration
}
func (*ConsumerTimedOut) Error ¶
func (e *ConsumerTimedOut) Error() string
type ConsumerWaiter ¶
type ConsumerWaiter struct {
// contains filtered or unexported fields
}
func NewConsumerWaiter ¶
func NewConsumerWaiter(broadcaster *concurrency.ChannelBroadcaster) *ConsumerWaiter
func (*ConsumerWaiter) WaitFor ¶
func (w *ConsumerWaiter) WaitFor(ctx context.Context, configurations []WaitConsumerConfiguration) chan error
type ConsumerWaiterRouter ¶
type ConsumerWaiterRouter struct {
// contains filtered or unexported fields
}
func NewConsumerWaiterRouter ¶
func NewConsumerWaiterRouter(collector Collector) *ConsumerWaiterRouter
func (*ConsumerWaiterRouter) Ack ¶
func (cwr *ConsumerWaiterRouter) Ack(rw http.ResponseWriter, req *http.Request)
func (*ConsumerWaiterRouter) Mount ¶
func (cwr *ConsumerWaiterRouter) Mount(router *chi.Mux)
type DefaultCollector ¶
type DefaultCollector struct {
// contains filtered or unexported fields
}
func NewCollector ¶
func NewCollector(store EventStore, publisher Publisher) *DefaultCollector
func (*DefaultCollector) Collect ¶
func (c *DefaultCollector) Collect(context context.Context, event *cloudevents.Event) error
type DistributedLock ¶
type DuplicateEventError ¶
type DuplicateEventError struct {
// contains filtered or unexported fields
}
func (*DuplicateEventError) Error ¶
func (e *DuplicateEventError) Error() string
func (*DuplicateEventError) EventInStore ¶
func (e *DuplicateEventError) EventInStore() cloudevents.Event
type EventAcknowledgment ¶
type EventLoader ¶
type EventNotFound ¶
type EventNotFound struct{}
func (*EventNotFound) Error ¶
func (e *EventNotFound) Error() string
type EventStore ¶
type EventStreamFactory ¶
type EventStreamFactory struct { Source chan cloudevents.Event Broadcaster *concurrency.ChannelBroadcaster // contains filtered or unexported fields }
func NewEventStreamFactory ¶
func NewEventStreamFactory(loader EventLoader) *EventStreamFactory
func (*EventStreamFactory) NewEventStream ¶
func (f *EventStreamFactory) NewEventStream(ctx context.Context, matcher events.Matcher) chan cloudevents.Event
type InMemoryPublisher ¶
type InMemoryPublisher struct {
Events []*cloudevents.Event
}
func NewInMemoryPublisher ¶
func NewInMemoryPublisher() *InMemoryPublisher
func (*InMemoryPublisher) Publish ¶
func (p *InMemoryPublisher) Publish(ctx context.Context, stream string, event *cloudevents.Event) error
type InMemoryStorage ¶
type InMemoryStorage struct {
Events []cloudevents.Event
}
func NewInMemoryStorage ¶
func NewInMemoryStorage() *InMemoryStorage
func (*InMemoryStorage) Find ¶
func (s *InMemoryStorage) Find(ctx context.Context, identifier string) (*cloudevents.Event, error)
func (*InMemoryStorage) MatchingStream ¶
func (s *InMemoryStorage) MatchingStream(ctx context.Context, matcher events.Matcher) chan cloudevents.Event
func (*InMemoryStorage) Store ¶
func (s *InMemoryStorage) Store(ctx context.Context, stream string, event *cloudevents.Event) error
type NamedConsumers ¶
type NamedConsumers struct {
// contains filtered or unexported fields
}
func NewNamedConsumers ¶
func NewNamedConsumers(sseRouter *SSERouter, store EventStore, loader EventLoader, lock DistributedLock) *NamedConsumers
func (*NamedConsumers) CommitOffset ¶
func (cg *NamedConsumers) CommitOffset(rw http.ResponseWriter, req *http.Request)
func (*NamedConsumers) Mount ¶
func (cg *NamedConsumers) Mount(router *chi.Mux)
func (*NamedConsumers) Stream ¶
func (cg *NamedConsumers) Stream(rw http.ResponseWriter, req *http.Request)
type SSERouter ¶
type SSERouter struct {
// contains filtered or unexported fields
}
func NewSSERouter ¶
func NewSSERouter(eventStreamFactory *EventStreamFactory, store EventStore) *SSERouter
func (*SSERouter) StreamEvents ¶
func (r *SSERouter) StreamEvents(rw http.ResponseWriter, req *http.Request)
type SequenceNumberDoNotMatchError ¶
type SequenceNumberDoNotMatchError struct{}
func (*SequenceNumberDoNotMatchError) Error ¶
func (e *SequenceNumberDoNotMatchError) Error() string
Click to show internal directories.
Click to hide internal directories.