pubsub

package module
v2.0.0-prerelea...-7b5f6b1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2024 License: MIT Imports: 30 Imported by: 0

README

go-pubsub

Pubsub implementation for Go based on Postgres

Documentation

Index

Constants

View Source
const JSONContentType = "application/json"
View Source
const TopicChannelPrefix = "pubsub_topic:"

Variables

View Source
var ErrDecoderNotRegistered = errors.New("pubsub: decoder not registered")
View Source
var ErrEncoderRequired = errors.New("pubsub: encoder required")
View Source
var ErrTopicInvalidChannelPrefix = errors.New("pubsub: channel does not start with expected prefix")
View Source
var ErrTopicInvalidID = errors.New("pubsub: invalid topic id")
View Source
var ErrTopicUnknown = errors.New("pubsub: unknown topic")
View Source
var ErrTopicValidation = errors.New("pubsub: invalid topic")
View Source
var ErrTypeUnknown = errors.New("pubsub: unknown type")

Functions

func AssertMessageEventForReceipt

func AssertMessageEventForReceipt(t *testing.T, event MessageEvent, receipt *PublishReceipt)

func AssertNoQueuedEvents

func AssertNoQueuedEvents(t *testing.T, sub *Subscription)

Types

type Configuration

type Configuration struct {
	MissedMessageSeconds int32
}

func PatchConfiguration

func PatchConfiguration(
	ctx context.Context,
	querier postgres.Querier,
	patch ConfigurationPatch,
) (*Configuration, error)

func ReadConfiguration

func ReadConfiguration(
	ctx context.Context,
	querier postgres.Querier,
) (*Configuration, error)

type ConfigurationPatch

type ConfigurationPatch struct {
	MissedMessageSeconds optional.Value[int32]
}

type ControlNotification

type ControlNotification struct {
	Operation ControlOperation `json:"operation"`
}

type ControlOperation

type ControlOperation string
const (
	StopSubscriptionsControlOperation ControlOperation = "stop-subscriptions"
)

type DataStore

type DataStore interface {
	AcquireConnection(ctx context.Context, dbPool *pgxpool.Pool) (*pgxpool.Conn, error)
	PatchConfiguration(ctx context.Context, querier postgres.Querier, patch ConfigurationPatch) (*Configuration, error)
	Publish(ctx context.Context, querier postgres.Querier, topicNames []string, value any, encodedValue *EncodedValue, publishedAt *time.Time) (*PublishReceipt, int64, error)
	ReadConfiguration(ctx context.Context, querier postgres.Querier) (*Configuration, error)
	ReadEncodedMessagesAfterID(ctx context.Context, querier postgres.Querier, messageID MessageID, topicNames []string) (chan common.Result[EncodedMessage], error)
	ReadEncodedMessagesWithID(ctx context.Context, querier postgres.Querier, messageID MessageID) (chan common.Result[EncodedMessage], error)
	ReadEncodedValue(ctx context.Context, querier postgres.Querier, messageID MessageID) (*EncodedValue, error)
	Subscribe(ctx context.Context, querier postgres.Querier, topicNames []string) (*SubscribeReceipt, error)
	UnmarshalMessageNotification(payload string) (*MessageNotification, error)
	WaitForNotification(ctx context.Context, conn *pgxpool.Conn) (*pgconn.Notification, error)
}

type Decoder

type Decoder interface {
	ContentType() string
	Types() map[string]reflect.Type
	Decode(ctx context.Context, encodedValue *EncodedValue) (any, error)
}

type EncodedMessage

type EncodedMessage struct {
	MessageID    MessageID
	Topic        Topic
	EncodedValue *EncodedValue
	PublishedAt  time.Time
}

type EncodedValue

type EncodedValue struct {
	ContentType string
	Bytes       []byte
}

type Encoder

type Encoder interface {
	Encode(ctx context.Context, v any) (*EncodedValue, error)
}

type ErrorEvent

type ErrorEvent struct {
	SubscriptionID SubscriptionID
	Error          error
}

type JSONDecoder

type JSONDecoder struct {
	// contains filtered or unexported fields
}

func NewJSONDecoder

func NewJSONDecoder(types ...reflect.Type) *JSONDecoder

func (*JSONDecoder) ContentType

func (d *JSONDecoder) ContentType() string

func (*JSONDecoder) Decode

func (d *JSONDecoder) Decode(
	ctx context.Context,
	encodedValue *EncodedValue,
) (any, error)

func (*JSONDecoder) Types

func (d *JSONDecoder) Types() map[string]reflect.Type

type JSONEncoder

type JSONEncoder struct{}

