Documentation
¶
Index ¶
- Constants
- Variables
- func AssertMessageEventForReceipt(t *testing.T, event MessageEvent, receipt *PublishReceipt)
- func AssertNoQueuedEvents(t *testing.T, sub *Subscription)
- type Configuration
- type ConfigurationPatch
- type ControlNotification
- type ControlOperation
- type DataStore
- type Decoder
- type EncodedMessage
- type EncodedValue
- type Encoder
- type ErrorEvent
- type JSONDecoder
- type JSONEncoder
- type Message
- type MessageEvent
- type MessageID
- type MessageNotification
- type PostgresDataStore
- func (sd *PostgresDataStore) AcquireConnection(ctx context.Context, dbPool *pgxpool.Pool) (*pgxpool.Conn, error)
- func (sd *PostgresDataStore) PatchConfiguration(ctx context.Context, querier postgres.Querier, patch ConfigurationPatch) (*Configuration, error)
- func (sd *PostgresDataStore) Publish(ctx context.Context, querier postgres.Querier, topicNames []string, value any, ...) (*PublishReceipt, int64, error)
- func (sd *PostgresDataStore) ReadConfiguration(ctx context.Context, querier postgres.Querier) (*Configuration, error)
- func (sd *PostgresDataStore) ReadEncodedMessagesAfterID(ctx context.Context, querier postgres.Querier, messageID MessageID, ...) (chan common.Result[EncodedMessage], error)
- func (sd *PostgresDataStore) ReadEncodedMessagesWithID(ctx context.Context, querier postgres.Querier, messageID MessageID) (chan common.Result[EncodedMessage], error)
- func (sd *PostgresDataStore) ReadEncodedValue(ctx context.Context, querier postgres.Querier, messageID MessageID) (*EncodedValue, error)
- func (sd *PostgresDataStore) Subscribe(ctx context.Context, querier postgres.Querier, topicNames []string) (*SubscribeReceipt, error)
- func (sd *PostgresDataStore) UnmarshalMessageNotification(payload string) (*MessageNotification, error)
- func (sd *PostgresDataStore) WaitForNotification(ctx context.Context, conn *pgxpool.Conn) (*pgconn.Notification, error)
- type PublishReceipt
- type StatusEvent
- type SubscribeReceipt
- type Subscription
- type SubscriptionID
- type TestHarness
- func (h *TestHarness) Close()
- func (h *TestHarness) Context() context.Context
- func (h *TestHarness) DBPool() *pgxpool.Pool
- func (h *TestHarness) DataStore() DataStore
- func (h *TestHarness) GenerateEncodedMessages(topicNames []string, values []any, receipts []*PublishReceipt) []EncodedMessage
- func (h *TestHarness) GenerateTopicNames(n int) []string
- func (h *TestHarness) GenerateValues(n int) []any
- func (h *TestHarness) Logger() *slog.Logger
- func (h *TestHarness) PatchConfiguration(patch ConfigurationPatch) *Configuration
- func (h *TestHarness) Publish(v any, encoder Encoder, topicNames []string) (*PublishReceipt, int64)
- func (h *TestHarness) PublishExpectingError(expectedErr error, v any, encoder Encoder, topicNames []string)
- func (h *TestHarness) PublishMany(values []any, encoder Encoder, topicNames []string) ([]*PublishReceipt, int64)
- func (h *TestHarness) ReadConfiguration() *Configuration
- func (h *TestHarness) ReadEncodedMessagesWithID(messageID MessageID) []EncodedMessage
- func (h *TestHarness) SortEncodedMessages(m []EncodedMessage)
- func (h *TestHarness) Subscribe(topicNames []string, decoders ...Decoder) *Subscription
- func (h *TestHarness) SubscribeExpectingError(expectedErr error, topicNames []string, decoders ...Decoder)
- func (h *TestHarness) WaitConsumeErrorEvent(sub *Subscription, testFn func(event ErrorEvent) bool, ...) ErrorEvent
- func (h *TestHarness) WaitConsumeErrorEventIs(sub *Subscription, err error, fns ...func(event ErrorEvent)) ErrorEvent
- func (h *TestHarness) WaitConsumeMessageEvent(sub *Subscription, testFn func(event MessageEvent) bool, ...) MessageEvent
- func (h *TestHarness) WaitConsumeMessageEventID(sub *Subscription, messageID MessageID, fns ...func(event MessageEvent)) MessageEvent
- func (h *TestHarness) WaitConsumeStatusEvent(sub *Subscription, testFn func(event StatusEvent) bool, ...) StatusEvent
- func (h *TestHarness) WaitConsumeStatusEventClosed(sub *Subscription, fns ...func(event StatusEvent)) StatusEvent
- func (h *TestHarness) WaitConsumeStatusEventDelayed(sub *Subscription, fns ...func(event StatusEvent)) StatusEvent
- func (h *TestHarness) WaitConsumeStatusEventSubscribed(sub *Subscription, subscribed bool, fns ...func(event StatusEvent)) StatusEvent
- type TestValue
- type Topic
- type TopicID
Constants ¶
View Source
const JSONContentType = "application/json"
View Source
const TopicChannelPrefix = "pubsub_topic:"
Variables ¶
View Source
var ErrDecoderNotRegistered = errors.New("pubsub: decoder not registered")
View Source
var ErrEncoderRequired = errors.New("pubsub: encoder required")
View Source
var ErrTopicInvalidChannelPrefix = errors.New("pubsub: channel does not start with expected prefix")
View Source
var ErrTopicInvalidID = errors.New("pubsub: invalid topic id")
View Source
var ErrTopicUnknown = errors.New("pubsub: unknown topic")
View Source
var ErrTopicValidation = errors.New("pubsub: invalid topic")
View Source
var ErrTypeUnknown = errors.New("pubsub: unknown type")
Functions ¶
func AssertMessageEventForReceipt ¶
func AssertMessageEventForReceipt(t *testing.T, event MessageEvent, receipt *PublishReceipt)
func AssertNoQueuedEvents ¶
func AssertNoQueuedEvents(t *testing.T, sub *Subscription)
Types ¶
type Configuration ¶
type Configuration struct {
MissedMessageSeconds int32
}
func PatchConfiguration ¶
func PatchConfiguration( ctx context.Context, querier postgres.Querier, patch ConfigurationPatch, ) (*Configuration, error)
func ReadConfiguration ¶
type ConfigurationPatch ¶
type ControlNotification ¶
type ControlNotification struct {
Operation ControlOperation `json:"operation"`
}
type ControlOperation ¶
type ControlOperation string
const (
StopSubscriptionsControlOperation ControlOperation = "stop-subscriptions"
)
type DataStore ¶
type DataStore interface { AcquireConnection(ctx context.Context, dbPool *pgxpool.Pool) (*pgxpool.Conn, error) PatchConfiguration(ctx context.Context, querier postgres.Querier, patch ConfigurationPatch) (*Configuration, error) Publish(ctx context.Context, querier postgres.Querier, topicNames []string, value any, encodedValue *EncodedValue, publishedAt *time.Time) (*PublishReceipt, int64, error) ReadConfiguration(ctx context.Context, querier postgres.Querier) (*Configuration, error) ReadEncodedMessagesAfterID(ctx context.Context, querier postgres.Querier, messageID MessageID, topicNames []string) (chan common.Result[EncodedMessage], error) ReadEncodedMessagesWithID(ctx context.Context, querier postgres.Querier, messageID MessageID) (chan common.Result[EncodedMessage], error) ReadEncodedValue(ctx context.Context, querier postgres.Querier, messageID MessageID) (*EncodedValue, error) Subscribe(ctx context.Context, querier postgres.Querier, topicNames []string) (*SubscribeReceipt, error) UnmarshalMessageNotification(payload string) (*MessageNotification, error) WaitForNotification(ctx context.Context, conn *pgxpool.Conn) (*pgconn.Notification, error) }
type EncodedMessage ¶
type EncodedMessage struct { MessageID MessageID Topic Topic EncodedValue *EncodedValue PublishedAt time.Time }
type EncodedValue ¶
type ErrorEvent ¶
type ErrorEvent struct { SubscriptionID SubscriptionID Error error }
type JSONDecoder ¶
type JSONDecoder struct {
// contains filtered or unexported fields
}
func NewJSONDecoder ¶
func NewJSONDecoder(types ...reflect.Type) *JSONDecoder
func (*JSONDecoder) ContentType ¶
func (d *JSONDecoder) ContentType() string
func (*JSONDecoder) Decode ¶
func (d *JSONDecoder) Decode( ctx context.Context, encodedValue *EncodedValue, ) (any, error)
type JSONEncoder ¶
type JSONEncoder struct{}
func NewJSONEncoder ¶
func NewJSONEncoder() *JSONEncoder
func (*JSONEncoder) Encode ¶
func (e *JSONEncoder) Encode( ctx context.Context, v any, ) (*EncodedValue, error)
type MessageEvent ¶
type MessageEvent struct { SubscriptionID SubscriptionID Topic Topic Message Message }
type MessageNotification ¶
type PostgresDataStore ¶
type PostgresDataStore struct{}
func NewPostgresDataStore ¶
func NewPostgresDataStore() *PostgresDataStore
func (*PostgresDataStore) AcquireConnection ¶
func (*PostgresDataStore) PatchConfiguration ¶
func (sd *PostgresDataStore) PatchConfiguration( ctx context.Context, querier postgres.Querier, patch ConfigurationPatch, ) (*Configuration, error)
func (*PostgresDataStore) Publish ¶
func (sd *PostgresDataStore) Publish( ctx context.Context, querier postgres.Querier, topicNames []string, value any, encodedValue *EncodedValue, publishedAt *time.Time, ) (*PublishReceipt, int64, error)
func (*PostgresDataStore) ReadConfiguration ¶
func (sd *PostgresDataStore) ReadConfiguration( ctx context.Context, querier postgres.Querier, ) (*Configuration, error)
func (*PostgresDataStore) ReadEncodedMessagesAfterID ¶
func (sd *PostgresDataStore) ReadEncodedMessagesAfterID( ctx context.Context, querier postgres.Querier, messageID MessageID, topics []string, ) (chan common.Result[EncodedMessage], error)
func (*PostgresDataStore) ReadEncodedMessagesWithID ¶
func (sd *PostgresDataStore) ReadEncodedMessagesWithID( ctx context.Context, querier postgres.Querier, messageID MessageID, ) (chan common.Result[EncodedMessage], error)
func (*PostgresDataStore) ReadEncodedValue ¶
func (sd *PostgresDataStore) ReadEncodedValue( ctx context.Context, querier postgres.Querier, messageID MessageID, ) (*EncodedValue, error)
func (*PostgresDataStore) Subscribe ¶
func (sd *PostgresDataStore) Subscribe( ctx context.Context, querier postgres.Querier, topicNames []string, ) (*SubscribeReceipt, error)
func (*PostgresDataStore) UnmarshalMessageNotification ¶
func (sd *PostgresDataStore) UnmarshalMessageNotification(payload string) (*MessageNotification, error)
func (*PostgresDataStore) WaitForNotification ¶
func (sd *PostgresDataStore) WaitForNotification(ctx context.Context, conn *pgxpool.Conn) (*pgconn.Notification, error)
type PublishReceipt ¶
type StatusEvent ¶
type SubscribeReceipt ¶
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
func (*Subscription) Close ¶
func (sub *Subscription) Close()
func (*Subscription) Event ¶
func (sub *Subscription) Event() any
func (*Subscription) ID ¶
func (sub *Subscription) ID() SubscriptionID
func (*Subscription) WaitForEvent ¶
func (*Subscription) WaitUntilSubscribed ¶
func (sub *Subscription) WaitUntilSubscribed(ctx context.Context) error
type SubscriptionID ¶
type SubscriptionID string
type TestHarness ¶
type TestHarness struct {
// contains filtered or unexported fields
}
func NewTestHarness ¶
func NewTestHarness( t *testing.T, dataStore DataStore, ) *TestHarness
func (*TestHarness) Close ¶
func (h *TestHarness) Close()
func (*TestHarness) Context ¶
func (h *TestHarness) Context() context.Context
func (*TestHarness) DBPool ¶
func (h *TestHarness) DBPool() *pgxpool.Pool
func (*TestHarness) DataStore ¶
func (h *TestHarness) DataStore() DataStore
func (*TestHarness) GenerateEncodedMessages ¶
func (h *TestHarness) GenerateEncodedMessages( topicNames []string, values []any, receipts []*PublishReceipt, ) []EncodedMessage
func (*TestHarness) GenerateTopicNames ¶
func (h *TestHarness) GenerateTopicNames(n int) []string
func (*TestHarness) GenerateValues ¶
func (h *TestHarness) GenerateValues(n int) []any
func (*TestHarness) Logger ¶
func (h *TestHarness) Logger() *slog.Logger
func (*TestHarness) PatchConfiguration ¶
func (h *TestHarness) PatchConfiguration( patch ConfigurationPatch, ) *Configuration
func (*TestHarness) Publish ¶
func (h *TestHarness) Publish( v any, encoder Encoder, topicNames []string, ) (*PublishReceipt, int64)
func (*TestHarness) PublishExpectingError ¶
func (h *TestHarness) PublishExpectingError( expectedErr error, v any, encoder Encoder, topicNames []string, )
func (*TestHarness) PublishMany ¶
func (h *TestHarness) PublishMany( values []any, encoder Encoder, topicNames []string, ) ([]*PublishReceipt, int64)
func (*TestHarness) ReadConfiguration ¶
func (h *TestHarness) ReadConfiguration() *Configuration
func (*TestHarness) ReadEncodedMessagesWithID ¶
func (h *TestHarness) ReadEncodedMessagesWithID( messageID MessageID, ) []EncodedMessage
func (*TestHarness) SortEncodedMessages ¶
func (h *TestHarness) SortEncodedMessages(m []EncodedMessage)
func (*TestHarness) Subscribe ¶
func (h *TestHarness) Subscribe( topicNames []string, decoders ...Decoder, ) *Subscription
func (*TestHarness) SubscribeExpectingError ¶
func (h *TestHarness) SubscribeExpectingError( expectedErr error, topicNames []string, decoders ...Decoder, )
func (*TestHarness) WaitConsumeErrorEvent ¶
func (h *TestHarness) WaitConsumeErrorEvent( sub *Subscription, testFn func(event ErrorEvent) bool, fns ...func(event ErrorEvent), ) ErrorEvent
func (*TestHarness) WaitConsumeErrorEventIs ¶
func (h *TestHarness) WaitConsumeErrorEventIs( sub *Subscription, err error, fns ...func(event ErrorEvent), ) ErrorEvent
func (*TestHarness) WaitConsumeMessageEvent ¶
func (h *TestHarness) WaitConsumeMessageEvent( sub *Subscription, testFn func(event MessageEvent) bool, fns ...func(event MessageEvent), ) MessageEvent
func (*TestHarness) WaitConsumeMessageEventID ¶
func (h *TestHarness) WaitConsumeMessageEventID( sub *Subscription, messageID MessageID, fns ...func(event MessageEvent), ) MessageEvent
func (*TestHarness) WaitConsumeStatusEvent ¶
func (h *TestHarness) WaitConsumeStatusEvent( sub *Subscription, testFn func(event StatusEvent) bool, fns ...func(event StatusEvent), ) StatusEvent
func (*TestHarness) WaitConsumeStatusEventClosed ¶
func (h *TestHarness) WaitConsumeStatusEventClosed( sub *Subscription, fns ...func(event StatusEvent), ) StatusEvent
func (*TestHarness) WaitConsumeStatusEventDelayed ¶
func (h *TestHarness) WaitConsumeStatusEventDelayed( sub *Subscription, fns ...func(event StatusEvent), ) StatusEvent
func (*TestHarness) WaitConsumeStatusEventSubscribed ¶
func (h *TestHarness) WaitConsumeStatusEventSubscribed( sub *Subscription, subscribed bool, fns ...func(event StatusEvent), ) StatusEvent
Source Files
¶
Click to show internal directories.
Click to hide internal directories.