client

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 1, 2019 License: MIT Imports: 20 Imported by: 10

Documentation

Index

Constants

View Source
const (
	DefaultMaxQueueSize        int = 5000
	DefaultMaxConcurrentItems  int = 5000
	DefaultMaxOperationRetries int = 10
	DefaultMaxReconnections    int = 10

	DefaultRequireMaster bool = true

	DefaultReconnectionDelay           time.Duration = 100 * time.Millisecond
	DefaultOperationTimeout            time.Duration = 7 * time.Second
	DefaultOperationTimeoutCheckPeriod time.Duration = time.Second

	TimerPeriod                           time.Duration = 200 * time.Millisecond
	MaxReadSize                           int           = 4096
	DefaultMaxClusterDiscoverAttempts     int           = 10
	DefaultClusterManagerExternalHttpPort int           = 30778

	CatchUpDefaultReadBatchSize    int = 500
	CatchUpDefaultMaxPushQueueSize int = 10000
)
View Source
const (
	ExpectedVersion_Any         = -2
	ExpectedVersion_NoStream    = -1
	ExpectedVersion_EmptyStream = -1
)
View Source
const (
	PackageCommandOffset     = 0
	PackageFlagsOffset       = PackageCommandOffset + 1
	PackageCorrelationOffset = PackageFlagsOffset + 1
	PackageAuthOffset        = PackageCorrelationOffset + 16
	PackageMandatorySize     = PackageAuthOffset
)
View Source
const (
	PersistentSubscriptionCreateStatus_Success  = 0
	PersistentSubscriptionCreateStatus_NotFound = 1
	PersistentSubscriptionCreateStatus_Failure  = 2
)
View Source
const (
	PersistentSubscriptionDeleteStatus_Success = 0
	PersistentSubscriptionDeleteStatus_Failure = 1
)
View Source
const (
	PersistentSubscriptionUpdateStatus_Success      = 0
	PersistentSubscriptionUpdateStatus_NotFound     = 1
	PersistentSubscriptionUpdateStatus_Failure      = 2
	PersistentSubscriptionUpdateStatus_AccessDenied = 3
)

Variables

View Source
var (
	WrongExpectedVersion = errors.New("Wrong expected version")
	StreamDeleted        = errors.New("Stream deleted")
	InvalidTransaction   = errors.New("Invalid transaction")
	AccessDenied         = errors.New("Access denied")
	AuthenticationError  = errors.New("Authentication error")
	BadRequest           = errors.New("Bad request")
)

TODO review those

View Source
var (
	CannotCommitRolledBackTransaction = errors.New("cannot commit a rolled back transaction")
	TransactionIsAlreadyCommitted     = errors.New("transaction is already committed")
)
View Source
var DefaultConnectionSettings = CreateConnectionSettings().Build()
View Source
var DefaultPersistentSubscriptionSettings = NewPersistentSubscriptionSettings(false, -1, false, 30*time.Second,
	500, 500, 10, 20, 2*time.Second, 10, 1000, 0, common.SystemConsumerStrategies_RoundRobin)
View Source
var PersistentSubscriptionNakEventAction_names = []string{
	"Unknown",
	"Park",
	"Retry",
	"Skip",
	"Stop",
}
View Source
var Position_End = &Position{-1, -1}
View Source
var Position_Start = &Position{0, 0}
View Source
var SubscriptionDropReason_name = map[int]string{
	0:   "UserInitiated",
	1:   "NotAuthenticated",
	2:   "AccessDenied",
	3:   "SubscriptionError",
	4:   "ServerError",
	5:   "ConnectionClosed",
	6:   "CatchUpError",
	7:   "ProcessingQueueOverflow",
	8:   "EventHandlerException",
	9:   "MaxSubscriberReached",
	10:  "PersistentSubscriptionDeleted",
	11:  "NotFound",
	100: "Unknown",
}

Functions

func NewEventStoreSubscription

func NewEventStoreSubscription(
	streamId string,
	lastCommitPosition int64,
	lastEventNumber *int,
	unsubscribe func() error,
) *eventStoreSubscription

func NewNotModified

func NewNotModified(stream string) error

func NewServerError

func NewServerError(msg string) error

Types

type AllEventsSlice

type AllEventsSlice struct {
	// contains filtered or unexported fields
}

func NewAllEventsSlice

func NewAllEventsSlice(
	readDirection ReadDirection,
	fromPosition *Position,
	nextPosition *Position,
	resolvedEvents []*messages.ResolvedEvent,
) *AllEventsSlice

func (*AllEventsSlice) GetEvents

func (s *AllEventsSlice) GetEvents() []*ResolvedEvent

func (*AllEventsSlice) GetFromPosition

func (s *AllEventsSlice) GetFromPosition() *Position

func (*AllEventsSlice) GetNextPosition

func (s *AllEventsSlice) GetNextPosition() *Position

func (*AllEventsSlice) GetReadDirection

func (s *AllEventsSlice) GetReadDirection() ReadDirection

func (*AllEventsSlice) IsEndOfStream

func (s *AllEventsSlice) IsEndOfStream() bool

func (*AllEventsSlice) String

func (s *AllEventsSlice) String() string

type CatchUpEventAppearedHandler

type CatchUpEventAppearedHandler func(s CatchUpSubscription, r *ResolvedEvent) error

type CatchUpSubscription

type CatchUpSubscription interface {
	Stop(timeout ...time.Duration) error
}

type CatchUpSubscriptionDroppedHandler

type CatchUpSubscriptionDroppedHandler func(s CatchUpSubscription, dr SubscriptionDropReason, err error) error

type CatchUpSubscriptionSettings

type CatchUpSubscriptionSettings struct {
	// contains filtered or unexported fields
}

func NewCatchUpSubscriptionSettings

func NewCatchUpSubscriptionSettings(
	maxLiveQueueSize int,
	readBatchSize int,
	verboseLogging bool,
	resolveLinkTos bool,
) *CatchUpSubscriptionSettings

func (*CatchUpSubscriptionSettings) MaxLiveQueueSize

func (s *CatchUpSubscriptionSettings) MaxLiveQueueSize() int

func (*CatchUpSubscriptionSettings) ReadBatchSize

func (s *CatchUpSubscriptionSettings) ReadBatchSize() int

func (*CatchUpSubscriptionSettings) ResolveLinkTos

func (s *CatchUpSubscriptionSettings) ResolveLinkTos() bool

func (*CatchUpSubscriptionSettings) VerboseLogging

func (s *CatchUpSubscriptionSettings) VerboseLogging() bool

type ClientAuthenticationFailedEventArgs

type ClientAuthenticationFailedEventArgs struct {
	// contains filtered or unexported fields
}

func NewClientAuthenticationFailedEventArgs

func NewClientAuthenticationFailedEventArgs(
	reason string,
	connection Connection,
) *ClientAuthenticationFailedEventArgs

func (*ClientAuthenticationFailedEventArgs) Connection

func (*ClientAuthenticationFailedEventArgs) Reason

func (*ClientAuthenticationFailedEventArgs) String

type ClientClosedEventArgs

type ClientClosedEventArgs struct {
	// contains filtered or unexported fields
}

func NewClientClosedEventArgs

func NewClientClosedEventArgs(
	reason string,
	connection Connection,
) *ClientClosedEventArgs

func (*ClientClosedEventArgs) Connection

func (a *ClientClosedEventArgs) Connection() Connection

func (*ClientClosedEventArgs) Reason

func (a *ClientClosedEventArgs) Reason() string

func (*ClientClosedEventArgs) String

func (a *ClientClosedEventArgs) String() string

type ClientConnectionEventArgs

type ClientConnectionEventArgs struct {
	// contains filtered or unexported fields
}

func NewClientConnectionEventArgs

func NewClientConnectionEventArgs(
	remoteEndpoint net.Addr,
	connection Connection,

) *ClientConnectionEventArgs

func (*ClientConnectionEventArgs) Connection

func (a *ClientConnectionEventArgs) Connection() Connection

func (*ClientConnectionEventArgs) RemoteEndpoint

func (a *ClientConnectionEventArgs) RemoteEndpoint() net.Addr

func (*ClientConnectionEventArgs) String

func (a *ClientConnectionEventArgs) String() string

type ClientErrorEventArgs

type ClientErrorEventArgs struct {
	// contains filtered or unexported fields
}

func NewClientErrorEventArgs

func NewClientErrorEventArgs(
	err error,
	connection Connection,
) *ClientErrorEventArgs

func (*ClientErrorEventArgs) Connection

func (a *ClientErrorEventArgs) Connection() Connection

func (*ClientErrorEventArgs) Error

func (a *ClientErrorEventArgs) Error() error

func (*ClientErrorEventArgs) String

func (a *ClientErrorEventArgs) String() string

type ClientReconnectingEventArgs

type ClientReconnectingEventArgs struct {
	// contains filtered or unexported fields
}

func NewClientReconnectingEventArgs

func NewClientReconnectingEventArgs(
	connection Connection,

) *ClientReconnectingEventArgs

func (*ClientReconnectingEventArgs) Connection

func (a *ClientReconnectingEventArgs) Connection() Connection

func (*ClientReconnectingEventArgs) String

func (a *ClientReconnectingEventArgs) String() string

type ClusterSettings

type ClusterSettings struct {
	// contains filtered or unexported fields
}

func NewClusterSettings

func NewClusterSettings(
	clusterDns string,
	maxDiscoverAttempts int,
	externalGossipPort int,
	gossipSeeds []*GossipSeed,
	gossipTimeout time.Duration,
) *ClusterSettings

func (*ClusterSettings) ClusterDns

func (cs *ClusterSettings) ClusterDns() string

func (*ClusterSettings) ExternalGossipPort

func (cs *ClusterSettings) ExternalGossipPort() int

func (*ClusterSettings) GossipSeeds

func (cs *ClusterSettings) GossipSeeds() []*GossipSeed

func (*ClusterSettings) GossipTimeout

func (cs *ClusterSettings) GossipTimeout() time.Duration

func (*ClusterSettings) MaxDiscoverAttempts

func (cs *ClusterSettings) MaxDiscoverAttempts() int

type Command

type Command byte
const (
	Command_HeartbeatRequestCommand  Command = 0x01
	Command_HeartbeatResponseCommand Command = 0x02

	Command_Ping Command = 0x03
	Command_Pong Command = 0x04

	Command_PrepareAck Command = 0x05
	Command_CommitAck  Command = 0x06

	Command_SlaveAssignment Command = 0x07
	Command_CloneAssignment Command = 0x08

	Command_SubscribeReplica         Command = 0x10
	Command_ReplicaLogPositionAck    Command = 0x11
	Command_CreateChunk              Command = 0x12
	Command_RawChunkBulk             Command = 0x13
	Command_DataChunkBulk            Command = 0x14
	Command_ReplicaSubscriptionRetry Command = 0x15
	Command_ReplicaSubscribed        Command = 0x16

	Command_WriteEvents          Command = 0x82
	Command_WriteEventsCompleted Command = 0x83

	Command_TransactionStart           Command = 0x84
	Command_TransactionStartCompleted  Command = 0x85
	Command_TransactionWrite           Command = 0x86
	Command_TransactionWriteCompleted  Command = 0x87
	Command_TransactionCommit          Command = 0x88
	Command_TransactionCommitCompleted Command = 0x89

	Command_DeleteStream          Command = 0x8A
	Command_DeleteStreamCompleted Command = 0x8B

	Command_ReadEvent                         Command = 0xB0
	Command_ReadEventCompleted                Command = 0xB1
	Command_ReadStreamEventsForward           Command = 0xB2
	Command_ReadStreamEventsForwardCompleted  Command = 0xB3
	Command_ReadStreamEventsBackward          Command = 0xB4
	Command_ReadStreamEventsBackwardCompleted Command = 0xB5
	Command_ReadAllEventsForward              Command = 0xB6
	Command_ReadAllEventsForwardCompleted     Command = 0xB7
	Command_ReadAllEventsBackward             Command = 0xB8
	Command_ReadAllEventsBackwardCompleted    Command = 0xB9

	Command_SubscribeToStream                         Command = 0xC0
	Command_SubscriptionConfirmation                  Command = 0xC1
	Command_StreamEventAppeared                       Command = 0xC2
	Command_UnsubscribeFromStream                     Command = 0xC3
	Command_SubscriptionDropped                       Command = 0xC4
	Command_ConnectToPersistentSubscription           Command = 0xC5
	Command_PersistentSubscriptionConfirmation        Command = 0xC6
	Command_PersistentSubscriptionStreamEventAppeared Command = 0xC7
	Command_CreatePersistentSubscription              Command = 0xC8
	Command_CreatePersistentSubscriptionCompleted     Command = 0xC9
	Command_DeletePersistentSubscription              Command = 0xCA
	Command_DeletePersistentSubscriptionCompleted     Command = 0xCB
	Command_PersistentSubscriptionAckEvents           Command = 0xCC
	Command_PersistentSubscriptionNakEvents           Command = 0xCD
	Command_UpdatePersistentSubscription              Command = 0xCE
	Command_UpdatePersistentSubscriptionCompleted     Command = 0xCF

	Command_ScavengeDatabase          Command = 0xD0
	Command_ScavengeDatabaseCompleted Command = 0xD1

	Command_BadRequest       Command = 0xF0
	Command_NotHandled       Command = 0xF1
	Command_Authenticate     Command = 0xF2
	Command_Authenticated    Command = 0xF3
	Command_NotAuthenticated Command = 0xF4
)

func (Command) String

func (c Command) String() string

type ConnectToPersistentSubscriptions

type ConnectToPersistentSubscriptions interface {
	NotifyEventsProcessed(processedEvents []uuid.UUID) error
	NotifyEventsFailed(processedEvents []uuid.UUID, action PersistentSubscriptionNakEventAction, reason string) error
	Unsubscribe() error
}

type Connection

type Connection interface {
	Name() string

	// Use Task.Wait()
	ConnectAsync() *tasks.Task

	Close() error

	// Task.Result() returns *client.DeleteResult
	DeleteStreamAsync(stream string, expectedVersion int, hardDelete bool, userCredentials *UserCredentials) (
		*tasks.Task, error)

	// Task.Result() returns *client.WriteResult
	AppendToStreamAsync(stream string, expectedVersion int, events []*EventData, userCredentials *UserCredentials) (
		*tasks.Task, error)

	// Task.Result() returns *client.Transaction
	StartTransactionAsync(stream string, expectedVersion int, userCredentials *UserCredentials) (
		*tasks.Task, error)

	ContinueTransaction(transactionId int64, userCredentials *UserCredentials) *Transaction

	// Task.Result() returns *client.EventReadResult
	ReadEventAsync(stream string, eventNumber int, resolveTos bool, userCredentials *UserCredentials) (
		*tasks.Task, error)

	// Task.Result() returns *client.StreamEventsSlice
	ReadStreamEventsForwardAsync(stream string, start int, max int, resolveLinkTos bool,
		userCredentials *UserCredentials) (*tasks.Task, error)

	// Task.Result() returns *client.StreamEventsSlice
	ReadStreamEventsBackwardAsync(stream string, start int, max int, resolveLinkTos bool,
		userCredentials *UserCredentials) (*tasks.Task, error)

	// Task.Result() returns *client.AllEventsSlice
	ReadAllEventsForwardAsync(pos *Position, max int, resolveTos bool, userCredentials *UserCredentials) (
		*tasks.Task, error)

	// Task.Result() returns *client.AllEventsSlice
	ReadAllEventsBackwardAsync(pos *Position, max int, resolveTos bool, userCredentials *UserCredentials) (
		*tasks.Task, error)

	// Task.Result() returns client.EventStoreSubscription
	SubscribeToStreamAsync(
		stream string,
		resolveLinkTos bool,
		eventAppeared EventAppearedHandler,
		subscriptionDropped SubscriptionDroppedHandler,
		userCredentials *UserCredentials,
	) (*tasks.Task, error)

	SubscribeToStreamFrom(
		stream string,
		lastCheckpoint *int,
		catchupSubscriptionSettings *CatchUpSubscriptionSettings,
		eventAppeared CatchUpEventAppearedHandler,
		liveProcessingStarted LiveProcessingStartedHandler,
		subscriptionDropped CatchUpSubscriptionDroppedHandler,
		userCredentials *UserCredentials,
	) (CatchUpSubscription, error)

	// Task.Result() returns client.EventStoreSubscription
	SubscribeToAllAsync(
		resolveLinkTos bool,
		eventAppeared EventAppearedHandler,
		subscriptionDropped SubscriptionDroppedHandler,
		userCredentials *UserCredentials,
	) (*tasks.Task, error)

	// Task.Result() returns client.PersistentSubscription
	ConnectToPersistentSubscriptionAsync(
		stream string,
		groupName string,
		eventAppeared PersistentEventAppearedHandler,
		subscriptionDropped PersistentSubscriptionDroppedHandler,
		userCredentials *UserCredentials,
		bufferSize int,
		autoAck bool,
	) (*tasks.Task, error)

	SubscribeToAllFrom(
		lastCheckpoint *Position,
		settings *CatchUpSubscriptionSettings,
		eventAppeared CatchUpEventAppearedHandler,
		liveProcessingStarted LiveProcessingStartedHandler,
		subscriptionDropped CatchUpSubscriptionDroppedHandler,
		userCredentials *UserCredentials,
	) (CatchUpSubscription, error)

	// Task.Result() returns *client.PersistentSubscriptionUpdateResult
	UpdatePersistentSubscriptionAsync(stream string, groupName string, settings *PersistentSubscriptionSettings,
		userCredentials *UserCredentials) (*tasks.Task, error)

	// Task.Result() returns *client.PersistentSubscriptionCreateResult
	CreatePersistentSubscriptionAsync(stream string, groupName string, settings *PersistentSubscriptionSettings,
		userCredentials *UserCredentials) (*tasks.Task, error)

	// Task.Result() returns *client.PersistentSubscriptionDeleteResult
	DeletePersistentSubscriptionAsync(stream string, groupName string,
		userCredentials *UserCredentials) (*tasks.Task, error)

	// Task.Result() returns *client.WriteResult
	SetStreamMetadataAsync(stream string, expectedMetastreamVersion int, metadata interface{},
		userCredentials *UserCredentials) (*tasks.Task, error)

	// Task.Result() returns *client.StreamMetadataResult
	GetStreamMetadataAsync(stream string, userCredentials *UserCredentials) (*tasks.Task, error)

	// Task.Result() returns *client.WriteResult
	SetSystemSettings(settings *SystemSettings, userCredentials *UserCredentials) (*tasks.Task, error)

	Connected() EventHandlers

	Disconnected() EventHandlers

	Reconnecting() EventHandlers

	Closed() EventHandlers

	ErrorOccurred() EventHandlers

	AuthenticationFailed() EventHandlers

	Settings() *ConnectionSettings
}

type ConnectionSettings

type ConnectionSettings struct {
	DefaultUserCredentials *UserCredentials
	// contains filtered or unexported fields
}

func (*ConnectionSettings) ClientConnectionTimeout

func (cs *ConnectionSettings) ClientConnectionTimeout() time.Duration

func (*ConnectionSettings) ClusterDns

func (cs *ConnectionSettings) ClusterDns() string

func (*ConnectionSettings) ExternalGossipPort

func (cs *ConnectionSettings) ExternalGossipPort() int

func (*ConnectionSettings) FailOnNoServerResponse

func (cs *ConnectionSettings) FailOnNoServerResponse() bool

func (*ConnectionSettings) GossipSeeds

func (cs *ConnectionSettings) GossipSeeds() []*GossipSeed

func (*ConnectionSettings) GossipTimeout

func (cs *ConnectionSettings) GossipTimeout() time.Duration

func (*ConnectionSettings) HeartbeatInterval

func (cs *ConnectionSettings) HeartbeatInterval() time.Duration

func (*ConnectionSettings) HeartbeatTimeout

func (cs *ConnectionSettings) HeartbeatTimeout() time.Duration

func (*ConnectionSettings) MaxConcurrentItem

func (cs *ConnectionSettings) MaxConcurrentItem() int

func (*ConnectionSettings) MaxDiscoverAttempts

func (cs *ConnectionSettings) MaxDiscoverAttempts() int

func (*ConnectionSettings) MaxQueueSize

func (cs *ConnectionSettings) MaxQueueSize() int

func (*ConnectionSettings) MaxReconnections

func (cs *ConnectionSettings) MaxReconnections() int

func (*ConnectionSettings) MaxRetries

func (cs *ConnectionSettings) MaxRetries() int

func (*ConnectionSettings) OperationTimeout

func (cs *ConnectionSettings) OperationTimeout() time.Duration

func (*ConnectionSettings) OperationTimeoutCheckPeriod

func (cs *ConnectionSettings) OperationTimeoutCheckPeriod() time.Duration

func (*ConnectionSettings) ReconnectionDelay

func (cs *ConnectionSettings) ReconnectionDelay() time.Duration

func (*ConnectionSettings) RequireMaster

func (cs *ConnectionSettings) RequireMaster() bool

func (*ConnectionSettings) TargetHost

func (cs *ConnectionSettings) TargetHost() string

func (*ConnectionSettings) UseSslConnection

func (cs *ConnectionSettings) UseSslConnection() bool

func (*ConnectionSettings) ValidateService

func (cs *ConnectionSettings) ValidateService() bool

func (*ConnectionSettings) VerboseLogging

func (cs *ConnectionSettings) VerboseLogging() bool

type ConnectionSettingsBuilder

type ConnectionSettingsBuilder struct {
	// contains filtered or unexported fields
}

func ConnectionSettingsBuilderFrom added in v0.2.0

func ConnectionSettingsBuilderFrom(o *ConnectionSettings) *ConnectionSettingsBuilder

func CreateConnectionSettings

func CreateConnectionSettings() *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) Build

func (*ConnectionSettingsBuilder) EnableVerboseLogging

func (csb *ConnectionSettingsBuilder) EnableVerboseLogging() *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) FailOnNoServerResponse

func (csb *ConnectionSettingsBuilder) FailOnNoServerResponse() *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) KeepReconnecting

func (csb *ConnectionSettingsBuilder) KeepReconnecting() *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) KeepRetrying

func (*ConnectionSettingsBuilder) LimitAttemptsForOperationTo

func (csb *ConnectionSettingsBuilder) LimitAttemptsForOperationTo(limit int) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) LimitConcurrentOperationsTo

func (csb *ConnectionSettingsBuilder) LimitConcurrentOperationsTo(limit int) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) LimitOperationsQueueTo

func (csb *ConnectionSettingsBuilder) LimitOperationsQueueTo(limit int) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) LimitReconnectionsTo

func (csb *ConnectionSettingsBuilder) LimitReconnectionsTo(limit int) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) LimitRetriesForOperationTo

func (csb *ConnectionSettingsBuilder) LimitRetriesForOperationTo(limit int) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) PerformOnAnyNode

func (csb *ConnectionSettingsBuilder) PerformOnAnyNode() *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) PerformOnMasterOnly

func (csb *ConnectionSettingsBuilder) PerformOnMasterOnly() *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetClusterDns

func (csb *ConnectionSettingsBuilder) SetClusterDns(clusterDns string) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetClusterGossipPort

func (csb *ConnectionSettingsBuilder) SetClusterGossipPort(port int) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetDefaultUserCredentials

func (csb *ConnectionSettingsBuilder) SetDefaultUserCredentials(userCredentials *UserCredentials) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetGossipSeedEndPoints

func (csb *ConnectionSettingsBuilder) SetGossipSeedEndPoints(endpoints []*net.TCPAddr) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetGossipSeeds

func (csb *ConnectionSettingsBuilder) SetGossipSeeds(gossipSeeds []*GossipSeed) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetGossipTimeout

func (csb *ConnectionSettingsBuilder) SetGossipTimeout(timeout time.Duration) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetHeartbeatInterval

func (csb *ConnectionSettingsBuilder) SetHeartbeatInterval(interval time.Duration) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetHeartbeatTimeout

func (csb *ConnectionSettingsBuilder) SetHeartbeatTimeout(timeout time.Duration) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetMaxDiscoverAttempts

func (csb *ConnectionSettingsBuilder) SetMaxDiscoverAttempts(maxAttempts int) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetOperationTimeoutTo

func (csb *ConnectionSettingsBuilder) SetOperationTimeoutTo(delay time.Duration) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetReconnectionDelayTo

func (csb *ConnectionSettingsBuilder) SetReconnectionDelayTo(delay time.Duration) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) SetTimeoutCheckPeriodTo

func (csb *ConnectionSettingsBuilder) SetTimeoutCheckPeriodTo(period time.Duration) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) UseSslConnection

func (csb *ConnectionSettingsBuilder) UseSslConnection(targetHost string, validateServer bool) *ConnectionSettingsBuilder

func (*ConnectionSettingsBuilder) WithConnectionTimeoutOf

func (csb *ConnectionSettingsBuilder) WithConnectionTimeoutOf(timeout time.Duration) *ConnectionSettingsBuilder

type DeleteResult

type DeleteResult struct {
	// contains filtered or unexported fields
}

func NewDeleteResult

func NewDeleteResult(logPosition *Position) *DeleteResult

func (*DeleteResult) LogPosition

func (r *DeleteResult) LogPosition() *Position

func (*DeleteResult) String

func (r *DeleteResult) String() string

type Event

type Event interface{}

type EventAppearedHandler

type EventAppearedHandler func(s EventStoreSubscription, r *ResolvedEvent) error

type EventData

type EventData struct {
	// contains filtered or unexported fields
}

func NewEventData

func NewEventData(
	eventId uuid.UUID,
	typ string,
	isJson bool,
	data []byte,
	metadata []byte,
) *EventData

func (*EventData) Data

func (e *EventData) Data() []byte

func (*EventData) EventId

func (e *EventData) EventId() uuid.UUID

func (*EventData) IsJson

func (e *EventData) IsJson() bool

func (*EventData) Metadata

func (e *EventData) Metadata() []byte

func (*EventData) String

func (e *EventData) String() string

func (*EventData) ToNewEvent

func (e *EventData) ToNewEvent() *messages.NewEvent

func (*EventData) Type

func (e *EventData) Type() string

type EventHandler

type EventHandler func(evt Event) error

type EventHandlers

type EventHandlers interface {
	Add(EventHandler) error
	Remove(EventHandler) error
}

type EventReadResult

type EventReadResult struct {
	// contains filtered or unexported fields
}

func NewEventReadResult

func NewEventReadResult(
	status EventReadStatus,
	stream string,
	eventNumber int,
	event *messages.ResolvedIndexedEvent,
) *EventReadResult

func (*EventReadResult) Event

func (r *EventReadResult) Event() *ResolvedEvent

func (*EventReadResult) EventNumber

func (r *EventReadResult) EventNumber() int

func (*EventReadResult) Status

func (r *EventReadResult) Status() EventReadStatus

func (*EventReadResult) Stream

func (r *EventReadResult) Stream() string

func (*EventReadResult) String

func (r *EventReadResult) String() string

type EventReadStatus

type EventReadStatus int
const (
	EventReadStatus_Success       EventReadStatus = 0
	EventReadStatus_NotFound      EventReadStatus = 1
	EventReadStatus_NoStream      EventReadStatus = 2
	EventReadStatus_StreamDeleted EventReadStatus = 3
	EventReadStatus_Error         EventReadStatus = 4
	EventReadStatus_AccessDenied  EventReadStatus = 5
)

func (EventReadStatus) String

func (s EventReadStatus) String() string

type EventStoreSubscription

type EventStoreSubscription interface {
	IsSubscribedToAll() bool
	StreamId() string
	LastCommitPosition() int64
	LastEventNumber() *int
	Close() error
	Unsubscribe() error
}

type GossipSeed

type GossipSeed struct {
	// contains filtered or unexported fields
}

func NewGossipSeed

func NewGossipSeed(ipEndpoint *net.TCPAddr, hostHeader string) *GossipSeed

func (*GossipSeed) HostHeader

func (gs *GossipSeed) HostHeader() string

func (*GossipSeed) IpEndpoint

func (gs *GossipSeed) IpEndpoint() *net.TCPAddr

func (*GossipSeed) String

func (gs *GossipSeed) String() string

type InspectionDecision

type InspectionDecision int
const (
	InspectionDecision_DoNothing InspectionDecision = iota
	InspectionDecision_EndOperation
	InspectionDecision_Retry
	InspectionDecision_Reconnect
	InspectionDecision_Subscribed
)

func (InspectionDecision) String

func (d InspectionDecision) String() string

type InspectionResult

type InspectionResult struct {
	// contains filtered or unexported fields
}

func NewInspectionResult

func NewInspectionResult(
	decision InspectionDecision,
	description string,
	tcpEndpoint net.Addr,
	secureTcpEndpoint net.Addr,
) *InspectionResult

func (*InspectionResult) Decision

func (r *InspectionResult) Decision() InspectionDecision

func (*InspectionResult) Description

func (r *InspectionResult) Description() string

func (*InspectionResult) SecureTcpEndpoint

func (r *InspectionResult) SecureTcpEndpoint() net.Addr

func (*InspectionResult) TcpEndpoint

func (r *InspectionResult) TcpEndpoint() net.Addr

type LiveProcessingStartedHandler

type LiveProcessingStartedHandler func(s CatchUpSubscription) error

type NotModified

type NotModified struct {
	// contains filtered or unexported fields
}

func (*NotModified) Error

func (e *NotModified) Error() string

type Operation

type Operation interface {
	CreateNetworkPackage(correlationId uuid.UUID) (*Package, error)
	InspectPackage(p *Package) (*InspectionResult, error)
	Fail(err error) error
}

type Package

type Package struct {
	// contains filtered or unexported fields
}

func NewTcpPackage

func NewTcpPackage(
	cmd Command,
	flags TcpFlag,
	correlationId uuid.UUID,
	data []byte,
	userCredentials *UserCredentials,
) *Package

func TcpPacketFromBytes