func NewJSONEncoder

func NewJSONEncoder() *JSONEncoder

func (*JSONEncoder) Encode

func (e *JSONEncoder) Encode(
	ctx context.Context,
	v any,
) (*EncodedValue, error)

type Message

type Message struct {
	ID          MessageID
	Value       any
	PublishedAt time.Time
}

type MessageEvent

type MessageEvent struct {
	SubscriptionID SubscriptionID
	Topic          Topic
	Message        Message
}

type MessageID

type MessageID int64

type MessageNotification

type MessageNotification struct {
	MessageID   MessageID `json:"message_id"`
	HasValue    bool      `json:"has_value"`
	PublishedAt time.Time `json:"published_at"`
}

type PostgresDataStore

type PostgresDataStore struct{}

func NewPostgresDataStore

func NewPostgresDataStore() *PostgresDataStore

func (*PostgresDataStore) AcquireConnection

func (sd *PostgresDataStore) AcquireConnection(
	ctx context.Context,
	dbPool *pgxpool.Pool,
) (*pgxpool.Conn, error)

func (*PostgresDataStore) PatchConfiguration

func (sd *PostgresDataStore) PatchConfiguration(
	ctx context.Context,
	querier postgres.Querier,
	patch ConfigurationPatch,
) (*Configuration, error)

func (*PostgresDataStore) Publish

func (sd *PostgresDataStore) Publish(
	ctx context.Context,
	querier postgres.Querier,
	topicNames []string,
	value any,
	encodedValue *EncodedValue,
	publishedAt *time.Time,
) (*PublishReceipt, int64, error)

func (*PostgresDataStore) ReadConfiguration

func (sd *PostgresDataStore) ReadConfiguration(
	ctx context.Context,
	querier postgres.Querier,
) (*Configuration, error)

func (*PostgresDataStore) ReadEncodedMessagesAfterID

func (sd *PostgresDataStore) ReadEncodedMessagesAfterID(
	ctx context.Context,
	querier postgres.Querier,
	messageID MessageID,
	topics []string,
) (chan common.Result[EncodedMessage], error)

func (*PostgresDataStore) ReadEncodedMessagesWithID

func (sd *PostgresDataStore) ReadEncodedMessagesWithID(
	ctx context.Context,
	querier postgres.Querier,
	messageID MessageID,
) (chan common.Result[EncodedMessage], error)

func (*PostgresDataStore) ReadEncodedValue

func (sd *PostgresDataStore) ReadEncodedValue(
	ctx context.Context,
	querier postgres.Querier,
	messageID MessageID,
) (*EncodedValue, error)

func (*PostgresDataStore) Subscribe

func (sd *PostgresDataStore) Subscribe(
	ctx context.Context,
	querier postgres.Querier,
	topicNames []string,
) (*SubscribeReceipt, error)

func (*PostgresDataStore) UnmarshalMessageNotification

func (sd *PostgresDataStore) UnmarshalMessageNotification(payload string) (*MessageNotification, error)

func (*PostgresDataStore) WaitForNotification

func (sd *PostgresDataStore) WaitForNotification(ctx context.Context, conn *pgxpool.Conn) (*pgconn.Notification, error)

type PublishReceipt

type PublishReceipt struct {
	MessageID    MessageID
	Topics       []Topic
	Value        any
	EncodedValue *EncodedValue
	PublishedAt  time.Time
}

func Publish

func Publish(
	ctx context.Context,
	querier postgres.Querier,
	value any,
	encoder Encoder,
	topicNames []string,
) (*PublishReceipt, int64, error)

type StatusEvent

type StatusEvent struct {
	SubscriptionID SubscriptionID
	Subscribed     optional.Value[bool]
	Closed         optional.Value[bool]
	Delay          optional.Value[time.Duration]
}

type SubscribeReceipt

type SubscribeReceipt struct {
	MaxMessageID MessageID
	Topics       []Topic
}

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func Subscribe

func Subscribe(
	ctx context.Context,
	dbPool *pgxpool.Pool,
	topics []string,
	decoders ...Decoder,
) (*Subscription, error)

func (*Subscription) Close

func (sub *Subscription) Close()

func (*Subscription) Event

func (sub *Subscription) Event() any

func (*Subscription) ID

func (sub *Subscription) ID() SubscriptionID

func (*Subscription) WaitForEvent

func (sub *Subscription) WaitForEvent(ctx context.Context, waiting ...chan bool) (any, error)

func (*Subscription) WaitUntilSubscribed

func (sub *Subscription) WaitUntilSubscribed(ctx context.Context) error

type SubscriptionID

