withttp

package module
v0.7.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 15, 2023 License: MIT Imports: 19 Imported by: 0

README

build

withttp

Build http requests and parse their responses with fluent syntax and wit. This package aims to quickly configure http roundtrips by covering common scenarios, while leaving all details of http requests and responses open for developers to allow maximum flexibility.

Supported underlying http implementations are:

Query Restful endpoints
type GithubRepoInfo struct {
  ID  int    `json:"id"`
  URL string `json:"html_url"`
}

func GetRepoInfo(user, repo string) (GithubRepoInfo, error) {

  call := withttp.NewCall[GithubRepoInfo](withttp.WithFasthttp()).
    WithURL(fmt.Sprintf("https://api.github.com/repos/%s/%s", user, repo)).
    WithMethod(http.MethodGet).
    WithHeader("User-Agent", "withttp/0.5.1 See https://github.com/sonirico/withttp", false).
    WithParseJSON().
    WithExpectedStatusCodes(http.StatusOK)

  err := call.Call(context.Background())

  return call.BodyParsed, err
}

func main() {
  info, _ := GetRepoInfo("sonirico", "withttp")
  log.Println(info)
}
Stream data to server (from a slice)

See full example

type metric struct {
  Time time.Time `json:"t"`
  Temp float32   `json:"T"`
}

func CreateStream() error {
  points := []metric{
    {
      Time: time.Unix(time.Now().Unix()-1, 0),
      Temp: 39,
    },
    {
      Time: time.Now(),
      Temp: 40,
    },
  }

  stream := withttp.Slice[metric](points)

  testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example").
    Request(
      withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"),
    )

  call := withttp.NewCall[any](withttp.WithFasthttp()).
    WithMethod(http.MethodPost).
    WithContentType(withttp.ContentTypeJSONEachRow).
    WithRequestSniffed(func(data []byte, err error) {
      fmt.Printf("recv: '%s', err: %v", string(data), err)
    }).
    WithRequestStreamBody(
      withttp.WithRequestStreamBody[any, metric](stream),
    ).
    WithExpectedStatusCodes(http.StatusOK)

  return call.CallEndpoint(context.Background(), testEndpoint)
}
Stream data to server (from a channel)

See full example

func CreateStreamChannel() error {
  points := make(chan metric, 2)

  go func() {
    points <- metric{
      Time: time.Unix(time.Now().Unix()-1, 0),
      Temp: 39,
    }

    points <- metric{
      Time: time.Now(),
      Temp: 40,
    }

    close(points)
  }()

  stream := withttp.Channel[metric](points)

  testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example").
    Request(
      withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"),
    )

  call := withttp.NewCall[any](withttp.WithFasthttp()).
    WithMethod(http.MethodPost).
    WithContentType(withttp.ContentTypeJSONEachRow).
    WithRequestSniffed(func(data []byte, err error) {
      fmt.Printf("recv: '%s', err: %v", string(data), err)
    }).
    WithRequestStreamBody(
      withttp.WithRequestStreamBody[any, metric](stream),
    ).
    WithExpectedStatusCodes(http.StatusOK)

  return call.CallEndpoint(context.Background(), testEndpoint)
}
Stream data to server (from a reader)

See full example

func CreateStreamReader() error {
  buf := bytes.NewBuffer(nil)

  go func() {
    buf.WriteString("{\"t\":\"2022-09-01T00:58:15+02:00\"")
    buf.WriteString(",\"T\":39}\n{\"t\":\"2022-09-01T00:59:15+02:00\",\"T\":40}\n")
  }()

  streamFactory := withttp.NewProxyStreamFactory(1 << 10)

  stream := withttp.NewStreamFromReader(buf, streamFactory)

  testEndpoint := withttp.NewEndpoint("webhook-site-request-stream-example").
    Request(
      withttp.WithBaseURL("https://webhook.site/24e84e8f-75cf-4239-828e-8bed244c0afb"),
    )

  call := withttp.NewCall[any](withttp.WithNetHttp()).
    WithMethod(http.MethodPost).
    WithRequestSniffed(func(data []byte, err error) {
      fmt.Printf("recv: '%s', err: %v", string(data), err)
    }).
    WithContentType(withttp.ContentTypeJSONEachRow).
    WithRequestStreamBody(
      withttp.WithRequestStreamBody[any, []byte](stream),
    ).
    WithExpectedStatusCodes(http.StatusOK)

  return call.CallEndpoint(context.Background(), testEndpoint)
}
Several endpoints

In case of a wide range catalog of endpoints, predefined parameters and behaviours can be defined by employing an endpoint definition.

var (
  githubApi = withttp.NewEndpoint("GithubAPI").
    Request(withttp.WithBaseURL("https://api.github.com/"))
)

type GithubRepoInfo struct {
  ID  int    `json:"id"`
  URL string `json:"html_url"`
}

func GetRepoInfo(user, repo string) (GithubRepoInfo, error) {
  call := withttp.NewCall[GithubRepoInfo](withttp.WithFasthttp()).
    WithURI(fmt.Sprintf("repos/%s/%s", user, repo)).
    WithMethod(http.MethodGet).
    WithHeader("User-Agent", "withttp/0.5.1 See https://github.com/sonirico/withttp", false).
    WithHeaderFunc(func() (key, value string, override bool) {
      key = "X-Date"
      value = time.Now().String()
      override = true
      return
    }).
    WithParseJSON().
    WithExpectedStatusCodes(http.StatusOK)

  err := call.CallEndpoint(context.Background(), githubApi)

  return call.BodyParsed, err
}

type GithubCreateIssueResponse struct {
  ID  int    `json:"id"`
  URL string `json:"url"`
}

func CreateRepoIssue(user, repo, title, body, assignee string) (GithubCreateIssueResponse, error) {
  type payload struct {
    Title    string `json:"title"`
    Body     string `json:"body"`
    Assignee string `json:"assignee"`
  }

  p := payload{
    Title:    title,
    Body:     body,
    Assignee: assignee,
  }

  call := withttp.NewCall[GithubCreateIssueResponse](
    withttp.WithFasthttp(),
  ).
    WithURI(fmt.Sprintf("repos/%s/%s/issues", user, repo)).
    WithMethod(http.MethodPost).
    WithContentType("application/vnd+github+json").
    WithBody(p).
    WithHeaderFunc(func() (key, value string, override bool) {
      key = "Authorization"
      value = fmt.Sprintf("Bearer %s", "S3cret")
      override = true
      return
    }).
    WithExpectedStatusCodes(http.StatusCreated)

  err := call.CallEndpoint(context.Background(), githubApi)

  log.Println("req body", string(call.Req.Body()))

  return call.BodyParsed, err
}

func main() {
  // Fetch repo info
  info, _ := GetRepoInfo("sonirico", "withttp")
  log.Println(info)

  // Create an issue
  res, err := CreateRepoIssue("sonirico", "withttp", "test",
    "This is a test", "sonirico")
  log.Println(res, err)
}
Test your calls again a mock endpoint

Quickly test your calls by creating a mock endpoint

var (
  exchangeListOrders = withttp.NewEndpoint("ListOrders").
        Request(withttp.WithBaseURL("http://example.com")).
        Response(
      withttp.WithMockedRes(func(res withttp.Response) {
        res.SetBody(io.NopCloser(bytes.NewReader(mockResponse)))
        res.SetStatus(http.StatusOK)
      }),
    )
  mockResponse = []byte(strings.TrimSpace(`
    {"amount": 234, "pair": "BTC/USDT"}
    {"amount": 123, "pair": "ETH/USDT"}`))
)

func main() {
  type Order struct {
    Amount float64 `json:"amount"`
    Pair   string  `json:"pair"`
  }

  res := make(chan Order)

  call := withttp.NewCall[Order](withttp.WithFasthttp()).
    WithURL("https://github.com/").
    WithMethod(http.MethodGet).
    WithHeader("User-Agent", "withttp/0.5.1 See https://github.com/sonirico/withttp", false).
    WithParseJSONEachRowChan(res).
    WithExpectedStatusCodes(http.StatusOK)

  go func() {
    for order := range res {
      log.Println(order)
    }
  }()

  err := call.CallEndpoint(context.Background(), exchangeListOrders)

  if err != nil {
    panic(err)
  }
}
TODO:
  • Form-data content type codecs
  • More quality-of-life methods for auth
  • Able to parse more content types:
    • tabular separated
    • xml
    • gRPC

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrAssertion            = errors.New("assertion was unmet")
	ErrUnexpectedStatusCode = errors.Wrap(ErrAssertion, "unexpected status code")
	ErrInsufficientParams   = errors.New("insufficient params")
)
View Source
var (
	ErrUnknownContentType = errors.New("unknown content type")
)

