Documentation ¶
Overview ¶
Package http provides amqp data processing and handling.
Index ¶
- Variables
- func Get(url string) (int, error)
- func GetByte(url string) ([]byte, int, error)
- func Post(address string, e cloudevents.Event) error
- type Protocol
- type Server
- func (h *Server) ClientID() uuid.UUID
- func (h *Server) DeleteSender(key uuid.UUID)
- func (h *Server) GetSender(key uuid.UUID, servicePath ServiceResourcePath) *Protocol
- func (h *Server) GetSenderMap(key uuid.UUID) map[ServiceResourcePath]*Protocol
- func (h *Server) HTTPProcessor(wg *sync.WaitGroup)
- func (h *Server) NewClient(host string, connOption []httpP.Option) (httpClient.Client, error)
- func (h *Server) NewSender(clientID uuid.UUID, address string) error
- func (h *Server) RegisterPublishers(publisherURL ...*types.URI)
- func (h *Server) ReloadSubsFromStore()
- func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress, resourceAddress string, ...)
- func (h *Server) SetOnStatusReceiveOverrideFn(fn func(e cloudevents.Event, dataChan *channel.DataChan) error)
- func (h *Server) SetProcessEventFn(fn func(e interface{}) error)
- func (h *Server) SetSender(key uuid.UUID, val map[ServiceResourcePath]*Protocol)
- func (h *Server) Shutdown()
- func (h *Server) Start(wg *sync.WaitGroup) error
- func (h *Server) UnRegisterPublishers(publisherURL *types.URI)
- type ServiceResourcePath
Constants ¶
This section is empty.
Variables ¶
var (
RequestReadHeaderTimeout = 2 * time.Second
)
Functions ¶
Types ¶
type Server ¶
type Server struct { sync.RWMutex Sender map[uuid.UUID]map[ServiceResourcePath]*Protocol Publishers []*types.URI ServiceName string Port int DataIn <-chan *channel.DataChan DataOut chan<- *channel.DataChan Client httpClient.Client //close on true CloseCh <-chan struct{} // contains filtered or unexported fields }
Server ...
func InitServer ¶
func InitServer(serviceName string, port int, storePath string, dataIn <-chan *channel.DataChan, dataOut chan<- *channel.DataChan, closeCh <-chan struct{}, onStatusReceiveOverrideFn func(e cloudevents.Event, dataChan *channel.DataChan) error, processEventFn func(e interface{}) error) (*Server, error)
InitServer initialize http configurations
func (*Server) DeleteSender ¶
DeleteSender ... delete listener
func (*Server) GetSender ¶
func (h *Server) GetSender(key uuid.UUID, servicePath ServiceResourcePath) *Protocol
GetSender is a wrapper for getting the value of a key in the underlying map
func (*Server) GetSenderMap ¶
func (h *Server) GetSenderMap(key uuid.UUID) map[ServiceResourcePath]*Protocol
GetSenderMap GetSender is a wrapper for getting the value of a key in the underlying map
func (*Server) HTTPProcessor ¶
HTTPProcessor ... Server the web Server listens on data and do either create subscribers and acts as publisher
//create a status ping
in <- &channel.DataChan{ Address: addr, Type: channel.STATUS, Status: channel.NEW, OnReceiveOverrideFn: func(e cloudevents.Event) error {} ProcessOutChDataFn: func (e event.Event) error {}
}
// create a subscriber
in <- &channel.DataChan{ ID: subscriptionOneID, Address: subscriptionOne.Resource, Type: channel.SUBSCRIBER, }
// send data
in <- &channel.DataChan{ Address: addr, Data: &event, Status: channel.NEW, Type: channel.EVENT, }
func (*Server) RegisterPublishers ¶
RegisterPublishers this will register publisher
func (*Server) ReloadSubsFromStore ¶
func (h *Server) ReloadSubsFromStore()
ReloadSubsFromStore creates senders for subscribers restored from persistent store
func (*Server) SendTo ¶
func (h *Server) SendTo(wg *sync.WaitGroup, clientID uuid.UUID, clientAddress, resourceAddress string, e *cloudevents.Event, eventType channel.Type)
SendTo sends events to the address specified
func (*Server) SetOnStatusReceiveOverrideFn ¶
func (h *Server) SetOnStatusReceiveOverrideFn(fn func(e cloudevents.Event, dataChan *channel.DataChan) error)
SetOnStatusReceiveOverrideFn ... sets receiver function
func (*Server) SetProcessEventFn ¶
SetProcessEventFn ...
func (*Server) SetSender ¶
func (h *Server) SetSender(key uuid.UUID, val map[ServiceResourcePath]*Protocol)
SetSender is a wrapper for setting the value of a key in the underlying map
func (*Server) Shutdown ¶
func (h *Server) Shutdown()
Shutdown ... shutdown rest service api, but it will not close until close chan is called
func (*Server) UnRegisterPublishers ¶
UnRegisterPublishers this will un register publisher
type ServiceResourcePath ¶
type ServiceResourcePath string
const ( DEFAULT ServiceResourcePath = "" HEALTH ServiceResourcePath = "/health" EVENT ServiceResourcePath = "/event" SUBSCRIPTION ServiceResourcePath = "/subscription" )