type SubscriptionID string

type TestHarness

type TestHarness struct {
	// contains filtered or unexported fields
}

func NewTestHarness

func NewTestHarness(
	t *testing.T,
	dataStore DataStore,
) *TestHarness

func (*TestHarness) Close

func (h *TestHarness) Close()

func (*TestHarness) Context

func (h *TestHarness) Context() context.Context

func (*TestHarness) DBPool

func (h *TestHarness) DBPool() *pgxpool.Pool

func (*TestHarness) DataStore

func (h *TestHarness) DataStore() DataStore

func (*TestHarness) GenerateEncodedMessages

func (h *TestHarness) GenerateEncodedMessages(
	topicNames []string,
	values []any,
	receipts []*PublishReceipt,
) []EncodedMessage

func (*TestHarness) GenerateTopicNames

func (h *TestHarness) GenerateTopicNames(n int) []string

func (*TestHarness) GenerateValues

func (h *TestHarness) GenerateValues(n int) []any

func (*TestHarness) Logger

func (h *TestHarness) Logger() *slog.Logger

func (*TestHarness) PatchConfiguration

func (h *TestHarness) PatchConfiguration(
	patch ConfigurationPatch,
) *Configuration

func (*TestHarness) Publish

func (h *TestHarness) Publish(
	v any,
	encoder Encoder,
	topicNames []string,
) (*PublishReceipt, int64)

func (*TestHarness) PublishExpectingError

func (h *TestHarness) PublishExpectingError(
	expectedErr error,
	v any,
	encoder Encoder,
	topicNames []string,
)

func (*TestHarness) PublishMany

func (h *TestHarness) PublishMany(
	values []any,
	encoder Encoder,
	topicNames []string,
) ([]*PublishReceipt, int64)

func (*TestHarness) ReadConfiguration

func (h *TestHarness) ReadConfiguration() *Configuration

func (*TestHarness) ReadEncodedMessagesWithID

func (h *TestHarness) ReadEncodedMessagesWithID(
	messageID MessageID,
) []EncodedMessage

func (*TestHarness) SortEncodedMessages

func (h *TestHarness) SortEncodedMessages(m []EncodedMessage)

func (*TestHarness) Subscribe

func (h *TestHarness) Subscribe(
	topicNames []string,
	decoders ...Decoder,
) *Subscription

func (*TestHarness) SubscribeExpectingError

func (h *TestHarness) SubscribeExpectingError(
	expectedErr error,
	topicNames []string,
	decoders ...Decoder,
)

func (*TestHarness) WaitConsumeErrorEvent

func (h *TestHarness) WaitConsumeErrorEvent(
	sub *Subscription,
	testFn func(event ErrorEvent) bool,
	fns ...func(event ErrorEvent),
) ErrorEvent

func (*TestHarness) WaitConsumeErrorEventIs

func (h *TestHarness) WaitConsumeErrorEventIs(
	sub *Subscription,
	err error,
	fns ...func(event ErrorEvent),
) ErrorEvent

func (*TestHarness) WaitConsumeMessageEvent

func (h *TestHarness) WaitConsumeMessageEvent(
	sub *Subscription,
	testFn func(event MessageEvent) bool,
	fns ...func(event MessageEvent),
) MessageEvent

func (*TestHarness) WaitConsumeMessageEventID

func (h *TestHarness) WaitConsumeMessageEventID(
	sub *Subscription,
	messageID MessageID,
	fns ...func(event MessageEvent),
) MessageEvent

func (*TestHarness) WaitConsumeStatusEvent

func (h *TestHarness) WaitConsumeStatusEvent(
	sub *Subscription,
	testFn func(event StatusEvent) bool,
	fns ...func(event StatusEvent),
) StatusEvent

func (*TestHarness) WaitConsumeStatusEventClosed

func (h *TestHarness) WaitConsumeStatusEventClosed(
	sub *Subscription,
	fns ...func(event StatusEvent),
) StatusEvent

func (*TestHarness) WaitConsumeStatusEventDelayed

func (h *TestHarness) WaitConsumeStatusEventDelayed(
	sub *Subscription,
	fns ...func(event StatusEvent),
) StatusEvent

func (*TestHarness) WaitConsumeStatusEventSubscribed

func (h *TestHarness) WaitConsumeStatusEventSubscribed(
	sub *Subscription,
	subscribed bool,
	fns ...func(event StatusEvent),
) StatusEvent

type TestValue

type TestValue struct {
	Value int64 `json:"value"`
}

type Topic

type Topic struct {
	ID   TopicID
	Name string
}

type TopicID

type TopicID int64

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL