transfer

module
v0.0.0-rc1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 16, 2024 License: Apache-2.0

README ΒΆ

Transfer: Cloud Native Ingestion engine

🦫 Introduction

Transfer, built in Go, is an open-source cloud native ingestion engine. Essentially we are building no-code (or low-code) EL(T) service that can scale data pipelines from several megabytes of data to dozens of petabytes without hassle.

Transfer provides a convenient way to transfer data between DBMSes, object stores, message brokers or anything that stores data. Our ultimate mission is to help you move data from any source to any destination with fast, effective and easy-to-use tool.

πŸš€ Try Transfer

1. Transfer Serverless Cloud

The fastest way to try Transfer, Double Cloud

2. Using CLI

Build from souces:

make build

Made with VHS

3. Using docker container

docker pull ghcr.io/doublecloud/transfer:dev

To run Transfer quickly:

docker run ghcr.io/doublecloud/transfer:dev activate --help

πŸš€ Getting Started

Ingestion from OLTP
Streaming Ingestion
CDC Streaming into Kafka
Semi-structured Ingestion
Airbyte compatibility
Transformers
Data parsers
Scaling Snapshot
Scaling Replication
Performance

πŸš€ Why Transfer

  • Cloud-Native: Single binary and cloud-native as heck, just drop it into your k8s cluster and be happy.

  • High Performance: Go-built, with cutting-edge, high-speed vectorized execution. πŸ‘‰ Bench.

  • Data Simplification: Streamlines data ingestion, no code needed needed. πŸ‘‰ Data Loading.

  • Schema infering: Automatically sync not just data but also data schemas.

  • Format Flexibility: Supports multiple data formats and types, including JSON, CSV, Parquet, Proto, and more.

  • ACID Transactions: Ensures data integrity with atomic, consistent, isolated, and durable operations.

  • Schemafull: Type system enabling schema-full data storage with flexible data modeling.

  • Community-Driven: Join a welcoming community for a user-friendly cloud analytics experience.

⚑ Performance

Naive-s3-vs-airbyte

Naive-s3-vs-airbyte

πŸ“ Architecture

transfer

Transfer is a golang pluggable package that include into transfer binary and register itself into it. Our transfer plugins can be one of:

  1. Storage - one-time data reader
  2. Sink - data writer
  3. Source - streaming data reader

Data pipeline composes with two Endpoint-s: Source and Destination. Each Data pipeline essentially link between Source {Storage|Source} and Destination {Sink}. Transfer is a LOGICAL data transfer service. The minimum unit of data is a logical ROW (object). Between source and target we communicate via ChangeItem-s. Those items batched and we may apply stateless Transformations. Overall this pipeline called Transfer

We could compose our primitive to create 2 main different types of connection

  1. {Storage} + {Sink} = Snapshot
  2. {Source} + {Sink} = Replication
  3. {Storage} + {Source} + {Sink} = Snapshot and Replication

These 2 directions are conceptually different and have different requirements for specific storages. Snapshot and Replication threads can follow each other. Event channels are conceptually unaware of the base types they bind. We mainly build cross system data connection (or as we called them Hetero replications), therefore we are not adding any nitpicking for them (type fit or schema adjustment). But for connection between same type of storages to improve accuracy, the system can tell Source|Storage|Sinks if they are homogeneous (or simply Homo replication), and do some adjustments and fine-tuning. Apart from this cross db-type connections should NOT know of what type of storage on apart side.

Storage / SnapshotProvider

Large-block reading primitive from data. The final stream of events of one type is the insertion of a row. It can give different levels of read consistency guarantees, depending on the depth of integration into a particular database.

snapshot image

ROW level Gurantee

At the most primitive storage level, it is enough to implement the reading of all logical lines from the source to work. In this case, the unit of consistency is the string itself. Example - if we say that one line is one file on disk, then reading the directory gives a guarantee of consistency within one specific file.

Table level Gurantee

Rows are logically grouped into groups of homogeneous rows, usually tables. If the source is able to read a consistent snapshot of the rows of one table, then we can guarantee that the data is consistent at the entire table level. From the point of view of the contract, consistency at the table / row level is indistinguishable for us.

Whole Storage

It can be arranged if we can take a consistent snapshot and reuse it to read several tables (for example, reading in one transaction sequentially or having a transaction pool with one database state).

Point of replication (Replication Slot)

If the source can atomically take a snapshot / snapshot mark for reading and a mark for future replication, we can implement a consistent transition between the snapshot and the replica.

Summary

From a contractual point of view, consistency at the table/row level is indistinguishable for us. We have no clear signs to clearly define with what level of assurance we have read the data from the source.

Source / ReplicationProvider

A streaming primitive. An endless stream of CRUD events line by line. In logical replication, conceptually there are only 3 types of events - create / edit / delete. For editing and deleting, we need to somehow identify the object with which we operate, so to support such events, we expect the source itself to be able to give them.

tx-bounds

For some storages such events can be grouped into transactions.

replication-lag

Once we start replication process we apply this stream of actions to target and try to minimize our data-lag between source database and target.

At the replication source level, we maintain different levels of consistency:

Row

This is the most basic mechanism, if the source does not link strings to each other, then there is a guarantee only at the string level. An example of MongoDB in FullDocument mode, each event in the source is one row living in its own timeline. Events with this level of assurance do not have a transaction tag and logical source time (LSN) or not in a strict order.

Table

If the rows begin to live in a single timeline - we can give consistency at the table level, applying the entire stream of events in the same order as we received them gives us a consistent slice of the table Eventually. Events with this level of guarantee do not have a transaction stamp in them, but contain a source logical timestamp (LSN) and a strict order.

Transaction

If the rows live in a single timeline and are attributed with transaction labels, as well as linearized in the transaction log (that is, there is a guarantee that all changes in one transaction are continuous and the transactions themselves are logically ordered) - we can give consistency at the table and transaction levels. Applying the entire stream of events in the same order with the same (or larger) batches of transactions, we will get a consistent slice of the table from the source at any moment in time.

Sink / Target

Each of our Targets is a simple thing that can consume a stream of events; at its level, the target can both support source guarantees and weaken them.

Primitive

At the most basic level, the target simply writes everything that comes in (the classic example is the / fs / s3 queue), at this level we do not guarantee anything other than the very fact of writing everything that comes in (while the records may be duplicated).

Unique Key deduplication

The Target can de-duplicate the row by the primary key, in which case we give an additional guarantee - there will be no key duplicates in the target.

Logical clock deduplication

If the Target can write to 2 tables in single transaction, we can transactional store the source logical timestamp in separate table and discard already written rows. In this case, there will be no duplicates in the targets, including in lines without keys.

Transaction boundaries

If the receiver can hold transactions for an arbitrarily long time and apply transactions of an arbitrary size, we can implement saving transaction boundaries on writes. In this case, the sink will receive rows in the same or larger transactions, which will give an exact cut of the source at any point in time.

Summary

For maximum guarantees (exact slice of the source at any point in time) both the source and the destination should give maximum guarantee between themselves.

For current storages, we have approximately the following matrix:

Storage Type S/Row S/Table S/DB S/Slot R/Row R/Table R/TX T/Rows T/Keys T/LSN T/TX
PG + + + + + + + + + + +
Mysql + + + + + + + + + +
Mongodb + + + +
Clickhouse + + +
Greenplum + + + + + + +
YDB + + + +
YT + + + + +
Airbyte + +/- +/- + +/-
Kafka + + +
EventHub + + +
LogBroker + + + +

🀝 Contributing

Transfer thrives on community contributions! Whether it's through ideas, code, or documentation, every effort helps in enhancing our project. As a token of our appreciation, once your code is merged, your name will be eternally preserved in the system.contributors table.

Here are some resources to help you get started:

πŸ‘₯ Community

For guidance on using Transfer, we recommend starting with the official documentation. If you need further assistance, explore the following community channels:

  • Slack (For live discussion with the Community)
  • GitHub (Feature/Bug reports, Contributions)
  • Twitter (Get the news fast)

πŸ›£οΈ Roadmap

Stay updated with Transfer's development journey. Here are our roadmap milestones:

πŸ“œ License

Transfer is released under the Apache License 2.0.

For more information, see the LICENSE file and Licensing FAQs.

Directories ΒΆ

Path Synopsis
cloud
library
go/core/log/zap/asynczap
Package asynczap implements asynchronous core for zap.
Package asynczap implements asynchronous core for zap.
go/core/metrics
Package metrics provides interface collecting performance metrics.
Package metrics provides interface collecting performance metrics.
go/core/resource
Package resource provides integration with RESOURCE and RESOURCE_FILES macros.
Package resource provides integration with RESOURCE and RESOURCE_FILES macros.
go/core/xerrors
package xerrors is a drop in replacement for errors and golang.org/x/xerrors packages and functionally for github.com/pkg/errors.
package xerrors is a drop in replacement for errors and golang.org/x/xerrors packages and functionally for github.com/pkg/errors.
go/test/recipe
Package recipe contains helper function for implementation of ya make recipes.
Package recipe contains helper function for implementation of ya make recipes.
go/test/yatest
Package yatest provides access to testing context, when running under ya make -t.
Package yatest provides access to testing context, when running under ya make -t.
transfer_manager
go/internal/metrics
Package metrics provides interface collecting performance metrics.
Package metrics provides interface collecting performance metrics.
go/pkg/providers/clickhouse
Package ch cluster - it's like stand-alone cluster with multimaster []*SinkServer - masters (AltHosts).
Package ch cluster - it's like stand-alone cluster with multimaster []*SinkServer - masters (AltHosts).
go/pkg/providers/clickhouse/httpclient
Code generated by MockGen.
Code generated by MockGen.
go/pkg/providers/kafka
Package kafka is a generated GoMock package.
Package kafka is a generated GoMock package.
go/pkg/providers/yt/sink
Used only in sorted_table
Used only in sorted_table
go/recipe/mongo
Basic instruction for Go recipes: https://docs.yandex-team.ru/devtools/test/environment#create-recipe
Basic instruction for Go recipes: https://docs.yandex-team.ru/devtools/test/environment#create-recipe

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL