esdb

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 29, 2021 License: Apache-2.0 Imports: 29 Imported by: 73

Documentation

Index

Constants

View Source
const (
	SchemeDefaultPort       = 2113
	SchemaHostsSeparator    = ","
	SchemeName              = "esdb"
	SchemeNameWithDiscover  = "esdb+discover"
	SchemePathSeparator     = "/"
	SchemePortSeparator     = ":"
	SchemeQuerySeparator    = "?"
	SchemeSeparator         = "://"
	SchemeSettingSeparator  = "&"
	SchemeUserInfoSeparator = "@"
)
View Source
const (
	UserStreamAcl   = "$userStreamAcl"
	SystemStreamAcl = "$systemStreamAcl"
)
View Source
const (
	EventFilterType   FilterType = 0
	StreamFilterType  FilterType = 1
	NoMaxSearchWindow int        = -1
)
View Source
const MAX_ACK_COUNT = 2000
View Source
const SUBSCRIBER_COUNT_UNLIMITED = 0

Variables

View Source
var ErrPermissionDenied = errors.New("PermissionDenied")

ErrPermissionDenied ...

View Source
var ErrStreamNotFound = errors.New("Failed to perform read because the stream was not found")

ErrStreamNotFound is returned when a read requests gets a stream not found response from the EventStore. Example usage: ```go events, err := esdb.ReadStream(...)

if err == errors.ErrStreamNotFound {
  // handle the stream not being found
}

```

View Source
var ErrWrongExpectedStreamRevision = errors.New("WrongExpectedStreamRevision")

ErrWrongExpectedStreamRevision ...

View Source
var PersistentSubscriptionExceedsMaxMessageCountError = PersistentSubscriptionError{
	Code: 9,
}
View Source
var PersistentSubscriptionToAllCanSetOnlyRegexOrPrefixError = PersistentSubscriptionError{
	Code: 1,
}
View Source
var PersistentSubscriptionToAllMustProvideRegexOrPrefixError = PersistentSubscriptionError{
	Code: 0,
}

Functions

func EventIDFromProto

func EventIDFromProto(recordedEvent *api.ReadResp_ReadEvent_RecordedEvent) uuid.UUID

EventIDFromProto ...

func NewGrpcClient

func NewGrpcClient(config Configuration) *grpcClient

func PersistentSubscriptionDeletionFailedError

func PersistentSubscriptionDeletionFailedError(err error) error

func PersistentSubscriptionFailedCreationError

func PersistentSubscriptionFailedCreationError(err error) error

func PersistentSubscriptionFailedReceiveStreamInitError

func PersistentSubscriptionFailedReceiveStreamInitError(err error) error

func PersistentSubscriptionFailedSendStreamInitError

func PersistentSubscriptionFailedSendStreamInitError(err error) error

func PersistentSubscriptionFailedToInitClientError

func PersistentSubscriptionFailedToInitClientError(err error) error

func PersistentSubscriptionNoConfirmationError

func PersistentSubscriptionNoConfirmationError(err error) error

func PersistentSubscriptionUpdateFailedError

func PersistentSubscriptionUpdateFailedError(err error) error

Types

type Acl

type Acl struct {
	// contains filtered or unexported fields
}

func AclFromMap

func AclFromMap(props map[string]interface{}) (Acl, error)

func (*Acl) AddDeleteRoles

func (a *Acl) AddDeleteRoles(roles ...string)

func (*Acl) AddMetaReadRoles

func (a *Acl) AddMetaReadRoles(roles ...string)

func (*Acl) AddMetaWriteRoles

func (a *Acl) AddMetaWriteRoles(roles ...string)

func (*Acl) AddReadRoles

func (a *Acl) AddReadRoles(roles ...string)

func (*Acl) AddWriteRoles

func (a *Acl) AddWriteRoles(roles ...string)

func (*Acl) DeleteRoles

func (a *Acl) DeleteRoles() []string

func (*Acl) MetaReadRoles

func (a *Acl) MetaReadRoles() []string

func (*Acl) MetaWriteRoles

func (a *Acl) MetaWriteRoles() []string

func (*Acl) ReadRoles

func (a *Acl) ReadRoles() []string

func (Acl) ToMap

func (a Acl) ToMap() map[string]interface{}

func (*Acl) WriteRoles

func (a *Acl) WriteRoles() []string

type AllPosition

type AllPosition interface {
	// contains filtered or unexported methods
}

type Any

type Any struct{}

type AppendToStreamOptions

type AppendToStreamOptions struct {
	ExpectedRevision ExpectedRevision
	Authenticated    *Credentials
}

type Client

type Client struct {
	Config *Configuration
	// contains filtered or unexported fields
}

Client ...

func NewClient

func NewClient(configuration *Configuration) (*Client, error)

NewClient ...

func (*Client) AppendToStream

func (client *Client) AppendToStream(
	context context.Context,
	streamID string,
	opts AppendToStreamOptions,
	events ...EventData,
) (*WriteResult, error)

AppendToStream ...

func (*Client) Close

func (client *Client) Close() error

Close ...

func (*Client) ConnectToPersistentSubscription

func (client *Client) ConnectToPersistentSubscription(
	ctx context.Context,
	streamName string,
	groupName string,
	options ConnectToPersistentSubscriptionOptions,
) (*PersistentSubscription, error)

ConnectToPersistentSubscription ...

func (*Client) ConnectToPersistentSubscriptionToAll added in v1.0.2

func (client *Client) ConnectToPersistentSubscriptionToAll(
	ctx context.Context,
	groupName string,
	options ConnectToPersistentSubscriptionOptions,
) (*PersistentSubscription, error)

func (*Client) CreatePersistentSubscription

func (client *Client) CreatePersistentSubscription(
	ctx context.Context,
	streamName string,
	groupName string,
	options PersistentStreamSubscriptionOptions,
) error

func (*Client) CreatePersistentSubscriptionAll

func (client *Client) CreatePersistentSubscriptionAll(
	ctx context.Context,
	groupName string,
	options PersistentAllSubscriptionOptions,
) error

func (*Client) DeletePersistentSubscription

func (client *Client) DeletePersistentSubscription(
	ctx context.Context,
	streamName string,
	groupName string,
	options DeletePersistentSubscriptionOptions,
) error

func (*Client) DeletePersistentSubscriptionAll

func (client *Client) DeletePersistentSubscriptionAll(
	ctx context.Context,
	groupName string,
	options DeletePersistentSubscriptionOptions,
) error

func (*Client) DeleteStream

func (client *Client) DeleteStream(
	context context.Context,
	streamID string,
	opts DeleteStreamOptions,
) (*DeleteResult, error)

DeleteStream ...

func (*Client) GetStreamMetadata

func (client *Client) GetStreamMetadata(
	context context.Context,
	streamID string,
	opts ReadStreamOptions,
) (*StreamMetadata, error)

func (*Client) ReadAll

func (client *Client) ReadAll(
	context context.Context,
	opts ReadAllOptions,
	count uint64,
) (*ReadStream, error)

ReadAll ...

func (*Client) ReadStream

func (client *Client) ReadStream(
	context context.Context,
	streamID string,
	opts ReadStreamOptions,
	count uint64,
) (*ReadStream, error)

ReadStream ...

func (*Client) SetStreamMetadata

func (client *Client) SetStreamMetadata(
	context context.Context,
	streamID string,
	opts AppendToStreamOptions,
	metadata StreamMetadata,
) (*WriteResult, error)

func (*Client) SubscribeToAll

func (client *Client) SubscribeToAll(
	ctx context.Context,
	opts SubscribeToAllOptions,
) (*Subscription, error)

SubscribeToAll ...

func (*Client) SubscribeToStream

func (client *Client) SubscribeToStream(
	ctx context.Context,
	streamID string,
	opts SubscribeToStreamOptions,
) (*Subscription, error)

SubscribeToStream ...

func (*Client) TombstoneStream

func (client *Client) TombstoneStream(
	context context.Context,
	streamID string,
	opts TombstoneStreamOptions,
) (*DeleteResult, error)

Tombstone ...

func (*Client) UpdatePersistentStreamSubscription

func (client *Client) UpdatePersistentStreamSubscription(
	ctx context.Context,
	streamName string,
	groupName string,
	options PersistentStreamSubscriptionOptions,
) error

func (*Client) UpdatePersistentSubscriptionAll

func (client *Client) UpdatePersistentSubscriptionAll(
	ctx context.Context,
	groupName string,
	options PersistentAllSubscriptionOptions,
) error

type Configuration

type Configuration struct {
	// The URI of the EventStoreDB. Use this when connecting to a single node.
	// Example: localhost:2113
	Address string

	// An array of end points used to seed gossip.
	GossipSeeds []*EndPoint

	// Disable communicating over a secure channel.
	DisableTLS bool // Defaults to false.

	// The NodePreference to use when connecting.
	NodePreference NodePreference

	// The username to use for authenticating against the EventStoreDB instance.
	Username string

	// The password to use for authenticating against the EventStoreDB instance.
	Password string

	// RootCAs defines the set of root certificate authorities
	// that clients use when verifying server certificates.
	// If RootCAs is nil, TLS uses the host's root CA set.
	RootCAs *x509.CertPool // Defaults to nil.

	// Allows to skip certificate validation.
	SkipCertificateVerification bool // Defaults to false.

	// The maximum number of times to attempt end point discovery.
	MaxDiscoverAttempts int // Defaults to 10.

	// The polling interval (in milliseconds) used to discover the end point.
	DiscoveryInterval int // Defaults to 100 milliseconds.

	// The amount of time (in seconds) after which an attempt to discover gossip will fail.
	GossipTimeout int // Defaults to 5 seconds.

	// Specifies if DNS discovery should be used.
	DnsDiscover bool // Defaults to false.

	// The amount of time (in milliseconds) to wait after which a keepalive ping is sent on the transport.
	// If set below 10s, a minimum value of 10s will be used instead. Use -1 to disable. Use -1 to disable.
	KeepAliveInterval time.Duration // Defaults to 10 seconds.

	// The amount of time (in milliseconds) the sender of the keep alive ping waits for an acknowledgement.
	KeepAliveTimeout time.Duration // Defaults to 10 seconds.
}

Configuration describes how to connect to an instance of EventStoreDB.

func ParseConnectionString

func ParseConnectionString(connectionString string) (*Configuration, error)

ParseConnectionString creates a Configuration based on an EventStoreDb connection string.

type ConnectToPersistentSubscriptionOptions

type ConnectToPersistentSubscriptionOptions struct {
	BatchSize     uint32
	Authenticated *Credentials
}

type ConsumerStrategy

type ConsumerStrategy int32
const (
	ConsumerStrategy_RoundRobin          ConsumerStrategy = 0
	ConsumerStrategy_DispatchToSingle    ConsumerStrategy = 1
	ConsumerStrategy_Pinned              ConsumerStrategy = 2
	ConsumerStrategy_PinnedByCorrelation ConsumerStrategy = 3
)

type ContentType

type ContentType int
const (
	BinaryContentType ContentType = 0
	JsonContentType   ContentType = 1
)

type Credentials

type Credentials struct {
	Login    string
	Password string
}

type DeletePersistentSubscriptionOptions

type DeletePersistentSubscriptionOptions struct {
	Authenticated *Credentials
}

type DeleteResult

type DeleteResult struct {
	Position Position
}

type DeleteStreamOptions

type DeleteStreamOptions struct {
	ExpectedRevision ExpectedRevision
	Authenticated    *Credentials
}

type Direction

type Direction int

Direction ...

const (
	// Forwards ...
	Forwards Direction = iota
	// Backwards ...
	Backwards
)

type End

type End struct {
}

type EndPoint

type EndPoint struct {
	Host string
	Port uint16
}

func ParseEndPoint

func ParseEndPoint(s string) (*EndPoint, error)

func (*EndPoint) String

func (e *EndPoint) String() string

type EventData

type EventData struct {
	EventID     uuid.UUID
	EventType   string
	ContentType ContentType
	Data        []byte
	Metadata    []byte
}

EventData ...

type ExpectedRevision

type ExpectedRevision interface {
	// contains filtered or unexported methods
}

type FilterType

type FilterType int

type Nack_Action

type Nack_Action int32
const (
	Nack_Unknown Nack_Action = 0
	Nack_Park    Nack_Action = 1
	Nack_Retry   Nack_Action = 2
	Nack_Skip    Nack_Action = 3
	Nack_Stop    Nack_Action = 4
)

type NoStream

type NoStream struct{}

type NodePreference

type NodePreference string
const (
	NodePreference_Leader          NodePreference = "Leader"
	NodePreference_Follower        NodePreference = "Follower"
	NodePreference_ReadOnlyReplica NodePreference = "ReadOnlyReplica"
	NodePreference_Random          NodePreference = "Random"
)

func (NodePreference) String

func (nodePreference NodePreference) String() string

type PersistentAllSubscriptionOptions

type PersistentAllSubscriptionOptions struct {
	Settings           *SubscriptionSettings
	From               AllPosition
	MaxSearchWindow    int
	CheckpointInterval int
	Filter             *SubscriptionFilter
	Authenticated      *Credentials
}

type PersistentStreamSubscriptionOptions

type PersistentStreamSubscriptionOptions struct {
	Settings      *SubscriptionSettings
	From          StreamPosition
	Authenticated *Credentials
}

type PersistentSubscription

type PersistentSubscription struct {
	// contains filtered or unexported fields
}

func NewPersistentSubscription

func NewPersistentSubscription(
	client persistent.PersistentSubscriptions_ReadClient,
	subscriptionId string,
	cancel context.CancelFunc,
) *PersistentSubscription

func (*PersistentSubscription) Ack

func (connection *PersistentSubscription) Ack(messages ...*ResolvedEvent) error

func (*PersistentSubscription) Close

func (connection *PersistentSubscription) Close() error

func (*PersistentSubscription) Nack

func (connection *PersistentSubscription) Nack(reason string, action Nack_Action, messages ...*ResolvedEvent) error

func (*PersistentSubscription) Recv

func (connection *PersistentSubscription) Recv() *SubscriptionEvent

type PersistentSubscriptionError

type PersistentSubscriptionError struct {
	Code int
	Err  error
}

func (*PersistentSubscriptionError) Error

func (*PersistentSubscriptionError) Is

func (e *PersistentSubscriptionError) Is(target error) bool

func (*PersistentSubscriptionError) Unwrap

func (e *PersistentSubscriptionError) Unwrap() error

type Position

type Position struct {
	Commit  uint64
	Prepare uint64
}

Position ...

var EmptyPosition Position = Position{Commit: ^uint64(0), Prepare: ^uint64(0)}

EmptyPosition ...

var EndPosition Position = Position{Commit: ^uint64(0), Prepare: ^uint64(0)}

EndPosition ...

var StartPosition Position = Position{Commit: 0, Prepare: 0}

StartPosition ...

type ReadAllOptions

type ReadAllOptions struct {
	Direction      Direction
	From           AllPosition
	ResolveLinkTos bool
	Authenticated  *Credentials
}

type ReadStream

type ReadStream struct {
	// contains filtered or unexported fields
}

func (*ReadStream) Close

func (stream *ReadStream) Close()

func (*ReadStream) Recv

func (stream *ReadStream) Recv() (*ResolvedEvent, error)

type ReadStreamOptions

type ReadStreamOptions struct {
	Direction      Direction
	From           StreamPosition
	ResolveLinkTos bool
	Authenticated  *Credentials
}

type RecordedEvent

type RecordedEvent struct {
	EventID        uuid.UUID
	EventType      string
	ContentType    string
	StreamID       string
	EventNumber    uint64
	Position       Position
	CreatedDate    time.Time
	Data           []byte
	SystemMetadata map[string]string
	UserMetadata   []byte
}

RecordedEvent ...

type ResolvedEvent

type ResolvedEvent struct {
	Link   *RecordedEvent
	Event  *RecordedEvent
	Commit *uint64
}

func (ResolvedEvent) OriginalEvent

func (resolved ResolvedEvent) OriginalEvent() *RecordedEvent

type Start

type Start struct {
}

type StreamDeletedError

type StreamDeletedError struct {
	StreamName string
}

func (*StreamDeletedError) Error

func (e *StreamDeletedError) Error() string

type StreamExists

type StreamExists struct{}

type StreamMetadata

type StreamMetadata struct {
	// contains filtered or unexported fields
}

func StreamMetadataFromMap

func StreamMetadataFromMap(props map[string]interface{}) (StreamMetadata, error)

func (*StreamMetadata) Acl

func (m *StreamMetadata) Acl() interface{}

func (*StreamMetadata) AddCustomProperty

func (m *StreamMetadata) AddCustomProperty(name string, value interface{})

func (*StreamMetadata) CacheControl

func (m *StreamMetadata) CacheControl() *time.Duration

func (*StreamMetadata) IsSystemStreamAcl

func (m *StreamMetadata) IsSystemStreamAcl() bool

func (*StreamMetadata) IsUserStreamAcl

func (m *StreamMetadata) IsUserStreamAcl() bool

func (*StreamMetadata) MaxAge

func (m *StreamMetadata) MaxAge() *time.Duration

func (*StreamMetadata) MaxCount

func (m *StreamMetadata) MaxCount() *uint64

func (*StreamMetadata) SetAcl

func (m *StreamMetadata) SetAcl(value interface{})

func (*StreamMetadata) SetCacheControl

func (m *StreamMetadata) SetCacheControl(value time.Duration)

func (*StreamMetadata) SetMaxAge

func (m *StreamMetadata) SetMaxAge(value time.Duration)

func (*StreamMetadata) SetMaxCount

func (m *StreamMetadata) SetMaxCount(value uint64)

func (*StreamMetadata) SetTruncateBefore

func (m *StreamMetadata) SetTruncateBefore(value uint64)

func (*StreamMetadata) StreamAcl

func (m *StreamMetadata) StreamAcl() *Acl

func (StreamMetadata) ToMap

func (m StreamMetadata) ToMap() (map[string]interface{}, error)

func (*StreamMetadata) TruncateBefore

func (m *StreamMetadata) TruncateBefore() *uint64

type StreamPosition

type StreamPosition interface {
	// contains filtered or unexported methods
}

type StreamRevision

type StreamRevision struct {
	Value uint64
}

func Revision

func Revision(value uint64) StreamRevision

type SubscribeToAllOptions

type SubscribeToAllOptions struct {
	From               AllPosition
	ResolveLinkTos     bool
	MaxSearchWindow    int
	CheckpointInterval int
	Filter             *SubscriptionFilter
	Authenticated      *Credentials
}

type SubscribeToStreamOptions

type SubscribeToStreamOptions struct {
	From           StreamPosition
	ResolveLinkTos bool
	Authenticated  *Credentials
}

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

func NewSubscription

func NewSubscription(client *Client, cancel context.CancelFunc, inner api.Streams_ReadClient, id string) *Subscription

func (*Subscription) Close

func (sub *Subscription) Close() error

func (*Subscription) Id

func (sub *Subscription) Id() string

func (*Subscription) Recv

func (sub *Subscription) Recv() *SubscriptionEvent

type SubscriptionDropped

type SubscriptionDropped struct {
	Error error
}

type SubscriptionEvent

type SubscriptionEvent struct {
	EventAppeared       *ResolvedEvent
	SubscriptionDropped *SubscriptionDropped
	CheckPointReached   *Position
}

type SubscriptionFilter

type SubscriptionFilter struct {
	Type     FilterType
	Prefixes []string
	Regex    string
}

func ExcludeSystemEventsFilter

func ExcludeSystemEventsFilter() *SubscriptionFilter

type SubscriptionFilterOptions

type SubscriptionFilterOptions struct {
	MaxSearchWindow    int
	CheckpointInterval int
	SubscriptionFilter *SubscriptionFilter
}

type SubscriptionSettings

type SubscriptionSettings struct {
	ResolveLinkTos        bool
	ExtraStatistics       bool
	MaxRetryCount         int32
	MinCheckpointCount    int32
	MaxCheckpointCount    int32
	MaxSubscriberCount    int32
	LiveBufferSize        int32
	ReadBatchSize         int32
	HistoryBufferSize     int32
	NamedConsumerStrategy ConsumerStrategy
	MessageTimeoutInMs    int32
	CheckpointAfterInMs   int32
}

func SubscriptionSettingsDefault

func SubscriptionSettingsDefault() SubscriptionSettings

type TombstoneStreamOptions

type TombstoneStreamOptions struct {
	ExpectedRevision ExpectedRevision
	Authenticated    *Credentials
}

type WriteResult

type WriteResult struct {
	CommitPosition      uint64
	PreparePosition     uint64
	NextExpectedVersion uint64
}

WriteResult ...

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL