bus

package
v0.9.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 30, 2019 License: Apache-2.0 Imports: 24 Imported by: 10

Documentation

Index

Constants

View Source
const (
	// TopicOutgoing is topic for external calls
	TopicOutgoing = "TopicOutgoing"

	// TopicIncoming is topic for incoming calls
	TopicIncoming = "TopicIncoming"

	// TopicIncomingRequestResponse is topic for handling incoming RequestResponse messages
	TopicIncomingRequestResults = "TopicIncomingRequestResults"
)

Variables

This section is empty.

Functions

func ReplyAsMessage

func ReplyAsMessage(ctx context.Context, rep insolar.Reply) *message.Message

func ReplyError

func ReplyError(ctx context.Context, sender Sender, meta payload.Meta, err error)

Types

type Bus

type Bus struct {
	// contains filtered or unexported fields
}

Bus is component that sends messages and gives access to replies for them.

func NewBus

NewBus creates Bus instance with provided values.

func (*Bus) IncomingMessageRouter

func (b *Bus) IncomingMessageRouter(handle message.HandlerFunc) message.HandlerFunc

IncomingMessageRouter is watermill middleware for incoming messages - it decides, how to handle it: as request or as reply.

func (*Bus) Reply

func (b *Bus) Reply(ctx context.Context, origin payload.Meta, reply *message.Message)

Reply sends message in response to another message.

func (*Bus) SendRole

func (b *Bus) SendRole(
	ctx context.Context, msg *message.Message, role insolar.DynamicRole, object insolar.Reference,
) (<-chan *message.Message, func())

SendRole sends message to specified role. Node will be calculated automatically for the latest pulse. Use this method unless you need to send a message to a pre-calculated node. Replies will be written to the returned channel. Always read from the channel using multiple assignment (rep, ok := <-ch) because the channel will be closed on timeout.

func (*Bus) SendTarget

func (b *Bus) SendTarget(
	ctx context.Context, msg *message.Message, target insolar.Reference,
) (<-chan *message.Message, func())

SendTarget sends message to a specific node. If you don't know the exact node, use SendRole. Replies will be written to the returned channel. Always read from the channel using multiple assignment (rep, ok := <-ch) because the channel will be closed on timeout.

type RetrySender

type RetrySender struct {
	// contains filtered or unexported fields
}

RetrySender allows to send messaged via provided Sender with retries.

func NewRetrySender

func NewRetrySender(sender Sender, pulseAccessor pulse.Accessor, retries uint, responseCount uint) *RetrySender

NewRetrySender creates RetrySender instance with provided values.

func (*RetrySender) Reply

func (r *RetrySender) Reply(ctx context.Context, origin payload.Meta, reply *message.Message)

func (*RetrySender) SendRole

func (r *RetrySender) SendRole(
	ctx context.Context, msg *message.Message, role insolar.DynamicRole, ref insolar.Reference,
) (<-chan *message.Message, func())

SendRole sends message to specified role, using provided Sender.SendRole. If error with CodeFlowCanceled was received, it retries request after pulse on current node will be changed. Replies will be written to the returned channel. Always read from the channel using multiple assignment (rep, ok := <-ch) because the channel will be closed on timeout.

func (*RetrySender) SendTarget

func (r *RetrySender) SendTarget(ctx context.Context, msg *message.Message, target insolar.Reference) (<-chan *message.Message, func())

type Sender

type Sender interface {
	// SendRole sends message to specified role. Node will be calculated automatically for the latest pulse. Use this
	// method unless you need to send a message to a pre-calculated node.
	// Replies will be written to the returned channel. Always read from the channel using multiple assignment
	// (rep, ok := <-ch) because the channel will be closed on timeout.
	SendRole(
		ctx context.Context, msg *message.Message, role insolar.DynamicRole, object insolar.Reference,
	) (<-chan *message.Message, func())
	// SendTarget sends message to a specific node. If you don't know the exact node, use SendRole.
	// Replies will be written to the returned channel. Always read from the channel using multiple assignment
	// (rep, ok := <-ch) because the channel will be closed on timeout.
	SendTarget(ctx context.Context, msg *message.Message, target insolar.Reference) (<-chan *message.Message, func())
	// Reply sends message in response to another message.
	Reply(ctx context.Context, origin payload.Meta, reply *message.Message)
}

Sender interface sends messages by watermill.

type SenderMock

type SenderMock struct {
	ReplyMock mSenderMockReply

	SendRoleMock mSenderMockSendRole

	SendTargetMock mSenderMockSendTarget
	// contains filtered or unexported fields
}

SenderMock implements Sender

func NewSenderMock

func NewSenderMock(t minimock.Tester) *SenderMock

NewSenderMock returns a mock for Sender

func (*SenderMock) MinimockFinish

func (m *SenderMock) MinimockFinish()

MinimockFinish checks that all mocked methods have been called the expected number of times

func (*SenderMock) MinimockReplyDone

func (m *SenderMock) MinimockReplyDone() bool

MinimockReplyDone returns true if the count of the Reply invocations corresponds the number of defined expectations

func (*SenderMock) MinimockReplyInspect

func (m *SenderMock) MinimockReplyInspect()

MinimockReplyInspect logs each unmet expectation

func (*SenderMock) MinimockSendRoleDone

func (m *SenderMock) MinimockSendRoleDone() bool

MinimockSendRoleDone returns true if the count of the SendRole invocations corresponds the number of defined expectations

func (*SenderMock) MinimockSendRoleInspect

func (m *SenderMock) MinimockSendRoleInspect()

MinimockSendRoleInspect logs each unmet expectation

func (*SenderMock) MinimockSendTargetDone

func (m *SenderMock) MinimockSendTargetDone() bool

