Documentation ¶
Overview ¶
Package esdb EventStoreDB gRPC client.
Index ¶
- Constants
- type Acl
- func (a *Acl) AddDeleteRoles(roles ...string)
- func (a *Acl) AddMetaReadRoles(roles ...string)
- func (a *Acl) AddMetaWriteRoles(roles ...string)
- func (a *Acl) AddReadRoles(roles ...string)
- func (a *Acl) AddWriteRoles(roles ...string)
- func (a *Acl) DeleteRoles() []string
- func (a *Acl) MetaReadRoles() []string
- func (a *Acl) MetaWriteRoles() []string
- func (a *Acl) ReadRoles() []string
- func (a *Acl) WriteRoles() []string
- type AllPosition
- type Any
- type AppendToStreamOptions
- type Client
- func (client *Client) AppendToStream(context context.Context, streamID string, opts AppendToStreamOptions, ...) (*WriteResult, error)
- func (client *Client) Close() error
- func (client *Client) CreatePersistentSubscription(ctx context.Context, streamName string, groupName string, ...) error
- func (client *Client) CreatePersistentSubscriptionToAll(ctx context.Context, groupName string, ...) error
- func (client *Client) DeletePersistentSubscription(ctx context.Context, streamName string, groupName string, ...) error
- func (client *Client) DeletePersistentSubscriptionToAll(ctx context.Context, groupName string, ...) error
- func (client *Client) DeleteStream(parent context.Context, streamID string, opts DeleteStreamOptions) (*DeleteResult, error)
- func (client *Client) GetPersistentSubscriptionInfo(ctx context.Context, streamName string, groupName string, ...) (*PersistentSubscriptionInfo, error)
- func (client *Client) GetPersistentSubscriptionInfoToAll(ctx context.Context, groupName string, ...) (*PersistentSubscriptionInfo, error)
- func (client *Client) GetServerVersion() (*ServerVersion, error)
- func (client *Client) GetStreamMetadata(context context.Context, streamID string, opts ReadStreamOptions) (*StreamMetadata, error)
- func (client *Client) ListAllPersistentSubscriptions(ctx context.Context, options ListPersistentSubscriptionsOptions) ([]PersistentSubscriptionInfo, error)
- func (client *Client) ListPersistentSubscriptionsForStream(ctx context.Context, streamName string, ...) ([]PersistentSubscriptionInfo, error)
- func (client *Client) ListPersistentSubscriptionsToAll(ctx context.Context, options ListPersistentSubscriptionsOptions) ([]PersistentSubscriptionInfo, error)
- func (client *Client) ReadAll(context context.Context, opts ReadAllOptions, count uint64) (*ReadStream, error)
- func (client *Client) ReadStream(context context.Context, streamID string, opts ReadStreamOptions, count uint64) (*ReadStream, error)
- func (client *Client) ReplayParkedMessages(ctx context.Context, streamName string, groupName string, ...) error
- func (client *Client) ReplayParkedMessagesToAll(ctx context.Context, groupName string, options ReplayParkedMessagesOptions) error
- func (client *Client) RestartPersistentSubscriptionSubsystem(ctx context.Context, options RestartPersistentSubscriptionSubsystemOptions) error
- func (client *Client) SetStreamMetadata(context context.Context, streamID string, opts AppendToStreamOptions, ...) (*WriteResult, error)
- func (client *Client) SubscribeToAll(parent context.Context, opts SubscribeToAllOptions) (*Subscription, error)
- func (client *Client) SubscribeToPersistentSubscription(ctx context.Context, streamName string, groupName string, ...) (*PersistentSubscription, error)
- func (client *Client) SubscribeToPersistentSubscriptionToAll(ctx context.Context, groupName string, ...) (*PersistentSubscription, error)
- func (client *Client) SubscribeToStream(parent context.Context, streamID string, opts SubscribeToStreamOptions) (*Subscription, error)
- func (client *Client) TombstoneStream(parent context.Context, streamID string, opts TombstoneStreamOptions) (*DeleteResult, error)
- func (client *Client) UpdatePersistentSubscription(ctx context.Context, streamName string, groupName string, ...) error
- func (client *Client) UpdatePersistentSubscriptionToAll(ctx context.Context, groupName string, ...) error
- type Configuration
- type ConsumerStrategy
- type ContentType
- type Credentials
- type DeletePersistentSubscriptionOptions
- type DeleteResult
- type DeleteStreamOptions
- type Direction
- type End
- type EndPoint
- type Error
- type ErrorCode
- type EventAppeared
- type EventData
- type ExpectedRevision
- type FilterType
- type GetPersistentSubscriptionOptions
- type ListPersistentSubscriptionsOptions
- type LogLevel
- type LoggingFunc
- type NackAction
- type NoStream
- type NodePreference
- type PersistentAllSubscriptionOptions
- type PersistentStreamSubscriptionOptions
- type PersistentSubscription
- func (connection *PersistentSubscription) Ack(messages ...*ResolvedEvent) error
- func (connection *PersistentSubscription) Close() error
- func (connection *PersistentSubscription) Nack(reason string, action NackAction, messages ...*ResolvedEvent) error
- func (connection *PersistentSubscription) Recv() *PersistentSubscriptionEvent
- type PersistentSubscriptionConnectionInfo
- type PersistentSubscriptionEvent
- type PersistentSubscriptionInfo
- type PersistentSubscriptionMeasurement
- type PersistentSubscriptionSettings
- type PersistentSubscriptionStats
- type Position
- type ReadAllOptions
- type ReadStream
- type ReadStreamOptions
- type RecordedEvent
- type ReplayParkedMessagesOptions
- type ResolvedEvent
- type RestartPersistentSubscriptionSubsystemOptions
- type ServerVersion
- type Start
- type StreamExists
- type StreamMetadata
- func (m *StreamMetadata) Acl() interface{}
- func (m *StreamMetadata) AddCustomProperty(name string, value interface{})
- func (m *StreamMetadata) CacheControl() *time.Duration
- func (m *StreamMetadata) IsSystemStreamAcl() bool
- func (m *StreamMetadata) IsUserStreamAcl() bool
- func (m *StreamMetadata) MaxAge() *time.Duration
- func (m *StreamMetadata) MaxCount() *uint64
- func (m *StreamMetadata) SetAcl(value interface{})
- func (m *StreamMetadata) SetCacheControl(value time.Duration)
- func (m *StreamMetadata) SetMaxAge(value time.Duration)
- func (m *StreamMetadata) SetMaxCount(value uint64)
- func (m *StreamMetadata) SetTruncateBefore(value uint64)
- func (m *StreamMetadata) StreamAcl() *Acl
- func (m *StreamMetadata) ToJson() ([]byte, error)
- func (m *StreamMetadata) TruncateBefore() *uint64
- type StreamPosition
- type StreamRevision
- type SubscribeToAllOptions
- type SubscribeToPersistentSubscriptionOptions
- type SubscribeToStreamOptions
- type Subscription
- type SubscriptionDropped
- type SubscriptionEvent
- type SubscriptionFilter
- type SubscriptionFilterOptions
- type TombstoneStreamOptions
- type WriteResult
Constants ¶
const ( // UserStreamAcl Default system ACL. UserStreamAcl = "$userStreamAcl" // SystemStreamAcl Default users ACL. SystemStreamAcl = "$systemStreamAcl" )
const ( // EventFilterType the filter will be applied on an event's type. EventFilterType FilterType = 0 // StreamFilterType the filter will be applied on a stream name. StreamFilterType FilterType = 1 // NoMaxSearchWindow disables the max search window. NoMaxSearchWindow int = -1 )
const SubscriberCountUnlimited = 0
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Acl ¶
type Acl struct {
// contains filtered or unexported fields
}
Acl Access Control List (ACL).
func (*Acl) AddDeleteRoles ¶
AddDeleteRoles Adds delete roles.
func (*Acl) AddMetaReadRoles ¶
AddMetaReadRoles Adds metadata roles.
func (*Acl) AddMetaWriteRoles ¶
AddMetaWriteRoles Adds metadata write roles.
func (*Acl) AddReadRoles ¶
AddReadRoles Adds read roles.
func (*Acl) AddWriteRoles ¶
AddWriteRoles Adds write roles.
func (*Acl) MetaReadRoles ¶
MetaReadRoles Returns metadata read roles.
func (*Acl) MetaWriteRoles ¶
MetaWriteRoles Returns metadata write roles.
type AllPosition ¶
type AllPosition interface {
// contains filtered or unexported methods
}
AllPosition represents a logical position in the $all stream.
type Any ¶
type Any struct{}
Any means the write should not conflict with anything and should always succeed.
type AppendToStreamOptions ¶
type AppendToStreamOptions struct { // Asks the server to check that the stream receiving the event is at the given expected version. ExpectedRevision ExpectedRevision // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
AppendToStreamOptions options of the append stream request.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client Represents a client to a single node. A client instance maintains a full duplex communication to EventStoreDB. Many threads can use an EventStoreDB client at the same time or a single thread can make many asynchronous requests.
func NewClient ¶
func NewClient(configuration *Configuration) (*Client, error)
NewClient Creates a gRPC client to an EventStoreDB database.
func (*Client) AppendToStream ¶
func (client *Client) AppendToStream( context context.Context, streamID string, opts AppendToStreamOptions, events ...EventData, ) (*WriteResult, error)
AppendToStream Appends events to a given stream.
func (*Client) CreatePersistentSubscription ¶
func (client *Client) CreatePersistentSubscription( ctx context.Context, streamName string, groupName string, options PersistentStreamSubscriptionOptions, ) error
CreatePersistentSubscription Creates a persistent subscription gorup on a stream.
Persistent subscriptions are special kind of subscription where the server remembers the state of the subscription. This allows for many modes of operations compared to a regular or catcup subscription where the client holds the subscription state.
func (*Client) CreatePersistentSubscriptionToAll ¶
func (client *Client) CreatePersistentSubscriptionToAll( ctx context.Context, groupName string, options PersistentAllSubscriptionOptions, ) error
CreatePersistentSubscriptionToAll Creates a persistent subscription gorup on the $all stream.
Persistent subscriptions are special kind of subscription where the server remembers the state of the subscription. This allows for many modes of operations compared to a regular or catcup subscription where the client holds the subscription state.
func (*Client) DeletePersistentSubscription ¶
func (client *Client) DeletePersistentSubscription( ctx context.Context, streamName string, groupName string, options DeletePersistentSubscriptionOptions, ) error
DeletePersistentSubscription Deletes a persistent subscription group on a stream.
func (*Client) DeletePersistentSubscriptionToAll ¶
func (client *Client) DeletePersistentSubscriptionToAll( ctx context.Context, groupName string, options DeletePersistentSubscriptionOptions, ) error
DeletePersistentSubscriptionToAll Deletes a persistent subscription group on the $all stream.
func (*Client) DeleteStream ¶
func (client *Client) DeleteStream( parent context.Context, streamID string, opts DeleteStreamOptions, ) (*DeleteResult, error)
DeleteStream Deletes a given stream.
Makes use of "Truncate Before". When a stream is deleted, it's "Truncate Before" is set to the stream's current last event number. When a deleted stream is read, the read will return a stream not found error. After deleting the stream, you are able to write to it again, continuing from where it left off.
func (*Client) GetPersistentSubscriptionInfo ¶
func (client *Client) GetPersistentSubscriptionInfo(ctx context.Context, streamName string, groupName string, options GetPersistentSubscriptionOptions) (*PersistentSubscriptionInfo, error)
GetPersistentSubscriptionInfo Gets the info for a specific persistent subscription to a stream
func (*Client) GetPersistentSubscriptionInfoToAll ¶
func (client *Client) GetPersistentSubscriptionInfoToAll(ctx context.Context, groupName string, options GetPersistentSubscriptionOptions) (*PersistentSubscriptionInfo, error)
GetPersistentSubscriptionInfoToAll Gets the info for a specific persistent subscription to the $all stream.
func (*Client) GetServerVersion ¶
func (client *Client) GetServerVersion() (*ServerVersion, error)
GetServerVersion Returns the version of the EventStoreDB node to which the client is currently connected.
func (*Client) GetStreamMetadata ¶
func (client *Client) GetStreamMetadata( context context.Context, streamID string, opts ReadStreamOptions, ) (*StreamMetadata, error)
GetStreamMetadata Reads the metadata for a stream.
func (*Client) ListAllPersistentSubscriptions ¶
func (client *Client) ListAllPersistentSubscriptions(ctx context.Context, options ListPersistentSubscriptionsOptions) ([]PersistentSubscriptionInfo, error)
ListAllPersistentSubscriptions Lists all persistent subscriptions regardless of which stream they are on.
func (*Client) ListPersistentSubscriptionsForStream ¶
func (client *Client) ListPersistentSubscriptionsForStream(ctx context.Context, streamName string, options ListPersistentSubscriptionsOptions) ([]PersistentSubscriptionInfo, error)
ListPersistentSubscriptionsForStream Lists all persistent subscriptions of a specific stream.
func (*Client) ListPersistentSubscriptionsToAll ¶
func (client *Client) ListPersistentSubscriptionsToAll(ctx context.Context, options ListPersistentSubscriptionsOptions) ([]PersistentSubscriptionInfo, error)
ListPersistentSubscriptionsToAll Lists all persistent subscriptions specific to the $all stream.
func (*Client) ReadAll ¶
func (client *Client) ReadAll( context context.Context, opts ReadAllOptions, count uint64, ) (*ReadStream, error)
ReadAll Reads events from the $all stream. The reading can be done forward and backward.
func (*Client) ReadStream ¶
func (client *Client) ReadStream( context context.Context, streamID string, opts ReadStreamOptions, count uint64, ) (*ReadStream, error)
ReadStream Reads events from a given stream. The reading can be done forward and backward.
func (*Client) ReplayParkedMessages ¶
func (client *Client) ReplayParkedMessages(ctx context.Context, streamName string, groupName string, options ReplayParkedMessagesOptions) error
ReplayParkedMessages Replays the parked messages of a persistent subscription to a stream.
func (*Client) ReplayParkedMessagesToAll ¶
func (client *Client) ReplayParkedMessagesToAll(ctx context.Context, groupName string, options ReplayParkedMessagesOptions) error
ReplayParkedMessagesToAll Replays the parked messages of a persistent subscription to $all.
func (*Client) RestartPersistentSubscriptionSubsystem ¶
func (client *Client) RestartPersistentSubscriptionSubsystem(ctx context.Context, options RestartPersistentSubscriptionSubsystemOptions) error
RestartPersistentSubscriptionSubsystem Restarts the persistent subscription subsystem on the server.
func (*Client) SetStreamMetadata ¶
func (client *Client) SetStreamMetadata( context context.Context, streamID string, opts AppendToStreamOptions, metadata StreamMetadata, ) (*WriteResult, error)
SetStreamMetadata Sets the metadata for a stream.
func (*Client) SubscribeToAll ¶
func (client *Client) SubscribeToAll( parent context.Context, opts SubscribeToAllOptions, ) (*Subscription, error)
SubscribeToAll allows you to subscribe to $all stream and receive notifications about new events added to the stream. The subscription will notify event from the starting point onward. If events already exist, the handler will be called for each event one by one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event appears.
func (*Client) SubscribeToPersistentSubscription ¶
func (client *Client) SubscribeToPersistentSubscription( ctx context.Context, streamName string, groupName string, options SubscribeToPersistentSubscriptionOptions, ) (*PersistentSubscription, error)
SubscribeToPersistentSubscription Connects to a persistent subscription group on a stream.
func (*Client) SubscribeToPersistentSubscriptionToAll ¶
func (client *Client) SubscribeToPersistentSubscriptionToAll( ctx context.Context, groupName string, options SubscribeToPersistentSubscriptionOptions, ) (*PersistentSubscription, error)
SubscribeToPersistentSubscriptionToAll Connects to a persistent subscription group to the $all stream.
func (*Client) SubscribeToStream ¶
func (client *Client) SubscribeToStream( parent context.Context, streamID string, opts SubscribeToStreamOptions, ) (*Subscription, error)
SubscribeToStream allows you to subscribe to a stream and receive notifications about new events added to the stream. The subscription will notify event from the starting point onward. If events already exist, the handler will be called for each event one by one until it reaches the end of the stream. From there, the server will notify the handler whenever a new event appears.
func (*Client) TombstoneStream ¶
func (client *Client) TombstoneStream( parent context.Context, streamID string, opts TombstoneStreamOptions, ) (*DeleteResult, error)
TombstoneStream Permanently deletes a given stream by writing a Tombstone event to the end of the stream.
A Tombstone event is written to the end of the stream, permanently deleting it. The stream cannot be recreated or written to again. Tombstone events are written with the event's type "$streamDeleted". When a tombstoned stream is read, the read will return a stream deleted error.
func (*Client) UpdatePersistentSubscription ¶
func (client *Client) UpdatePersistentSubscription( ctx context.Context, streamName string, groupName string, options PersistentStreamSubscriptionOptions, ) error
UpdatePersistentSubscription Updates a persistent subscription group on a stream.
func (*Client) UpdatePersistentSubscriptionToAll ¶
func (client *Client) UpdatePersistentSubscriptionToAll( ctx context.Context, groupName string, options PersistentAllSubscriptionOptions, ) error
UpdatePersistentSubscriptionToAll Updates a persistent subscription group on the $all stream.
type Configuration ¶
type Configuration struct { // The URI of the EventStoreDB. Use this when connecting to a single node. // Example: localhost:2113 Address string // An array of end points used to seed gossip. GossipSeeds []*EndPoint // Disable communicating over a secure channel. DisableTLS bool // Defaults to false. // The NodePreference to use when connecting. NodePreference NodePreference // The username to use for authenticating against the EventStoreDB instance. Username string // The password to use for authenticating against the EventStoreDB instance. Password string // RootCAs defines the set of root certificate authorities // that clients use when verifying server certificates. // If RootCAs is nil, TLS uses the host's root CA set. RootCAs *x509.CertPool // Defaults to nil. // Allows to skip certificate validation. SkipCertificateVerification bool // Defaults to false. // The maximum number of times to attempt end point discovery. MaxDiscoverAttempts int // Defaults to 10. // The polling interval (in milliseconds) used to discover the end point. DiscoveryInterval int // Defaults to 100 milliseconds. // The amount of time (in seconds) after which an attempt to discover gossip will fail. GossipTimeout int // Defaults to 5 seconds. // Specifies if DNS discovery should be used. DnsDiscover bool // Defaults to false. // The amount of time (in milliseconds) to wait after which a keepalive ping is sent on the transport. // If set below 10s, a minimum value of 10s will be used instead. Use -1 to disable. Use -1 to disable. KeepAliveInterval time.Duration // Defaults to 10 seconds. // The amount of time (in milliseconds) the sender of the keep alive ping waits for an acknowledgement. KeepAliveTimeout time.Duration // Defaults to 10 seconds. // The amount of time (in milliseconds) a non-streaming operation should take to complete before resulting in a // DeadlineExceeded. Defaults to 10 seconds. DefaultDeadline *time.Duration // Logging abstraction used by the client. Logger LoggingFunc }
Configuration describes how to connect to an instance of EventStoreDB.
func ParseConnectionString ¶
func ParseConnectionString(connectionString string) (*Configuration, error)
ParseConnectionString creates a Configuration based on an EventStoreDb connection string.
type ConsumerStrategy ¶
type ConsumerStrategy string
ConsumerStrategy named consumer strategies for use with persistent subscriptions.
const ( // ConsumerStrategyRoundRobin Distributes events to all clients evenly. If the client buffer-size is reached, the // client is ignored until events are (not) acknowledged. ConsumerStrategyRoundRobin ConsumerStrategy = "RoundRobin" // ConsumerStrategyDispatchToSingle Distributes events to a single client until the buffer size is reached. After // which the next client is selected in a round-robin style, and the process is repeated. ConsumerStrategyDispatchToSingle ConsumerStrategy = "DispatchToSingle" // ConsumerStrategyPinned For use with an indexing projection such as the system '$by_category' projection. // EventStoreDB inspects event for its source stream id, hashing the id to one of 1024 buckets assigned to // individual clients. When a client disconnects, its buckets are assigned to other clients. When a client connects, // it is assigned some existing buckets. This naively attempts to maintain a balanced workload. The main goal of // this strategy is to decrease the likelihood of concurrency and ordering issues while maintaining load balancing. // This is not a guarantee, and you should handle the usual ordering and concurrency issues. ConsumerStrategyPinned ConsumerStrategy = "Pinned" )
type ContentType ¶
type ContentType int
ContentType event's content type.
const ( // ContentTypeBinary binary content type. ContentTypeBinary ContentType = 0 // ContentTypeJson JSON content type. ContentTypeJson ContentType = 1 )
type Credentials ¶
Credentials holds a login and a password for authenticated requests.
type DeletePersistentSubscriptionOptions ¶
type DeletePersistentSubscriptionOptions struct { // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
DeletePersistentSubscriptionOptions options of the delete persistent subscription's request.
type DeleteResult ¶
type DeleteResult struct { // Transaction log position of the stream deletion. Position Position }
DeleteResult is returned on a successful stream deletion request.
type DeleteStreamOptions ¶
type DeleteStreamOptions struct { // Asks the server to check that the stream receiving the event is at the given expected version. ExpectedRevision ExpectedRevision // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
DeleteStreamOptions options of the delete stream request.
type EndPoint ¶
EndPoint is database node endpoint.
func ParseEndPoint ¶
ParseEndPoint parses an endpoint string representation. For example: "{hostname}:{port}"
type Error ¶
type Error struct {
// contains filtered or unexported fields
}
Error main client error type.
func (*Error) IsErrorCode ¶ added in v3.3.0
IsErrorCode checks if the error code is the same as the given one.
type ErrorCode ¶
type ErrorCode int
ErrorCode EventStoreDB error code.
const ( // ErrorCodeUnknown unclassified error. ErrorCodeUnknown ErrorCode = iota // ErrorCodeUnsupportedFeature a request not supported by the targeted EventStoreDB node was sent. ErrorCodeUnsupportedFeature // ErrorCodeDeadlineExceeded a gRPC deadline exceeded error. ErrorCodeDeadlineExceeded // ErrorCodeUnauthenticated a request requires authentication and the authentication failed. ErrorCodeUnauthenticated // ErrorCodeResourceNotFound a remote resource was not found or because its access was denied. ErrorCodeResourceNotFound // ErrorCodeResourceAlreadyExists a creation request was made for a resource that already exists. ErrorCodeResourceAlreadyExists // ErrorCodeConnectionClosed when a connection is already closed. ErrorCodeConnectionClosed // ErrorCodeWrongExpectedVersion when an append request failed the optimistic concurrency on the server. ErrorCodeWrongExpectedVersion // ErrorCodeAccessDenied a request requires the right ACL. ErrorCodeAccessDenied // ErrorCodeStreamDeleted requested stream is deleted. ErrorCodeStreamDeleted // ErrorCodeParsing error when parsing data. ErrorCodeParsing // ErrorCodeInternalClient unexpected error from the client library, worthy of a GitHub issue. ErrorCodeInternalClient // ErrorCodeInternalServer unexpected error from the server, worthy of a GitHub issue. ErrorCodeInternalServer // ErrorCodeNotLeader when a request needing a leader node was executed on a follower node. ErrorCodeNotLeader // ErrorAborted when the server aborted the request. ErrorAborted ErrorUnavailable )
type EventAppeared ¶
type EventAppeared struct { // Event sent by EventStoreDB. Event *ResolvedEvent // How many times that event was sent to the persistent subscription. RetryCount int }
EventAppeared when EventStoreDB sends an event to a persistent subscription.
type EventData ¶
type EventData struct { // Event's unique identifier. EventID uuid.UUID // Event's type. EventType string // Event's content type. ContentType ContentType // Event's payload data. Data []byte // Event's metadata. Metadata []byte }
EventData represents an event that will be sent to EventStoreDB.
type ExpectedRevision ¶
type ExpectedRevision interface {
// contains filtered or unexported methods
}
ExpectedRevision the use of expected revision can be a bit tricky especially when discussing guaranties given by EventStoreDB server. The EventStoreDB server will assure idempotency for all requests using any value in ExpectedRevision except Any. When using Any, the EventStoreDB server will do its best to assure idempotency but will not guarantee it.
type FilterType ¶
type FilterType int
FilterType represents the type filters supported by EventStoreDB.
type GetPersistentSubscriptionOptions ¶
type GetPersistentSubscriptionOptions struct { // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
GetPersistentSubscriptionOptions options of the get persistent subscription info request.
type ListPersistentSubscriptionsOptions ¶
type ListPersistentSubscriptionsOptions struct { // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
ListPersistentSubscriptionsOptions options of the list persistent subscription request.
type LoggingFunc ¶
LoggingFunc main logging abstraction.
func ConsoleLogging ¶
func ConsoleLogging() LoggingFunc
ConsoleLogging will print out log statements in stdout.
type NackAction ¶
type NackAction int32
NackAction persistent subscription acknowledgement error type.
const ( // NackActionUnknown client does not know what action to take, let the server decide. NackActionUnknown NackAction = 0 // NackActionPark park message, do not resend. NackActionPark NackAction = 1 // NackActionRetry explicitly retry the message. NackActionRetry NackAction = 2 // NackActionSkip skip this message, do not resend, do not park the message. NackActionSkip NackAction = 3 // NackActionStop stop the subscription. NackActionStop NackAction = 4 )
type NoStream ¶
type NoStream struct{}
NoStream means the stream being written to should not yet exist.
type NodePreference ¶
type NodePreference string
NodePreference indicates which order of preferred nodes for connecting to.
const ( // NodePreferenceLeader When attempting connection, prefers leader nodes. NodePreferenceLeader NodePreference = "Leader" // NodePreferenceFollower When attempting connection, prefers follower nodes. NodePreferenceFollower NodePreference = "Follower" // NodePreferenceReadOnlyReplica When attempting connection, prefers read only replica nodes. NodePreferenceReadOnlyReplica NodePreference = "ReadOnlyReplica" // NodePreferenceRandom When attempting connection, has no node preference. NodePreferenceRandom NodePreference = "Random" )
func (NodePreference) String ¶
func (nodePreference NodePreference) String() string
type PersistentAllSubscriptionOptions ¶
type PersistentAllSubscriptionOptions struct { // Persistent subscription's request. Settings *PersistentSubscriptionSettings // Starting position of the subscription. StartFrom AllPosition // Max search window. MaxSearchWindow int // Applies a server-side filter to determine if an event of the subscription should be yielded. Filter *SubscriptionFilter // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
PersistentAllSubscriptionOptions options for most of the persistent subscription requests.
type PersistentStreamSubscriptionOptions ¶
type PersistentStreamSubscriptionOptions struct { // Persistent subscription's request. Settings *PersistentSubscriptionSettings // Starting position of the subscription. StartFrom StreamPosition // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
PersistentStreamSubscriptionOptions options for most of the persistent subscription requests.
type PersistentSubscription ¶
type PersistentSubscription struct {
// contains filtered or unexported fields
}
PersistentSubscription persistent subscription handle.
func (*PersistentSubscription) Ack ¶
func (connection *PersistentSubscription) Ack(messages ...*ResolvedEvent) error
Ack acknowledges events have been successfully processed.
func (*PersistentSubscription) Close ¶
func (connection *PersistentSubscription) Close() error
Close drops the persistent subscription and free allocated resources.
func (*PersistentSubscription) Nack ¶
func (connection *PersistentSubscription) Nack(reason string, action NackAction, messages ...*ResolvedEvent) error
Nack acknowledges events failed processing.
func (*PersistentSubscription) Recv ¶
func (connection *PersistentSubscription) Recv() *PersistentSubscriptionEvent
Recv awaits for the next incoming persistent subscription event.
type PersistentSubscriptionConnectionInfo ¶
type PersistentSubscriptionConnectionInfo struct { // Origin of this connection. From string `json:"from"` // Connection's username. Username string `json:"username"` // Average events per second on this connection. AverageItemsPerSecond float64 `json:"averageItemsPerSecond"` // Total items on this connection. TotalItemsProcessed int64 `json:"totalItemsProcessed"` // Number of items seen since last measurement on this connection. CountSinceLastMeasurement int64 `json:"countSinceLastMeasurement"` // Number of available slots. AvailableSlots int64 `json:"availableSlots"` // Number of in flight messages on this connection. InFlightMessages int64 `json:"inFlightMessages"` // Connection's name. ConnectionName string `json:"connectionName"` // Timing measurements for the connection. ExtraStatistics []PersistentSubscriptionMeasurement `json:"extraStatistics"` }
PersistentSubscriptionConnectionInfo holds an active persistent subscription connection info.
type PersistentSubscriptionEvent ¶
type PersistentSubscriptionEvent struct { // When EventStoreDB sends an event to the subscription. EventAppeared *EventAppeared // When the subscription is dropped. SubscriptionDropped *SubscriptionDropped // When a checkpoint was created. CheckPointReached *Position }
PersistentSubscriptionEvent used to handle persistent subscription notifications raised throughout its lifecycle.
type PersistentSubscriptionInfo ¶
type PersistentSubscriptionInfo struct { // The source of events for the subscription. EventSource string // The group name given on creation. GroupName string // The current status of the subscription. Status string // Active connections to the subscription. Connections []PersistentSubscriptionConnectionInfo // Persistent subscription's settings. Settings *PersistentSubscriptionSettings // Persistent subscription's stats. Stats *PersistentSubscriptionStats }
PersistentSubscriptionInfo represents persistent subscription info.
type PersistentSubscriptionMeasurement ¶
type PersistentSubscriptionMeasurement struct { // Metric name Key string `json:"key"` // Metric value. Value int64 `json:"value"` }
PersistentSubscriptionMeasurement key-value pair of a metric and its value.
type PersistentSubscriptionSettings ¶
type PersistentSubscriptionSettings struct { // Where to start the subscription from. StartFrom interface{} // Resolve linkTo event to their linked events. ResolveLinkTos bool // Enable tracking of latency statistics on this subscription. ExtraStatistics bool // The maximum number of retries (due to timeout) before a message is considered to be parked. Default: 10. MaxRetryCount int32 // The minimum number of messages to process before a checkpoint may be written. Default: 10. CheckpointLowerBound int32 // The maximum number of messages not checkpointed before forcing a checkpoint. Default: 1000. CheckpointUpperBound int32 // The maximum number of subscribers allowed. Default: 0 (Unbounded). MaxSubscriberCount int32 // The size of the buffer (in-memory) listening to live messages as they happen before paging occurs. Default: 500. LiveBufferSize int32 // The number of events read at a time when catching up. Default: 20. ReadBatchSize int32 // The number of events to cache when catching up. Default: 500. HistoryBufferSize int32 // The strategy to use for distributing events to client consumers. ConsumerStrategyName ConsumerStrategy // The amount of time after which to consider a message as timed out and retried. Default 30_000ms. MessageTimeout int32 // The amount of time to try to checkpoint after. Default: 2_000ms. CheckpointAfter int32 }
PersistentSubscriptionSettings persistent subscription settings.
func SubscriptionSettingsDefault ¶
func SubscriptionSettingsDefault() PersistentSubscriptionSettings
SubscriptionSettingsDefault returns a persistent subscription settings with default values.
type PersistentSubscriptionStats ¶
type PersistentSubscriptionStats struct { // Average number of events per seconds. AveragePerSecond int64 // Total number of events processed by this subscription. TotalItems int64 // Number of events seen since last measurement on this subscription. CountSinceLastMeasurement int64 // The revision number of the last checkpoint. LastCheckpointedEventRevision *uint64 // The revision number of the last known. LastKnownEventRevision *uint64 // The transaction log position of the last checkpoint. LastCheckpointedPosition *Position // The transaction log position of the last known event. LastKnownPosition *Position // Number of events in the read buffer. ReadBufferCount int64 // Number of events in the live buffer. LiveBufferCount int64 // Number of events in the retry buffer. RetryBufferCount int64 // Current in flight messages across the persistent subscription group. TotalInFlightMessages int64 // Current number of outstanding messages. OutstandingMessagesCount int64 // The current number of parked messages. ParkedMessagesCount int64 }
PersistentSubscriptionStats represents processing-related persistent subscription statistics.
type ReadAllOptions ¶
type ReadAllOptions struct { // Direction of the read request. Direction Direction // Starting position of the read request. From AllPosition // Whether the read request should resolve linkTo events to their linked events. ResolveLinkTos bool // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
ReadAllOptions options of the read $all request.
type ReadStream ¶
type ReadStream struct {
// contains filtered or unexported fields
}
ReadStream read stream iterator.
func (*ReadStream) Close ¶
func (stream *ReadStream) Close()
Close closes the iterator and release allocated resources.
func (*ReadStream) Recv ¶
func (stream *ReadStream) Recv() (*ResolvedEvent, error)
Recv awaits for the next incoming event.
type ReadStreamOptions ¶
type ReadStreamOptions struct { // Direction of the read request. Direction Direction // Starting position of the read request. From StreamPosition // Whether the read request should resolve linkTo events to their linked events. ResolveLinkTos bool // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
ReadStreamOptions options of the read stream request.
type RecordedEvent ¶
type RecordedEvent struct { // Event's id. EventID uuid.UUID // Event's type. EventType string // Event's content type. ContentType string // The stream that event belongs to. StreamID string // The event's revision number. EventNumber uint64 // The event's transaction log position. Position Position // When the event was created. CreatedDate time.Time // The event's payload data. Data []byte // The event's system metadata. SystemMetadata map[string]string // The event user-defined metadata. UserMetadata []byte }
RecordedEvent represents a previously written event.
type ReplayParkedMessagesOptions ¶
type ReplayParkedMessagesOptions struct { // Asks for authenticated request. Authenticated *Credentials // Replays the parked messages until the event revision within the parked messages stream is reached. StopAt int // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
ReplayParkedMessagesOptions options of the replay parked messages request.
type ResolvedEvent ¶
type ResolvedEvent struct { // The link event if the original event is a link. Link *RecordedEvent // The event, or the resolved linked event if the original event is a link. Event *RecordedEvent // The optional commit position of the event. Commit *uint64 }
ResolvedEvent represents an event with a potential link.
func (ResolvedEvent) OriginalEvent ¶
func (resolved ResolvedEvent) OriginalEvent() *RecordedEvent
OriginalEvent returns the event that was read or which triggered the subscription.
func (ResolvedEvent) OriginalStreamRevision ¶ added in v3.3.0
func (resolved ResolvedEvent) OriginalStreamRevision() StreamRevision
OriginalStreamRevision returns the stream revision of the original event.
type RestartPersistentSubscriptionSubsystemOptions ¶
type RestartPersistentSubscriptionSubsystemOptions struct { // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
RestartPersistentSubscriptionSubsystemOptions options of the restart persistent subscription subsystem request.
type ServerVersion ¶
ServerVersion Represents the version of an EventStoreDB node.
type StreamMetadata ¶
type StreamMetadata struct {
// contains filtered or unexported fields
}
StreamMetadata Represents stream metadata with strongly typed properties for system values and a dictionary-like interface for custom values.
func StreamMetadataFromJson ¶
func StreamMetadataFromJson(bytes []byte) (*StreamMetadata, error)
StreamMetadataFromJson deserializes a JSON byte array into a StreamMetadata object.
func (*StreamMetadata) Acl ¶
func (m *StreamMetadata) Acl() interface{}
Acl The Access Control List of the stream (ACL).
func (*StreamMetadata) AddCustomProperty ¶
func (m *StreamMetadata) AddCustomProperty(name string, value interface{})
AddCustomProperty Key-value pair of a key to JSON for user-provider metadata.
func (*StreamMetadata) CacheControl ¶
func (m *StreamMetadata) CacheControl() *time.Duration
CacheControl The amount of time for which the stream head is cacheable (in seconds).
func (*StreamMetadata) IsSystemStreamAcl ¶
func (m *StreamMetadata) IsSystemStreamAcl() bool
IsSystemStreamAcl Checks if the ACL is set to system default.
func (*StreamMetadata) IsUserStreamAcl ¶
func (m *StreamMetadata) IsUserStreamAcl() bool
IsUserStreamAcl Checks if the ACL is set to users default.
func (*StreamMetadata) MaxAge ¶
func (m *StreamMetadata) MaxAge() *time.Duration
MaxAge The maximum age of events allowed in the stream.
func (*StreamMetadata) MaxCount ¶
func (m *StreamMetadata) MaxCount() *uint64
MaxCount The maximum number of events allowed in the stream.
func (*StreamMetadata) SetAcl ¶
func (m *StreamMetadata) SetAcl(value interface{})
SetAcl The Access Control List of the stream (ACL).
func (*StreamMetadata) SetCacheControl ¶
func (m *StreamMetadata) SetCacheControl(value time.Duration)
SetCacheControl The amount of time for which the stream head is cacheable (in seconds).
func (*StreamMetadata) SetMaxAge ¶
func (m *StreamMetadata) SetMaxAge(value time.Duration)
SetMaxAge The maximum age of events allowed in the stream.
func (*StreamMetadata) SetMaxCount ¶
func (m *StreamMetadata) SetMaxCount(value uint64)
SetMaxCount The maximum number of events allowed in the stream.
func (*StreamMetadata) SetTruncateBefore ¶
func (m *StreamMetadata) SetTruncateBefore(value uint64)
SetTruncateBefore The event number from which previous events can be scavenged. This is used to implement deletions of streams.
func (*StreamMetadata) StreamAcl ¶
func (m *StreamMetadata) StreamAcl() *Acl
StreamAcl The Access Control List of the stream (ACL).
func (*StreamMetadata) ToJson ¶
func (m *StreamMetadata) ToJson() ([]byte, error)
ToJson serializes StreamMetadata to JSON.
func (*StreamMetadata) TruncateBefore ¶
func (m *StreamMetadata) TruncateBefore() *uint64
TruncateBefore The event number from which previous events can be scavenged. This is used to implement deletions of streams.
type StreamPosition ¶
type StreamPosition interface {
// contains filtered or unexported methods
}
StreamPosition represents a logical position in a stream.
type StreamRevision ¶
type StreamRevision struct {
Value uint64
}
StreamRevision returns a stream position at a specific event revision.
func Revision ¶
func Revision(value uint64) StreamRevision
Revision returns a stream position at a specific event revision.
type SubscribeToAllOptions ¶
type SubscribeToAllOptions struct { // Starting position of the subscribe request. From AllPosition // Whether the read request should resolve linkTo events to their linked events. ResolveLinkTos bool // Max search window. MaxSearchWindow int // Checkpoint interval. CheckpointInterval int // Applies a server-side filter to determine if an event of the subscription should be yielded. Filter *SubscriptionFilter // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
SubscribeToAllOptions options of the subscribe to $all request.
type SubscribeToPersistentSubscriptionOptions ¶
type SubscribeToPersistentSubscriptionOptions struct { // Buffer size. BufferSize uint32 // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
SubscribeToPersistentSubscriptionOptions options of the subscribe to persistent subscription request.
type SubscribeToStreamOptions ¶
type SubscribeToStreamOptions struct { // Starting position of the subscribe request. From StreamPosition // Whether the read request should resolve linkTo events to their linked events. ResolveLinkTos bool // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
SubscribeToStreamOptions options of the subscribe to stream request.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is a subscription's handle.
func (*Subscription) Close ¶
func (sub *Subscription) Close() error
Close drops the subscription and cleans up allocated resources.
func (*Subscription) Recv ¶
func (sub *Subscription) Recv() *SubscriptionEvent
Recv awaits for the next incoming subscription's event.
type SubscriptionDropped ¶
type SubscriptionDropped struct { // Error that caused the drop. Error error }
SubscriptionDropped when a subscription is dropped.
type SubscriptionEvent ¶
type SubscriptionEvent struct { // When EventStoreDB sends an event to the subscription. EventAppeared *ResolvedEvent // When the subscription is dropped. SubscriptionDropped *SubscriptionDropped // When a checkpoint was created. CheckPointReached *Position // When an event is caught up CaughtUp *Subscription // When an event has fallen behind FellBehind *Subscription }
SubscriptionEvent used to handle catch-up subscription notifications raised throughout its lifecycle.
type SubscriptionFilter ¶
type SubscriptionFilter struct { // Type of filter. Type FilterType // String prefixes. Prefixes []string // Regex expression. Regex string }
SubscriptionFilter is a filter that targets $all stream.
func ExcludeSystemEventsFilter ¶
func ExcludeSystemEventsFilter() *SubscriptionFilter
ExcludeSystemEventsFilter filters out event's with system's event types, i.e. event types starting with `$`.
type SubscriptionFilterOptions ¶
type SubscriptionFilterOptions struct { MaxSearchWindow int CheckpointInterval int SubscriptionFilter *SubscriptionFilter }
type TombstoneStreamOptions ¶
type TombstoneStreamOptions struct { // Asks the server to check that the stream receiving the event is at the given expected version. ExpectedRevision ExpectedRevision // Asks for authenticated request. Authenticated *Credentials // A length of time to use for gRPC deadlines. Deadline *time.Duration // Requires the request to be performed by the leader of the cluster. RequiresLeader bool }
TombstoneStreamOptions options of the tombstone stream request.
type WriteResult ¶
WriteResult ...
Source Files ¶
- append_options.go
- client.go
- configuration.go
- credentials.go
- delete_options.go
- delete_result.go
- endpoint.go
- errors.go
- event_data.go
- impl.go
- logging.go
- operations.go
- options.go
- persistent_options.go
- persistent_subscription.go
- persistent_subscription_client.go
- persistent_subscription_http_client.go
- position.go
- protobuf_utils.go
- read_options.go
- reads.go
- recorded_event.go
- resolved_event.go
- revision.go
- subscribe_options.go
- subscription_event.go
- subscriptions.go
- tombstone_options.go
- types.go
- write_result.go