Functions

func B2S added in v0.5.0

func B2S(data []byte) string

func BtsIsset added in v0.2.0

func BtsIsset(bts []byte) bool

func BytesEquals added in v0.3.0

func BytesEquals(a, b []byte) bool

func ConfigureHeader

func ConfigureHeader(req Request, key, value string, override bool) error

func CreateAuthorizationHeader added in v0.5.0

func CreateAuthorizationHeader(kind authHeaderKind, user, pass string) (string, error)

func EncodeBody added in v0.2.0

func EncodeBody(payload any, contentType ContentType) (bts []byte, err error)

func EncodeStream added in v0.3.0

func EncodeStream[T any](
	ctx context.Context,
	r rangeable[T],
	req Request,
	encoder codec.Encoder,
	sniffer func([]byte, error),
) (err error)

func ReadJSON

func ReadJSON[T any](rc io.ReadCloser) (res T, err error)

func ReadStream

func ReadStream[T any](rc io.ReadCloser, factory StreamFactory[T], fn func(T) bool) (err error)

func ReadStreamChan

func ReadStreamChan[T any](rc io.ReadCloser, factory StreamFactory[T], out chan<- T) (err error)

func S2B added in v0.5.0

func S2B(s string) (bts []byte)

func StrIsset added in v0.2.0

func StrIsset(s string) bool

Types

type CSVStream added in v0.6.0

type CSVStream[T any] struct {
	// contains filtered or unexported fields
}

func (*CSVStream[T]) Data added in v0.6.0

func (s *CSVStream[T]) Data() T

func (*CSVStream[T]) Err added in v0.6.0

func (s *CSVStream[T]) Err() error

func (*CSVStream[T]) Next added in v0.6.0

func (s *CSVStream[T]) Next(ctx context.Context) bool

type Call

type Call[T any] struct {
	Req Request
	Res Response

	BodyRaw    []byte
	BodyParsed T

	ReqContentType ContentType
	ReqBodyRaw     []byte
	ReqIsStream    bool

	ReqStreamWriter  func(ctx context.Context, c *Call[T], res Request, wg *sync.WaitGroup) error
	ReqStreamSniffer func([]byte, error)
	ReqShouldSniff   bool
	// contains filtered or unexported fields
}

func NewCall

func NewCall[T any](client Client) *Call[T]

func (*Call[T]) Call

func (c *Call[T]) Call(ctx context.Context) (err error)

func (*Call[T]) CallEndpoint added in v0.2.0

func (c *Call[T]) CallEndpoint(ctx context.Context, e *Endpoint) (err error)

func (*Call[T]) Log added in v0.7.0

func (c *Call[T]) Log(w io.Writer)

func (*Call[T]) Request

func (c *Call[T]) Request(opts ...ReqOption) *Call[T]

func (*Call[T]) Response

func (c *Call[T]) Response(opts ...ResOption) *Call[T]

func (*Call[T]) WithAssert

func (c *Call[T]) WithAssert(fn func(req Response) error) *Call[T]

func (*Call[T]) WithBasicAuth added in v0.5.0

func (c *Call[T]) WithBasicAuth(user, pass string) *Call[T]

func (*Call[T]) WithBody added in v0.2.0

func (c *Call[T]) WithBody(payload any) *Call[T]

func (*Call[T]) WithBodyStream added in v0.2.0

func (c *Call[T]) WithBodyStream(rc io.ReadWriteCloser, bodySize int) *Call[T]

WithBodyStream receives a stream of data to set on the request. Second parameter `bodySize` indicates the estimated content-length of this stream. Required when employing fasthttp http client.

func (*Call[T]) WithContentLength added in v0.2.0

func (c *Call[T]) WithContentLength(length int) *Call[T]

func (*Call[T]) WithContentType added in v0.2.0

func (c *Call[T]) WithContentType(ct ContentType) *Call[T]

func (*Call[T]) WithExpectedStatusCodes

func (c *Call[T]) WithExpectedStatusCodes(states ...int) *Call[T]

func (*Call[T]) WithHeader

func (c *Call[T]) WithHeader(key, value string, override bool) *Call[T]

func (*Call[T]) WithHeaderFunc

func (c *Call[T]) WithHeaderFunc(fn func() (key, value string, override bool)) *Call[T]

func (*Call[T]) WithIgnoreResponseBody added in v0.3.0

func (c *Call[T]) WithIgnoreResponseBody() *Call[T]

func (*Call[T]) WithLogger added in v0.7.0

func (c *Call[T]) WithLogger(l logger) *Call[T]

func (*Call[T]) WithMethod

func (c *Call[T]) WithMethod(method string) *Call[T]

func (*Call[T]) WithParseCSV added in v0.6.0

func (c *Call[T]) WithParseCSV(
	ignoreLines int,
	parser csvparser.Parser[T],
	fn func(T) bool,
) *Call[T]

func (*Call[T]) WithParseJSON added in v0.2.0

func (c *Call[T]) WithParseJSON() *Call[T]

func (*Call[T]) WithParseJSONEachRow added in v0.3.0

func (c *Call[T]) WithParseJSONEachRow(fn func(T) bool) *Call[T]

func (*Call[T]) WithParseJSONEachRowChan added in v0.3.0

func (c *Call[T]) WithParseJSONEachRowChan(out chan<- T) *Call[T]

func (*Call[T]) WithParseStream added in v0.3.0

func (c *Call[T]) WithParseStream(factory StreamFactory[T], fn func(T) bool) *Call[T]

func (*Call[T]) WithParseStreamChan added in v0.3.0

func (c *Call[T]) WithParseStreamChan(factory StreamFactory[T], ch chan<- T) *Call[T]

func (*Call[T]) WithRawBody

func (c *Call[T]) WithRawBody(payload []byte) *Call[T]

func (*Call[T]) WithReadBody

func (c *Call[T]) WithReadBody() *Call[T]

func (*Call[T]) WithRequestSniffed added in v0.4.0

func (c *Call[T]) WithRequestSniffed(fn func([]byte, error)) *Call[T]

func (*Call[T]) WithRequestStreamBody added in v0.3.0

func (c *Call[T]) WithRequestStreamBody(opt StreamCallReqOptionFunc[T]) *Call[T]

func (*Call[T]) WithURI

func (c *Call[T]) WithURI(raw string) *Call[T]

func (*Call[T]) WithURL

func (c *Call[T]) WithURL(raw string) *Call[T]

type CallReqOption added in v0.3.0

type CallReqOption[T any] interface {
	Configure(c *Call[T], r Request) error
}

type CallReqOptionFunc added in v0.2.0

type CallReqOptionFunc[T any] func(c *Call[T], res Request) error

func WithBasicAuth added in v0.5.0

func WithBasicAuth[T any](user, pass string) CallReqOptionFunc[T]

func WithBody added in v0.2.0

func WithBody[T any](payload any) CallReqOptionFunc[T]

func WithBodyStream added in v0.2.0

func WithBodyStream[T any](rc io.ReadWriteCloser, bodySize int) CallReqOptionFunc[T]

func WithContentType added in v0.2.0

func WithContentType[T any](ct ContentType) CallReqOptionFunc[T]

func WithHeader added in v0.2.0

func WithHeader[T any](k, v string, override bool) CallReqOptionFunc[T]

func WithHeaderFunc added in v0.2.0

func WithHeaderFunc[T any](fn func() (string, string, bool)) CallReqOptionFunc[T]

func WithMethod added in v0.2.0

func WithMethod[T any](method string) CallReqOptionFunc[T]

func WithRawBody

func WithRawBody[T any](payload []byte) CallReqOptionFunc[T]

func WithRequestSniffer added in v0.4.0

func WithRequestSniffer[T any](fn func([]byte, error)) CallReqOptionFunc[T]

func WithURI

func WithURI[T any](raw string) CallReqOptionFunc[T]

func WithURL

func WithURL[T any](raw string) CallReqOptionFunc[T]

func (CallReqOptionFunc[T]) Configure added in v0.2.0

func (f CallReqOptionFunc[T]) Configure(c *Call[T], req Request) error

type CallResOption

type CallResOption[T any] interface {
	Parse(c *Call[T], r Response) error
}

type CallResOptionFunc

type CallResOptionFunc[T any] func(c *Call[T], res Response) error

func WithAssertion

func WithAssertion[T any](fn func(res Response) error) CallResOptionFunc[T]

func WithCloseBody

func WithCloseBody[T any]() CallResOptionFunc[T]

func WithExpectedStatusCodes

func WithExpectedStatusCodes[T any](states ...int) CallResOptionFunc[T]

func WithIgnoredBody

func WithIgnoredBody[T any]() CallResOptionFunc[T]

