Documentation ¶
Index ¶
- Constants
- type Consumer
- type ConsumerStats
- type LogsServer
- type LogsService
- func (ls *LogsService) AddAdapter(a adapter.Adapter)
- func (ls *LogsService) GetConsumersStats(ctx context.Context) (stats ConsumerStats)
- func (ls *LogsService) Run(ctx context.Context) (err error)
- func (ls *LogsService) RunGRPCServer(ctx context.Context) error
- func (ls *LogsService) RunHealthCheckHandler(ctx context.Context) error
- func (ls *LogsService) Shutdown(ctx context.Context) (err error)
- func (ls *LogsService) WithGrpcAddress(address string) *LogsService
- func (ls *LogsService) WithHttpAddress(address string) *LogsService
- func (ls *LogsService) WithLogsRepositoryFactory(f repository.Factory) *LogsService
- func (ls *LogsService) WithRandomPort() *LogsService
- func (ls *LogsService) WithStopWaitTime(duration time.Duration) *LogsService
Constants ¶
View Source
const ( StatusPending byte = 1 StatusFinished byte = 2 StreamPrefix = "log" StartSubject = "events.logs.start" StopSubject = "events.logs.stop" StartQueue = "logsstart" StopQueue = "logsstop" )
View Source
const ( DefaultHttpAddress = ":8080" DefaultGrpcAddress = ":9090" DefaultStopWaitTime = 60 * time.Second // when stop event is faster than first message arrived )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Consumer ¶
type Consumer struct { // Context is a consumer context you can call Stop() method on it when no more messages are expected Context jetstream.ConsumeContext // Instance is a NATS consumer instance Instance jetstream.Consumer }
type ConsumerStats ¶
type LogsServer ¶
type LogsServer struct { pb.UnimplementedLogsServiceServer // contains filtered or unexported fields }
func NewLogsServer ¶
func NewLogsServer(repo repository.Factory, state state.Interface) *LogsServer
func (LogsServer) Logs ¶
func (s LogsServer) Logs(req *pb.LogRequest, stream pb.LogsService_LogsServer) error
type LogsService ¶
type LogsService struct { Ready chan struct{} // contains filtered or unexported fields }
func NewLogsService ¶
func NewLogsService(nats *nats.Conn, js jetstream.JetStream, state state.Interface) *LogsService
func (*LogsService) AddAdapter ¶
func (ls *LogsService) AddAdapter(a adapter.Adapter)
AddAdapter adds new adapter to logs service adapters will be configred based on given mode e.g. cloud mode will get cloud adapter to store logs directly on the cloud
func (*LogsService) GetConsumersStats ¶
func (ls *LogsService) GetConsumersStats(ctx context.Context) (stats ConsumerStats)
func (*LogsService) RunGRPCServer ¶
func (ls *LogsService) RunGRPCServer(ctx context.Context) error
TODO handle TLS
func (*LogsService) RunHealthCheckHandler ¶
func (ls *LogsService) RunHealthCheckHandler(ctx context.Context) error
RunHealthCheckHandler is a handler for health check events we need HTTP as GRPC probes starts from Kubernetes 1.25
func (*LogsService) WithGrpcAddress ¶
func (ls *LogsService) WithGrpcAddress(address string) *LogsService
func (*LogsService) WithHttpAddress ¶
func (ls *LogsService) WithHttpAddress(address string) *LogsService
func (*LogsService) WithLogsRepositoryFactory ¶
func (ls *LogsService) WithLogsRepositoryFactory(f repository.Factory) *LogsService
func (*LogsService) WithRandomPort ¶
func (ls *LogsService) WithRandomPort() *LogsService
func (*LogsService) WithStopWaitTime ¶ added in v1.16.25
func (ls *LogsService) WithStopWaitTime(duration time.Duration) *LogsService
Click to show internal directories.
Click to hide internal directories.