Documentation ¶
Overview ¶
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2023, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2021, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Package transport provides streaming object-based transport over http for intra-cluster continuous intra-cluster communications (see README for details and usage example).
- Copyright (c) 2018-2022, NVIDIA CORPORATION. All rights reserved.
Example (Headers) ¶
package main import ( "encoding/binary" "fmt" "io" "net/http" "net/http/httptest" "sync" "github.com/artashesbalabekyan/aistore/api/apc" "github.com/artashesbalabekyan/aistore/cmn" "github.com/artashesbalabekyan/aistore/cmn/cos" "github.com/artashesbalabekyan/aistore/memsys" "github.com/artashesbalabekyan/aistore/transport" ) const ( lorem = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.` duis = `Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.` et = `Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est eligendi optio, cumque nihil impedit, quo minus id, quod maxime placeat, facere possimus, omnis voluptas assumenda est, omnis dolor repellendus.` temporibus = `Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet, ut et voluptates repudiandae sint et molestiae non-recusandae.` ) func main() { f := func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { panic(err) } if len(body) == 0 { return } var ( hdr transport.ObjHdr hlen, off int ) for { hlen = int(binary.BigEndian.Uint64(body[off:])) off += 16 // hlen and hlen-checksum hdr = transport.ExtObjHeader(body[off:], hlen) if transport.ReservedOpcode(hdr.Opcode) { break } fmt.Printf("%+v (%d)\n", hdr, hlen) off += hlen + int(hdr.ObjAttrs.Size) } } ts := httptest.NewServer(http.HandlerFunc(f)) defer ts.Close() httpclient := transport.NewIntraDataClient() stream := transport.NewObjStream(httpclient, ts.URL, cos.GenTie(), nil) sendText(stream, lorem, duis) stream.Fin() } func sendText(stream *transport.Stream, txt1, txt2 string) { var wg sync.WaitGroup cb := func(transport.ObjHdr, io.ReadCloser, any, error) { wg.Done() } sgl1 := memsys.PageMM().NewSGL(0) sgl1.Write([]byte(txt1)) hdr := transport.ObjHdr{ Bck: cmn.Bck{ Name: "abc", Provider: apc.AWS, Ns: cmn.Ns{UUID: "uuid", Name: "namespace"}, }, ObjName: "X", ObjAttrs: cmn.ObjAttrs{ Size: sgl1.Size(), Atime: 663346294, Cksum: cos.NewCksum(cos.ChecksumXXHash, "h1"), Ver: "1", }, Opaque: nil, } wg.Add(1) stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl1, Callback: cb}) wg.Wait() sgl2 := memsys.PageMM().NewSGL(0) sgl2.Write([]byte(txt2)) hdr = transport.ObjHdr{ Bck: cmn.Bck{ Name: "abracadabra", Provider: apc.AIS, Ns: cmn.NsGlobal, }, ObjName: "p/q/s", ObjAttrs: cmn.ObjAttrs{ Size: sgl2.Size(), Atime: 663346294, Cksum: cos.NewCksum(cos.ChecksumXXHash, "h2"), Ver: "222222222222222222222222", }, Opaque: []byte{'1', '2', '3'}, } hdr.ObjAttrs.SetCustomMD(cos.StrKVs{"xx": "11", "yy": "22"}) wg.Add(1) stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl2, Callback: cb}) wg.Wait() }
Output: {Bck:s3://@uuid#namespace/abc ObjName:X SID: Opaque:[] ObjAttrs:{Cksum:xxhash[h1] CustomMD:map[] Ver:1 Atime:663346294 Size:231} Opcode:0} (69) {Bck:ais://abracadabra ObjName:p/q/s SID: Opaque:[49 50 51] ObjAttrs:{Cksum:xxhash[h2] CustomMD:map[xx:11 yy:22] Ver:222222222222222222222222 Atime:663346294 Size:213} Opcode:0} (110)
Example (Msg) ¶
receive := func(msg transport.Msg, err error) error { if !transport.ReservedOpcode(msg.Opcode) { fmt.Printf("%s...\n", string(msg.Body[:16])) } return nil } ts := httptest.NewServer(msgmux) defer ts.Close() trname := "dummy-msg" err := transport.HandleMsgStream(trname, receive) if err != nil { fmt.Println(err) return } httpclient := transport.NewIntraDataClient() url := ts.URL + transport.MsgURLPath(trname) stream := transport.NewMsgStream(httpclient, url, cos.GenTie()) stream.Send(&transport.Msg{Body: []byte(lorem)}) stream.Send(&transport.Msg{Body: []byte(duis)}) stream.Send(&transport.Msg{Body: []byte(et)}) stream.Send(&transport.Msg{Body: []byte(temporibus)}) stream.Fin()
Output: Lorem ipsum dolo... Duis aute irure ... Et harum quidem ... Temporibus autem...
Example (Obj) ¶
package main import ( "fmt" "io" "net/http/httptest" "sync" "time" "github.com/artashesbalabekyan/aistore/3rdparty/golang/mux" "github.com/artashesbalabekyan/aistore/api/apc" "github.com/artashesbalabekyan/aistore/cmn" "github.com/artashesbalabekyan/aistore/cmn/cos" "github.com/artashesbalabekyan/aistore/memsys" "github.com/artashesbalabekyan/aistore/transport" ) const ( lorem = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat.` duis = `Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.` et = `Et harum quidem rerum facilis est et expedita distinctio. Nam libero tempore, cum soluta nobis est eligendi optio, cumque nihil impedit, quo minus id, quod maxime placeat, facere possimus, omnis voluptas assumenda est, omnis dolor repellendus.` temporibus = `Temporibus autem quibusdam et aut officiis debitis aut rerum necessitatibus saepe eveniet, ut et voluptates repudiandae sint et molestiae non-recusandae.` ) var objmux *mux.ServeMux func sendText(stream *transport.Stream, txt1, txt2 string) { var wg sync.WaitGroup cb := func(transport.ObjHdr, io.ReadCloser, any, error) { wg.Done() } sgl1 := memsys.PageMM().NewSGL(0) sgl1.Write([]byte(txt1)) hdr := transport.ObjHdr{ Bck: cmn.Bck{ Name: "abc", Provider: apc.AWS, Ns: cmn.Ns{UUID: "uuid", Name: "namespace"}, }, ObjName: "X", ObjAttrs: cmn.ObjAttrs{ Size: sgl1.Size(), Atime: 663346294, Cksum: cos.NewCksum(cos.ChecksumXXHash, "h1"), Ver: "1", }, Opaque: nil, } wg.Add(1) stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl1, Callback: cb}) wg.Wait() sgl2 := memsys.PageMM().NewSGL(0) sgl2.Write([]byte(txt2)) hdr = transport.ObjHdr{ Bck: cmn.Bck{ Name: "abracadabra", Provider: apc.AIS, Ns: cmn.NsGlobal, }, ObjName: "p/q/s", ObjAttrs: cmn.ObjAttrs{ Size: sgl2.Size(), Atime: 663346294, Cksum: cos.NewCksum(cos.ChecksumXXHash, "h2"), Ver: "222222222222222222222222", }, Opaque: []byte{'1', '2', '3'}, } hdr.ObjAttrs.SetCustomMD(cos.StrKVs{"xx": "11", "yy": "22"}) wg.Add(1) stream.Send(&transport.Obj{Hdr: hdr, Reader: sgl2, Callback: cb}) wg.Wait() } func main() { receive := func(hdr transport.ObjHdr, objReader io.Reader, err error) error { cos.Assert(err == nil) object, err := io.ReadAll(objReader) if err != nil { panic(err) } if int64(len(object)) != hdr.ObjAttrs.Size { panic(fmt.Sprintf("size %d != %d", len(object), hdr.ObjAttrs.Size)) } fmt.Printf("%s...\n", string(object[:16])) return nil } ts := httptest.NewServer(objmux) defer ts.Close() trname := "dummy-obj" err := transport.HandleObjStream(trname, receive) if err != nil { fmt.Println(err) return } httpclient := transport.NewIntraDataClient() stream := transport.NewObjStream(httpclient, ts.URL+transport.ObjURLPath(trname), cos.GenTie(), nil) sendText(stream, lorem, duis) sendText(stream, et, temporibus) stream.Fin() }
Output: Lorem ipsum dolo... Duis aute irure ... Et harum quidem ... Temporibus autem...
Index ¶
- Constants
- func DrainAndFreeReader(r io.Reader)
- func FreeRecv(object io.Reader)
- func GetStats() (netstats map[string]EndpointStats, err error)
- func HandleMsgStream(trname string, rxMsg RecvMsg) error
- func HandleObjStream(trname string, rxObj RecvObj) error
- func IsErrDuplicateTrname(e error) bool
- func MsgURLPath(trname string) string
- func ObjURLPath(trname string) string
- func ReservedOpcode(opc int) bool
- func RxAnyStream(w http.ResponseWriter, r *http.Request)
- func UID2SessID(uid uint64) (xxh, sessID uint64)
- func Unhandle(trname string) (err error)
- type Client
- type EndpointStats
- type ErrDuplicateTrname
- type Extra
- type Msg
- type MsgStream
- func (s *MsgStream) Abort()
- func (s *MsgStream) Fin()
- func (s *MsgStream) GetStats() (stats Stats)
- func (s *MsgStream) ID() (string, int64)
- func (s *MsgStream) IsTerminated() bool
- func (s *MsgStream) Read(b []byte) (n int, err error)
- func (s *MsgStream) Send(msg *Msg) (err error)
- func (s *MsgStream) Stop()
- func (s *MsgStream) String() string
- func (s *MsgStream) TermInfo() (reason string, err error)
- func (s *MsgStream) URL() string
- type Obj
- type ObjHdr
- type ObjSentCB
- type RecvMsg
- type RecvObj
- type Stats
- type Stream
- func (s *Stream) Abort()
- func (s *Stream) Fin()
- func (s *Stream) GetStats() (stats Stats)
- func (s *Stream) ID() (string, int64)
- func (s *Stream) IsTerminated() bool
- func (s *Stream) Read(b []byte) (n int, err error)
- func (s *Stream) Send(obj *Obj) (err error)
- func (s *Stream) Stop()
- func (s *Stream) String() string
- func (s *Stream) TermInfo() (reason string, err error)
- func (s *Stream) URL() string
- type StreamCollector
Examples ¶
Constants ¶
const ( OutObjCount = "stream.out.n" OutObjSize = "stream.out.size" InObjCount = "stream.in.n" InObjSize = "stream.in.size" )
const (
SizeUnknown = -1
)
Variables ¶
This section is empty.
Functions ¶
func DrainAndFreeReader ¶
DrainAndFreeReader: 1) reads and discards all the data from `r` - the `objReader`; 2) frees this objReader back to the `recvPool`. As such, this function is intended for usage only and exclusively by `transport.RecvObj` implementations.
func GetStats ¶
func GetStats() (netstats map[string]EndpointStats, err error)
func HandleMsgStream ¶
func HandleObjStream ¶
func IsErrDuplicateTrname ¶
func MsgURLPath ¶
func ObjURLPath ¶
func ReservedOpcode ¶
func UID2SessID ¶
Types ¶
type Client ¶
func NewIntraDataClient ¶
func NewIntraDataClient() Client
intra-cluster networking: fasthttp client
type EndpointStats ¶
type ErrDuplicateTrname ¶
type ErrDuplicateTrname struct {
// contains filtered or unexported fields
}
private types
func (*ErrDuplicateTrname) Error ¶
func (e *ErrDuplicateTrname) Error() string
type Extra ¶
type Extra struct { Callback ObjSentCB // typical usage: to free SGLs, close files, etc. MMSA *memsys.MMSA // compression-related buffering Config *cmn.Config // (to optimize-out GCO.Get()) Compression string // see CompressAlways, etc. enum SenderID string // e.g., xaction ID (optional) IdleTeardown time.Duration // when exceeded, causes PUT to terminate (and to renew upon the very next send) SizePDU int32 // NOTE: 0(zero): no PDUs; must be below maxSizePDU; unknown size _requires_ PDUs MaxHdrSize int32 // overrides `dfltMaxHdr` if specified }
advanced usage: additional stream control
func (*Extra) Compressed ¶
type Msg ¶
func (*Msg) IsHeaderOnly ¶
type MsgStream ¶
type MsgStream struct {
// contains filtered or unexported fields
}
message stream & private types
func NewMsgStream ¶
func (*MsgStream) IsTerminated ¶
func (s *MsgStream) IsTerminated() bool
type Obj ¶
type Obj struct { Reader io.ReadCloser // reader (to read the object, and close when done) CmplArg any // optional context passed to the ObjSentCB callback Callback ObjSentCB // called when the last byte is sent _or_ when the stream terminates (see term.reason) Hdr ObjHdr // contains filtered or unexported fields }
object to transmit
func (*Obj) IsHeaderOnly ¶
type ObjHdr ¶
type ObjHdr struct { Bck cmn.Bck ObjName string SID string // sender node ID Opaque []byte // custom control (optional) ObjAttrs cmn.ObjAttrs // attributes/metadata of the object that's being transmitted Opcode int // (see reserved range above) }
object header
func ExtObjHeader ¶
func (*ObjHdr) IsHeaderOnly ¶
type ObjSentCB ¶
type ObjSentCB func(ObjHdr, io.ReadCloser, any, error)
object-sent callback that has the following signature can optionally be defined on a: a) per-stream basis (via NewStream constructor - see Extra struct above) b) for a given object that is being sent (for instance, to support a call-per-batch semantics) Naturally, object callback "overrides" the per-stream one: when object callback is defined (i.e., non-nil), the stream callback is ignored/skipped. NOTE: if defined, the callback executes asynchronously as far as the sending part is concerned
type Stats ¶
type Stats struct { Num atomic.Int64 // number of transferred objects including zero size (header-only) objects Size atomic.Int64 // transferred object size (does not include transport headers) Offset atomic.Int64 // stream offset, in bytes CompressedSize atomic.Int64 // compressed size (NOTE: converges to the actual compressed size over time) }
stream stats
func (*Stats) CompressionRatio ¶
type Stream ¶
type Stream struct {
// contains filtered or unexported fields
}
object stream & private types
func NewObjStream ¶
func (*Stream) IsTerminated ¶
func (s *Stream) IsTerminated() bool
func (*Stream) Send ¶
Asynchronously send an object (transport.Obj) defined by its header and its reader.
The sending pipeline is implemented as a pair (SQ, SCQ) where the former is a send queue realized as workCh, and the latter is a send completion queue (cmplCh). Together SQ and SCQ form a FIFO.
- header-only objects are supported; when there's no data to send (that is, when the header's Dsize field is set to zero), the reader is not required and the corresponding argument in Send() can be set to nil.
- object reader is *always* closed irrespectively of whether the Send() succeeds or fails. On success, if send-completion (ObjSentCB) callback is provided (i.e., non-nil), the closing is done by doCmpl().
- Optional reference counting is also done by (and in) the doCmpl, so that the ObjSentCB gets called if and only when the refcount (if provided i.e., non-nil) reaches zero.
- For every transmission of every object there's always an doCmpl() completion (with its refcounting and reader-closing). This holds true in all cases including network errors that may cause sudden and instant termination of the underlying stream(s).
type StreamCollector ¶
type StreamCollector struct{}
stream collector
func Init ¶
func Init(st cos.StatsUpdater, config *cmn.Config) *StreamCollector
func (*StreamCollector) Name ¶
func (*StreamCollector) Name() string
func (*StreamCollector) Run ¶
func (sc *StreamCollector) Run() (err error)
func (*StreamCollector) Stop ¶
func (sc *StreamCollector) Stop(err error)
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
Package bundle provides multi-streaming transport with the functionality to dynamically (un)register receive endpoints, establish long-lived flows, and more.
|
Package bundle provides multi-streaming transport with the functionality to dynamically (un)register receive endpoints, establish long-lived flows, and more. |