func WithParseBodyRaw added in v0.2.0

func WithParseBodyRaw[T any]() CallResOptionFunc[T]

func WithParseJSON added in v0.3.0

func WithParseJSON[T any]() CallResOptionFunc[T]

func WithParseStream added in v0.3.0

func WithParseStream[T any](factory StreamFactory[T], fn func(T) bool) CallResOptionFunc[T]

func WithParseStreamChan added in v0.3.0

func WithParseStreamChan[T any](factory StreamFactory[T], out chan<- T) CallResOptionFunc[T]

func (CallResOptionFunc[T]) Parse

func (f CallResOptionFunc[T]) Parse(c *Call[T], res Response) error

type Channel added in v0.4.0

type Channel[T any] chan T

func (Channel[T]) Range added in v0.4.0

func (c Channel[T]) Range(fn func(int, T) bool)

func (Channel[T]) Serialize added in v0.4.0

func (c Channel[T]) Serialize() bool

type Client added in v0.7.0

type Client interface {
	Request(ctx context.Context) (Request, error)
	Do(ctx context.Context, req Request) (Response, error)
}

type ContentType added in v0.2.0

type ContentType string
var (
	ContentTypeJSON        ContentType = "application/json"
	ContentTypeJSONEachRow ContentType = "application/jsoneachrow"
)

func (ContentType) Codec added in v0.2.0

func (c ContentType) Codec() (codec.Codec, error)

func (ContentType) String added in v0.2.0

func (c ContentType) String() string

type Endpoint

type Endpoint struct {
	// contains filtered or unexported fields
}

func NewEndpoint

func NewEndpoint(name string) *Endpoint

func (*Endpoint) Request

func (e *Endpoint) Request(opts ...ReqOption) *Endpoint

func (*Endpoint) Response

func (e *Endpoint) Response(opts ...ResOption) *Endpoint

type FastHttpHttpClientAdapter

type FastHttpHttpClientAdapter struct {
	// contains filtered or unexported fields
}

func WithFasthttp added in v0.5.0

func WithFasthttp() *FastHttpHttpClientAdapter

func WithFasthttpClient added in v0.5.0

func WithFasthttpClient(cli *fasthttp.Client) *FastHttpHttpClientAdapter

func (*FastHttpHttpClientAdapter) Do

func (*FastHttpHttpClientAdapter) Request

type Header interface {
	SetHeader(k, v string)
	AddHeader(k, v string)
	Header(k string) (string, bool)
	RangeHeaders(func(string, string))
}

type JSONEachRowStream

type JSONEachRowStream[T any] struct {
	// contains filtered or unexported fields
}

func (*JSONEachRowStream[T]) Data

func (s *JSONEachRowStream[T]) Data() T

func (*JSONEachRowStream[T]) Err

func (s *JSONEachRowStream[T]) Err() error

func (*JSONEachRowStream[T]) Next

func (s *JSONEachRowStream[T]) Next(ctx context.Context) bool

type MockEndpoint

type MockEndpoint struct {
	// contains filtered or unexported fields
}

type MockHttpClientAdapter

type MockHttpClientAdapter struct{}

func NewMockHttpClientAdapter

func NewMockHttpClientAdapter() *MockHttpClientAdapter

func (*MockHttpClientAdapter) Do

func (*MockHttpClientAdapter) Request

type NativeHttpClientAdapter

type NativeHttpClientAdapter struct {
	// contains filtered or unexported fields
}

func WithNetHttp added in v0.5.0

func WithNetHttp() *NativeHttpClientAdapter

func WithNetHttpClient added in v0.5.0

func WithNetHttpClient(cli *http.Client) *NativeHttpClientAdapter

func (*NativeHttpClientAdapter) Do

func (*NativeHttpClientAdapter) Request

type NewLineStream added in v0.4.0

type NewLineStream struct {
	// contains filtered or unexported fields
}

func (*NewLineStream) Data added in v0.4.0

func (s *NewLineStream) Data() []byte

func (*NewLineStream) Err added in v0.4.0

func (s *NewLineStream) Err() error

func (*NewLineStream) Next added in v0.4.0

func (s *NewLineStream) Next(_ context.Context) bool

type ProxyStream added in v0.4.0

type ProxyStream struct {
	// contains filtered or unexported fields
}

func (*ProxyStream) Data added in v0.4.0

func (s *ProxyStream) Data() []byte

func (*ProxyStream) Err added in v0.4.0

func (s *ProxyStream) Err() error

func (*ProxyStream) Next added in v0.4.0

func (s *ProxyStream) Next(_ context.Context) bool

type ReqOption

type ReqOption interface {
	Configure(r Request) error
}

func WithBaseURL added in v0.2.0

func WithBaseURL(raw string) ReqOption

type ReqOptionFunc

type ReqOptionFunc func(req Request) error

func (ReqOptionFunc) Configure

func (f ReqOptionFunc) Configure(req Request) error

type Request

type Request interface {
	Header

	Method() string
	SetMethod(string)

	SetURL(*url.URL)
	// SetBodyStream sets the stream of body data belonging to a request. bodySize parameter is needed
	// when using fasthttp implementation.
	SetBodyStream(rc io.ReadWriteCloser, bodySize int)
	SetBody([]byte)

	Body() []byte
	BodyStream() io.ReadWriteCloser

	URL() *url.URL
}

type ResOption

type ResOption interface {
	Parse(r Response) error
}

func WithMockedRes added in v0.2.0

func WithMockedRes(fn func(response Response)) ResOption

type ResOptionFunc

type ResOptionFunc func(res Response) error

func (ResOptionFunc) Parse

func (f ResOptionFunc) Parse(res Response) error

type Response

type Response interface {
	Header

	Status() int
	StatusText() string
	Body() io.ReadCloser

	SetBody(rc io.ReadCloser)
	SetStatus(status int)
}

type Slice added in v0.3.0

type Slice[T any] []T

func (Slice[T]) Range added in v0.3.0

func (s Slice[T]) Range(fn func(int, T) bool)

func (Slice[T]) Serialize added in v0.4.0

func (s Slice[T]) Serialize() bool

type Stream

type Stream[T any] interface {
	Next(ctx context.Context) bool
	Data() T
	Err() error
}

func NewCSVStream added in v0.6.0

func NewCSVStream[T any](r io.Reader, ignoreLines int, parser csvparser.Parser[T]) Stream[T]

func NewJSONEachRowStream

func NewJSONEachRowStream[T any](r io.Reader) Stream[T]

func NewNewLineStream added in v0.4.0

func NewNewLineStream(r io.Reader) Stream[[]byte]

func NewProxyStream added in v0.4.0

func NewProxyStream(r io.Reader, bufferSize int) Stream[[]byte]

type StreamCallReqOption added in v0.3.0

type StreamCallReqOption[T any] interface {
	CallReqOption[T]
	// contains filtered or unexported methods
}

type StreamCallReqOptionFunc added in v0.3.0

type StreamCallReqOptionFunc[T any] func(c *Call[T], req Request) error

func WithRequestStreamBody added in v0.3.0

func WithRequestStreamBody[T, U any](r rangeable[U]) StreamCallReqOptionFunc[T]

func (StreamCallReqOptionFunc[T]) Configure added in v0.3.0

func (s StreamCallReqOptionFunc[T]) Configure(c *Call[T], req Request) error

type StreamFactory

type StreamFactory[T any] interface {
	Get(r io.Reader) Stream[T]
}

func NewCSVStreamFactory added in v0.6.0

func NewCSVStreamFactory[T any](ignoreLines int, parser csvparser.Parser[T]) StreamFactory[T]

func NewJSONEachRowStreamFactory

func NewJSONEachRowStreamFactory[T any]() StreamFactory[T]

func NewNewLineStreamFactory added in v0.4.0

func NewNewLineStreamFactory() StreamFactory[[]byte]

func NewProxyStreamFactory added in v0.4.0

func NewProxyStreamFactory(bufferSize int) StreamFactory[[]byte]

type StreamFactoryFunc

type StreamFactoryFunc[T any] func(reader io.Reader) Stream[T]

func (StreamFactoryFunc[T]) Get

func (f StreamFactoryFunc[T]) Get(r io.Reader) Stream[T]

type StreamFromReader added in v0.4.0

type StreamFromReader struct {
	io.Reader
	// contains filtered or unexported fields
}

func NewStreamFromReader added in v0.4.0

func NewStreamFromReader(r io.Reader, sf StreamFactory[[]byte]) StreamFromReader

func (StreamFromReader) Range added in v0.4.0

func (r StreamFromReader) Range(fn func(int, []byte) bool)

func (StreamFromReader) Serialize added in v0.4.0

func (r StreamFromReader) Serialize() bool

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL