Documentation
¶
Overview ¶
Package event_streams provides interaction with EventStoreDb event streams. Before accessing streams a grpc connection needs to be established with EventStore through github.com/pivonroll/EventStore-Client-Go/core/connection package.
Index ¶
- Constants
- type AppendResponse
- type BatchAppendResponse
- func (response BatchAppendResponse) GetCorrelationId() uuid.UUID
- func (response BatchAppendResponse) GetCurrentRevision() uint64
- func (response BatchAppendResponse) GetExpectedStreamPosition() (uint64, bool)
- func (response BatchAppendResponse) GetPosition() (Position, bool)
- func (response BatchAppendResponse) HasExpectedStreamPosition() bool
- func (response BatchAppendResponse) IsCurrentRevisionNoStream() bool
- func (response BatchAppendResponse) IsExpectedStreamPositionAny() bool
- func (response BatchAppendResponse) IsExpectedStreamPositionNoStream() bool
- func (response BatchAppendResponse) IsExpectedStreamPositionStreamExists() bool
- type BatchError
- type Client
- func (client *Client) AppendToStream(ctx context.Context, streamID string, ...) (AppendResponse, errors.Error)
- func (client *Client) BatchAppendToStream(ctx context.Context, streamId string, ...) (BatchAppendResponse, errors.Error)
- func (client *Client) BatchAppendToStreamWithCorrelationId(ctx context.Context, streamId string, ...) (BatchAppendResponse, errors.Error)
- func (client *Client) DeleteStream(ctx context.Context, streamID string, ...) (DeleteResponse, errors.Error)
- func (client *Client) GetStreamMetadata(ctx context.Context, streamId string) (StreamMetadataResult, errors.Error)
- func (client *Client) GetStreamReader(ctx context.Context, streamID string, direction ReadDirection, ...) (StreamReader, errors.Error)
- func (client *Client) GetStreamReaderForStreamAll(ctx context.Context, direction ReadDirection, ...) (StreamReader, errors.Error)
- func (client *Client) ReadEventsFromStreamAll(ctx context.Context, direction ReadDirection, ...) (ResolvedEventList, errors.Error)
- func (client *Client) ReadStreamEvents(ctx context.Context, streamID string, direction ReadDirection, ...) (ResolvedEventList, errors.Error)
- func (client *Client) SetStreamMetadata(ctx context.Context, streamID string, ...) (AppendResponse, errors.Error)
- func (client *Client) SubscribeToFilteredStreamAll(ctx context.Context, position stream_revision.IsReadPositionAll, ...) (StreamReader, errors.Error)
- func (client *Client) SubscribeToStream(ctx context.Context, streamID string, ...) (StreamReader, errors.Error)
- func (client *Client) SubscribeToStreamAll(ctx context.Context, position stream_revision.IsReadPositionAll, ...) (StreamReader, errors.Error)
- func (client *Client) TombstoneStream(ctx context.Context, streamID string, ...) (TombstoneResponse, errors.Error)
- type ContentType
- type CustomMetadataType
- type DeleteResponse
- type ErrorDetails
- type Filter
- type FilterByEventType
- type FilterByStreamId
- type FilterNoWindow
- type FilterWindowMax
- type Position
- type PrefixFilterMatcher
- type ProposedEvent
- type ProposedEventList
- type ReadDirection
- type ReadResponse
- type ReadResponseCheckpoint
- type RecordedEvent
- type RegexFilterMatcher
- type ResolvedEvent
- type ResolvedEventList
- type StreamAcl
- type StreamMetadata
- type StreamMetadataResult
- type StreamNotFoundError
- type StreamReader
- type TombstoneResponse
- type WrongExpectedVersion
- func (exception WrongExpectedVersion) Code() errors.ErrorCode
- func (exception WrongExpectedVersion) Error() string
- func (exception WrongExpectedVersion) GetCurrentRevision() (uint64, bool)
- func (exception WrongExpectedVersion) GetExpectedRevision() uint64
- func (exception WrongExpectedVersion) IsCurrentRevisionNoStream() bool
- func (exception WrongExpectedVersion) IsExpectedRevisionAny() bool
- func (exception WrongExpectedVersion) IsExpectedRevisionFinite() bool
- func (exception WrongExpectedVersion) IsExpectedRevisionNoStream() bool
- func (exception WrongExpectedVersion) IsExpectedRevisionStreamExists() bool
Examples ¶
- Client.AppendToStream (WithAnyStreamRevisionWhenStreamExist)
- Client.AppendToStream (WithAnyWhenStreamDoesNotExist)
- Client.AppendToStream (WithExactStreamRevisionWhenStreamExist)
- Client.AppendToStream (WithNoStreamWhenStreamDoesNotExist)
- Client.BatchAppendToStream (StreamDoesNotExist)
- Client.BatchAppendToStream (StreamExists)
- Client.BatchAppendToStreamWithCorrelationId
- Client.DeleteStream (StreamDoesNotExist)
- Client.DeleteStream (StreamExists)
- Client.GetStreamMetadata (IsEmptyIfStreamHasNoMetadata)
- Client.GetStreamMetadata (StreamHasMetadata)
- Client.ReadEventsFromStreamAll (ReadEventsBackwardsFromEnd)
- Client.ReadEventsFromStreamAll (ReadEventsFromStart)
- Client.ReadStreamEvents (ReadEventsBackwardsFromEnd)
- Client.ReadStreamEvents (ReadEventsFromStart)
- Client.ReadStreamEvents (StreamDoesNotExist)
- Client.ReadStreamEvents (StreamIsSoftDeleted)
- Client.SetStreamMetadata (OnNonExistingStream)
- Client.SetStreamMetadata (WhenStreamExists)
- Client.SubscribeToFilteredStreamAll
- Client.SubscribeToStream (CatchesDeletion)
- Client.SubscribeToStream (ReadOldAndNewContentFromStream)
- Client.SubscribeToStream (StreamDoesNotExist)
- Client.SubscribeToStream (StreamExists)
- Client.SubscribeToStreamAll
- Client.TombstoneStream (StreamDoesNotExist)
- Client.TombstoneStream (StreamExists)
Constants ¶
const ( // AppendToStream_FailedToObtainWriterErr indicates that client failed to receive a protobuf append client. AppendToStream_FailedToObtainWriterErr errors.ErrorCode = "AppendToStream_FailedToObtainWriterErr" // AppendToStream_FailedSendHeaderErr indicates that client received an unknown error // when it tried to send a header to a protobuf stream. // Header is sent before client can append any events to a stream. AppendToStream_FailedSendHeaderErr errors.ErrorCode = "AppendToStream_FailedSendHeaderErr" // AppendToStream_FailedSendMessageErr indicates that there was an unknown error received when client // tried to append an event to a EventStoreDB stream. AppendToStream_FailedSendMessageErr errors.ErrorCode = "AppendToStream_FailedSendMessageErr" // AppendToStream_FailedToCloseStreamErr indicates that there was an unknown error when client // tried to close the protobuf stream after it has written all events to an EventStoreDB stream. AppendToStream_FailedToCloseStreamErr errors.ErrorCode = "AppendToStream_FailedToCloseStreamErr" )
const ( // BatchAppendToStream_FailedToObtainWriterErr indicates that client failed to receive a protobuf append client. BatchAppendToStream_FailedToObtainWriterErr errors.ErrorCode = "BatchAppendToStream_FailedToObtainWriterErr" // BatchAppendToStream_FailedSendMessageErr indicates that there was an unknown error received when client // tried to append a chunk of events to a EventStoreDB stream. BatchAppendToStream_FailedSendMessageErr errors.ErrorCode = "BatchAppendToStream_FailedSendMessageErr" // BatchAppendToStream_FailedToCloseStreamErr indicates that there was an unknown error when client // tried to close the protobuf stream after it has written all event chunks to an EventStoreDB stream. BatchAppendToStream_FailedToCloseStreamErr errors.ErrorCode = "BatchAppendToStream_FailedToCloseStreamErr" )
const ( FailedToCreateReaderErr errors.ErrorCode = "FailedToCreateReaderErr" FailedToReceiveSubscriptionResponseErr errors.ErrorCode = "FailedToReceiveSubscriptionResponseErr" )
const (
DefaultCheckpointIntervalMultiplier = uint32(1)
)
const FailedToDeleteStreamErr errors.ErrorCode = "FailedToDeleteStreamErr"
FailedToDeleteStreamErr indicates that client's Client.DeleteStream received an unknown error when it tried to soft-delete an EventStoreDB stream.
const (
FailedToObtainStreamReaderErr errors.ErrorCode = "FailedToObtainStreamReaderErr"
)
FailedToObtainStreamReaderErr indicates that client received an unknown error when it tried to construct a protobuf stream reader client.
const FailedToTombstoneStreamErr errors.ErrorCode = "FailedToTombstoneStreamErr"
FailedToTombstoneStreamErr indicates that client's Client.TombstoneStream received an unknown error when it tried to soft-delete an EventStoreDB stream.
const ReadCountMax = ^uint64(0)
const StreamMetadataType = "$metadata"
const WrongExpectedVersionErr errors.ErrorCode = "WrongExpectedVersionErr"
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AppendResponse ¶
type AppendResponse struct {
// contains filtered or unexported fields
}
AppendResponse is returned from Client.AppendToStream when events are written successfully.
func (AppendResponse) GetCurrentRevision ¶
func (this AppendResponse) GetCurrentRevision() uint64
GetCurrentRevision returns stream's current revision if current revision is not NoStream. If currentRevision is NoStream, it returns 0. Note that stream can have a valid revision 0 if it contains only one event. Use IsCurrentRevisionNoStream to check if current revision of a stream is NoStream.
func (AppendResponse) GetPosition ¶
func (this AppendResponse) GetPosition() (Position, bool)
GetPosition returns a position of last appended event in a stream and a boolean value which indicates if position for last written event was received. If no position was received a zero initialized Position and a false will be returned.
func (AppendResponse) IsCurrentRevisionNoStream ¶
func (this AppendResponse) IsCurrentRevisionNoStream() bool
IsCurrentRevisionNoStream returns true if current revision in append response was set to NoStream. Current revision in response can be NoStream if no events are appended to a non-existing stream.
type BatchAppendResponse ¶
type BatchAppendResponse struct {
// contains filtered or unexported fields
}
BatchAppendResponse is a response returned by EventStoreDB after an entire batch of events (all chunks) were appended to a stream in EventStoreDB.
func (BatchAppendResponse) GetCorrelationId ¶ added in v0.11.0
func (response BatchAppendResponse) GetCorrelationId() uuid.UUID
GetCorrelationId returns a correlation id the client has sent along with a chunk of events to append to EventStoreDB stream.
func (BatchAppendResponse) GetCurrentRevision ¶ added in v0.11.0
func (response BatchAppendResponse) GetCurrentRevision() uint64
GetCurrentRevision returns a current revision of a stream after events were appended to a stream in EventStoreDB.
func (BatchAppendResponse) GetExpectedStreamPosition ¶ added in v0.11.0
func (response BatchAppendResponse) GetExpectedStreamPosition() (uint64, bool)
GetExpectedStreamPosition returns true if response contains finite expected stream position.
func (BatchAppendResponse) GetPosition ¶ added in v0.10.0
func (response BatchAppendResponse) GetPosition() (Position, bool)
GetPosition returns a position of the last appended event to a stream along with a boolean which indicates if EventStoreDB has returned a position or if it has not returned a position. If EventStoreDB has not returned a position, GetPosition returns a zero initialized Position and false.
func (BatchAppendResponse) HasExpectedStreamPosition ¶ added in v0.11.0
func (response BatchAppendResponse) HasExpectedStreamPosition() bool
HasExpectedStreamPosition returns true is response contains expected stream position.
func (BatchAppendResponse) IsCurrentRevisionNoStream ¶ added in v0.11.0
func (response BatchAppendResponse) IsCurrentRevisionNoStream() bool
IsCurrentRevisionNoStream returns true if current revision of a stream returned by EventStore is NoStream.
func (BatchAppendResponse) IsExpectedStreamPositionAny ¶ added in v0.11.0
func (response BatchAppendResponse) IsExpectedStreamPositionAny() bool
IsExpectedStreamPositionAny returns true if expected stream position is Any.
func (BatchAppendResponse) IsExpectedStreamPositionNoStream ¶ added in v0.11.0
func (response BatchAppendResponse) IsExpectedStreamPositionNoStream() bool
IsExpectedStreamPositionNoStream returns true if expected stream position is NoStream.
func (BatchAppendResponse) IsExpectedStreamPositionStreamExists ¶ added in v0.11.0
func (response BatchAppendResponse) IsExpectedStreamPositionStreamExists() bool
IsExpectedStreamPositionStreamExists returns true if expected stream position is StreamExists.
type BatchError ¶ added in v0.10.0
type BatchError struct { ProtoCode int32 // an error code returned by EventStoreDB Message string // informative message of an error returned by EventStoreDB Details []ErrorDetails // various details about an error CorrelationId uuid.UUID // correlation id sent by a client when it tried to append a batch of events to a stream at EventStoreDB StreamId string // stream to which we wanted to append events to // contains filtered or unexported fields }
BatchError is an error which can occur when chunks of events are appended to a stream in EventStoreDB.
func (BatchError) Code ¶ added in v0.10.0
func (b BatchError) Code() errors.ErrorCode
Code returns a code of an error.
func (BatchError) Error ¶ added in v0.10.0
func (b BatchError) Error() string
Error returns a string representation of an error.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client which can interact with EventStoreDB streams.
func NewClient ¶ added in v0.11.0
func NewClient(grpcClient connection.GrpcClient) *Client
NewClient create a new event streams client
func (*Client) AppendToStream ¶
func (client *Client) AppendToStream( ctx context.Context, streamID string, expectedStreamRevision stream_revision.IsWriteStreamRevision, events []ProposedEvent, ) (AppendResponse, errors.Error)
AppendToStream appends a slice of events to a stream.
Events are sent to a stream one by one.
If appending of one event fails EventStoreDb will roll back the whole transaction.
If any error occurs error will be returned with appropriate code set.
Example (WithAnyStreamRevisionWhenStreamExist) ¶
Example of appending an event to an existing stream with WriteStreamRevisionAny.
package main import ( "context" "fmt" "log" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" firstEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } // Create a stream by appending one event to it writeResult, err := client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, []event_streams.ProposedEvent{firstEvent}) if err != nil { log.Fatalln(err) } if writeResult.GetCurrentRevision() != 0 { log.Fatalln(writeResult.GetCurrentRevision()) } // Append an event to an existing stream secondEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } writeResult, err = client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionAny{}, []event_streams.ProposedEvent{secondEvent}) if err != nil { log.Fatalln(err) } if writeResult.GetCurrentRevision() != 1 { log.Fatalln(writeResult.GetCurrentRevision()) } }
Output:
Example (WithAnyWhenStreamDoesNotExist) ¶
Example of appending an event to a stream which does not exist with WriteStreamRevisionAny.
package main import ( "context" "fmt" "log" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" proposedEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } writeResult, err := client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionAny{}, []event_streams.ProposedEvent{proposedEvent}) if err != nil { log.Fatalln(err) } if writeResult.GetCurrentRevision() != 0 { log.Fatalln(writeResult.GetCurrentRevision()) } }
Output:
Example (WithExactStreamRevisionWhenStreamExist) ¶
Example of appending an event to an existing stream with exact expected stream revision.
package main import ( "context" "fmt" "log" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" firstEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } // Create a stream by appending one event to it writeResult, err := client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, []event_streams.ProposedEvent{firstEvent}) if err != nil { log.Fatalln(err) } if writeResult.GetCurrentRevision() != 0 { log.Fatalln(writeResult.GetCurrentRevision()) } // Append an event to an existing stream secondEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } writeResult, err = client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevision{Revision: 0}, // 0 because stream has one event, like an index in a slice []event_streams.ProposedEvent{secondEvent}) if err != nil { log.Fatalln(err) } if writeResult.GetCurrentRevision() != 1 { log.Fatalln(writeResult.GetCurrentRevision()) } }
Output:
Example (WithNoStreamWhenStreamDoesNotExist) ¶
Example of appending an event to a stream which does not exist with WriteStreamRevisionNoStream.
package main import ( "context" "fmt" "log" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" proposedEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } writeResult, err := client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, []event_streams.ProposedEvent{proposedEvent}) if err != nil { log.Fatalln(err) } if writeResult.GetCurrentRevision() != 0 { log.Fatalln(writeResult.GetCurrentRevision()) } }
Output:
func (*Client) BatchAppendToStream ¶ added in v0.9.2
func (client *Client) BatchAppendToStream(ctx context.Context, streamId string, expectedStreamRevision stream_revision.IsWriteStreamRevision, events ProposedEventList, chunkSize uint64, deadline time.Time, ) (BatchAppendResponse, errors.Error)
BatchAppendToStream appends events to a stream in chunks.
Correlation ID for events will be auto generated.
If batch append of one chunk fails EventStoreDb will roll back the whole transaction.
If any error occurs error will be returned with appropriate code set.
Example (StreamDoesNotExist) ¶
Example demonstrates how to do a batch append to a stream which does not exist. Correlation id for events will auto be generated.
package main import ( "context" "fmt" "log" "time" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" firstEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } deadline := time.Now().Add(time.Second * 10) // batch append to a stream which does not exist writeResult, err := client.BatchAppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, []event_streams.ProposedEvent{firstEvent}, 1, deadline) if writeResult.IsCurrentRevisionNoStream() { log.Fatalln("IsCurrentRevisionNoStream should return false") } if writeResult.GetCurrentRevision() != 0 { log.Fatalln("Current revision should be 0") } }
Output:
Example (StreamExists) ¶
Example demonstrates how to do a batch append to a stream which does exist. Correlation id for events will auto be generated.
package main import ( "context" "fmt" "log" "time" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" firstEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } // create a stream by appending one event to it writeResult, err := client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, []event_streams.ProposedEvent{firstEvent}) if err != nil { log.Fatalln(err) } if writeResult.GetCurrentRevision() != 0 { log.Fatalln("Current revision must be 0") } secondEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } deadline := time.Now().Add(time.Second * 10) // batch append to a stream which exists with expected revision batchWriteResult, err := client.BatchAppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevision{Revision: 0}, // there is already one event in the stream []event_streams.ProposedEvent{secondEvent}, 1, deadline) if err != nil { log.Fatalln(err) } if batchWriteResult.IsCurrentRevisionNoStream() { log.Fatalln("IsCurrentRevisionNoStream should return false") } if batchWriteResult.GetCurrentRevision() != 0 { log.Fatalln("Current revision should be 0") } }
Output:
func (*Client) BatchAppendToStreamWithCorrelationId ¶ added in v0.11.0
func (client *Client) BatchAppendToStreamWithCorrelationId(ctx context.Context, streamId string, expectedStreamRevision stream_revision.IsWriteStreamRevision, correlationId uuid.UUID, events ProposedEventList, chunkSize uint64, deadline time.Time, ) (BatchAppendResponse, errors.Error)
BatchAppendToStreamWithCorrelationId appends events to a stream in chunks.
CorrelationId for events must be provided.
If batch append of one chunk fails EventStoreDb will roll back the whole transaction.
If any error occurs error will be returned with appropriate code set.
Example ¶
Example demonstrates how to do a batch append to a stream with correlation id.
package main import ( "context" "fmt" "log" "time" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" firstEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } deadline := time.Now().Add(time.Second * 10) correlationId := uuid.New() // batch append to a stream with correlation id writeResult, err := client.BatchAppendToStreamWithCorrelationId(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, correlationId, []event_streams.ProposedEvent{firstEvent}, 1, deadline) if writeResult.IsCurrentRevisionNoStream() { log.Fatalln("IsCurrentRevisionNoStream should return false") } if writeResult.GetCurrentRevision() != 0 { log.Fatalln("Current revision should be 0") } }
Output:
func (*Client) DeleteStream ¶
func (client *Client) DeleteStream( ctx context.Context, streamID string, revision stream_revision.IsWriteStreamRevision, ) (DeleteResponse, errors.Error)
DeleteStream performs a soft delete on a stream.
Appending events to soft-deleted stream with WriteStreamRevisionStreamExists will fail with error errors.StreamDeleted.
Soft-deleted stream is a stream to which events can be appended using for example WriteStreamRevisionNoStream and WriteStreamRevisionAny.
The only events which can be read from a soft-deleted stream are only the ones which were written after a soft-delete. Any events written previous to soft-delete are out of reach.
Example (StreamDoesNotExist) ¶
Example of soft-deleting a stream which does not exist.
package main import ( "context" "fmt" "log" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" // delete a non-existing stream deleteResult, err := client.DeleteStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}) deletePosition, isPosition := deleteResult.GetPosition() // result of a soft-delete must be a position if !isPosition { log.Fatalln("Must be a position") } // position returned by soft-delete must not be zero if deletePosition.CommitPosition == 0 || deletePosition.PreparePosition == 0 { log.Fatalln("Commit and Prepare position must not be zero") } }
Output:
Example (StreamExists) ¶
Example of sot-deleting a stream which exists.
package main import ( "context" "fmt" "log" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" proposedEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } // create a stream with one event writeResult, err := client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, []event_streams.ProposedEvent{proposedEvent}) if err != nil { log.Fatalln(err) } // soft-delete a stream deleteResult, err := client.DeleteStream(context.Background(), streamId, stream_revision.WriteStreamRevision{Revision: writeResult.GetCurrentRevision()}) deletePosition, isPosition := deleteResult.GetPosition() if !isPosition { log.Fatalln("Must be a position") } writePosition, _ := writeResult.GetPosition() // position returned by soft-delete must be greater than the one of the last event if !deletePosition.GreaterThan(writePosition) { log.Fatalln("Delete position must be greater than last event's write position") } }
Output:
func (*Client) GetStreamMetadata ¶
func (client *Client) GetStreamMetadata( ctx context.Context, streamId string) (StreamMetadataResult, errors.Error)
GetStreamMetadata reads stream's latest metadata.
Example (IsEmptyIfStreamHasNoMetadata) ¶
Example of reading metadata for a stream which has no metadata set.
package main import ( "context" "fmt" "log" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" proposedEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } // create a stream with one event _, err = client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, []event_streams.ProposedEvent{proposedEvent}) if err != nil { log.Fatalln(err) } // read stream's metadata metaDataResponse, err := client.GetStreamMetadata(context.Background(), streamId) if err != nil { log.Fatalln(err) } // Stream's metadata stream must not contains any metadata if !metaDataResponse.IsEmpty() { log.Fatalln("Stream's must have no metadata") } }
Output:
Example (StreamHasMetadata) ¶
Example of reading metadata for a stream which has metadata.
package main import ( "context" "fmt" "log" "reflect" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/ptr" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" expectedStreamMetadata := event_streams.StreamMetadata{ MaxCount: ptr.Int(17), TruncateBefore: ptr.UInt64(10), CacheControlInSeconds: ptr.UInt64(17), MaxAgeInSeconds: ptr.UInt64(15), } // write metadata for a stream _, err = client.SetStreamMetadata(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, expectedStreamMetadata, ) if err != nil { log.Fatalln(err) } // read metadata for a stream metaData, err := client.GetStreamMetadata(context.Background(), streamId) if err != nil { log.Fatalln(err) } if metaData.IsEmpty() { log.Fatalln("Stream must have metadata") } if metaData.GetMetaStreamRevision() != 0 { log.Fatalln("Metadata must be at index 0 in stream's metadata stream") } if !reflect.DeepEqual(expectedStreamMetadata, metaData.GetStreamMetadata()) { log.Fatalln("Metadata received must be the same as the metadata written") } }
Output:
func (*Client) GetStreamReader ¶
func (client *Client) GetStreamReader( ctx context.Context, streamID string, direction ReadDirection, revision stream_revision.IsReadStreamRevision, count uint64, resolveLinks bool, ) (StreamReader, errors.Error)
GetStreamReader returns a stream reader for a stream which will read events from a given revision towards a given direction.
For example, you can read events from the end towards the start of a stream by setting revision to ReadStreamRevisionEnd and direction to ReadDirectionBackward.
Use count to specify how many events you want to be able to read through a reader. Maximum number of events to read is ReadCountMax.
func (*Client) GetStreamReaderForStreamAll ¶ added in v0.11.0
func (client *Client) GetStreamReaderForStreamAll( ctx context.Context, direction ReadDirection, position stream_revision.IsReadPositionAll, count uint64, resolveLinks bool, ) (StreamReader, errors.Error)
GetStreamReaderForStreamAll returns a reader for a stream $all which will read events from a given position towards a given direction.
For example, you can read events from the end towards the start of a stream $all by setting revision to ReadPositionAllEnd and direction to ReadDirectionBackward.
Use count to specify how many events you want to be able to read through a reader. Maximum number of events to read is ReadCountMax.
func (*Client) ReadEventsFromStreamAll ¶ added in v0.11.0
func (client *Client) ReadEventsFromStreamAll( ctx context.Context, direction ReadDirection, position stream_revision.IsReadPositionAll, count uint64, resolveLinks bool, ) (ResolvedEventList, errors.Error)
ReadEventsFromStreamAll reads events from stream $all.
Read is performed by starting from a position and reading all events towards a given direction.
For example, you can read events from the end towards the start of a stream $all by setting revision to ReadPositionAllEnd and direction to ReadDirectionBackward.
Use count to specify how many events you want to read. Maximum number of events read is ReadCountMax.
Example (ReadEventsBackwardsFromEnd) ¶
Example of reading events backwards from the end of a stream $all. We will append some user defined events to demonstrate that user events will be read from the end of stream $all in reversed order.
That does not guarantee that system events are not going to be appended after user events if some system operation is triggered.
package main import ( "context" "fmt" "log" "reflect" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" // create 10 events to write (EventId must be unique) eventsToWrite := make(event_streams.ProposedEventList, 10) for i := uint32(0); i < 10; i++ { eventsToWrite[i] = event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } } // create a stream with 10 events _, err = client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, eventsToWrite) if err != nil { log.Fatalln(err) } count := uint64(len(eventsToWrite)) readEvents, err := client.ReadEventsFromStreamAll(context.Background(), event_streams.ReadDirectionForward, stream_revision.ReadPositionAllStart{}, count, false) if err != nil { log.Fatalln(err) } // Number of events read must equal to count if uint64(len(readEvents)) != count { log.Fatalln(`Number of events read from stream $all must be greater than number of user defined events`) } // since events are read backwards from the end they are received in reversed order if !reflect.DeepEqual(eventsToWrite, readEvents.Reverse().ToProposedEvents()) { log.Fatalln("Events read from the end must match user defined events") } }
Output:
Example (ReadEventsFromStart) ¶
Example of reading events from start of stream $all. At the beginning of stream $all we can find system events.
package main import ( "context" "fmt" "log" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) // read events from stream $all // at the beginning of stream $all are some system events // as admin user we can access them too readEvents, err := client.ReadEventsFromStreamAll(context.Background(), event_streams.ReadDirectionForward, stream_revision.ReadPositionAllStart{}, 5, false) if err != nil { log.Fatalln(err) } if len(readEvents) < 5 { log.Fatalln("Not enough system events read from stream $all") } }
Output:
func (*Client) ReadStreamEvents ¶
func (client *Client) ReadStreamEvents( ctx context.Context, streamID string, direction ReadDirection, revision stream_revision.IsReadStreamRevision, count uint64, resolveLinks bool, ) (ResolvedEventList, errors.Error)
ReadStreamEvents reads events from a given stream.
Read is performed by starting from a revision and reading all events towards a given direction.
For example, you can read events from the end towards the start of a stream by setting revision to ReadStreamRevisionEnd and direction to ReadDirectionBackward.
Use count to specify how many events you want to read. Maximum number of events read is ReadCountMax.
Example (ReadEventsBackwardsFromEnd) ¶
Example of reading events backwards from the end of a stream.
package main import ( "context" "fmt" "log" "reflect" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" // create 10 events to write (EventId must be unique) eventsToWrite := make(event_streams.ProposedEventList, 10) for i := uint32(0); i < 10; i++ { eventsToWrite[i] = event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } } // create a stream with 10 events _, err = client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, eventsToWrite) if err != nil { log.Fatalln(err) } // read events from existing stream readEvents, err := client.ReadStreamEvents(context.Background(), streamId, event_streams.ReadDirectionBackward, stream_revision.ReadStreamRevisionEnd{}, 10, // set to be bigger than current number of events in a stream false) if err != nil { log.Fatalln(err) } // Event read must be in reversed order // since readEvents are of type ResolvedEvent we must convert them to slice of ProposedEvents if !reflect.DeepEqual(eventsToWrite, readEvents.Reverse().ToProposedEvents()) { log.Fatalln("Events read from a stream must match") } }
Output:
Example (ReadEventsFromStart) ¶
Example of reading events from the start of a stream.
package main import ( "context" "fmt" "log" "reflect" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" proposedEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } eventsToWrite := []event_streams.ProposedEvent{proposedEvent} // create a stream with one event _, err = client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, eventsToWrite) if err != nil { log.Fatalln(err) } // read events from existing stream readEvents, err := client.ReadStreamEvents(context.Background(), streamId, event_streams.ReadDirectionForward, stream_revision.ReadStreamRevisionStart{}, 10, // set to be bigger than current number of events in a stream false) if err != nil { log.Fatalln(err) } // since readEvents are of type ResolvedEvent we must convert them to slice of ProposedEvents if !reflect.DeepEqual(eventsToWrite, readEvents.ToProposedEvents()) { log.Fatalln("Events read from a stream must match") } }
Output:
Example (StreamDoesNotExist) ¶
Example of trying to read events from a stream which does not exist.
package main import ( "context" "fmt" "log" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/errors" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, stdErr := connection.ParseConnectionString(clientURI) if stdErr != nil { log.Fatalln(stdErr) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" _, err := client.ReadStreamEvents(context.Background(), streamId, event_streams.ReadDirectionBackward, stream_revision.ReadStreamRevisionEnd{}, 1, false) if err.Code() != errors.StreamNotFoundErr { log.Fatalln("Stream must not exist") } }
Output:
Example (StreamIsSoftDeleted) ¶
Example of trying to read events from a stream which is soft-deleted.
package main import ( "context" "fmt" "log" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/errors" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, stdErr := connection.ParseConnectionString(clientURI) if stdErr != nil { log.Fatalln(stdErr) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" proposedEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } // create a stream with one event writeResult, err := client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, []event_streams.ProposedEvent{proposedEvent}) if err != nil { log.Fatalln(err) } // soft-delete a stream _, err = client.DeleteStream(context.Background(), streamId, stream_revision.WriteStreamRevision{Revision: writeResult.GetCurrentRevision()}) if err != nil { log.Fatalln(err) } // reading a soft-deleted stream fails with error code StreamNotFoundErr _, err = client.ReadStreamEvents(context.Background(), streamId, event_streams.ReadDirectionBackward, stream_revision.ReadStreamRevisionEnd{}, event_streams.ReadCountMax, false) if err.Code() != errors.StreamNotFoundErr { log.Fatalln("Stream must not exist") } }
Output:
func (*Client) SetStreamMetadata ¶
func (client *Client) SetStreamMetadata( ctx context.Context, streamID string, expectedStreamRevision stream_revision.IsWriteStreamRevision, metadata StreamMetadata) (AppendResponse, errors.Error)
SetStreamMetadata writes stream's metadata. Streams metadata are a series of events, each event represented by StreamMetadata.
Stream's metadata are kept in a separate stream which begins with a prefix $$.
For example: for stream my_card, it's metadata stream will be $my_card.
Example (OnNonExistingStream) ¶
Example of setting metadata for a stream which does not exist.
package main import ( "context" "fmt" "log" "reflect" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/ptr" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" expectedStreamMetadata := event_streams.StreamMetadata{ MaxCount: ptr.Int(17), TruncateBefore: ptr.UInt64(10), CacheControlInSeconds: ptr.UInt64(17), MaxAgeInSeconds: ptr.UInt64(15), } // write metadata for a stream _, err = client.SetStreamMetadata(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, expectedStreamMetadata, ) if err != nil { log.Fatalln(err) } // read metadata for a stream metaData, err := client.GetStreamMetadata(context.Background(), streamId) if err != nil { log.Fatalln(err) } if metaData.IsEmpty() { log.Fatalln("Stream must have metadata") } if metaData.GetMetaStreamRevision() != 0 { log.Fatalln("Metadata must be at index 0 in stream's metadata stream") } if !reflect.DeepEqual(expectedStreamMetadata, metaData.GetStreamMetadata()) { log.Fatalln("Metadata received must be the same as the metadata written") } }
Output:
Example (WhenStreamExists) ¶
Example of setting metadata for an existing stream.
package main import ( "context" "fmt" "log" "reflect" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/ptr" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" proposedEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } // create a stream with one event _, err = client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, []event_streams.ProposedEvent{proposedEvent}) if err != nil { log.Fatalln(err) } expectedStreamMetadata := event_streams.StreamMetadata{ MaxCount: ptr.Int(17), TruncateBefore: ptr.UInt64(10), CacheControlInSeconds: ptr.UInt64(17), MaxAgeInSeconds: ptr.UInt64(15), } // write metadata for a stream _, err = client.SetStreamMetadata(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, expectedStreamMetadata, ) if err != nil { log.Fatalln(err) } // read stream's metadata metaData, err := client.GetStreamMetadata(context.Background(), streamId) if err != nil { log.Fatalln(err) } if metaData.IsEmpty() { log.Fatalln("Stream must have metadata") } if metaData.GetMetaStreamRevision() != 0 { log.Fatalln("Metadata must be at index 0 in stream's metadata stream") } if !reflect.DeepEqual(expectedStreamMetadata, metaData.GetStreamMetadata()) { log.Fatalln("Metadata received must be the same as the metadata written") } }
Output:
func (*Client) SubscribeToFilteredStreamAll ¶ added in v0.11.0
func (client *Client) SubscribeToFilteredStreamAll( ctx context.Context, position stream_revision.IsReadPositionAll, resolveLinks bool, filter Filter, ) (StreamReader, errors.Error)
SubscribeToFilteredStreamAll subscribes to stream $all using a filter and receives content from it.
Filter is used to filter by event's type or by a stream ID. Both can be filtered using a set of prefixes or by a regex.
Revision indicates from which point in a stream we want to receive content. Content can be received from the beginning of a stream, the end of a stream of from other specific point.
If we opt to receive content from start of the stream we will receive all content for the stream, eventually, unless we cancel our subscription.
If we opt to receive content from the end of a stream then we will receive only content written to a stream after our subscription was created.
If we set a specific point from which we want to start to receive content of the stream, then we will start to receive content only when that point (index) is reached.
If you only want to receive new content from stream $all, set revision to ReadPositionAllEnd.
Example ¶
Example demonstrates how to subscribe to stream $all with filter.
We create three streams and write events to them. Subscription to stream $all with a filter which will filter only content from two of the three streams. Content is filtered by prefix of the stream's ID.
package main import ( "context" "fmt" "log" "reflect" "sync" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/core/systemmetadata" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, stdErr := connection.ParseConnectionString(clientURI) if stdErr != nil { log.Fatalln(stdErr) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) prefix1 := "my_first_prefix" prefix2 := "my_second_prefix" otherStream := "read_all_existing_and_new_ones_otherStream" prefixStream := prefix1 + "_stream" newPrefixStream := prefix2 + "_stream" createEvents := func(count uint32) event_streams.ProposedEventList { result := make(event_streams.ProposedEventList, 10) for i := uint32(0); i < count; i++ { result[i] = event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } } return result } otherStreamEvents := createEvents(10) prefixStreamEvents := createEvents(10) newPrefixStreamEvents := createEvents(10) // create other stream with 10 events _, err := client.AppendToStream(context.Background(), otherStream, stream_revision.WriteStreamRevisionNoStream{}, otherStreamEvents) if err != nil { log.Fatalln(err) } // create first stream which content we will read _, err = client.AppendToStream(context.Background(), prefixStream, stream_revision.WriteStreamRevisionNoStream{}, prefixStreamEvents) if err != nil { log.Fatalln(err) } // subscribe to stream $all and filter only events written to // streams with prefix my_first_prefix and my_second_prefix streamReader, err := client.SubscribeToFilteredStreamAll(context.Background(), stream_revision.ReadPositionAllStart{}, false, event_streams.Filter{ FilterBy: event_streams.FilterByStreamId{ Matcher: event_streams.PrefixFilterMatcher{ PrefixList: []string{prefix1, prefix2}, }, }, Window: event_streams.FilterNoWindow{}, CheckpointIntervalMultiplier: 5, }) if err != nil { log.Fatalln(err) } waitForReadingFirstEvents := sync.WaitGroup{} waitForReadingFirstEvents.Add(1) // read events written to a stream with prefix my_first_prefix go func() { defer waitForReadingFirstEvents.Done() var result event_streams.ProposedEventList readResult, err := streamReader.ReadOne() if err != nil { log.Fatalln(err) } if event, isEvent := readResult.GetEvent(); isEvent { result = append(result, event.ToProposedEvent()) } if reflect.DeepEqual(prefixStreamEvents, result) { return } }() waitForNewEventsAppend := sync.WaitGroup{} waitForNewEventsAppend.Add(1) // after events from stream with prefix my_first_prefix are read // create stream with prefix my_second_prefix go func() { defer waitForNewEventsAppend.Done() waitForReadingFirstEvents.Wait() // wait until all events from stream with prefix my_first_prefix are read // create stream with prefix my_second_prefix with 10 events in it _, err = client.AppendToStream(context.Background(), newPrefixStream, stream_revision.WriteStreamRevisionNoStream{}, newPrefixStreamEvents) if err != nil { log.Fatalln(err) } }() waitForReadingNewEvents := sync.WaitGroup{} waitForReadingNewEvents.Add(1) // read events written to a stream with prefix my_second_prefix go func() { defer waitForReadingNewEvents.Done() waitForNewEventsAppend.Wait() // wait until stream my_second_prefix created var result event_streams.ProposedEventList readResult, err := streamReader.ReadOne() if err != nil { log.Fatalln(err) } if event, isEvent := readResult.GetEvent(); isEvent { if !systemmetadata.IsSystemStream(event.Event.StreamId) { result = append(result, event.ToProposedEvent()) } } // we have finished reading if reflect.DeepEqual(newPrefixStreamEvents, result) { return } }() // wait for reader to receive new events waitForReadingNewEvents.Wait() }
Output:
func (*Client) SubscribeToStream ¶
func (client *Client) SubscribeToStream( ctx context.Context, streamID string, revision stream_revision.IsReadStreamRevision, resolveLinks bool, ) (StreamReader, errors.Error)
SubscribeToStream subscribes to a stream in a form of a live subscription, starting from a given revision.
Revision indicates from which point in a stream we want to receive content. Content can be received from the beginning of a stream, the end of a stream of from other specific revision.
If we opt to receive content from start of the stream we will receive all content for the stream, eventually, unless we cancel our subscription.
If we opt to receive content from the end of a stream then we will receive only content written to a stream after our subscription was created.
If we set a specific point from which we want to start to receive content of the stream, then we will start to receive content only when that point (index) is reached. For example: If we subscribe from revision 5 and stream currently contains only one event, then our subscription will receive content, only after 4 events have been written to a stream. That means that our subscription will receive the 6th event written to a stream and all content written to a stream after it.
If you only want to receive new content, set revision to ReadStreamRevisionEnd.
Example (CatchesDeletion) ¶
Example demonstrates that subscription will receive StreamDeleted error once stream has been deleted.
package main import ( "context" "fmt" "log" "sync" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/errors" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" wg := sync.WaitGroup{} wg.Add(1) ctx := context.Background() // create a subscription to a stream, from start of the stream streamReader, err := client.SubscribeToStream(ctx, streamId, stream_revision.ReadStreamRevisionStart{}, false) if err != nil { log.Fatalln(err) } go func() { defer wg.Done() _, err := streamReader.ReadOne() // reads content of a stream until StreamDeletedErr is received if err.Code() != errors.StreamDeletedErr { log.Fatalln("Unexpected error received") } }() _, err = client.TombstoneStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}) if err != nil { log.Fatalln(err) } // wait for reader to receive StreamDeleted wg.Wait() }
Output:
Example (ReadOldAndNewContentFromStream) ¶
Example shows that subscription from start of the stream will receive all old events written to it, as well as new events written to a stream after a subscription was created.
package main import ( "context" "fmt" "log" "reflect" "sync" "time" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/errors" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, stdErr := connection.ParseConnectionString(clientURI) if stdErr != nil { log.Fatalln(stdErr) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" readerWait := sync.WaitGroup{} readerWait.Add(1) cancelWait := sync.WaitGroup{} cancelWait.Add(1) ctx := context.Background() ctx, cancelFunc := context.WithTimeout(ctx, 20*time.Second) createEvents := func(eventCount uint32) event_streams.ProposedEventList { result := make(event_streams.ProposedEventList, eventCount) for i := uint32(0); i < eventCount; i++ { result = append(result, event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), }) } return result } beforeEvents := createEvents(3) afterEvents := createEvents(2) totalEvents := append(beforeEvents, afterEvents...) // create a stream with 3 events in it _, err := client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, beforeEvents) if err != nil { log.Fatalln(err) } // create a stream subscription from start of the stream streamReader, err := client.SubscribeToStream(ctx, streamId, stream_revision.ReadStreamRevisionStart{}, false) if err != nil { log.Fatalln(err) } go func() { defer readerWait.Done() var result event_streams.ProposedEventList for { response, err := streamReader.ReadOne() // read event one by-one if err != nil { if err.Code() == errors.CanceledErr { // we have received cancellation of a subscription break } cancelWait.Done() // must never be reached, some other error occur log.Fatalln("Unexpected error received") } event, isEvent := response.GetEvent() if !isEvent { log.Fatalln("Must have read an event") } result = append(result, event.ToProposedEvent()) if len(result) == len(totalEvents) { cancelWait.Done() // we have read all events, signal to main thread that it can cancel subscription } } if !reflect.DeepEqual(totalEvents, result) { log.Fatalln("Not all events have been read from the stream") } }() _, err = client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionAny{}, afterEvents) cancelWait.Wait() // wait until subscription receives all events cancelFunc() // cancel subscription readerWait.Wait() // wait for subscription go routine to exit }
Output:
Example (StreamDoesNotExist) ¶
Example show how to subscribe to a stream which does not exist and wait for an event from it. We can subscribe to a non-existing stream. ReadOne method of StreamReader will block until stream with content is created.
package main import ( "context" "fmt" "log" "reflect" "sync" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" wg := sync.WaitGroup{} wg.Add(1) // subscribe to a stream // we can subscribe to a stream which does not exist // as soon as stream is created we will start to receive content from it. streamReader, err := client.SubscribeToStream(context.Background(), streamId, stream_revision.ReadStreamRevisionStart{}, false) if err != nil { log.Fatalln(err) } proposedEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } // we will wait for events in a separate go routine go func() { defer wg.Done() response, err := streamReader.ReadOne() // read blocks until event is written to a stream if err != nil { log.Fatalln(err) } event, isEvent := response.GetEvent() if !isEvent { log.Fatalln("Must have received an event") } if !reflect.DeepEqual(proposedEvent, event.ToProposedEvent()) { log.Fatalln("Must receive an event we have written to a stream") } }() // create a stream with one event written to it _, err = client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, event_streams.ProposedEventList{proposedEvent}) wg.Wait() }
Output:
Example (StreamExists) ¶
Example shows how to subscribe to an existing stream from start.
package main import ( "context" "fmt" "log" "reflect" "sync" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamId := "some_stream" proposedEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } _, err = client.AppendToStream(context.Background(), streamId, stream_revision.WriteStreamRevisionNoStream{}, event_streams.ProposedEventList{proposedEvent}) if err != nil { log.Fatalln(err) } // subscribe to a stream fro start streamReader, err := client.SubscribeToStream(context.Background(), streamId, stream_revision.ReadStreamRevisionStart{}, false) if err != nil { log.Fatalln(err) } wg := sync.WaitGroup{} wg.Add(1) // we will wait for an event in a separate go routine go func() { defer wg.Done() response, err := streamReader.ReadOne() // read an event written to a stream if err != nil { log.Fatalln(err) } event, isEvent := response.GetEvent() if !isEvent { log.Fatalln("Must have received an event") } if !reflect.DeepEqual(proposedEvent, event.ToProposedEvent()) { log.Fatalln("Must receive an event we have written to a stream") } }() wg.Wait() }
Output:
func (*Client) SubscribeToStreamAll ¶ added in v0.11.0
func (client *Client) SubscribeToStreamAll( ctx context.Context, position stream_revision.IsReadPositionAll, resolveLinks bool, ) (StreamReader, errors.Error)
SubscribeToStreamAll subscribes to stream $all and receives content from it. Content is not filtered.
Revision indicates from which point in a stream we want to receive content. Content can be received from the beginning of a stream, the end of a stream of from other specific position.
If we opt to receive content from start of the stream we will receive all content for the stream, eventually, unless we cancel our subscription.
If we opt to receive content from the end of a stream then we will receive only content written to a stream after our subscription was created.
If we set a specific point from which we want to start to receive content of the stream, then we will start to receive content only when that point (index) is reached.
If you only want to receive new content, set revision to ReadPositionAllEnd.
Example ¶
Example demonstrates how to subscribe to stream $all without a filter.
We create two streams and write events to them. Subscription to stream $all must catch all events written to those two streams.
package main import ( "context" "fmt" "log" "reflect" "sync" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/core/systemmetadata" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, stdErr := connection.ParseConnectionString(clientURI) if stdErr != nil { log.Fatalln(stdErr) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) createEvents := func(count uint32) event_streams.ProposedEventList { result := make(event_streams.ProposedEventList, 10) for i := uint32(0); i < count; i++ { result[i] = event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } } return result } wg := sync.WaitGroup{} wg.Add(1) firstStream := "firstStream" secondStream := "secondStream" beforeEvents := createEvents(10) afterEvents := createEvents(10) allUserEvents := append(beforeEvents, afterEvents...) // create a stream with some events in it _, err := client.AppendToStream(context.Background(), firstStream, stream_revision.WriteStreamRevisionNoStream{}, beforeEvents) if err != nil { log.Fatalln(err) } // create a subscription to a stream streamReader, err := client.SubscribeToStreamAll(context.Background(), stream_revision.ReadPositionAllStart{}, false) if err != nil { log.Fatalln(err) } go func() { defer wg.Done() var resultsRead event_streams.ProposedEventList readResult, err := streamReader.ReadOne() if err != nil { log.Fatalln(err) } if event, isEvent := readResult.GetEvent(); isEvent { if !systemmetadata.IsSystemStream(event.Event.StreamId) { resultsRead = append(resultsRead, event.ToProposedEvent()) } } // if we have read all user defined event stop listening for events and return from go routine if reflect.DeepEqual(allUserEvents, resultsRead) { streamReader.Close() return } }() // append some events to a stream after a listening go routine has started _, err = client.AppendToStream(context.Background(), secondStream, stream_revision.WriteStreamRevisionNoStream{}, afterEvents) if err != nil { log.Fatalln(err) } // wait for subscription to receive all events wg.Wait() }
Output:
func (*Client) TombstoneStream ¶
func (client *Client) TombstoneStream( ctx context.Context, streamID string, revision stream_revision.IsWriteStreamRevision, ) (TombstoneResponse, errors.Error)
TombstoneStream performs a hard-delete on a stream.
After performing a hard-delete events cannot be written or read from a stream.
Example (StreamDoesNotExist) ¶
Example of trying to put a tombstone on a stream which does not exist.
package main import ( "context" "fmt" "log" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamName := "some_stream" // tombstone a non-existing stream tombstoneResult, err := client.TombstoneStream(context.Background(), streamName, stream_revision.WriteStreamRevisionNoStream{}) tombstonePosition, isPosition := tombstoneResult.GetPosition() // result of a hard-delete must be a position if !isPosition { log.Fatalln("Must be a position") } // position returned by hard-delete must not be zero if tombstonePosition.CommitPosition == 0 || tombstonePosition.PreparePosition == 0 { log.Fatalln("Commit and Prepare position must not be zero") } }
Output:
Example (StreamExists) ¶
Example of putting a tombstone on an existing stream.
package main import ( "context" "fmt" "log" "github.com/google/uuid" "github.com/pivonroll/EventStore-Client-Go/core/connection" "github.com/pivonroll/EventStore-Client-Go/core/stream_revision" "github.com/pivonroll/EventStore-Client-Go/event_streams" ) func main() { username := "admin" password := "changeit" eventStoreEndpoint := "localhost:2113" // assuming that EventStoreDB is running on port 2113 clientURI := fmt.Sprintf("esdb://%s:%s@%s", username, password, eventStoreEndpoint) config, err := connection.ParseConnectionString(clientURI) if err != nil { log.Fatalln(err) } grpcClient := connection.NewGrpcClient(*config) client := event_streams.NewClient(grpcClient) streamName := "some_stream" proposedEvent := event_streams.ProposedEvent{ EventId: uuid.Must(uuid.NewRandom()), EventType: "TestEvent", ContentType: "application/octet-stream", UserMetadata: []byte{}, Data: []byte("some event data"), } // create a stream with one event writeResult, err := client.AppendToStream(context.Background(), streamName, stream_revision.WriteStreamRevisionNoStream{}, []event_streams.ProposedEvent{proposedEvent}) if err != nil { log.Fatalln(err) } // put tombstone on a stream tombstoneResult, err := client.TombstoneStream(context.Background(), streamName, stream_revision.WriteStreamRevision{Revision: writeResult.GetCurrentRevision()}) tombstonePosition, isPosition := tombstoneResult.GetPosition() if !isPosition { log.Fatalln("Must be a position") } writePosition, _ := writeResult.GetPosition() // position returned after we put a tombstone on a stream must be greater // than the one of the last event if !tombstonePosition.GreaterThan(writePosition) { log.Fatalln("Delete position must be greater than last event's write position") } }
Output:
type ContentType ¶
type ContentType string
ContentType represents the content type of the events stored in EventStoreDB. EventStoreDB can store events in a json format or in octet-stream format.
const ( ContentTypeJson ContentType = "application/json" ContentTypeOctetStream ContentType = "application/octet-stream" )
type CustomMetadataType ¶
type CustomMetadataType map[string]interface{}
CustomMetadataType is shorthand type for user defined metadata of a stream.
type DeleteResponse ¶
type DeleteResponse struct {
// contains filtered or unexported fields
}
DeleteResponse is response received when stream is soft-deleted by using Client.DeleteStream.
func (DeleteResponse) GetPosition ¶
func (response DeleteResponse) GetPosition() (Position, bool)
GetPosition returns a position at which stream was soft-deleted. If position was received it will also return a true as a second return value. If position does not exist a zero initialized Position and a false will be returned. Position may not exist if an empty stream was soft-deleted.
type ErrorDetails ¶ added in v0.9.2
ErrorDetails represents various details about an error EventStore might sent to us.
type Filter ¶ added in v0.11.0
type Filter struct { // FilterByEventType // FilterByStreamId FilterBy isFilterBy // FilterWindowMax // FilterNoWindow Window isFilterWindow CheckpointIntervalMultiplier uint32 }
Filter is used to specify a filter when reading events from stream $all. Events can be filtered by event type or by a stream ID. Reading window can be specified to a maximum number of events to read by setting Window to FilterWindowMax. If you do not want to specify a maximum number of events to read set Window to FilterNoWindow. CheckpointIntervalMultiplier must be greater than zero. Default is 1 as specified in DefaultCheckpointIntervalMultiplier.
type FilterByEventType ¶ added in v0.11.0
type FilterByEventType struct { // PrefixFilterMatcher // RegexFilterMatcher Matcher filterMatcher }
FilterByEventType sets a filter matcher for stream $all to match by event type. Matching can be done by a set of prefixes or a regex.
type FilterByStreamId ¶ added in v0.11.0
type FilterByStreamId struct { // PrefixFilterMatcher // RegexFilterMatcher Matcher filterMatcher }
FilterByStreamId sets a filter matcher for stream $all to match by stream ID. Matching can be done by a set of prefixes or a regex.
type FilterNoWindow ¶ added in v0.11.0
type FilterNoWindow struct{}
FilterNoWindow is used when we do not want to set the maximum number of events to receive as a result.
type FilterWindowMax ¶ added in v0.11.0
type FilterWindowMax struct {
Max uint32
}
FilterWindowMax is a maximum number of events that we want to receive as a result.
func DefaultFilterWindowMax ¶ added in v0.11.0
func DefaultFilterWindowMax() FilterWindowMax
DefaultFilterWindowMax returns a default value for a maximum number of events to read with filter from streams $all.
type Position ¶
Position represents event's position in stream $all.
func (Position) GreaterThan ¶
GreaterThan returns true if receiver's position is greater than provided position. Position is greater if commit position is greater than argument's commit position or if commit position is the same then it returns true if prepare position is greater than arguments prepare position.
func (Position) LessThan ¶
LessThan returns true if receiver's position is less than provided position. Position is less than if commit position is less than argument's commit position or if commit position is the same then it returns true if prepare position is less than argument's prepare position.
type PrefixFilterMatcher ¶ added in v0.11.0
type PrefixFilterMatcher struct {
PrefixList []string
}
PrefixFilterMatcher represents a list of prefixes used to match against event type or a stream ID.
type ProposedEvent ¶
type ProposedEvent struct { EventId uuid.UUID EventType string ContentType ContentType Data []byte UserMetadata []byte }
ProposedEvent represents an event we want to append to a stream. EventId is a unique id of an event. EventType field is a user defined event type. ContentType will tell EventStoreDB how to store this event. Event can be stored as json or as octet-stream. Data are user defined data to be stored in an event. UserMetadata holds user defined metadata for an event.
type ProposedEventList ¶ added in v0.9.2
type ProposedEventList []ProposedEvent
ProposedEventList represents a slice of events.
type ReadDirection ¶ added in v0.11.0
type ReadDirection string
ReadDirection specifies a direction in which a stream will be read. Direction can be either forward or backward.
const ( ReadDirectionForward ReadDirection = "ReadDirectionForward" ReadDirectionBackward ReadDirection = "ReadDirectionBackward" )
type ReadResponse ¶
type ReadResponse struct {
// contains filtered or unexported fields
}
ReadResponse represents a response received when reading a stream. When reading a stream we can receive either an event or a checkpoint. Use #GetEvent and GetCheckpoint to determine the result stored in a response.
func (ReadResponse) GetCheckpoint ¶
func (response ReadResponse) GetCheckpoint() (ReadResponseCheckpoint, bool)
GetCheckpoint returns a checkpoint and a boolean which indicates if received value was actually a checkpoint. If returned boolean is false then returned checkpoint is a zero initialized ReadResponseCheckpoint.
func (ReadResponse) GetEvent ¶
func (response ReadResponse) GetEvent() (ResolvedEvent, bool)
GetEvent returns an event and a boolean which indicates if received value was actually an event. If returned boolean is false then returned event is a zero initialized ResolvedEvent.
type ReadResponseCheckpoint ¶
ReadResponseCheckpoint represents a checkpoint stored in a stream. Checkpoints are used to mark certain positions of interest in a stream.
type RecordedEvent ¶
type RecordedEvent struct { EventID uuid.UUID // ID of an event. Event's ID is provided by user when event is appended to a stream EventType string // user defined event type ContentType ContentType // content type used to store event in EventStoreDB. Supported types are ContentTypeJson and ContentTypeOctetStream StreamId string // stream identifier of a stream on which this event is stored EventNumber uint64 // index number of an event in a stream Position position.Position // event's position in stream $all CreatedDateTime time.Time // a date and time when event was stored in a stream Data []byte // user data stored in an event SystemMetadata map[string]string // EventStoreDB's metadata set for an event UserMetadata []byte // user defined metadata }
RecordedEvent represents an event recorded in the EventStoreDB.
type RegexFilterMatcher ¶ added in v0.11.0
type RegexFilterMatcher struct {
Regex string
}
RegexFilterMatcher represents a regex used to match event type or a stream identifier.
type ResolvedEvent ¶
type ResolvedEvent struct { Link *RecordedEvent Event *RecordedEvent CommitPosition *uint64 // nil if no position }
ResolvedEvent is an event received from a stream. Each event has either Event or Link set. If event has no commit position CommitPosition will be nil.
func (ResolvedEvent) GetOriginalEvent ¶
func (resolved ResolvedEvent) GetOriginalEvent() *RecordedEvent
GetOriginalEvent returns an original event. It chooses between Link and Event fields. Link field has precedence over Event field.
func (ResolvedEvent) ToProposedEvent ¶ added in v0.11.0
func (this ResolvedEvent) ToProposedEvent() ProposedEvent
ToProposedEvent returns event converted to ProposedEvent.
type ResolvedEventList ¶ added in v0.11.0
type ResolvedEventList []ResolvedEvent
ResolvedEventList is a shorthand type for slice of events.
func (ResolvedEventList) Reverse ¶ added in v0.11.0
func (list ResolvedEventList) Reverse() ResolvedEventList
Reverse returns a reversed slice of events.
func (ResolvedEventList) ToProposedEvents ¶ added in v0.11.0
func (list ResolvedEventList) ToProposedEvents() ProposedEventList
ToProposedEvents returns a slice of events, where each event is converted to ProposedEvent.
type StreamAcl ¶
type StreamAcl struct { // ReadRoles is a list of users which can read from a stream. // If ReadRoles is empty that means that any user can read from a stream. ReadRoles []string `json:"$r"` // WriteRoles is a list of users which can write events to a stream. // If WriteRoles is empty that means that any user can write events to a stream. WriteRoles []string `json:"$w"` // DeleteRoles is a list of users which can perform soft and hard delete of a stream. // If DeleteRoles is empty that means that any user can perform soft and hard delete of a stream. DeleteRoles []string `json:"$d"` // MetaReadRoles is a list of users which can read stream's metadata. // If MetaReadRoles is empty that means that any user can read stream's metadata. MetaReadRoles []string `json:"$mr"` // MetaWriteRoles is a list of users which can write stream's metadata. // If MetaWriteRoles is empty that means that any user can write stream's metadata. MetaWriteRoles []string `json:"$mw"` }
StreamAcl represents an access control list for a stream. It is set through stream's metadata with Client.SetStreamMetadata. User must have a write access to a stream's metadata stream in order to be able to set access control list. Read more about stream ACL at https://developers.eventstore.com/server/v21.6/security/acl.html#stream-acl.
type StreamMetadata ¶
type StreamMetadata struct { // MaxAgeInSeconds Sets a sliding window based on dates. // When data reaches a certain age it disappears automatically from the stream and is // considered eligible for scavenging. // This value is set as an integer representing the number of seconds. This value must be >= 1. MaxAgeInSeconds *uint64 `json:"$maxAge"` // TruncateBefore indicates a stream's revision before all events in a stream are truncated from // stream read operations. // If TruncateBefore is 4 that means that all events before revision 4 are truncated from stream // read operation. // Truncation naturally occurs when a soft-delete is performed on a stream with Client.DeleteStream. TruncateBefore *uint64 `json:"$tb"` // This controls the cache of the head of a stream. // Most URIs in a stream are infinitely cacheable but the head by default will not cache. // It may be preferable in some situations to set a small amount of caching on the head to // allow intermediaries to handle polls (say 10 seconds). // The argument is an integer representing the seconds to cache. This value must be >= 1. CacheControlInSeconds *uint64 `json:"$cacheControl"` // Access Control List for a stream. Acl *StreamAcl `json:"$acl"` // Sets a sliding window based on the number of items in the stream. // When data reaches a certain length it disappears automatically from the stream and is // considered eligible for scavenging. // This value is set as an integer representing the count of items. This value must be >= 1. MaxCount *int `json:"$maxCount"` // User defined metadata for a stream. CustomMetadata CustomMetadataType }
StreamMetadata is the metadata of a stream. You can read more about stream metadata at https://developers.eventstore.com/server/v21.6/streams/metadata-and-reserved-names.html#reserved-names
func (StreamMetadata) MarshalJSON ¶
func (b StreamMetadata) MarshalJSON() ([]byte, error)
MarshalJSON implements JSON marshaller interface
func (*StreamMetadata) UnmarshalJSON ¶
func (b *StreamMetadata) UnmarshalJSON(data []byte) error
UnmarshalJSON implements JSON marshaller interface
type StreamMetadataResult ¶
type StreamMetadataResult struct {
// contains filtered or unexported fields
}
StreamMetadataResult is stream's metadata read by Client.GetStreamMetadata.
Streams metadata hold information about a stream. Some of that information is an access control list (acl) for a stream. See StreamMetadata for more info.
Stream does not need to have metadata set. If stream has no metadata set than IsEmpty returns false.
func (StreamMetadataResult) GetMetaStreamRevision ¶
func (result StreamMetadataResult) GetMetaStreamRevision() uint64
GetMetaStreamRevision returns a current revision of stream's metadata stream if stream has metadata set. If stream has no metadata set it returns 0. Use IsEmpty to determine if stream has any metadata set.
func (StreamMetadataResult) GetStreamId ¶
func (result StreamMetadataResult) GetStreamId() string
GetStreamId returns a stream's identifier to which metadata relates to.
func (StreamMetadataResult) GetStreamMetadata ¶
func (result StreamMetadataResult) GetStreamMetadata() StreamMetadata
GetStreamMetadata returns stream's latest metadata. If stream has no metadata set it returns a zero initialized StreamMetadata.
func (StreamMetadataResult) IsEmpty ¶ added in v0.11.0
func (result StreamMetadataResult) IsEmpty() bool
IsEmpty returns true if stream's metadata stream has nothing stored in it.
type StreamNotFoundError ¶ added in v0.11.0
type StreamNotFoundError struct {
// contains filtered or unexported fields
}
StreamNotFoundError is an error returned if we tried to read a stream which does not exist.
func (StreamNotFoundError) Code ¶ added in v0.11.0
func (streamNotFound StreamNotFoundError) Code() errors.ErrorCode
Code returns a code of an error. Use this to determine the error type.
func (StreamNotFoundError) Error ¶ added in v0.11.0
func (streamNotFound StreamNotFoundError) Error() string
Error returns a string representation of an error.
func (StreamNotFoundError) GetStreamId ¶ added in v0.11.0
func (streamNotFound StreamNotFoundError) GetStreamId() string
GetStreamId returns a stream identifier of a stream which does not exist. This identifier was set in a read request set to EventStoreDB.
type StreamReader ¶
type StreamReader interface { // ReadOne reads one message from a stream. // Message can be read after a successful read/subscription is // established with EventStoreDB's event stream. // Message must contain either an event or a checkpoint. // If message contains subscription confirmation or a stream-not-found this method must panic. ReadOne() (ReadResponse, errors.Error) // Close closes a protobuf stream used to read or subscribe to stream's events. Close() }
StreamReader is an interface which represents a reader of a stream. Implementation of this interface should read a stream in a dedicated go routine to avoid issues which can occur when multiple go routines are trying to read data from protobuf stream.
type TombstoneResponse ¶
type TombstoneResponse struct {
// contains filtered or unexported fields
}
TombstoneResponse is response received when stream is hard-deleted by using Client.TombstoneStream.
func (TombstoneResponse) GetPosition ¶
func (response TombstoneResponse) GetPosition() (Position, bool)
GetPosition returns a position at which stream was hard-deleted. If position was received it will also return a true as a second return value. If position does not exist a zero initialized Position and a false will be returned. Position may not exist if an empty stream was hard-deleted.
type WrongExpectedVersion ¶ added in v0.10.0
type WrongExpectedVersion struct {
// contains filtered or unexported fields
}
WrongExpectedVersion is an error returned when writing events to a stream fails gracefully. That means that if grpc connection is broken, WrongExpectedVersion error will not be returned, but a more general error of type errors.Error will be returned. Writing of events can fail gracefully if client sends a wrong expected revision.
func (WrongExpectedVersion) Code ¶ added in v0.10.0
func (exception WrongExpectedVersion) Code() errors.ErrorCode
Code returns a code of an error received. Currently, only WrongExpectedVersionErr code will be returned.
func (WrongExpectedVersion) Error ¶ added in v0.10.0
func (exception WrongExpectedVersion) Error() string
Error returns a string representation of an error
func (WrongExpectedVersion) GetCurrentRevision ¶ added in v0.10.0
func (exception WrongExpectedVersion) GetCurrentRevision() (uint64, bool)
GetCurrentRevision returns an actual current revision of a stream and a true value if current revision was received. If no current revision was received a zero is returned together with a false value.
func (WrongExpectedVersion) GetExpectedRevision ¶ added in v0.10.0
func (exception WrongExpectedVersion) GetExpectedRevision() uint64
GetExpectedRevision returns a finite expected revision of a stream which client has sent when it wanted to open a protobuf stream to write events. It panics if client did not send a finite expected revision. Use IsExpectedRevisionFinite to check if expected revision is finite.
func (WrongExpectedVersion) IsCurrentRevisionNoStream ¶ added in v0.10.0
func (exception WrongExpectedVersion) IsCurrentRevisionNoStream() bool
IsCurrentRevisionNoStream returns true if client has sent WriteStreamRevisionNoStream when it wanted to open a protobuf stream to write events to EventStoreDB's event stream.
func (WrongExpectedVersion) IsExpectedRevisionAny ¶ added in v0.10.0
func (exception WrongExpectedVersion) IsExpectedRevisionAny() bool
IsExpectedRevisionAny returns true if client has sent WriteStreamRevisionAny when it wanted to open a protobuf stream to write events to EventStoreDB's event stream.
func (WrongExpectedVersion) IsExpectedRevisionFinite ¶ added in v0.10.0
func (exception WrongExpectedVersion) IsExpectedRevisionFinite() bool
IsExpectedRevisionFinite returns true if client has sent a finite expected revision of a stream.
func (WrongExpectedVersion) IsExpectedRevisionNoStream ¶ added in v0.10.0
func (exception WrongExpectedVersion) IsExpectedRevisionNoStream() bool
IsExpectedRevisionNoStream returns true if client has sent WriteStreamRevisionNoStream when it wanted to open a protobuf stream to write events to EventStoreDB's event stream.
func (WrongExpectedVersion) IsExpectedRevisionStreamExists ¶ added in v0.10.0
func (exception WrongExpectedVersion) IsExpectedRevisionStreamExists() bool
IsExpectedRevisionStreamExists returns true if client has sent WriteStreamRevisionStreamExists when it wanted to open a protobuf stream to write events to EventStoreDB's event stream.
Source Files
¶
- append_request_types.go
- append_response_errors.go
- append_response_types.go
- batch_append_request.go
- batch_append_response.go
- client_impl.go
- delete_request_types.go
- delete_response_types.go
- direction.go
- filter.go
- metadata.go
- position.go
- proposed_event.go
- read_request_types.go
- read_response_types.go
- resolved_event.go
- stream_acl.go
- stream_reader.go
- stream_reader_impl.go
- subscribe_to_stream_request_types.go
- tombstone_request_types.go
- tombstone_response_types.go