esdb

package
v3.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 27, 2024 License: Apache-2.0 Imports: 33 Imported by: 30

Documentation

Overview

Package esdb EventStoreDB gRPC client.

Index

Constants

View Source
const (
	// UserStreamAcl Default system ACL.
	UserStreamAcl = "$userStreamAcl"
	// SystemStreamAcl Default users ACL.
	SystemStreamAcl = "$systemStreamAcl"
)
View Source
const (
	// EventFilterType the filter will be applied on an event's type.
	EventFilterType FilterType = 0
	// StreamFilterType the filter will be applied on a stream name.
	StreamFilterType FilterType = 1
	// NoMaxSearchWindow disables the max search window.
	NoMaxSearchWindow int = -1
)
View Source
const SubscriberCountUnlimited = 0

Variables

This section is empty.

Functions

This section is empty.

Types

type Acl

type Acl struct {
	// contains filtered or unexported fields
}

Acl Access Control List (ACL).

func (*Acl) AddDeleteRoles

func (a *Acl) AddDeleteRoles(roles ...string)

AddDeleteRoles Adds delete roles.

func (*Acl) AddMetaReadRoles

func (a *Acl) AddMetaReadRoles(roles ...string)

AddMetaReadRoles Adds metadata roles.

func (*Acl) AddMetaWriteRoles

func (a *Acl) AddMetaWriteRoles(roles ...string)

AddMetaWriteRoles Adds metadata write roles.

func (*Acl) AddReadRoles

func (a *Acl) AddReadRoles(roles ...string)

AddReadRoles Adds read roles.

func (*Acl) AddWriteRoles

func (a *Acl) AddWriteRoles(roles ...string)

AddWriteRoles Adds write roles.

func (*Acl) DeleteRoles

func (a *Acl) DeleteRoles() []string

DeleteRoles Returns delete roles.

func (*Acl) MetaReadRoles

func (a *Acl) MetaReadRoles() []string

MetaReadRoles Returns metadata read roles.

func (*Acl) MetaWriteRoles

func (a *Acl) MetaWriteRoles() []string

MetaWriteRoles Returns metadata write roles.

func (*Acl) ReadRoles

func (a *Acl) ReadRoles() []string

ReadRoles Returns read roles.

func (*Acl) WriteRoles

func (a *Acl) WriteRoles() []string

WriteRoles Returns write roles.

type AllPosition

type AllPosition interface {
	// contains filtered or unexported methods
}

AllPosition represents a logical position in the $all stream.

type Any

type Any struct{}

Any means the write should not conflict with anything and should always succeed.

type AppendToStreamOptions

type AppendToStreamOptions struct {
	// Asks the server to check that the stream receiving the event is at the given expected version.
	ExpectedRevision ExpectedRevision
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

AppendToStreamOptions options of the append stream request.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client Represents a client to a single node. A client instance maintains a full duplex communication to EventStoreDB. Many threads can use an EventStoreDB client at the same time or a single thread can make many asynchronous requests.

func NewClient

func NewClient(configuration *Configuration) (*Client, error)

NewClient Creates a gRPC client to an EventStoreDB database.

func (*Client) AppendToStream

func (client *Client) AppendToStream(
	context context.Context,
	streamID string,
	opts AppendToStreamOptions,
	events ...EventData,
) (*WriteResult, error)

AppendToStream Appends events to a given stream.

func (*Client) Close

func (client *Client) Close() error

Close Closes a connection and cleans all its allocated resources.

func (*Client) CreatePersistentSubscription

func (client *Client) CreatePersistentSubscription(
	ctx context.Context,
	streamName string,
	groupName string,
	options PersistentStreamSubscriptionOptions,
) error

CreatePersistentSubscription Creates a persistent subscription gorup on a stream.

Persistent subscriptions are special kind of subscription where the server remembers the state of the subscription. This allows for many modes of operations compared to a regular or catcup subscription where the client holds the subscription state.

func (*Client) CreatePersistentSubscriptionToAll

func (client *Client) CreatePersistentSubscriptionToAll(
	ctx context.Context,
	groupName string,
	options PersistentAllSubscriptionOptions,
) error

CreatePersistentSubscriptionToAll Creates a persistent subscription gorup on the $all stream.

Persistent subscriptions are special kind of subscription where the server remembers the state of the subscription. This allows for many modes of operations compared to a regular or catcup subscription where the client holds the subscription state.

func (*Client) DeletePersistentSubscription

func (client *Client) DeletePersistentSubscription(
	ctx context.Context,
	streamName string,
	groupName string,
	options DeletePersistentSubscriptionOptions,
) error

DeletePersistentSubscription Deletes a persistent subscription group on a stream.

func (*Client) DeletePersistentSubscriptionToAll

func (client *Client) DeletePersistentSubscriptionToAll(
	ctx context.Context,
	groupName string,
	options DeletePersistentSubscriptionOptions,
) error

DeletePersistentSubscriptionToAll Deletes a persistent subscription group on the $all stream.

func (*Client) DeleteStream

func (client *Client) DeleteStream(
	parent context.Context,
	streamID string,
	opts DeleteStreamOptions,
) (*DeleteResult, error)

DeleteStream Deletes a given stream.

Makes use of "Truncate Before". When a stream is deleted, it's "Truncate Before" is set to the stream's current last event number. When a deleted stream is read, the read will return a stream not found error. After deleting the stream, you are able to write to it again, continuing from where it left off.

func (*Client) GetPersistentSubscriptionInfo

func (client *Client) GetPersistentSubscriptionInfo(ctx context.Context, streamName string, groupName string, options GetPersistentSubscriptionOptions) (*PersistentSubscriptionInfo, error)

GetPersistentSubscriptionInfo Gets the info for a specific persistent subscription to a stream

func (*Client) GetPersistentSubscriptionInfoToAll

func (client *Client) GetPersistentSubscriptionInfoToAll(ctx context.Context, groupName string, options GetPersistentSubscriptionOptions) (*PersistentSubscriptionInfo, error)

GetPersistentSubscriptionInfoToAll Gets the info for a specific persistent subscription to the $all stream.

func (*Client) GetServerVersion

func (client *Client) GetServerVersion() (*ServerVersion, error)

GetServerVersion Returns the version of the EventStoreDB node to which the client is currently connected.

func (*Client) GetStreamMetadata

func (client *Client) GetStreamMetadata(
	context context.Context,
	streamID string,
	opts ReadStreamOptions,
) (*StreamMetadata, error)

GetStreamMetadata Reads the metadata for a stream.

func (*Client) ListAllPersistentSubscriptions

func (client *Client) ListAllPersistentSubscriptions(ctx context.Context, options ListPersistentSubscriptionsOptions) ([]PersistentSubscriptionInfo, error)

ListAllPersistentSubscriptions Lists all persistent subscriptions regardless of which stream they are on.

func (*Client) ListPersistentSubscriptionsForStream

func (client *Client) ListPersistentSubscriptionsForStream(ctx context.Context, streamName string, options ListPersistentSubscriptionsOptions) ([]PersistentSubscriptionInfo, error)

ListPersistentSubscriptionsForStream Lists all persistent subscriptions of a specific stream.

func (*Client) ListPersistentSubscriptionsToAll

func (client *Client) ListPersistentSubscriptionsToAll(ctx context.Context, options ListPersistentSubscriptionsOptions) ([]PersistentSubscriptionInfo, error)

ListPersistentSubscriptionsToAll Lists all persistent subscriptions specific to the $all stream.

func (*Client) ReadAll

func (client *Client) ReadAll(
	context context.Context,
	opts ReadAllOptions,
	count uint64,
) (*ReadStream, error)

ReadAll Reads events from the $all stream. The reading can be done forward and backward.

func (*Client) ReadStream

func (client *Client) ReadStream(
	context context.Context,
	streamID string,
	opts ReadStreamOptions,
	count uint64,
) (*ReadStream, error)

ReadStream Reads events from a given stream. The reading can be done forward and backward.

func (*Client) ReplayParkedMessages

func (client *Client) ReplayParkedMessages(ctx context.Context, streamName string, groupName string, options ReplayParkedMessagesOptions) error

ReplayParkedMessages Replays the parked messages of a persistent subscription to a stream.

func (*Client) ReplayParkedMessagesToAll

func (client *Client) ReplayParkedMessagesToAll(ctx context.Context, groupName string, options ReplayParkedMessagesOptions) error

ReplayParkedMessagesToAll Replays the parked messages of a persistent subscription to $all.

func (*Client) RestartPersistentSubscriptionSubsystem

func (client *Client) RestartPersistentSubscriptionSubsystem(ctx context.Context, options RestartPersistentSubscriptionSubsystemOptions) error

RestartPersistentSubscriptionSubsystem Restarts the persistent subscription subsystem on the server.

func (*Client) SetStreamMetadata

func (client *Client) SetStreamMetadata(
	context context.Context,
	streamID string,
	opts AppendToStreamOptions,
	metadata StreamMetadata,
) (*WriteResult, error)

SetStreamMetadata Sets the metadata for a stream.

func (*Client) SubscribeToAll

func (client *Client) SubscribeToAll(
	parent context.Context,
	opts SubscribeToAllOptions,
) (*Subscription, error)

SubscribeToAll allows you to subscribe to $all stream and receive notifications about new events added to the stream. The subscription will notify event from the starting point onward. If events already exist, the handler will be called for each event one by one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event appears.

func (*Client) SubscribeToPersistentSubscription

func (client *Client) SubscribeToPersistentSubscription(
	ctx context.Context,
	streamName string,
	groupName string,
	options SubscribeToPersistentSubscriptionOptions,
) (*PersistentSubscription, error)

SubscribeToPersistentSubscription Connects to a persistent subscription group on a stream.

func (*Client) SubscribeToPersistentSubscriptionToAll

func (client *Client) SubscribeToPersistentSubscriptionToAll(
	ctx context.Context,
	groupName string,
	options SubscribeToPersistentSubscriptionOptions,
) (*PersistentSubscription, error)

SubscribeToPersistentSubscriptionToAll Connects to a persistent subscription group to the $all stream.

func (*Client) SubscribeToStream

func (client *Client) SubscribeToStream(
	parent context.Context,
	streamID string,
	opts SubscribeToStreamOptions,
) (*Subscription, error)

SubscribeToStream allows you to subscribe to a stream and receive notifications about new events added to the stream. The subscription will notify event from the starting point onward. If events already exist, the handler will be called for each event one by one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event appears.

func (*Client) TombstoneStream

func (client *Client) TombstoneStream(
	parent context.Context,
	streamID string,
	opts TombstoneStreamOptions,
) (*DeleteResult, error)

TombstoneStream Permanently deletes a given stream by writing a Tombstone event to the end of the stream.

A Tombstone event is written to the end of the stream, permanently deleting it. The stream cannot be recreated or written to again. Tombstone events are written with the event's type "$streamDeleted". When a tombstoned stream is read, the read will return a stream deleted error.

func (*Client) UpdatePersistentSubscription

func (client *Client) UpdatePersistentSubscription(
	ctx context.Context,
	streamName string,
	groupName string,
	options PersistentStreamSubscriptionOptions,
) error

UpdatePersistentSubscription Updates a persistent subscription group on a stream.

func (*Client) UpdatePersistentSubscriptionToAll

func (client *Client) UpdatePersistentSubscriptionToAll(
	ctx context.Context,
	groupName string,
	options PersistentAllSubscriptionOptions,
) error

UpdatePersistentSubscriptionToAll Updates a persistent subscription group on the $all stream.

type Configuration

type Configuration struct {
	// The URI of the EventStoreDB. Use this when connecting to a single node.
	// Example: localhost:2113
	Address string

	// An array of end points used to seed gossip.
	GossipSeeds []*EndPoint

	// Disable communicating over a secure channel.
	DisableTLS bool // Defaults to false.

	// The NodePreference to use when connecting.
	NodePreference NodePreference

	// The username to use for authenticating against the EventStoreDB instance.
	Username string

	// The password to use for authenticating against the EventStoreDB instance.
	Password string

	// RootCAs defines the set of root certificate authorities
	// that clients use when verifying server certificates.
	// If RootCAs is nil, TLS uses the host's root CA set.
	RootCAs *x509.CertPool // Defaults to nil.

	// Allows to skip certificate validation.
	SkipCertificateVerification bool // Defaults to false.

	// The maximum number of times to attempt end point discovery.
	MaxDiscoverAttempts int // Defaults to 10.

	// The polling interval (in milliseconds) used to discover the end point.
	DiscoveryInterval int // Defaults to 100 milliseconds.

	// The amount of time (in seconds) after which an attempt to discover gossip will fail.
	GossipTimeout int // Defaults to 5 seconds.

	// Specifies if DNS discovery should be used.
	DnsDiscover bool // Defaults to false.

	// The amount of time (in milliseconds) to wait after which a keepalive ping is sent on the transport.
	// If set below 10s, a minimum value of 10s will be used instead. Use -1 to disable. Use -1 to disable.
	KeepAliveInterval time.Duration // Defaults to 10 seconds.

	// The amount of time (in milliseconds) the sender of the keep alive ping waits for an acknowledgement.
	KeepAliveTimeout time.Duration // Defaults to 10 seconds.

	// The amount of time (in milliseconds) a non-streaming operation should take to complete before resulting in a
	// DeadlineExceeded. Defaults to 10 seconds.
	DefaultDeadline *time.Duration

	// Logging abstraction used by the client.
	Logger LoggingFunc
}

Configuration describes how to connect to an instance of EventStoreDB.

func ParseConnectionString

func ParseConnectionString(connectionString string) (*Configuration, error)

ParseConnectionString creates a Configuration based on an EventStoreDb connection string.

type ConsumerStrategy

type ConsumerStrategy string

ConsumerStrategy named consumer strategies for use with persistent subscriptions.

const (
	// ConsumerStrategyRoundRobin Distributes events to all clients evenly. If the client buffer-size is reached, the
	// client is ignored until events are (not) acknowledged.
	ConsumerStrategyRoundRobin ConsumerStrategy = "RoundRobin"

	// ConsumerStrategyDispatchToSingle Distributes events to a single client until the buffer size is reached. After
	// which the next client is selected in a round-robin style, and the process is repeated.
	ConsumerStrategyDispatchToSingle ConsumerStrategy = "DispatchToSingle"

	// ConsumerStrategyPinned For use with an indexing projection such as the system '$by_category' projection.
	// EventStoreDB inspects event for its source stream id, hashing the id to one of 1024 buckets assigned to
	// individual clients. When a client disconnects, its buckets are assigned to other clients. When a client connects,
	// it is assigned some existing buckets. This naively attempts to maintain a balanced workload. The main goal of
	// this strategy is to decrease the likelihood of concurrency and ordering issues while maintaining load balancing.
	// This is not a guarantee, and you should handle the usual ordering and concurrency issues.
	ConsumerStrategyPinned ConsumerStrategy = "Pinned"
)

type ContentType

type ContentType int

ContentType event's content type.

const (
	// ContentTypeBinary binary content type.
	ContentTypeBinary ContentType = 0
	// ContentTypeJson JSON content type.
	ContentTypeJson ContentType = 1
)

type Credentials

type Credentials struct {
	// User's login.
	Login string
	// User's password.
	Password string
}

Credentials holds a login and a password for authenticated requests.

type DeletePersistentSubscriptionOptions

type DeletePersistentSubscriptionOptions struct {
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

DeletePersistentSubscriptionOptions options of the delete persistent subscription's request.

type DeleteResult

type DeleteResult struct {
	// Transaction log position of the stream deletion.
	Position Position
}

DeleteResult is returned on a successful stream deletion request.

type DeleteStreamOptions

type DeleteStreamOptions struct {
	// Asks the server to check that the stream receiving the event is at the given expected version.
	ExpectedRevision ExpectedRevision
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

DeleteStreamOptions options of the delete stream request.

type Direction

type Direction int

Direction Read direction.

const (
	// Forwards Reads from the start to the end.
	Forwards Direction = iota
	// Backwards Reads from the end to the start.
	Backwards
)

type End

type End struct {
}

End represents the end of a stream or $all.

type EndPoint

type EndPoint struct {
	// Endpoint's hostname.
	Host string
	// Endpoint's port.
	Port uint16
}

EndPoint is database node endpoint.

func ParseEndPoint

func ParseEndPoint(s string) (*EndPoint, error)

ParseEndPoint parses an endpoint string representation. For example: "{hostname}:{port}"

func (*EndPoint) String

func (e *EndPoint) String() string

String Endpoint string representation.

type Error

type Error struct {
	// contains filtered or unexported fields
}

Error main client error type.

func FromError

func FromError(err error) (*Error, bool)

func (*Error) Code

func (e *Error) Code() ErrorCode

Code returns an error code.

func (*Error) Err

func (e *Error) Err() error

Err returns underlying error.

func (*Error) Error

func (e *Error) Error() string

func (*Error) IsErrorCode added in v3.3.0

func (e *Error) IsErrorCode(code ErrorCode) bool

IsErrorCode checks if the error code is the same as the given one.

func (*Error) Unwrap

func (e *Error) Unwrap() error

type ErrorCode

type ErrorCode int

ErrorCode EventStoreDB error code.

const (
	// ErrorCodeUnknown unclassified error.
	ErrorCodeUnknown ErrorCode = iota
	// ErrorCodeUnsupportedFeature a request not supported by the targeted EventStoreDB node was sent.
	ErrorCodeUnsupportedFeature
	// ErrorCodeDeadlineExceeded a gRPC deadline exceeded error.
	ErrorCodeDeadlineExceeded
	// ErrorCodeUnauthenticated a request requires authentication and the authentication failed.
	ErrorCodeUnauthenticated
	// ErrorCodeResourceNotFound a remote resource was not found or because its access was denied.
	ErrorCodeResourceNotFound
	// ErrorCodeResourceAlreadyExists a creation request was made for a resource that already exists.
	ErrorCodeResourceAlreadyExists
	// ErrorCodeConnectionClosed when a connection is already closed.
	ErrorCodeConnectionClosed
	// ErrorCodeWrongExpectedVersion when an append request failed the optimistic concurrency on the server.
	ErrorCodeWrongExpectedVersion
	// ErrorCodeAccessDenied a request requires the right ACL.
	ErrorCodeAccessDenied
	// ErrorCodeStreamDeleted requested stream is deleted.
	ErrorCodeStreamDeleted
	// ErrorCodeParsing error when parsing data.
	ErrorCodeParsing
	// ErrorCodeInternalClient unexpected error from the client library, worthy of a GitHub issue.
	ErrorCodeInternalClient
	// ErrorCodeInternalServer unexpected error from the server, worthy of a GitHub issue.
	ErrorCodeInternalServer
	// ErrorCodeNotLeader when a request needing a leader node was executed on a follower node.
	ErrorCodeNotLeader
	// ErrorAborted when the server aborted the request.
	ErrorAborted
	// ErrorUnavailable when the EventStoreDB node became unavailable.
	ErrorUnavailable
)

type EventAppeared

type EventAppeared struct {
	// Event sent by EventStoreDB.
	Event *ResolvedEvent
	// How many times that event was sent to the persistent subscription.
	RetryCount int
}

EventAppeared when EventStoreDB sends an event to a persistent subscription.

type EventData

type EventData struct {
	// Event's unique identifier.
	EventID uuid.UUID
	// Event's type.
	EventType string
	// Event's content type.
	ContentType ContentType
	// Event's payload data.
	Data []byte
	// Event's metadata.
	Metadata []byte
}

EventData represents an event that will be sent to EventStoreDB.

type ExpectedRevision

type ExpectedRevision interface {
	// contains filtered or unexported methods
}

ExpectedRevision the use of expected revision can be a bit tricky especially when discussing guaranties given by EventStoreDB server. The EventStoreDB server will assure idempotency for all requests using any value in ExpectedRevision except Any. When using Any, the EventStoreDB server will do its best to assure idempotency but will not guarantee it.

type FilterType

type FilterType int

FilterType represents the type filters supported by EventStoreDB.

type GetPersistentSubscriptionOptions

type GetPersistentSubscriptionOptions struct {
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

GetPersistentSubscriptionOptions options of the get persistent subscription info request.

type ListPersistentSubscriptionsOptions

type ListPersistentSubscriptionsOptions struct {
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

ListPersistentSubscriptionsOptions options of the list persistent subscription request.

type LogLevel

type LogLevel = string

LogLevel log statement level.

const (
	LogDebug LogLevel = "debug"
	LogInfo  LogLevel = "info"
	LogWarn  LogLevel = "warn"
	LogError LogLevel = "error"
)

type LoggingFunc

type LoggingFunc = func(level LogLevel, format string, args ...interface{})

LoggingFunc main logging abstraction.

func ConsoleLogging

func ConsoleLogging() LoggingFunc

ConsoleLogging will print out log statements in stdout.

func NoopLogging

func NoopLogging() LoggingFunc

NoopLogging disables logging.

type NackAction

type NackAction int32

NackAction persistent subscription acknowledgement error type.

const (
	// NackActionUnknown client does not know what action to take, let the server decide.
	NackActionUnknown NackAction = 0
	// NackActionPark park message, do not resend.
	NackActionPark NackAction = 1
	// NackActionRetry explicitly retry the message.
	NackActionRetry NackAction = 2
	// NackActionSkip skip this message, do not resend, do not park the message.
	NackActionSkip NackAction = 3
	// NackActionStop stop the subscription.
	NackActionStop NackAction = 4
)

type NoStream

type NoStream struct{}

NoStream means the stream being written to should not yet exist.

type NodePreference

type NodePreference string

NodePreference indicates which order of preferred nodes for connecting to.

const (
	// NodePreferenceLeader When attempting connection, prefers leader nodes.
	NodePreferenceLeader NodePreference = "Leader"
	// NodePreferenceFollower When attempting connection, prefers follower nodes.
	NodePreferenceFollower NodePreference = "Follower"
	// NodePreferenceReadOnlyReplica When attempting connection, prefers read only replica nodes.
	NodePreferenceReadOnlyReplica NodePreference = "ReadOnlyReplica"
	// NodePreferenceRandom When attempting connection, has no node preference.
	NodePreferenceRandom NodePreference = "Random"
)

func (NodePreference) String

func (nodePreference NodePreference) String() string

type PersistentAllSubscriptionOptions

type PersistentAllSubscriptionOptions struct {
	// Persistent subscription's request.
	Settings *PersistentSubscriptionSettings
	// Starting position of the subscription.
	StartFrom AllPosition
	// Max search window.
	MaxSearchWindow int
	// Applies a server-side filter to determine if an event of the subscription should be yielded.
	Filter *SubscriptionFilter
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

PersistentAllSubscriptionOptions options for most of the persistent subscription requests.

type PersistentStreamSubscriptionOptions

type PersistentStreamSubscriptionOptions struct {
	// Persistent subscription's request.
	Settings *PersistentSubscriptionSettings
	// Starting position of the subscription.
	StartFrom StreamPosition
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

PersistentStreamSubscriptionOptions options for most of the persistent subscription requests.

type PersistentSubscription

type PersistentSubscription struct {
	// contains filtered or unexported fields
}

PersistentSubscription persistent subscription handle.

func (*PersistentSubscription) Ack

func (connection *PersistentSubscription) Ack(messages ...*ResolvedEvent) error

Ack acknowledges events have been successfully processed.

func (*PersistentSubscription) Close

func (connection *PersistentSubscription) Close() error

Close drops the persistent subscription and free allocated resources.

func (*PersistentSubscription) Nack

func (connection *PersistentSubscription) Nack(reason string, action NackAction, messages ...*ResolvedEvent) error

Nack acknowledges events failed processing.

func (*PersistentSubscription) Recv

Recv awaits for the next incoming persistent subscription event.

type PersistentSubscriptionConnectionInfo

type PersistentSubscriptionConnectionInfo struct {
	// Origin of this connection.
	From string `json:"from"`
	// Connection's username.
	Username string `json:"username"`
	// Average events per second on this connection.
	AverageItemsPerSecond float64 `json:"averageItemsPerSecond"`
	// Total items on this connection.
	TotalItemsProcessed int64 `json:"totalItemsProcessed"`
	// Number of items seen since last measurement on this connection.
	CountSinceLastMeasurement int64 `json:"countSinceLastMeasurement"`
	// Number of available slots.
	AvailableSlots int64 `json:"availableSlots"`
	// Number of in flight messages on this connection.
	InFlightMessages int64 `json:"inFlightMessages"`
	// Connection's name.
	ConnectionName string `json:"connectionName"`
	// Timing measurements for the connection.
	ExtraStatistics []PersistentSubscriptionMeasurement `json:"extraStatistics"`
}

PersistentSubscriptionConnectionInfo holds an active persistent subscription connection info.

type PersistentSubscriptionEvent

type PersistentSubscriptionEvent struct {
	// When EventStoreDB sends an event to the subscription.
	EventAppeared *EventAppeared
	// When the subscription is dropped.
	SubscriptionDropped *SubscriptionDropped
	// When a checkpoint was created.
	CheckPointReached *Position
}

PersistentSubscriptionEvent used to handle persistent subscription notifications raised throughout its lifecycle.

type PersistentSubscriptionInfo

type PersistentSubscriptionInfo struct {
	// The source of events for the subscription.
	EventSource string
	// The group name given on creation.
	GroupName string
	// The current status of the subscription.
	Status string
	// Active connections to the subscription.
	Connections []PersistentSubscriptionConnectionInfo
	// Persistent subscription's settings.
	Settings *PersistentSubscriptionSettings
	// Persistent subscription's stats.
	Stats *PersistentSubscriptionStats
}

PersistentSubscriptionInfo represents persistent subscription info.

type PersistentSubscriptionMeasurement

type PersistentSubscriptionMeasurement struct {
	// Metric name
	Key string `json:"key"`
	// Metric value.
	Value int64 `json:"value"`
}

PersistentSubscriptionMeasurement key-value pair of a metric and its value.

type PersistentSubscriptionSettings

type PersistentSubscriptionSettings struct {
	// Where to start the subscription from.
	StartFrom interface{}
	// Resolve linkTo event to their linked events.
	ResolveLinkTos bool
	// Enable tracking of latency statistics on this subscription.
	ExtraStatistics bool
	// The maximum number of retries (due to timeout) before a message is considered to be parked. Default: 10.
	MaxRetryCount int32
	// The minimum number of messages to process before a checkpoint may be written. Default: 10.
	CheckpointLowerBound int32
	// The maximum number of messages not checkpointed before forcing a checkpoint. Default: 1000.
	CheckpointUpperBound int32
	// The maximum number of subscribers allowed. Default: 0 (Unbounded).
	MaxSubscriberCount int32
	// The size of the buffer (in-memory) listening to live messages as they happen before paging occurs. Default: 500.
	LiveBufferSize int32
	// The number of events read at a time when catching up. Default: 20.
	ReadBatchSize int32
	// The number of events to cache when catching up. Default: 500.
	HistoryBufferSize int32
	// The strategy to use for distributing events to client consumers.
	ConsumerStrategyName ConsumerStrategy
	// The amount of time after which to consider a message as timed out and retried. Default 30_000ms.
	MessageTimeout int32
	// The amount of time to try to checkpoint after. Default: 2_000ms.
	CheckpointAfter int32
}

PersistentSubscriptionSettings persistent subscription settings.

func SubscriptionSettingsDefault

func SubscriptionSettingsDefault() PersistentSubscriptionSettings

SubscriptionSettingsDefault returns a persistent subscription settings with default values.

type PersistentSubscriptionStats

type PersistentSubscriptionStats struct {
	// Average number of events per seconds.
	AveragePerSecond int64
	// Total number of events processed by this subscription.
	TotalItems int64
	// Number of events seen since last measurement on this subscription.
	CountSinceLastMeasurement int64
	// The revision number of the last checkpoint.
	LastCheckpointedEventRevision *uint64
	// The revision number of the last known.
	LastKnownEventRevision *uint64
	// The transaction log position of the last checkpoint.
	LastCheckpointedPosition *Position
	// The transaction log position of the last known event.
	LastKnownPosition *Position
	// Number of events in the read buffer.
	ReadBufferCount int64
	// Number of events in the live buffer.
	LiveBufferCount int64
	// Number of events in the retry buffer.
	RetryBufferCount int64
	// Current in flight messages across the persistent subscription group.
	TotalInFlightMessages int64
	// Current number of outstanding messages.
	OutstandingMessagesCount int64
	// The current number of parked messages.
	ParkedMessagesCount int64
}

PersistentSubscriptionStats represents processing-related persistent subscription statistics.

type Position

type Position struct {
	// Commit position.
	Commit uint64
	// Prepare position.
	Prepare uint64
}

Position transaction log position.

type ReadAllOptions

type ReadAllOptions struct {
	// Direction of the read request.
	Direction Direction
	// Starting position of the read request.
	From AllPosition
	// Whether the read request should resolve linkTo events to their linked events.
	ResolveLinkTos bool
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

ReadAllOptions options of the read $all request.

type ReadStream

type ReadStream struct {
	// contains filtered or unexported fields
}

ReadStream read stream iterator.

func (*ReadStream) Close

func (stream *ReadStream) Close()

Close closes the iterator and release allocated resources.

func (*ReadStream) Recv

func (stream *ReadStream) Recv() (*ResolvedEvent, error)

Recv awaits for the next incoming event.

type ReadStreamOptions

type ReadStreamOptions struct {
	// Direction of the read request.
	Direction Direction
	// Starting position of the read request.
	From StreamPosition
	// Whether the read request should resolve linkTo events to their linked events.
	ResolveLinkTos bool
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

ReadStreamOptions options of the read stream request.

type RecordedEvent

type RecordedEvent struct {
	// Event's id.
	EventID uuid.UUID
	// Event's type.
	EventType string
	// Event's content type.
	ContentType string
	// The stream that event belongs to.
	StreamID string
	// The event's revision number.
	EventNumber uint64
	// The event's transaction log position.
	Position Position
	// When the event was created.
	CreatedDate time.Time
	// The event's payload data.
	Data []byte
	// The event's system metadata.
	SystemMetadata map[string]string
	// The event user-defined metadata.
	UserMetadata []byte
}

RecordedEvent represents a previously written event.

type ReplayParkedMessagesOptions

type ReplayParkedMessagesOptions struct {
	// Asks for authenticated request.
	Authenticated *Credentials
	// Replays the parked messages until the event revision within the parked messages stream is reached.
	StopAt int
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

ReplayParkedMessagesOptions options of the replay parked messages request.

type ResolvedEvent

type ResolvedEvent struct {
	// The link event if the original event is a link.
	Link *RecordedEvent
	// The event, or the resolved linked event if the original event is a link.
	Event *RecordedEvent
	// The optional commit position of the event.
	Commit *uint64
}

ResolvedEvent represents an event with a potential link.

func (ResolvedEvent) OriginalEvent

func (resolved ResolvedEvent) OriginalEvent() *RecordedEvent

OriginalEvent returns the event that was read or which triggered the subscription.

func (ResolvedEvent) OriginalStreamRevision added in v3.3.0

func (resolved ResolvedEvent) OriginalStreamRevision() StreamRevision

OriginalStreamRevision returns the stream revision of the original event.

type RestartPersistentSubscriptionSubsystemOptions

type RestartPersistentSubscriptionSubsystemOptions struct {
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

RestartPersistentSubscriptionSubsystemOptions options of the restart persistent subscription subsystem request.

type ServerVersion

type ServerVersion struct {
	Major int
	Minor int
	Patch int
}

ServerVersion Represents the version of an EventStoreDB node.

type Start

type Start struct {
}

Start represents the beginning of a stream or $all.

type StreamExists

type StreamExists struct{}

StreamExists means the stream should exist.

type StreamMetadata

type StreamMetadata struct {
	// contains filtered or unexported fields
}

StreamMetadata Represents stream metadata with strongly typed properties for system values and a dictionary-like interface for custom values.

func StreamMetadataFromJson

func StreamMetadataFromJson(bytes []byte) (*StreamMetadata, error)

StreamMetadataFromJson deserializes a JSON byte array into a StreamMetadata object.

func (*StreamMetadata) Acl

func (m *StreamMetadata) Acl() interface{}

Acl The Access Control List of the stream (ACL).

func (*StreamMetadata) AddCustomProperty

func (m *StreamMetadata) AddCustomProperty(name string, value interface{})

AddCustomProperty Key-value pair of a key to JSON for user-provider metadata.

func (*StreamMetadata) CacheControl

func (m *StreamMetadata) CacheControl() *time.Duration

CacheControl The amount of time for which the stream head is cacheable (in seconds).

func (*StreamMetadata) IsSystemStreamAcl

func (m *StreamMetadata) IsSystemStreamAcl() bool

IsSystemStreamAcl Checks if the ACL is set to system default.

func (*StreamMetadata) IsUserStreamAcl

func (m *StreamMetadata) IsUserStreamAcl() bool

IsUserStreamAcl Checks if the ACL is set to users default.

func (*StreamMetadata) MaxAge

func (m *StreamMetadata) MaxAge() *time.Duration

MaxAge The maximum age of events allowed in the stream.

func (*StreamMetadata) MaxCount

func (m *StreamMetadata) MaxCount() *uint64

MaxCount The maximum number of events allowed in the stream.

func (*StreamMetadata) SetAcl

func (m *StreamMetadata) SetAcl(value interface{})

SetAcl The Access Control List of the stream (ACL).

func (*StreamMetadata) SetCacheControl

func (m *StreamMetadata) SetCacheControl(value time.Duration)

SetCacheControl The amount of time for which the stream head is cacheable (in seconds).

func (*StreamMetadata) SetMaxAge

func (m *StreamMetadata) SetMaxAge(value time.Duration)

SetMaxAge The maximum age of events allowed in the stream.

func (*StreamMetadata) SetMaxCount

func (m *StreamMetadata) SetMaxCount(value uint64)

SetMaxCount The maximum number of events allowed in the stream.

func (*StreamMetadata) SetTruncateBefore

func (m *StreamMetadata) SetTruncateBefore(value uint64)

SetTruncateBefore The event number from which previous events can be scavenged. This is used to implement deletions of streams.

func (*StreamMetadata) StreamAcl

func (m *StreamMetadata) StreamAcl() *Acl

StreamAcl The Access Control List of the stream (ACL).

func (*StreamMetadata) ToJson

func (m *StreamMetadata) ToJson() ([]byte, error)

ToJson serializes StreamMetadata to JSON.

func (*StreamMetadata) TruncateBefore

func (m *StreamMetadata) TruncateBefore() *uint64

TruncateBefore The event number from which previous events can be scavenged. This is used to implement deletions of streams.

type StreamPosition

type StreamPosition interface {
	// contains filtered or unexported methods
}

StreamPosition represents a logical position in a stream.

type StreamRevision

type StreamRevision struct {
	Value uint64
}

StreamRevision returns a stream position at a specific event revision.

func Revision

func Revision(value uint64) StreamRevision

Revision returns a stream position at a specific event revision.

type SubscribeToAllOptions

type SubscribeToAllOptions struct {
	// Starting position of the subscribe request.
	From AllPosition
	// Whether the read request should resolve linkTo events to their linked events.
	ResolveLinkTos bool
	// Max search window.
	MaxSearchWindow int
	// Checkpoint interval.
	CheckpointInterval int
	// Applies a server-side filter to determine if an event of the subscription should be yielded.
	Filter *SubscriptionFilter
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

SubscribeToAllOptions options of the subscribe to $all request.

type SubscribeToPersistentSubscriptionOptions

type SubscribeToPersistentSubscriptionOptions struct {
	// Buffer size.
	BufferSize uint32
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

SubscribeToPersistentSubscriptionOptions options of the subscribe to persistent subscription request.

type SubscribeToStreamOptions

type SubscribeToStreamOptions struct {
	// Starting position of the subscribe request.
	From StreamPosition
	// Whether the read request should resolve linkTo events to their linked events.
	ResolveLinkTos bool
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

SubscribeToStreamOptions options of the subscribe to stream request.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is a subscription's handle.

func (*Subscription) Close

func (sub *Subscription) Close() error

Close drops the subscription and cleans up allocated resources.

func (*Subscription) Id

func (sub *Subscription) Id() string

Id returns subscription's id.

func (*Subscription) Recv

func (sub *Subscription) Recv() *SubscriptionEvent

Recv awaits for the next incoming subscription's event.

type SubscriptionDropped

type SubscriptionDropped struct {
	// Error that caused the drop.
	Error error
}

SubscriptionDropped when a subscription is dropped.

type SubscriptionEvent

type SubscriptionEvent struct {
	// When EventStoreDB sends an event to the subscription.
	EventAppeared *ResolvedEvent
	// When the subscription is dropped.
	SubscriptionDropped *SubscriptionDropped
	// When a checkpoint was created.
	CheckPointReached *Position
	// When an event is caught up
	CaughtUp *Subscription
	// When an event has fallen behind
	FellBehind *Subscription
}

SubscriptionEvent used to handle catch-up subscription notifications raised throughout its lifecycle.

type SubscriptionFilter

type SubscriptionFilter struct {
	// Type of filter.
	Type FilterType
	// String prefixes.
	Prefixes []string
	// Regex expression.
	Regex string
}

SubscriptionFilter is a filter that targets $all stream.

func ExcludeSystemEventsFilter

func ExcludeSystemEventsFilter() *SubscriptionFilter

ExcludeSystemEventsFilter filters out event's with system's event types, i.e. event types starting with `$`.

type SubscriptionFilterOptions

type SubscriptionFilterOptions struct {
	MaxSearchWindow    int
	CheckpointInterval int
	SubscriptionFilter *SubscriptionFilter
}

type TombstoneStreamOptions

type TombstoneStreamOptions struct {
	// Asks the server to check that the stream receiving the event is at the given expected version.
	ExpectedRevision ExpectedRevision
	// Asks for authenticated request.
	Authenticated *Credentials
	// A length of time to use for gRPC deadlines.
	Deadline *time.Duration
	// Requires the request to be performed by the leader of the cluster.
	RequiresLeader bool
}

TombstoneStreamOptions options of the tombstone stream request.

type WriteResult

type WriteResult struct {
	CommitPosition      uint64
	PreparePosition     uint64
	NextExpectedVersion uint64
}

WriteResult ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL