Documentation ¶
Index ¶
- type NullEventSpool
- type Publisher
- func (p *Publisher) Connect() chan<- []*core.EventDescriptor
- func (p *Publisher) OnAck(endpoint *endpoint.Endpoint, pendingPayload *payload.Payload, firstAck bool, ...)
- func (p *Publisher) OnFail(endpoint *endpoint.Endpoint)
- func (p *Publisher) OnFinish(endpoint *endpoint.Endpoint) bool
- func (p *Publisher) OnPong(endpoint *endpoint.Endpoint)
- func (p *Publisher) OnStarted(endpoint *endpoint.Endpoint)
- func (p *Publisher) Run()
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type NullEventSpool ¶
type NullEventSpool struct { }
NullEventSpool is a dummy registrar used by publisher when there is no registrar to send acknowledgements to for persistence. It simply discards all acknowledgement requests it is given. Currently used during stdin pipe processing where persistence becomes irrelevant.
func (*NullEventSpool) Add ¶
func (s *NullEventSpool) Add(event registrar.EventProcessor)
Add does nothing - it's a dummy registrar
func (*NullEventSpool) Close ¶
func (s *NullEventSpool) Close()
Close does nothing - it's a dummy registrar
func (*NullEventSpool) Send ¶
func (s *NullEventSpool) Send()
Send does nothing - it's a dummy registrar
type Publisher ¶
type Publisher struct { core.PipelineSegment core.PipelineConfigReceiver // contains filtered or unexported fields }
Publisher handles payloads and is responsible for passing ordered acknowledgements to the Registrar It makes all the load balancing and distribution decisions, leaving transport state management to the EndpointSink We have always used a Push mechanism for load balancing, in the sense that the Publisher will push out events to transports and potentially pull them back if it deems there's a problem, rather than letting transports pull the events from the Publisher and then the transport making decisions on whether there is a problem. This pattern continues that tradition but with there now potentially being multiple transports rather than just one TODO: Extrapolate the load balance / failover logic to other interfaces?
I'm thinking not, as the difference is very little
func NewPublisher ¶
func NewPublisher(pipeline *core.Pipeline, config *config.Config, registrar registrar.Registrator) *Publisher
NewPublisher creates a new publisher instance on the given pipeline
func (*Publisher) Connect ¶
func (p *Publisher) Connect() chan<- []*core.EventDescriptor
Connect is used by Spooler TODO: Spooler doesn't need to know of publisher, only of events
func (*Publisher) OnAck ¶
func (p *Publisher) OnAck(endpoint *endpoint.Endpoint, pendingPayload *payload.Payload, firstAck bool, lineCount int)
OnAck handles acknowledgements from endpoints It keeps track of how many out of sync acknowldgements have been made so shutdown can be postponed if we've received Acks for newer events before older events. It also serialises the Ack offsets for correct registrar storage to ensure the registrar offsets are always sequential
func (*Publisher) OnFinish ¶
OnFinish handles when endpoints are finished Should return false if the endpoint is not to be reinitialised, such as when shutting down