Documentation
¶
Index ¶
- Variables
- func MakeGcsRetrierOptions(initialIntervalInSeconds int, maxIntervalInSeconds int, multiplier float64, ...) []storage.RetryOption
- func MustNewParallel(clientGateway GcsClientGateway, contentType string, chunkSize int, ...) (pipeline.Sink, func() error)
- type GcsClientGateway
- type SinkMessage
Constants ¶
This section is empty.
Variables ¶
View Source
var ( // CodeWrongTypeSinkMessage is returned when sink message is not a gcssink.SinkMessage CodeWrongTypeSinkMessage = errors.Code("WRONG_TYPE_SINK_MESSAGE") // CodeEmptyDataSinkMessage is returned when sink message data is empty CodeEmptyDataSinkMessage = errors.Code("EMPTY_DATA_MESSAGE") // CodeFailedToWriteAtBucket is returned when a error occurs while writing to gcs CodeFailedToWriteAtBucket = errors.Code("FAILED_TO_WRITE_AT_BUCKET") // CodeFailedToCloseBucket is returned when a error occurs while closing gcs bucket CodeFailedToCloseBucket = errors.Code("FAILED_TO_CLOSE_BUCKET") // CodePanic is returned when a panic occurs CodePanic = errors.Code("PANIC_TO_WRITE_AT_BUCKET") // ErrFailedToCloseSink is returned when any error occurs while gcsClientGateway is writing ErrFailedToStoreMessages = errors.New("failed to store messages") // ErrInvalidMessage is returned when sink message is invalid ErrInvalidSinkMessage = errors.New("invalid sink message") //ErrPanic is returned when a panic occurs ErrPanic = errors.New("panic occurred while storing to gcs") )
Functions ¶
func MakeGcsRetrierOptions ¶ added in v0.14.4
func MakeGcsRetrierOptions( initialIntervalInSeconds int, maxIntervalInSeconds int, multiplier float64, policy storage.RetryPolicy, ) []storage.RetryOption
MakeRetrierOptions returns a []storage.RetryOption to allows users to configure non-default retry behavior for API calls made to GCS.
func MustNewParallel ¶
func MustNewParallel( clientGateway GcsClientGateway, contentType string, chunkSize int, retrierOption []storage.RetryOption, ) (pipeline.Sink, func() error)
MustNew creates a new pipeline sink that write messages to GCS. It panics if GcsClientGateway, contentType or chunkSize are nil or invalid. Order of the messages is not guaranteed.
Types ¶
type GcsClientGateway ¶
type GcsClientGateway interface { // GetWriter returns a writer to a GCS object with the given bucket, object, contentType, chunkSize and retrierOption. // If retrierOption is not empty, the writer will be configured with the given retry options. GetWriter(ctx context.Context, bucket, object, contentType string, chunkSize int, retrierOption ...storage.RetryOption) *storage.Writer // Write writes the given message to the given writer. Write(writer *storage.Writer, message SinkMessage) error // Close closes the GCS client Close() error }
GcsClientGateway represents a gateway to GCS client
func NewGcsGateway ¶
func NewGcsGateway(client *storage.Client) GcsClientGateway
NewGcsClientGateway creates a new GcsClientGateway
Click to show internal directories.
Click to hide internal directories.