Documentation ¶
Index ¶
- Constants
- func RegisterStore(name string, factory func() StreamStore)
- type AzureStreamStore
- func (s *AzureStreamStore) CreateTier(ctx context.Context, args *models.CreateTierArgs) error
- func (s *AzureStreamStore) DeletePage(ctx context.Context, args *models.DeletePageArgs) error
- func (s *AzureStreamStore) DeletePartition(ctx context.Context, args *models.DeletePartitionArgs) error
- func (s *AzureStreamStore) DeleteSpace(ctx context.Context, args *models.DeleteSpaceArgs) error
- func (s *AzureStreamStore) GetPages(ctx context.Context, args *models.GetPagesArgs) enumerators.Enumerator[int32]
- func (s *AzureStreamStore) GetPartitions(ctx context.Context, args *models.GetPartitionsArgs) enumerators.Enumerator[string]
- func (s *AzureStreamStore) GetSpaces(ctx context.Context, args *models.GetSpacesArgs) enumerators.Enumerator[string]
- func (s *AzureStreamStore) ReadManifest(ctx context.Context, args *models.ReadManifestArgs) (*models.ManifestWrapper, error)
- func (s *AzureStreamStore) ReadPage(ctx context.Context, args *models.ReadPageArgs) enumerators.Enumerator[*models.Entry]
- func (s *AzureStreamStore) Scavenge(ctx context.Context) error
- func (s *AzureStreamStore) WriteManifest(ctx context.Context, args *models.WriteManifestArgs) (models.ConcurrencyTag, error)
- func (s *AzureStreamStore) WritePage(ctx context.Context, args *models.WritePageArgs, ...) (*models.Page, error)
- type FileSystemStore
- func (fs *FileSystemStore) CreateTier(ctx context.Context, args *models.CreateTierArgs) error
- func (fs *FileSystemStore) DeletePage(ctx context.Context, args *models.DeletePageArgs) error
- func (fs *FileSystemStore) DeletePartition(ctx context.Context, args *models.DeletePartitionArgs) error
- func (fs *FileSystemStore) DeleteSpace(ctx context.Context, args *models.DeleteSpaceArgs) error
- func (fs *FileSystemStore) GetPages(ctx context.Context, args *models.GetPagesArgs) enumerators.Enumerator[int32]
- func (fs *FileSystemStore) GetPartitions(ctx context.Context, args *models.GetPartitionsArgs) enumerators.Enumerator[string]
- func (fs *FileSystemStore) GetSpaces(ctx context.Context, args *models.GetSpacesArgs) enumerators.Enumerator[string]
- func (fs *FileSystemStore) ReadManifest(ctx context.Context, args *models.ReadManifestArgs) (*models.ManifestWrapper, error)
- func (fs *FileSystemStore) ReadPage(ctx context.Context, args *models.ReadPageArgs) enumerators.Enumerator[*models.Entry]
- func (fs *FileSystemStore) Scavenge(ctx context.Context) error
- func (fs *FileSystemStore) WriteManifest(ctx context.Context, args *models.WriteManifestArgs) (models.ConcurrencyTag, error)
- func (fs *FileSystemStore) WritePage(ctx context.Context, args *models.WritePageArgs, ...) (*models.Page, error)
- type GoogleStreamStore
- func (s *GoogleStreamStore) CreateTier(ctx context.Context, args *models.CreateTierArgs) error
- func (s *GoogleStreamStore) DeletePage(ctx context.Context, args *models.DeletePageArgs) error
- func (s *GoogleStreamStore) DeletePartition(ctx context.Context, args *models.DeletePartitionArgs) error
- func (s *GoogleStreamStore) DeleteSpace(ctx context.Context, args *models.DeleteSpaceArgs) error
- func (s *GoogleStreamStore) GetPages(ctx context.Context, args *models.GetPagesArgs) enumerators.Enumerator[int32]
- func (s *GoogleStreamStore) GetPartitions(ctx context.Context, args *models.GetPartitionsArgs) enumerators.Enumerator[string]
- func (s *GoogleStreamStore) GetSpaces(ctx context.Context, args *models.GetSpacesArgs) enumerators.Enumerator[string]
- func (s *GoogleStreamStore) ReadManifest(ctx context.Context, args *models.ReadManifestArgs) (*models.ManifestWrapper, error)
- func (s *GoogleStreamStore) ReadPage(ctx context.Context, args *models.ReadPageArgs) enumerators.Enumerator[*models.Entry]
- func (s *GoogleStreamStore) Scavenge(ctx context.Context) error
- func (s *GoogleStreamStore) WriteManifest(ctx context.Context, args *models.WriteManifestArgs) (models.ConcurrencyTag, error)
- func (s *GoogleStreamStore) WritePage(ctx context.Context, args *models.WritePageArgs, ...) (*models.Page, error)
- type S3StreamStore
- func (s *S3StreamStore) CreateTier(ctx context.Context, args *models.CreateTierArgs) error
- func (s *S3StreamStore) DeletePage(ctx context.Context, args *models.DeletePageArgs) error
- func (s *S3StreamStore) DeletePartition(ctx context.Context, args *models.DeletePartitionArgs) error
- func (s *S3StreamStore) DeleteSpace(ctx context.Context, args *models.DeleteSpaceArgs) error
- func (s *S3StreamStore) GetPages(ctx context.Context, args *models.GetPagesArgs) enumerators.Enumerator[int32]
- func (s *S3StreamStore) GetPartitions(ctx context.Context, args *models.GetPartitionsArgs) enumerators.Enumerator[string]
- func (s *S3StreamStore) GetSpaces(ctx context.Context, args *models.GetSpacesArgs) enumerators.Enumerator[string]
- func (s *S3StreamStore) ReadManifest(ctx context.Context, args *models.ReadManifestArgs) (*models.ManifestWrapper, error)
- func (s *S3StreamStore) ReadPage(ctx context.Context, args *models.ReadPageArgs) enumerators.Enumerator[*models.Entry]
- func (s *S3StreamStore) Scavenge(ctx context.Context) error
- func (s *S3StreamStore) WriteManifest(ctx context.Context, args *models.WriteManifestArgs) (models.ConcurrencyTag, error)
- func (s *S3StreamStore) WritePage(ctx context.Context, args *models.WritePageArgs, ...) (*models.Page, error)
- type StreamStore
Constants ¶
const AWS = "S3"
const AZURE = "AZURE"
const FILE_SYSTEM = "FILE_SYSTEM"
const GOOGLE = "GOOGLE"
Variables ¶
This section is empty.
Functions ¶
func RegisterStore ¶
func RegisterStore(name string, factory func() StreamStore)
Types ¶
type AzureStreamStore ¶
type AzureStreamStore struct { }
FileSystemStreamStore represents a store for file-based streams
func (*AzureStreamStore) CreateTier ¶
func (s *AzureStreamStore) CreateTier(ctx context.Context, args *models.CreateTierArgs) error
CreateTier implements StreamStore.
func (*AzureStreamStore) DeletePage ¶
func (s *AzureStreamStore) DeletePage(ctx context.Context, args *models.DeletePageArgs) error
DeletePage implements StreamStore.
func (*AzureStreamStore) DeletePartition ¶
func (s *AzureStreamStore) DeletePartition(ctx context.Context, args *models.DeletePartitionArgs) error
DeletePartition implements StreamStore.
func (*AzureStreamStore) DeleteSpace ¶
func (s *AzureStreamStore) DeleteSpace(ctx context.Context, args *models.DeleteSpaceArgs) error
DeleteSpace implements StreamStore.
func (*AzureStreamStore) GetPages ¶
func (s *AzureStreamStore) GetPages(ctx context.Context, args *models.GetPagesArgs) enumerators.Enumerator[int32]
GetPages implements StreamStore.
func (*AzureStreamStore) GetPartitions ¶
func (s *AzureStreamStore) GetPartitions(ctx context.Context, args *models.GetPartitionsArgs) enumerators.Enumerator[string]
GetPartitions implements StreamStore.
func (*AzureStreamStore) GetSpaces ¶
func (s *AzureStreamStore) GetSpaces(ctx context.Context, args *models.GetSpacesArgs) enumerators.Enumerator[string]
GetSpaces implements StreamStore.
func (*AzureStreamStore) ReadManifest ¶
func (s *AzureStreamStore) ReadManifest(ctx context.Context, args *models.ReadManifestArgs) (*models.ManifestWrapper, error)
ReadManifest implements StreamStore.
func (*AzureStreamStore) ReadPage ¶
func (s *AzureStreamStore) ReadPage(ctx context.Context, args *models.ReadPageArgs) enumerators.Enumerator[*models.Entry]
ReadPage implements StreamStore.
func (*AzureStreamStore) Scavenge ¶ added in v0.0.8
func (s *AzureStreamStore) Scavenge(ctx context.Context) error
ScavengeTier implements StreamStore.
func (*AzureStreamStore) WriteManifest ¶
func (s *AzureStreamStore) WriteManifest(ctx context.Context, args *models.WriteManifestArgs) (models.ConcurrencyTag, error)
WriteManifest implements StreamStore.
func (*AzureStreamStore) WritePage ¶
func (s *AzureStreamStore) WritePage(ctx context.Context, args *models.WritePageArgs, entries enumerators.Enumerator[*models.Entry]) (*models.Page, error)
WritePage implements StreamStore.
type FileSystemStore ¶ added in v0.0.8
type FileSystemStore struct {
// contains filtered or unexported fields
}
FileSystemStore represents a store for file-based streams
func (*FileSystemStore) CreateTier ¶ added in v0.0.8
func (fs *FileSystemStore) CreateTier(ctx context.Context, args *models.CreateTierArgs) error
func (*FileSystemStore) DeletePage ¶ added in v0.0.8
func (fs *FileSystemStore) DeletePage(ctx context.Context, args *models.DeletePageArgs) error
DeletePage deletes a page file
func (*FileSystemStore) DeletePartition ¶ added in v0.0.8
func (fs *FileSystemStore) DeletePartition(ctx context.Context, args *models.DeletePartitionArgs) error
DeletePartition deletes the entire stream directory
func (*FileSystemStore) DeleteSpace ¶ added in v0.0.8
func (fs *FileSystemStore) DeleteSpace(ctx context.Context, args *models.DeleteSpaceArgs) error
DeletePartition deletes the entire stream directory
func (*FileSystemStore) GetPages ¶ added in v0.0.8
func (fs *FileSystemStore) GetPages(ctx context.Context, args *models.GetPagesArgs) enumerators.Enumerator[int32]
func (*FileSystemStore) GetPartitions ¶ added in v0.0.8
func (fs *FileSystemStore) GetPartitions(ctx context.Context, args *models.GetPartitionsArgs) enumerators.Enumerator[string]
func (*FileSystemStore) GetSpaces ¶ added in v0.0.8
func (fs *FileSystemStore) GetSpaces(ctx context.Context, args *models.GetSpacesArgs) enumerators.Enumerator[string]
func (*FileSystemStore) ReadManifest ¶ added in v0.0.8
func (fs *FileSystemStore) ReadManifest(ctx context.Context, args *models.ReadManifestArgs) (*models.ManifestWrapper, error)
ReadManifest gets the manifest for a stream
func (*FileSystemStore) ReadPage ¶ added in v0.0.8
func (fs *FileSystemStore) ReadPage(ctx context.Context, args *models.ReadPageArgs) enumerators.Enumerator[*models.Entry]
func (*FileSystemStore) Scavenge ¶ added in v0.0.8
func (fs *FileSystemStore) Scavenge(ctx context.Context) error
func (*FileSystemStore) WriteManifest ¶ added in v0.0.8
func (fs *FileSystemStore) WriteManifest(ctx context.Context, args *models.WriteManifestArgs) (models.ConcurrencyTag, error)
WriteManifest writes the manifest for a stream
func (*FileSystemStore) WritePage ¶ added in v0.0.8
func (fs *FileSystemStore) WritePage(ctx context.Context, args *models.WritePageArgs, entries enumerators.Enumerator[*models.Entry]) (*models.Page, error)
WriteRecords writes records to a page file
type GoogleStreamStore ¶
type GoogleStreamStore struct { }
FileSystemStreamStore represents a store for file-based streams
func (*GoogleStreamStore) CreateTier ¶
func (s *GoogleStreamStore) CreateTier(ctx context.Context, args *models.CreateTierArgs) error
CreateTier implements StreamStore.
func (*GoogleStreamStore) DeletePage ¶
func (s *GoogleStreamStore) DeletePage(ctx context.Context, args *models.DeletePageArgs) error
DeletePage implements StreamStore.
func (*GoogleStreamStore) DeletePartition ¶
func (s *GoogleStreamStore) DeletePartition(ctx context.Context, args *models.DeletePartitionArgs) error
DeletePartition implements StreamStore.
func (*GoogleStreamStore) DeleteSpace ¶
func (s *GoogleStreamStore) DeleteSpace(ctx context.Context, args *models.DeleteSpaceArgs) error
DeleteSpace implements StreamStore.
func (*GoogleStreamStore) GetPages ¶
func (s *GoogleStreamStore) GetPages(ctx context.Context, args *models.GetPagesArgs) enumerators.Enumerator[int32]
GetPages implements StreamStore.
func (*GoogleStreamStore) GetPartitions ¶
func (s *GoogleStreamStore) GetPartitions(ctx context.Context, args *models.GetPartitionsArgs) enumerators.Enumerator[string]
GetPartitions implements StreamStore.
func (*GoogleStreamStore) GetSpaces ¶
func (s *GoogleStreamStore) GetSpaces(ctx context.Context, args *models.GetSpacesArgs) enumerators.Enumerator[string]
GetSpaces implements StreamStore.
func (*GoogleStreamStore) ReadManifest ¶
func (s *GoogleStreamStore) ReadManifest(ctx context.Context, args *models.ReadManifestArgs) (*models.ManifestWrapper, error)
ReadManifest implements StreamStore.
func (*GoogleStreamStore) ReadPage ¶
func (s *GoogleStreamStore) ReadPage(ctx context.Context, args *models.ReadPageArgs) enumerators.Enumerator[*models.Entry]
ReadPage implements StreamStore.
func (*GoogleStreamStore) Scavenge ¶ added in v0.0.8
func (s *GoogleStreamStore) Scavenge(ctx context.Context) error
ScavengeTier implements StreamStore.
func (*GoogleStreamStore) WriteManifest ¶
func (s *GoogleStreamStore) WriteManifest(ctx context.Context, args *models.WriteManifestArgs) (models.ConcurrencyTag, error)
WriteManifest implements StreamStore.
func (*GoogleStreamStore) WritePage ¶
func (s *GoogleStreamStore) WritePage(ctx context.Context, args *models.WritePageArgs, entries enumerators.Enumerator[*models.Entry]) (*models.Page, error)
WritePage implements StreamStore.
type S3StreamStore ¶
type S3StreamStore struct { }
FileSystemStreamStore represents a store for file-based streams
func (*S3StreamStore) CreateTier ¶
func (s *S3StreamStore) CreateTier(ctx context.Context, args *models.CreateTierArgs) error
CreateTier implements StreamStore.
func (*S3StreamStore) DeletePage ¶
func (s *S3StreamStore) DeletePage(ctx context.Context, args *models.DeletePageArgs) error
DeletePage implements StreamStore.
func (*S3StreamStore) DeletePartition ¶
func (s *S3StreamStore) DeletePartition(ctx context.Context, args *models.DeletePartitionArgs) error
DeletePartition implements StreamStore.
func (*S3StreamStore) DeleteSpace ¶
func (s *S3StreamStore) DeleteSpace(ctx context.Context, args *models.DeleteSpaceArgs) error
DeleteSpace implements StreamStore.
func (*S3StreamStore) GetPages ¶
func (s *S3StreamStore) GetPages(ctx context.Context, args *models.GetPagesArgs) enumerators.Enumerator[int32]
GetPages implements StreamStore.
func (*S3StreamStore) GetPartitions ¶
func (s *S3StreamStore) GetPartitions(ctx context.Context, args *models.GetPartitionsArgs) enumerators.Enumerator[string]
GetPartitions implements StreamStore.
func (*S3StreamStore) GetSpaces ¶
func (s *S3StreamStore) GetSpaces(ctx context.Context, args *models.GetSpacesArgs) enumerators.Enumerator[string]
GetSpaces implements StreamStore.
func (*S3StreamStore) ReadManifest ¶
func (s *S3StreamStore) ReadManifest(ctx context.Context, args *models.ReadManifestArgs) (*models.ManifestWrapper, error)
ReadManifest implements StreamStore.
func (*S3StreamStore) ReadPage ¶
func (s *S3StreamStore) ReadPage(ctx context.Context, args *models.ReadPageArgs) enumerators.Enumerator[*models.Entry]
ReadPage implements StreamStore.
func (*S3StreamStore) Scavenge ¶ added in v0.0.8
func (s *S3StreamStore) Scavenge(ctx context.Context) error
ScavengeTier implements StreamStore.
func (*S3StreamStore) WriteManifest ¶
func (s *S3StreamStore) WriteManifest(ctx context.Context, args *models.WriteManifestArgs) (models.ConcurrencyTag, error)
WriteManifest implements StreamStore.
func (*S3StreamStore) WritePage ¶
func (s *S3StreamStore) WritePage(ctx context.Context, args *models.WritePageArgs, entries enumerators.Enumerator[*models.Entry]) (*models.Page, error)
WritePage implements StreamStore.
type StreamStore ¶
type StreamStore interface { // CreatePartition creates a new stream based on the provided arguments. // Arguments: CreatePartitionArgs - the parameters for creating the stream. // Returns: error - an error if stream creation fails, nil if successful. CreateTier(ctx context.Context, args *models.CreateTierArgs) error // ReadManifest reads the manifest associated with a specific stream or space. // Arguments: ReadManifestArgs - the parameters to locate the manifest. // Returns: ManifestWrapper - the wrapped manifest data, or an error if reading fails. ReadManifest(ctx context.Context, args *models.ReadManifestArgs) (*models.ManifestWrapper, error) // WriteManifest writes a manifest to storage. // Arguments: WriteManifestArgs - contains details for writing the manifest. // Returns: ConcurrencyTag - the concurrency tag representing the write operation's version. // error - any error encountered during the write process. WriteManifest(ctx context.Context, args *models.WriteManifestArgs) (models.ConcurrencyTag, error) // ReadPage retrieves entries from a specific page. // Arguments: ReadPageArgs - contains the parameters needed to locate the page. // Returns: Enumerator[*models.Entry] - a lazy enumerator for the entries within the page. // The entries are retrieved on demand as the enumerator is iterated over. ReadPage(ctx context.Context, args *models.ReadPageArgs) enumerators.Enumerator[*models.Entry] // WritePage writes a page containing entries to storage. // Arguments: WritePageArgs - parameters for creating the page and its entries. // Returns: Page - the page that was written, Entry - an entry in the page, // and an error if there was a failure in writing. WritePage(ctx context.Context, args *models.WritePageArgs, entries enumerators.Enumerator[*models.Entry]) (*models.Page, error) // DeletePage deletes a specific page from storage. // Arguments: DeletePageArgs - parameters identifying the page to be deleted. // Returns: error - any error encountered during the deletion of the page. DeletePage(ctx context.Context, args *models.DeletePageArgs) error // GetPages retrieves a list of page numbers. // Arguments: GetPagesArgs - parameters for retrieving the page information. // Returns: Enumerator[int32] - a lazy enumerator of page numbers. // The page numbers are returned one by one as the enumerator is iterated over. GetPages(ctx context.Context, args *models.GetPagesArgs) enumerators.Enumerator[int32] // GetSpaces retrieves a list of space keys. // Arguments: GetSpacesArgs - parameters for retrieving space information. // Returns: Enumerator[string] - a lazy enumerator of space keys. // The space keys are returned one by one as the enumerator is iterated over. GetSpaces(ctx context.Context, args *models.GetSpacesArgs) enumerators.Enumerator[string] // GetPartitions retrieves a list of stream keys. // Arguments: GetPartitionsArgs - parameters for retrieving stream information. // Returns: Enumerator[string] - a lazy enumerator of stream keys. // The stream keys are returned one by one as the enumerator is iterated over. GetPartitions(ctx context.Context, args *models.GetPartitionsArgs) enumerators.Enumerator[string] // DeleteSpace deletes a space from storage. // Arguments: DeleteSpaceArgs - parameters identifying the space to be deleted. // Returns: error - any error encountered during the deletion of the space. DeleteSpace(ctx context.Context, args *models.DeleteSpaceArgs) error // DeletePartition deletes a stream from storage. // Arguments: DeletePartitionArgs - parameters identifying the stream to be deleted. // Returns: error - any error encountered during the deletion of the stream. DeletePartition(ctx context.Context, args *models.DeletePartitionArgs) error // Scavenge deletes obsolete pages. Scavenge(ctx context.Context) error }
StreamStore defines the interface for operations on stream data, including stream creation, reading and writing manifests, managing pages, and handling spaces and streams.
func NewAzureStore ¶
func NewAzureStore() StreamStore
NewFileSystemStore creates a new instance of FileSystemPartitionStore
func NewFileSystemStore ¶
func NewFileSystemStore() StreamStore
NewFileSystemStore creates a new instance of FileSystemPartitionStore
func NewGoogleStore ¶
func NewGoogleStore() StreamStore
NewFileSystemStore creates a new instance of FileSystemPartitionStore
func NewS3Store ¶
func NewS3Store() StreamStore
NewFileSystemStore creates a new instance of FileSystemPartitionStore
func NewStore ¶
func NewStore(name string) StreamStore