MinimockSendTargetDone returns true if the count of the SendTarget invocations corresponds the number of defined expectations

func (*SenderMock) MinimockSendTargetInspect

func (m *SenderMock) MinimockSendTargetInspect()

MinimockSendTargetInspect logs each unmet expectation

func (*SenderMock) MinimockWait

func (m *SenderMock) MinimockWait(timeout mm_time.Duration)

MinimockWait waits for all mocked methods to be called the expected number of times

func (*SenderMock) Reply

func (mmReply *SenderMock) Reply(ctx context.Context, origin payload.Meta, reply *message.Message)

Reply implements Sender

func (*SenderMock) ReplyAfterCounter

func (mmReply *SenderMock) ReplyAfterCounter() uint64

ReplyAfterCounter returns a count of finished SenderMock.Reply invocations

func (*SenderMock) ReplyBeforeCounter

func (mmReply *SenderMock) ReplyBeforeCounter() uint64

ReplyBeforeCounter returns a count of SenderMock.Reply invocations

func (*SenderMock) SendRole

func (mmSendRole *SenderMock) SendRole(ctx context.Context, msg *message.Message, role insolar.DynamicRole, object insolar.Reference) (ch1 <-chan *message.Message, f1 func())

SendRole implements Sender

func (*SenderMock) SendRoleAfterCounter

func (mmSendRole *SenderMock) SendRoleAfterCounter() uint64

SendRoleAfterCounter returns a count of finished SenderMock.SendRole invocations

func (*SenderMock) SendRoleBeforeCounter

func (mmSendRole *SenderMock) SendRoleBeforeCounter() uint64

SendRoleBeforeCounter returns a count of SenderMock.SendRole invocations

func (*SenderMock) SendTarget

func (mmSendTarget *SenderMock) SendTarget(ctx context.Context, msg *message.Message, target insolar.Reference) (ch1 <-chan *message.Message, f1 func())

SendTarget implements Sender

func (*SenderMock) SendTargetAfterCounter

func (mmSendTarget *SenderMock) SendTargetAfterCounter() uint64

SendTargetAfterCounter returns a count of finished SenderMock.SendTarget invocations

func (*SenderMock) SendTargetBeforeCounter

func (mmSendTarget *SenderMock) SendTargetBeforeCounter() uint64

SendTargetBeforeCounter returns a count of SenderMock.SendTarget invocations

type SenderMockReplyExpectation

type SenderMockReplyExpectation struct {
	Counter uint64
	// contains filtered or unexported fields
}

SenderMockReplyExpectation specifies expectation struct of the Sender.Reply

type SenderMockReplyParams

type SenderMockReplyParams struct {
	// contains filtered or unexported fields
}

SenderMockReplyParams contains parameters of the Sender.Reply

type SenderMockSendRoleExpectation

type SenderMockSendRoleExpectation struct {
	Counter uint64
	// contains filtered or unexported fields
}

SenderMockSendRoleExpectation specifies expectation struct of the Sender.SendRole

func (*SenderMockSendRoleExpectation) Then

func (e *SenderMockSendRoleExpectation) Then(ch1 <-chan *message.Message, f1 func()) *SenderMock

Then sets up Sender.SendRole return parameters for the expectation previously defined by the When method

type SenderMockSendRoleParams

type SenderMockSendRoleParams struct {
	// contains filtered or unexported fields
}

SenderMockSendRoleParams contains parameters of the Sender.SendRole

type SenderMockSendRoleResults

type SenderMockSendRoleResults struct {
	// contains filtered or unexported fields
}

SenderMockSendRoleResults contains results of the Sender.SendRole

type SenderMockSendTargetExpectation

type SenderMockSendTargetExpectation struct {
	Counter uint64
	// contains filtered or unexported fields
}

SenderMockSendTargetExpectation specifies expectation struct of the Sender.SendTarget

func (*SenderMockSendTargetExpectation) Then

func (e *SenderMockSendTargetExpectation) Then(ch1 <-chan *message.Message, f1 func()) *SenderMock

Then sets up Sender.SendTarget return parameters for the expectation previously defined by the When method

type SenderMockSendTargetParams

type SenderMockSendTargetParams struct {
	// contains filtered or unexported fields
}

SenderMockSendTargetParams contains parameters of the Sender.SendTarget

type SenderMockSendTargetResults

type SenderMockSendTargetResults struct {
	// contains filtered or unexported fields
}

SenderMockSendTargetResults contains results of the Sender.SendTarget

type WaitOKSender

type WaitOKSender struct {
	// contains filtered or unexported fields
}

WaitOKSender allows to send messaged via provided Sender and wait for reply.OK.

func NewWaitOKSender

func NewWaitOKSender(sender Sender) *WaitOKSender

NewWaitOKSender creates WaitOKSender instance with provided values.

func NewWaitOKWithRetrySender

func NewWaitOKWithRetrySender(sender Sender, pulseAccessor pulse.Accessor, retries uint) *WaitOKSender

NewWaitOKWithRetrySender creates WaitOKSender instance with RetrySender as Sender.

func (*WaitOKSender) SendRole

func (c *WaitOKSender) SendRole(
	ctx context.Context, msg *message.Message, role insolar.DynamicRole, ref insolar.Reference,
)

SendRole sends message to specified role, using provided Sender.SendRole. It waiting for one reply and close replies channel after getting it. If reply is not reply.OK, it logs error message.

func (*WaitOKSender) SendTarget

func (c *WaitOKSender) SendTarget(
	ctx context.Context, msg *message.Message, target insolar.Reference)

SendTarget sends message to specified target, using provided Sender.SendTarget. It waiting for one reply and close replies channel after getting it. If reply is not reply.OK, it logs error message.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL