Documentation ¶
Overview ¶
Package gateway implements the full PubSub gateway features. It's the core of PubSub system.
Index ¶
- Constants
- Variables
- func AddTagToMessage(m *mpool.Message, tag string)
- func EnsureServerUlimit()
- func ExtractMessageTag(msg []byte) ([]string, int, error)
- func FastListener(gw *Gateway, l net.Listener) net.Listener
- func IsTaggedMessage(msg []byte) bool
- func LimitListener(name string, gw *Gateway, l net.Listener, n int) net.Listener
- func NewPubMetrics(gw *Gateway) *pubMetrics
- func NewServerMetrics(interval time.Duration, gw *Gateway) *serverMetrics
- func NewSubMetrics(gw *Gateway) *subMetrics
- func ParseFlags()
- func SetupLogging(logFile, level, crashLogFile string)
- func ShouldUseForwardedProto() bool
- func ValidateFlags()
- type AccessLogger
- type FramePolicy
- type Gateway
- type Message
- type SecurityConfig
- type SubStatus
- type WriterWrapper
Constants ¶
const ( HttpHeaderXForwardedFor = "X-Forwarded-For" HttpHeaderPartition = "X-Partition" HttpHeaderOffset = "X-Offset" HttpHeaderMsgBury = "X-Bury" HttpHeaderMsgKey = "X-Key" HttpHeaderMsgTag = "X-Tag" HttpHeaderJobId = "X-Job-Id" HttpHeaderAcceptEncoding = "Accept-Encoding" HttpHeaderContentEncoding = "Content-Encoding" HttpEncodingGzip = "gzip" UrlParamTopic = "topic" UrlParamVersion = "ver" UrlParamAppid = "appid" UrlParamGroup = "group" MaxPartitionKeyLen = 256 )
const ( TagMarkStart = byte(1) // FIXME conflicts with ProtocolBuffer TagMarkEnd = byte(2) TagSeperator = ";" // follow cookie rules a=b;c=d )
Variables ¶
var ( ResponseOk = []byte(`{"ok":1}`) HttpHeaderAppid = "Appid" HttpHeaderPubkey = "Pubkey" HttpHeaderSubkey = "Subkey" )
var ( ErrClientGone = errors.New("remote client gone") ErrTooBigMessage = errors.New("too big message") ErrTooSmallMessage = errors.New("too small message") ErrIllegalTaggedMessage = errors.New("illegal tagged message") ErrClientKilled = errors.New("client killed") ErrBadResponseWriter = errors.New("ResponseWriter Close not supported") ErrPartitionOutOfRange = errors.New("partition out of range") ErrOffsetOutOfRange = errors.New("offset out of range") )
var ( Options struct { Id string Zone string ConfigFile string PubHttpAddr string PubHttpsAddr string SubHttpAddr string SubHttpsAddr string ManHttpAddr string ManHttpsAddr string DebugHttpAddr string Store string JobStore string ManagerStore string PidFile string CertFile string KeyFile string LogFile string LogLevel string CrashLogFile string DummyCluster string InfluxServer string InfluxDbName string KillFile string HintedHandoffType string HintedHandoffDir string AllwaysHintedHandoff bool ShowVersion bool Ratelimit bool PermitStandbySub bool DisableMetrics bool EnableHintedHandoff bool HintedHandoffBufio bool FlushHintedOffOnly bool BadGroupRateLimit bool BadPubAppRateLimit bool RunSwaggerServer bool AuditPub bool AuditSub bool EnableGzip bool DryRun bool CpuAffinity bool EnableAccessLog bool EnableHttpPanicRecover bool GolangTrace bool PermitUnregisteredGroup bool UseCompress bool Debug bool EnableRegistry bool HttpHeaderMaxBytes int MaxPubSize int64 MaxJobSize int64 LogRotateSize int MaxMsgTagLen int MinPubSize int PubQpsLimit int64 MaxSubBatchSize int MaxClients int MaxRequestPerConn int // to make load balancer distribute request even for persistent conn PubPoolCapcity int AssignJobShardId int // how to assign shard id for new app PubPoolIdleTimeout time.Duration SubTimeout time.Duration OffsetCommitInterval time.Duration BadClientPunishDuration time.Duration InternalServerErrorBackoff time.Duration ReporterInterval time.Duration MetaRefresh time.Duration ManagerRefresh time.Duration HttpReadTimeout time.Duration HttpWriteTimeout time.Duration MaxWaitBeforeForceClose time.Duration } )
Functions ¶
func AddTagToMessage ¶
┌────────────────────────────┐ ┌────────┐ │TagMarkStart Tag TagMarkEnd │ │Message │ └────────────────────────────┘ └────────┘
func EnsureServerUlimit ¶
func EnsureServerUlimit()
func IsTaggedMessage ¶
func LimitListener ¶
LimitListener returns a Listener that accepts at most n simultaneous connections from the provided Listener.
func NewPubMetrics ¶
func NewPubMetrics(gw *Gateway) *pubMetrics
func NewServerMetrics ¶
func NewSubMetrics ¶
func NewSubMetrics(gw *Gateway) *subMetrics
func ParseFlags ¶
func ParseFlags()
func SetupLogging ¶
func SetupLogging(logFile, level, crashLogFile string)
func ShouldUseForwardedProto ¶
func ShouldUseForwardedProto() bool
ShouldUseForwardedProto returns whether to trust the X-Forwarded-Proto header field. DefaultConfig.HTTPSUseForwardedProto is initialized to this value.
This value depends on the particular environment where the package is built. It is currently true iff build constraint "heroku" is satisfied.
func ValidateFlags ¶
func ValidateFlags()
Types ¶
type AccessLogger ¶
type AccessLogger struct {
// contains filtered or unexported fields
}
AccessLogger is a daily rotating/unblocking logger to record access log.
func NewAccessLogger ¶
func NewAccessLogger(fn string, poolSize int) *AccessLogger
func (*AccessLogger) Discarded ¶
func (this *AccessLogger) Discarded() uint64
func (*AccessLogger) Log ¶
func (this *AccessLogger) Log(line []byte)
Caution: NEVER call Log after Stop is called.
func (*AccessLogger) Start ¶
func (this *AccessLogger) Start() error
func (*AccessLogger) Stop ¶
func (this *AccessLogger) Stop()
type FramePolicy ¶
type FramePolicy string
FramePolicy tells the browser under what circumstances to allow the response to be displayed inside an HTML frame. There are three options:
Deny do not permit display in a frame SameOrigin permit display in a frame from the same origin AllowFrom(url) permit display in a frame from the given url
const ( Deny FramePolicy = "DENY" SameOrigin FramePolicy = "SAMEORIGIN" )
func AllowFrom ¶
func AllowFrom(url string) FramePolicy
AllowFrom returns a FramePolicy specifying that the requested resource should be included in a frame from only the given url.
type Gateway ¶
type Gateway struct {
// contains filtered or unexported fields
}
Gateway is a distributed Pub/Sub HTTP endpoint.
Working with ehaproxy, it can form a Pub/Sub cluster system.
func (*Gateway) InstanceInfo ¶
func (*Gateway) ServeForever ¶
func (this *Gateway) ServeForever()
type Message ¶
func DecodeMessageSet ¶
type SecurityConfig ¶
type SecurityConfig struct { // If true, redirects any request with scheme http to the equivalent https URL. HTTPSRedirect bool HTTPSUseForwardedProto bool // Allow cleartext (non-HTTPS) HTTP connections to a loopback // address, even if HTTPSRedirect is true. PermitClearLoopback bool // If true, sets X-Content-Type-Options to "nosniff". ContentTypeOptions bool // If true, sets the HTTP Strict Transport Security header // field, which instructs browsers to send future requests // over HTTPS, even if the URL uses the unencrypted http // scheme. HSTS bool HSTSMaxAge time.Duration HSTSIncludeSubdomains bool // If true, sets X-Frame-Options, to control when the request // should be displayed inside an HTML frame. FrameOptions bool FrameOptionsPolicy FramePolicy // If true, sets X-XSS-Protection to "1", optionally with // "mode=block". See the official documentation, linked above, // for the meaning of these values. XSSProtection bool XSSProtectionBlock bool // Used by ServeHTTP, after setting any extra headers, to // reply to the request. Next is typically nil, in which case // http.DefaultServeMux is used instead. Next http.Handler }
SecurityConfig adds some HTTP header fields widely considered to improve safety of HTTP requests. These fields are documented as follows:
Strict Transport Security: https://tools.ietf.org/html/rfc6797 Frame Options: https://tools.ietf.org/html/draft-ietf-websec-x-frame-options-00 Cross Site Scripting: http://msdn.microsoft.com/en-us/library/dd565647%28v=vs.85%29.aspx Content Type Options: http://msdn.microsoft.com/en-us/library/ie/gg622941%28v=vs.85%29.aspx
func (*SecurityConfig) ServeHTTP ¶
func (c *SecurityConfig) ServeHTTP(w http.ResponseWriter, r *http.Request)
type SubStatus ¶
type SubStatus struct { Appid string `json:"appid,omitempty"` Group string `json:"group"` Topic string `json:"topic,omitempty"` Partition string `json:"partition"` ProducedOldest int64 `json:"pold"` ProducedNewest int64 `json:"pubd"` Consumed int64 `json:"subd"` ClientRealIP string `json:"realip"` }
type WriterWrapper ¶
type WriterWrapper interface { http.ResponseWriter // Status returns the HTTP status of the request, or 0 if one has not // yet been sent. Status() int // BytesWritten returns the total number of bytes sent to the client. BytesWritten() int }
func SniffWriter ¶
func SniffWriter(w http.ResponseWriter) WriterWrapper
Source Files ¶
- access_logger.go
- codec.go
- const.go
- doc.go
- errors.go
- gateway.go
- guard.go
- handler_job.go
- handler_man.go
- handler_man_pub.go
- handler_man_sub.go
- handler_metrics.go
- handler_pub.go
- handler_pub_raw.go
- handler_pub_ws.go
- handler_sub.go
- handler_sub_ack.go
- handler_sub_bury.go
- handler_sub_raw.go
- handler_sub_ws.go
- handler_token.go
- handler_xa.go
- healthchk.go
- hijack.go
- jwt.go
- listener.go
- log.go
- metrics_pub.go
- metrics_server.go
- metrics_sub.go
- middleware.go
- option.go
- response.go
- routing.go
- rpc.go
- security.go
- server.go
- server_man.go
- server_pub.go
- server_sub.go
- server_web.go
- tag.go
- ulimit_linux.go
- util.go
- writer_wrapper.go
- ws.go