func TcpPacketFromBytes(data []byte) (*Package, error)

func (*Package) Bytes

func (p *Package) Bytes() []byte

func (*Package) Command

func (p *Package) Command() Command

func (*Package) CorrelationId

func (p *Package) CorrelationId() uuid.UUID

func (*Package) Data

func (p *Package) Data() []byte

func (*Package) Flags

func (p *Package) Flags() TcpFlag

func (*Package) Password

func (p *Package) Password() string

func (*Package) Size

func (p *Package) Size() int32

func (*Package) String

func (p *Package) String() string

func (*Package) Username

func (p *Package) Username() string

type PackageConnection

type PackageConnection struct {
	// contains filtered or unexported fields
}

func NewPackageConnection

func NewPackageConnection(
	ipEndpoint net.Addr,
	connectionId uuid.UUID,
	ssl bool,
	targetHost string,
	validateServer bool,
	timeout time.Duration,
	packageHandler func(conn *PackageConnection, packet *Package),
	errorHandler func(conn *PackageConnection, err error),
	connectionEstablished func(conn *PackageConnection),
	connectionClosed func(conn *PackageConnection, err error),
) *PackageConnection

func (*PackageConnection) Close

func (c *PackageConnection) Close(reason string) error

func (*PackageConnection) ConnectionId

func (c *PackageConnection) ConnectionId() uuid.UUID

func (*PackageConnection) EnqueueSend

func (c *PackageConnection) EnqueueSend(p *Package) error

func (*PackageConnection) IsClosed

func (c *PackageConnection) IsClosed() bool

func (*PackageConnection) LocalEndpoint

func (c *PackageConnection) LocalEndpoint() net.Addr

func (*PackageConnection) RemoteEndpoint

func (c *PackageConnection) RemoteEndpoint() net.Addr

func (*PackageConnection) StartReceiving

func (c *PackageConnection) StartReceiving() error

type PersistentEventAppearedHandler

type PersistentEventAppearedHandler func(s PersistentSubscription, r *ResolvedEvent) error

type PersistentEventStoreSubscription

type PersistentEventStoreSubscription struct {
	EventStoreSubscription
	// contains filtered or unexported fields
}

func NewPersistentEventStoreSubscription

func NewPersistentEventStoreSubscription(
	subscriptionOperation ConnectToPersistentSubscriptions,
	streamId string,
	lastCommitPosition int64,
	lastEventNumber *int,
) *PersistentEventStoreSubscription

func (*PersistentEventStoreSubscription) NotifyEventsFailed

func (s *PersistentEventStoreSubscription) NotifyEventsFailed(
	processedEvents []uuid.UUID,
	action PersistentSubscriptionNakEventAction,
	reason string,
) error

func (*PersistentEventStoreSubscription) NotifyEventsProcessed

func (s *PersistentEventStoreSubscription) NotifyEventsProcessed(processedEvents []uuid.UUID) error

type PersistentSubscription

type PersistentSubscription interface {
	Acknowledge(events []ResolvedEvent) error
	Fail(events []ResolvedEvent, action PersistentSubscriptionNakEventAction, reason string) error
	Stop(timeout ...time.Duration) error
}

type PersistentSubscriptionCreateResult

type PersistentSubscriptionCreateResult struct {
	// contains filtered or unexported fields
}

func (*PersistentSubscriptionCreateResult) GetStatus

type PersistentSubscriptionCreateStatus

type PersistentSubscriptionCreateStatus int

func (PersistentSubscriptionCreateStatus) String

type PersistentSubscriptionDeleteResult

type PersistentSubscriptionDeleteResult struct {
	// contains filtered or unexported fields
}

func (*PersistentSubscriptionDeleteResult) GetStatus

type PersistentSubscriptionDeleteStatus

type PersistentSubscriptionDeleteStatus int

func (PersistentSubscriptionDeleteStatus) String

type PersistentSubscriptionDroppedHandler

type PersistentSubscriptionDroppedHandler func(s PersistentSubscription, dr SubscriptionDropReason, err error) error

type PersistentSubscriptionNakEventAction

type PersistentSubscriptionNakEventAction int
const (
	PersistentSubscriptionNakEventAction_Unknown PersistentSubscriptionNakEventAction = iota
	PersistentSubscriptionNakEventAction_Park
	PersistentSubscriptionNakEventAction_Retry
	PersistentSubscriptionNakEventAction_Skip
	PersistentSubscriptionNakEventAction_Stop
)

func (PersistentSubscriptionNakEventAction) String

type PersistentSubscriptionSettings

type PersistentSubscriptionSettings struct {
	MaxRetryCount     int32
	LiveBufferSize    int32
	ReadBatchSize     int32
	HistoryBufferSize int32

	NamedConsumerStrategy common.SystemConsumerStrategies
	// contains filtered or unexported fields
}

func NewPersistentSubscriptionSettings

func NewPersistentSubscriptionSettings(
	resolveLinkTos bool,
	startFrom int32,
	extraStatistics bool,
	messageTimeout time.Duration,
	maxRetryCount int32,
	liveBufferSize int32,
	readBatchSize int32,
	historyBufferSize int32,
	checkPointAfter time.Duration,
	minCheckPointCount int32,
	maxCheckPointCount int32,
	maxSubscriberCount int32,
	namedConsumerStrategy common.SystemConsumerStrategies,
) *PersistentSubscriptionSettings

func (*PersistentSubscriptionSettings) CheckPointAfter

func (s *PersistentSubscriptionSettings) CheckPointAfter() time.Duration

func (*PersistentSubscriptionSettings) ExtraStatistics

func (s *PersistentSubscriptionSettings) ExtraStatistics() bool

func (*PersistentSubscriptionSettings) MaxCheckPointCount

func (s *PersistentSubscriptionSettings) MaxCheckPointCount() int32

func (*PersistentSubscriptionSettings) MaxSubscriberCount

func (s *PersistentSubscriptionSettings) MaxSubscriberCount() int32

func (*PersistentSubscriptionSettings) MessageTimeout

func (s *PersistentSubscriptionSettings) MessageTimeout() time.Duration

func (*PersistentSubscriptionSettings) MinCheckPointCount

func (s *PersistentSubscriptionSettings) MinCheckPointCount() int32

func (*PersistentSubscriptionSettings) ResolveLinkTos

func (s *PersistentSubscriptionSettings) ResolveLinkTos() bool

func (*PersistentSubscriptionSettings) StartFrom

func (s *PersistentSubscriptionSettings) StartFrom() int32

type PersistentSubscriptionUpdateResult

type PersistentSubscriptionUpdateResult struct {
	// contains filtered or unexported fields
}

func (*PersistentSubscriptionUpdateResult) GetStatus

type PersistentSubscriptionUpdateStatus

type PersistentSubscriptionUpdateStatus int

func (PersistentSubscriptionUpdateStatus) String

type Position

type Position struct {
	// contains filtered or unexported fields
}

func NewPosition

func NewPosition(commitPosition int64, preparePosition int64) *Position

func (*Position) CommitPosition

func (p *Position) CommitPosition() int64

func (*Position) Equals

func (p *Position) Equals(p2 *Position) bool

func (*Position) GreaterThan

func (p *Position) GreaterThan(p2 *Position) bool

func (*Position) GreaterThanOrEquals

func (p *Position) GreaterThanOrEquals(p2 *Position) bool

func (*Position) PreparePosition

func (p *Position) PreparePosition() int64

type ReadDirection

type ReadDirection int
const (
	ReadDirection_Forward ReadDirection = iota
	ReadDirection_Backward
)

func (ReadDirection) String

func (x ReadDirection) String() string

type RecordedEvent

type RecordedEvent struct {
	// contains filtered or unexported fields
}

func (*RecordedEvent) Created

func (e *RecordedEvent) Created() time.Time

func (*RecordedEvent) CreatedEpoch

func (e *RecordedEvent) CreatedEpoch() time.Time

func (*RecordedEvent) Data

func (e *RecordedEvent) Data() []byte

func (*RecordedEvent) EventId

func (e *RecordedEvent) EventId() uuid.UUID

func (*RecordedEvent) EventNumber

func (e *RecordedEvent) EventNumber() int

func (*RecordedEvent) EventStreamId

func (e *RecordedEvent) EventStreamId() string

func (*RecordedEvent) EventType

func (e *RecordedEvent) EventType() string

func (*RecordedEvent) IsJson

func (e *RecordedEvent) IsJson() bool

func (*RecordedEvent) Metadata

func (e *RecordedEvent) Metadata() []byte

func (*RecordedEvent) String

func (e *RecordedEvent) String() string

type ResolvedEvent

type ResolvedEvent struct {
	// contains filtered or unexported fields
}

func NewResolvedEvent

func NewResolvedEvent(evt *messages.ResolvedIndexedEvent) *ResolvedEvent

func NewResolvedEventFrom

func NewResolvedEventFrom(evt *messages.ResolvedEvent) *ResolvedEvent

func (*ResolvedEvent) Event

func (e *ResolvedEvent) Event() *RecordedEvent

func (*ResolvedEvent) IsResolved

func (e *ResolvedEvent) IsResolved() bool
func (e *ResolvedEvent) Link() *RecordedEvent

func (*ResolvedEvent) OriginalEvent

func (e *ResolvedEvent) OriginalEvent() *RecordedEvent

func (*ResolvedEvent) OriginalEventNumber

func (e *ResolvedEvent) OriginalEventNumber() int

func (*ResolvedEvent) OriginalPosition

func (e *ResolvedEvent) OriginalPosition() *Position

func (*ResolvedEvent) OriginalStreamId

func (e *ResolvedEvent) OriginalStreamId() string

func (*ResolvedEvent) String

func (e *ResolvedEvent) String() string

type ServerError

type ServerError struct {
	// contains filtered or unexported fields
}

func (*ServerError) Error

func (e *ServerError) Error() string

type SliceReadStatus

type SliceReadStatus int
const (
	SliceReadStatus_Success        SliceReadStatus = 0
	SliceReadStatus_StreamNotFound SliceReadStatus = 1
	SliceReadStatus_StreamDeleted  SliceReadStatus = 2
	SliceReadStatus_NotModified    SliceReadStatus = 3
	SliceReadStatus_Error          SliceReadStatus = 4
	SliceReadStatus_AccessDenied   SliceReadStatus = 5
)

func (SliceReadStatus) String

func (s SliceReadStatus) String() string

type StreamAcl

type StreamAcl struct {
	// contains filtered or unexported fields
}

func NewStreamAcl

func NewStreamAcl(
	readRoles []string,
	writeRoles []string,
	deleteRoles []string,
	metaReadRoles []string,
	metaWriteRoles []string,
) *StreamAcl

func (*StreamAcl) DeleteRoles

func (x *StreamAcl) DeleteRoles() []string

func (*StreamAcl) MarshalJSON

func (x *StreamAcl) MarshalJSON() ([]byte, error)

func (*StreamAcl) MetaReadRoles

func (x *StreamAcl) MetaReadRoles() []string

func (*StreamAcl) MetaWriteRoles

func (x *StreamAcl) MetaWriteRoles() []string

func (*StreamAcl) ReadRoles

func (x *StreamAcl) ReadRoles() []string

func (*StreamAcl) UnmarshalJSON

func (x *StreamAcl) UnmarshalJSON(data []byte) error

func (*StreamAcl) WriteRoles

func (x *StreamAcl) WriteRoles() []string

type StreamEventsSlice

type StreamEventsSlice struct {
	// contains filtered or unexported fields
}

func NewStreamEventsSlice

func NewStreamEventsSlice(
	status SliceReadStatus,
	stream string,
	fromEventNumber int,
	readDirection ReadDirection,
	resolvedEvents []*messages.ResolvedIndexedEvent,
	nextEventNumber int,
	lastEventNumber int,
	isEndOfStream bool,
) *StreamEventsSlice

func (*StreamEventsSlice) Events

func (s *StreamEventsSlice) Events() []*ResolvedEvent

func (*StreamEventsSlice) FromEventNumber

func (s *StreamEventsSlice) FromEventNumber() int

func (*StreamEventsSlice) IsEndOfStream

func (s *StreamEventsSlice) IsEndOfStream() bool

func (*StreamEventsSlice) LastEventNumber

func (s *StreamEventsSlice) LastEventNumber() int

func (*StreamEventsSlice) NextEventNumber

func (s *StreamEventsSlice) NextEventNumber() int

func (*StreamEventsSlice) ReadDirection

func (s *StreamEventsSlice) ReadDirection() ReadDirection

func (*StreamEventsSlice) Status

func (s *StreamEventsSlice) Status() SliceReadStatus

func (*StreamEventsSlice) Stream

func (s *StreamEventsSlice) Stream() string

type StreamMetadata

type StreamMetadata map[string]interface{}

func CreateStreamMetadata

func CreateStreamMetadata(
	maxCount *int,
	maxAge *time.Duration,
	truncateBefore *int,
	cacheControl *time.Duration,
	acl *StreamAcl,
) StreamMetadata

func StreamMetadataFromJsonBytes

func StreamMetadataFromJsonBytes(data []byte) (StreamMetadata, error)

type StreamMetadataResult

type StreamMetadataResult struct {
	// contains filtered or unexported fields
}

func NewStreamMetadataResult

func NewStreamMetadataResult(
	stream string,
	isStreamDeleted bool,
	metastreamVersion int,
	streamMetadata StreamMetadata,
) *StreamMetadataResult

func (*StreamMetadataResult) IsStreamDeleted

func (r *StreamMetadataResult) IsStreamDeleted() bool

func (*StreamMetadataResult) MetastreamVersion

func (r *StreamMetadataResult) MetastreamVersion() int

func (*StreamMetadataResult) Stream

func (r *StreamMetadataResult) Stream() string

func (*StreamMetadataResult) StreamMetadata

func (r *StreamMetadataResult) StreamMetadata() StreamMetadata

func (*StreamMetadataResult) String

func (r *StreamMetadataResult) String() string

type SubscriptionDropReason

type SubscriptionDropReason int
const (
	SubscriptionDropReason_UserInitiated                 SubscriptionDropReason = 0
	SubscriptionDropReason_NotAuthenticated              SubscriptionDropReason = 1
	SubscriptionDropReason_AccessDenied                  SubscriptionDropReason = 2
	SubscriptionDropReason_SubscribingError              SubscriptionDropReason = 3
	SubscriptionDropReason_ServerError                   SubscriptionDropReason = 4
	SubscriptionDropReason_ConnectionClosed              SubscriptionDropReason = 5
	SubscriptionDropReason_CatchUpError                  SubscriptionDropReason = 6
	SubscriptionDropReason_ProcessingQueueOverflow       SubscriptionDropReason = 7
	SubscriptionDropReason_EventHandlerException         SubscriptionDropReason = 8
	SubscriptionDropReason_MaxSubscribersReached         SubscriptionDropReason = 9
	SubscriptionDropReason_PersistentSubscriptionDeleted SubscriptionDropReason = 10
	SubscriptionDropReason_NotFound                      SubscriptionDropReason = 11
	SubscriptionDropReason_Unknown                       SubscriptionDropReason = 100
)

func (SubscriptionDropReason) String

func (r SubscriptionDropReason) String() string

type SubscriptionDroppedHandler

type SubscriptionDroppedHandler func(s EventStoreSubscription, dr SubscriptionDropReason, err error) error

type SystemSettings

type SystemSettings struct {
	// contains filtered or unexported fields
}

func NewSystemSettings

func NewSystemSettings(
	userStreamAcl *StreamAcl,
	systemStreamAcl *StreamAcl,
) *SystemSettings

func SystemSettingsFromJsonBytes

func SystemSettingsFromJsonBytes(data []byte) (*SystemSettings, error)

func (*SystemSettings) MarshalJSON

func (s *SystemSettings) MarshalJSON() ([]byte, error)

func (*SystemSettings) String

func (s *SystemSettings) String() string

func (*SystemSettings) SystemStreamAcl

func (s *SystemSettings) SystemStreamAcl() *StreamAcl

func (*SystemSettings) UnmarshalJSON

func (s *SystemSettings) UnmarshalJSON(data []byte) error

func (*SystemSettings) UserStreamAcl

func (s *SystemSettings) UserStreamAcl() *StreamAcl

type TcpFlag

type TcpFlag byte
const (
	FlagsNone          TcpFlag = 0x00
	FlagsAuthenticated TcpFlag = 0x01
)

func (TcpFlag) String

func (f TcpFlag) String() string

type Transaction

type Transaction struct {
	// contains filtered or unexported fields
}

func NewTransaction

func NewTransaction(
	transactionId int64,
	userCredentials *UserCredentials,
	connection TransactionConnection,
) *Transaction

func (*Transaction) CommitAsync

func (t *Transaction) CommitAsync() (*tasks.Task, error)

Task.Result() returns *client.WriteResult

func (*Transaction) Rollback

func (t *Transaction) Rollback() error

func (*Transaction) TransactionId

func (t *Transaction) TransactionId() int64

func (*Transaction) WriteAsync

func (t *Transaction) WriteAsync(events []*EventData) (*tasks.Task, error)

type TransactionConnection

type TransactionConnection interface {
	TransactionalWriteAsync(*Transaction, []*EventData, *UserCredentials) (*tasks.Task, error)
	CommitTransactionAsync(*Transaction, *UserCredentials) (*tasks.Task, error)
}

type UserCredentials

type UserCredentials struct {
	// contains filtered or unexported fields
}

func NewUserCredentials

func NewUserCredentials(username string, password string) *UserCredentials

func (*UserCredentials) Password

func (uc *UserCredentials) Password() string

func (*UserCredentials) Username

func (uc *UserCredentials) Username() string

type WriteResult

type WriteResult struct {
	// contains filtered or unexported fields
}

func NewWriteResult

func NewWriteResult(nextExpectedVersion int, logPosition *Position) *WriteResult

func (*WriteResult) LogPosition

func (r *WriteResult) LogPosition() *Position

func (*WriteResult) NextExpectedVersion

func (r *WriteResult) NextExpectedVersion() int

func (*WriteResult) String

func (r *WriteResult) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL