Documentation ¶
Overview ¶
Go-flowfile is a light weight connection handling tool for sending and receiving FlowFiles via an HTTP/HTTPS exchange. When the HTTPS method is used, the client MUST also present a valid client certificate.
About FlowFiles ¶
FlowFiles are at the heart of Apache NiFi and its flow-based design. A FlowFile is a data record, which consists of a pointer to its content (payload) and attributes to support the content, that is associated with one or more provenance events. The attributes are key/value pairs that act as the metadata for the FlowFile, such as the FlowFile filename. The content is the actual data or the payload of the file.
More info: https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html
What is a FlowFile ¶
A FlowFile is a logical notion that correlates a piece of data with a set of Attributes about that data. Such attributes include a FlowFile's unique identifier, as well as its name, size, and any number of other flow-specific values. While the contents and attributes of a FlowFile can change, the FlowFile object is immutable. Modifications to a FlowFile are made possible by the ProcessSession.
The core attributes for FlowFiles are defined in the org.apache.nifi.flowfile.attributes.CoreAttributes enum. The most common attributes you'll see are filename, path and uuid. The string in quotes is the value of the attribute within the CoreAttributes enum.
- Filename ("filename"): The filename of the FlowFile. The filename should not contain any directory structure.
- UUID ("uuid"): A unique universally unique identifier (UUID) assigned to this FlowFile.
- Path ("path"): The FlowFile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename.
- Absolute Path ("absolute.path"): The FlowFile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename.
- Priority ("priority"): A numeric value indicating the FlowFile priority.
- MIME Type ("mime.type"): The MIME Type of this FlowFile.
- Discard Reason ("discard.reason"): Specifies the reason that a FlowFile is being discarded.
- Alternative Identifier ("alternate.identifier"): Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.
More info: https://docs.cloudera.com/HDPDocuments/HDF3/HDF-3.0.3/bk_developer-guide/content/flowfile.html
Implementations ¶
The `File` entity represents the flowfile. The FlowFile Attributes can be inspected on the wire, before the payload is consumed. This way one can craft switching logic and apply rules before the payload has been brougn in.
One can compare this design, of acting on the header alone, to that of a network switch/router in which the Layer 2/3 packet header is inspected before the entire packet is consumed to make a determination on which path the bytes should flow. By this design, of not having, to have the entire file before logic can be done, pipes can be connected and memory use can be kept to a minimum.
Standard usage has shown that the overall memory footprint of several utilities doing complex routing of FlowFiles remains around 10-20MB, conversely a standard NiFi package will take a minimum of approximately 800MB and up.
Index ¶
- Constants
- Variables
- type Attribute
- type Attributes
- func (h Attributes) Clone() Attributes
- func (h *Attributes) CustodyChainAddHTTP(r *http.Request)
- func (h *Attributes) CustodyChainAddListen(listen string)
- func (h *Attributes) CustodyChainShift()
- func (h *Attributes) GenerateUUID() string
- func (h *Attributes) Get(name string) string
- func (h Attributes) MarshalBinary() ([]byte, error)
- func (h Attributes) MarshalJSON() ([]byte, error)
- func (h Attributes) NewChecksumHash() hash.Hash
- func (h *Attributes) ReadFrom(in io.Reader) (err error)
- func (h *Attributes) Set(name, val string) *Attributes
- func (h *Attributes) Sort()
- func (h Attributes) String() string
- func (h *Attributes) UnmarshalBinary(in []byte) (err error)
- func (h *Attributes) UnmarshalJSON(in []byte) error
- func (h *Attributes) Unset(name string) (ok bool)
- func (h *Attributes) WriteTo(out io.Writer) (err error)
- type File
- func (f *File) AddChecksum(cksum string) error
- func (l *File) AddChecksumFromVerify() error
- func (f *File) BufferFile(buf *bytes.Buffer) (err error)
- func (l *File) ChecksumInit() error
- func (l *File) Close() (err error)
- func (f *File) EncodedReader() (rdr io.Reader)
- func (f File) FilePath() string
- func (f File) HeaderSize() (n int)
- func (f *File) MarshalBinary(dat []byte, err error)
- func (l *File) Read(p []byte) (n int, err error)
- func (f *File) Reset() error
- func (f *File) Save(baseDir string) (outputFile string, err error)
- func (f *File) UnmarshalBinary(dat []byte) (err error)
- func (l *File) Verify() error
- func (l *File) VerifyDetails() string
- func (l *File) VerifyHash(h hash.Hash) error
- func (l *File) VerifyParent(fp string) error
- type HTTPPostWriter
- type HTTPReceiver
- type HTTPTransaction
- type Metrics
- type Scanner
- type Writer
Examples ¶
Constants ¶
const ( FlowFile3Header = "NiFiFF3" FlowFileEOF = "NiFiEOF" )
Variables ¶
var ( ErrorNoFlowFileHeader = errors.New("No NiFiFF3 header found") ErrorInvalidFlowFileHeader = errors.New("Invalid of incomplete FlowFile header") )
var ( ErrorChecksumMismatch = errors.New("Mismatching checksum") ErrorChecksumMissing = errors.New("Missing checksum") ErrorChecksumNoInit = errors.New("Checksum was not initialized") )
var ( UserAgent = "NiFi FlowFile Client (github.com/pschou/go-flowfile)" AboutString = "NiFi FlowFile Server (github.com/pschou/go-flowfile)" Debug = false )
var ErrorInconsistantSize = errors.New("Inconsistant flowfile size")
var ErrorUnmarshallingAttributes = errors.New("Error unmarshalling attributes")
Functions ¶
This section is empty.
Types ¶
type Attribute ¶
type Attribute struct {
Name, Value string
}
A single attribue in a FlowFile header
type Attributes ¶
type Attributes []Attribute
A set of attributes in a FlowFile header
func (Attributes) Clone ¶
func (h Attributes) Clone() Attributes
Clone the attributes for ease of duplication
func (*Attributes) CustodyChainAddHTTP ¶
func (h *Attributes) CustodyChainAddHTTP(r *http.Request)
Add attributes related to an http request, such as remote host, request URI, and TLS details.
func (*Attributes) CustodyChainAddListen ¶
func (h *Attributes) CustodyChainAddListen(listen string)
func (*Attributes) CustodyChainShift ¶
func (h *Attributes) CustodyChainShift()
Update the custodyChain field to increment all the values one and add an additional time and hostname.
func (*Attributes) GenerateUUID ¶
func (h *Attributes) GenerateUUID() string
Set a new UUID value for a FlowFile
func (*Attributes) Get ¶
func (h *Attributes) Get(name string) string
Returns the first attribute's value with specified name
Example ¶
This show how to get an individual attribute
var a flowfile.Attributes a.Set("path", "./") fmt.Println("attribute:", a.Get("path"))
Output: attribute: ./
func (Attributes) MarshalBinary ¶
func (h Attributes) MarshalBinary() ([]byte, error)
Parse the FlowFile attributes into binary slice.
Example ¶
This show how to encode the attributes into a header for sending
var a flowfile.Attributes a.Set("path", "./") a.Set("filename", "abcd-efgh") b, _ := a.MarshalBinary() fmt.Printf("attributes: %q\n", b)
Output: attributes: "NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh"
func (Attributes) MarshalJSON ¶
func (h Attributes) MarshalJSON() ([]byte, error)
Provides a MarshalJSON interface
func (Attributes) NewChecksumHash ¶
func (h Attributes) NewChecksumHash() hash.Hash
Create a new checksum for verifying payload.
func (*Attributes) ReadFrom ¶
func (h *Attributes) ReadFrom(in io.Reader) (err error)
Parse the FlowFile attributes from binary Reader.
Example ¶
This show how to decode the attributes frim a header for parsing
var a flowfile.Attributes wire := bytes.NewBuffer([]byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh")) a.ReadFrom(wire) fmt.Printf("attributes: %v\n", a)
Output: attributes: {"path":"./","filename":"abcd-efgh"}
func (*Attributes) Set ¶
func (h *Attributes) Set(name, val string) *Attributes
Sets the attribute with the given value, takes two inputs the first is the attribute name and the second is the attribute value. It returns the attributes for function stacking.
Example ¶
This show how to set an individual attribute
var a flowfile.Attributes fmt.Printf("attributes: %v\n", a) a.Set("path", "./") fmt.Printf("attributes: %v\n", a)
Output: attributes: {} attributes: {"path":"./"}
func (Attributes) String ¶
func (h Attributes) String() string
func (*Attributes) UnmarshalBinary ¶
func (h *Attributes) UnmarshalBinary(in []byte) (err error)
Parse the FlowFile attributes from a binary slice.
Example ¶
This show how to decode the attributes frim a header for parsing
var a flowfile.Attributes buf := []byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh") err := a.UnmarshalBinary(buf) if err != nil { log.Fatal("Error unmarshalling attributes:", err) } fmt.Printf("attributes: %v\n", a)
Output: attributes: {"path":"./","filename":"abcd-efgh"}
func (*Attributes) UnmarshalJSON ¶
func (h *Attributes) UnmarshalJSON(in []byte) error
func (*Attributes) Unset ¶
func (h *Attributes) Unset(name string) (ok bool)
Returns the first attribute's value with specified name
Example ¶
This show how to unset an individual attribute
var a flowfile.Attributes a.Set("path", "./") a.Set("junk", "cars") a.Set("filename", "abcd-efgh") a.Unset("junk") fmt.Printf("attributes: %v\n", a)
Output: attributes: {"path":"./","filename":"abcd-efgh"}
func (*Attributes) WriteTo ¶
func (h *Attributes) WriteTo(out io.Writer) (err error)
Parse the FlowFile attributes into binary writer.
Example ¶
This show how to encode the attributes into a header for sending
var a flowfile.Attributes a.Set("path", "./") a.Set("filename", "abcd-efgh") buf := bytes.NewBuffer([]byte{}) a.WriteTo(buf) fmt.Printf("raw: %q\n", buf)
Output: raw: "NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh"
type File ¶
type File struct { Attrs Attributes Size int64 // total size // contains filtered or unexported fields }
A File is a handler for either an incoming datafeed or outgoing datafeed of the contents of a file over a File connection. The intent is for one to either provide a Reader to provide to a flowfile sender or read from the File directly as it implments the io.Reader interface. Neither the reader or the counts are exported to avoid accidental over-reads of the underlying reader interface.
func New ¶
Create a new File struct from an io.Reader with size. One should add attributes before writing it to a stream.
Example ¶
A calling method should do the due diligence of closing the inner reader after the flowfile is done being used. A good way to do this is something like:
dir, filename := "./", "myfile.dat" fh, err := os.Open(filename) if err != nil { log.Fatal(err) } defer fh.Close() // Ensure the file is closed when the function exits fileInfo, _ := fh.Stat() f := flowfile.New(fh, fileInfo.Size()) // Construct a flowfile with size f.Attrs.Set("path", dir) // Specify the path for the file f.Attrs.Set("filename", filename) // Give the filename f.Attrs.GenerateUUID() // Set a unique identifier to this file
Output:
func NewFromDisk ¶
NewFromDisk creates a new File struct from a file on disk. One should add attributes before writing it to a stream.
Note that the file is not opened to keep the opened file pointers on the system at a minimum. However, once a file is used, the file handle remains open until Close() is called. It is recommended that a checksum is done on the file before sending.
func Segment ¶
Splits up a flowfile into count number of segments. The intended purpose here is to enable larger files to be sent in smaller chucks so as to avoid having to replay sending a whole file in case a connection gets dropped.
func SegmentBySize ¶
Splits up a flowfile into a number of segments with segmentSize. The intended purpose here is to enable larger files to be sent in smaller chucks so as to avoid having to replay sending a whole file in case a connection gets dropped.
func (*File) AddChecksum ¶
Add checksum to flowfile, requires a ReadAt interface in the flowfile context.
Note: The checksums cannot be added to a streamed File (io.Reader) as the header would have already been sent and could not be placed in the header as the payload would have been sent on the wire already. Hence, read the content, build checksum and add to header. Hence why the io.ReaderAt interface is important.
func (*File) AddChecksumFromVerify ¶
AddChecksumFromVerify will take the checksum computed in the verify step and set the checksum attribute to match. This effectively makes a FlowFile pass what may other be a failed verification. Useful for updating a checksum to an existing flowfile after it has been fully read in.
func (*File) BufferFile ¶
Read the entire payload into a buffer, so as to complete the checksum and enable the ability to reset the File for multiple reads.
Note: This could create memory bloat if the buffers are not able to be cleared out due to the runtime keeping an unused pointer or the buffer isn't returned to a Pool.
func (*File) ChecksumInit ¶
Function called before a file is read for setting up the hashing function.
func (*File) Close ¶
Close the flowfile contruct. Generally the FlowFile is acted upon in a streaming context, moving a file from one place to another. So, in this understanding, the action of closing a flowfile is effectively removing the current payload from consideration and moving the reader pointer forward, making the next flowfile available for reading.
func (*File) EncodedReader ¶
Encode a flowfile into an io.Writer
func (File) HeaderSize ¶
Return the size of the header for computations of the total flow file size.
Total Size = Header + Data
func (*File) MarshalBinary ¶
Marshal a FlowFile into a byte slice.
Note: This is not preferred as it can cause memory bloat.
func (*File) Reset ¶
If the flowfile has a ReaderAt interface, one can reset the reader to the start for reading again
func (*File) Save ¶
Save will save the flowfile to a given directory, reconstructing the original directory tree with files in it while doing checksums on each file as they are layed down. It is up to the calling function to determine whether to delete or keep the file after an unsuccessful send.
func (*File) UnmarshalBinary ¶
Unmarshal parses a FlowFile formatted byte slice into a File struct for processing.
Note: This is not preferred as it can cause memory bloat.
Example ¶
Sends files out a writer, making sure the headers are sent before each file is sent.
dat := []byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh\x00\x00\x00\x00\x00\x00\x00$this is a custom string for flowfile") var f flowfile.File err := f.UnmarshalBinary(dat) if err != nil { fmt.Println("Error unmarshalling:", err) } fmt.Printf("Attrs: %v\n", f.Attrs) buf := bytes.NewBuffer([]byte{}) buf.ReadFrom(&f) fmt.Printf("content: %q\n", buf.String())
Output: Attrs: {"path":"./","filename":"abcd-efgh"} content: "this is a custom string for flowfile"
func (*File) VerifyDetails ¶
VerifyDetails describes why a match was successful or failed
func (*File) VerifyHash ¶
Verify a given hash against the file sent, to ensure a complete and accurate payload.
func (*File) VerifyParent ¶
Verify the file sent was complete and accurate
type HTTPPostWriter ¶
type HTTPPostWriter struct { Header http.Header FlushInterval time.Duration Sent int64 Response *http.Response // contains filtered or unexported fields }
Writer ecapsulates the ability to write one or more flow files in one POST request. This must be closed upon completion of the last File sent.
One needs to first create an HTTPTransaction before one can create an HTTPPostWriter, so the process looks like:
ff1 := flowfile.New(strings.NewReader("test1"), 5) ff2 := flowfile.New(strings.NewReader("test2"), 5) ht, err := flowfile.NewHTTPTransaction("http://localhost:8080/contentListener", http.DefaultClient) if err != nil { log.Fatal(err) } w := ht.NewHTTPPostWriter() // Create the POST to the NiFi endpoint w.Write(ff1) w.Write(ff2) err = w.Close() // Finalize the POST
Example ¶
// Build two small files ff1 := flowfile.New(strings.NewReader("test1"), 5) ff2 := flowfile.New(strings.NewReader("test2"), 5) // Prepare an HTTP transaction ht, err := flowfile.NewHTTPTransaction("http://localhost:8080/contentListener", tlsConfig) if err != nil { log.Fatal(err) } // Post the files to the endpoint w := ht.NewHTTPPostWriter() w.Write(ff1) w.Write(ff2) err = w.Close() // Finalize the POST
Output:
Example (SendWithCustomHeader) ¶
// Create a new HTTPTransaction, used for sending batches of flowfiles hs, err := flowfile.NewHTTPTransaction("http://localhost:8080/contentListener", tlsConfig) if err != nil { log.Fatal(err) } dat := []byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh\x00\x00\x00\x00\x00\x00\x00$this is a custom string for flowfile") var ff flowfile.File err = ff.UnmarshalBinary(dat) hp := hs.NewHTTPPostWriter() defer hp.Close() hp.Header.Set("X-Forwarded-For", "1.2.3.4:5678") _, err = hp.Write(&ff)
Output:
func (*HTTPPostWriter) Close ¶
func (hw *HTTPPostWriter) Close() (err error)
Close the HTTPPostWriter and flush the data to the stream
func (*HTTPPostWriter) Terminate ¶
func (hw *HTTPPostWriter) Terminate()
Terminate the HTTPPostWriter
func (*HTTPPostWriter) Write ¶
func (hw *HTTPPostWriter) Write(f *File) (n int64, err error)
Write a flow file to the remote server and return any errors back. One cannot determine if there has been a successful send until the HTTPPostWriter is closed. Then the Response.StatusCode will be set with the reply from the server.
type HTTPReceiver ¶
type HTTPReceiver struct { Server string MaxPartitionSize int64 MaxConnections int Metrics *Metrics // contains filtered or unexported fields }
Implements http.Handler and can be used with the GoLang built-in http module:
https://pkg.go.dev/net/http#Handler
func NewHTTPFileReceiver ¶
func NewHTTPFileReceiver(handler func(*File, http.ResponseWriter, *http.Request) error) *HTTPReceiver
NewHTTPFileReceiver interfaces with the built-in HTTP Handler and parses out the individual FlowFiles from a stream and sends them to a FlowFile handler.
Example ¶
ffReceiver := flowfile.NewHTTPFileReceiver(func(f *flowfile.File, w http.ResponseWriter, r *http.Request) error { log.Println("Got file", f.Attrs.Get("filename")) // do stuff with file return nil }) // Add this reciever to the path http.Handle("/contentListener", ffReceiver) // Start accepting files http.ListenAndServe(":8080", nil)
Output:
func NewHTTPReceiver ¶
func NewHTTPReceiver(handler func(*Scanner, http.ResponseWriter, *http.Request)) *HTTPReceiver
NewHTTPReceiver interfaces with the built-in HTTP Handler and parses out the FlowFile stream and provids a FlowFile scanner to a FlowFile handler.
Example ¶
ffReceiver := flowfile.NewHTTPReceiver(func(fs *flowfile.Scanner, w http.ResponseWriter, r *http.Request) { // Loop over all the files in the post payload count := 0 for fs.Scan() { count++ f := fs.File() log.Println("Got file", f.Attrs.Get("filename")) // do stuff with file } if err := fs.Err(); err != nil { log.Println("Error:", err) w.WriteHeader(http.StatusInternalServerError) return } log.Println(count, "file(s) in POST payload") w.WriteHeader(http.StatusOK) }) http.Handle("/contentListener", ffReceiver) // Add this reciever to the path http.ListenAndServe(":8080", nil) // Start accepting files
Output:
func (*HTTPReceiver) MetricsHandler ¶
func (hr *HTTPReceiver) MetricsHandler() http.Handler
func (*HTTPReceiver) ServeHTTP ¶
func (f *HTTPReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request)
Handle for accepting flow files through a http webserver. The handle here is intended to be used in a Listen Handler so as to make building out all the web endpoints seemless.
ffReceiver := flowfile.HTTPReceiver{Handler: post} http.Handle("/contentListener", ffReceiver) log.Fatal(http.ListenAndServe(":8080", nil))
type HTTPTransaction ¶
type HTTPTransaction struct { Server string TransactionID string RetryCount int // When using a ReadAt reader, attempt multiple retries RetryDelay time.Duration OnRetry func(ff []*File, retry int, err error) // Non-standard NiFi entities supported by this library MaxPartitionSize int64 // Maximum partition size for partitioned file CheckSumType string // What kind of CheckSum to use for sent files MetricsHandshakeLatency time.Duration // contains filtered or unexported fields }
The HTTP Sender will establish a NiFi handshake and ensure that the remote endpoint is listening and compatible with the current flow file format.
func NewHTTPTransaction ¶
func NewHTTPTransaction(url string, cfg *tls.Config) (*HTTPTransaction, error)
Create the HTTP sender and verify that the remote side is listening.
Example ¶
// Create a new HTTPTransaction, used for sending batches of flowfiles hs, err := flowfile.NewHTTPTransaction("http://localhost:8080/contentListener", tlsConfig) if err != nil { log.Fatal(err) } dat := []byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh\x00\x00\x00\x00\x00\x00\x00$this is a custom string for flowfile") var ff flowfile.File err = ff.UnmarshalBinary(dat) err = hs.Send(&ff)
Output:
func NewHTTPTransactionNoHandshake ¶
func NewHTTPTransactionNoHandshake(url string, cfg *tls.Config) *HTTPTransaction
Create the HTTP sender without verifying remote is listening
func NewHTTPTransactionWithTransport ¶
func NewHTTPTransactionWithTransport(url string, cfg *http.Transport) (*HTTPTransaction, error)
Create the HTTP sender and verify that the remote side is listening.
func (*HTTPTransaction) Handshake ¶
func (hs *HTTPTransaction) Handshake() error
Establishes or re-establishes a transaction id with NiFi to begin the process of transferring flowfiles. This is a blocking call so no new files will be sent until this is completed.
func (*HTTPTransaction) NewHTTPBufferedPostWriter ¶
func (hs *HTTPTransaction) NewHTTPBufferedPostWriter() (httpWriter *HTTPPostWriter)
NewHTTPBufferedPostWriter creates a POST to a NiFi listening endpoint and allows multiple files to be written to the endpoint at one time. This reduces additional overhead (with fewer HTTP responses) and decreases latency. Additionally, the added buffering helps with constructing larger packets, thus further reducing TCP overhead.
However, HTTPPostWriter increases the chances of failures as all the sent files will be marked as failed if the the HTTP POST is not a success.
func (*HTTPTransaction) NewHTTPPostWriter ¶
func (hs *HTTPTransaction) NewHTTPPostWriter() (httpWriter *HTTPPostWriter)
NewHTTPPostWriter creates a POST to a NiFi listening endpoint and allows multiple files to be written to the endpoint at one time. This reduces additional overhead (with fewer HTTP responses) and decreases latency (by instead putting pressure on TCP with smaller payload sizes).
However, HTTPPostWriter increases the chances of failures as all the sent files will be marked as failed if the the HTTP POST is not a success.
func (*HTTPTransaction) Send ¶
func (hs *HTTPTransaction) Send(ff ...*File) (err error)
Send one or more flow files to the remote server and return any errors back. A nil return for error is a successful send.
A failed send will be retried if HTTPTransaction.RetryCount is set and the File uses a ReadAt reader, a (1+retries) attempts will be made with a HTTPTransaction.RetryDelay between retries.
// With one or more files: err = hs.Send(file1) err = hs.Send(file1, file2) // or more // A slice of files: err = hs.Send(files...)
This method of sending will make one POST-per-file which is not recommended for small files. To increase throughput on smaller files one should consider using either NewHTTPPostWriter or NewHTTPBufferedPostWriter.
type Metrics ¶
type Metrics struct { // Custom buckets can be defined by setting new buckets before ingesting data // Note the BucketValues is always N+1 sized, as the last is overflow MetricsFlowFileTransferredBuckets []int64 MetricsFlowFileTransferredBucketValues []int64 MetricsFlowFileTransferredSum int64 MetricsFlowFileTransferredCount int64 //MetricsFlowFileReceivedSum *int64 //MetricsFlowFileReceivedCount *int64 MetricsThreadsActive int64 MetricsThreadsTerminated int64 MetricsThreadsQueued int64 // contains filtered or unexported fields }
func NewMetrics ¶
func NewMetrics() *Metrics
func (*Metrics) BucketCounter ¶
type Scanner ¶
type Scanner struct {
// contains filtered or unexported fields
}
A wrapper around an io.Reader which parses out the flow files.
func NewScanner ¶
Create a new FlowFile reader, wrapping io.Reader for reading consecutive FlowFiles from a stream.
Example ¶
This example shows how to write a FlowFile and then read in a stream to make a flowfile
wire := bytes.NewBuffer([]byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh\x00\x00\x00\x00\x00\x00\x00$this is a custom string for flowfile")) s := flowfile.NewScanner(wire) for s.Scan() { // Scan for another FlowFile in the stream f := s.File() fmt.Printf("attributes: %v\n", f.Attrs) buf := bytes.NewBuffer([]byte{}) buf.ReadFrom(f) fmt.Printf("content: %q\n", buf.String()) } fmt.Println("Check for errors:", s.Err())
Output: attributes: {"path":"./","filename":"abcd-efgh"} content: "this is a custom string for flowfile" Check for errors: <nil>
func NewScannerChan ¶
Create a new FlowFile reader, using a (chan *File) for reading consecutive FlowFiles from a channel.
func NewScannerSlice ¶
Create a new FlowFile reader, for reading from a slice of FlowfFles.
func (*Scanner) Scan ¶
Scan advances the Scanner to the next token, which will then be available through the File method. It returns false when the scan stops, either by reaching the end of the input or an error. After Scan returns false, the Err method will return any error that occurred during scanning, except that if it was io.EOF, Err will return nil.