Supported PHP versions
Flow is a most advanced and flexible PHP, data processing library.
Except typical ETL use cases (Extract, Transform, Load), Flow can be also used for memory-safe data analysis.
composer require flow-php/etl:1.x@dev
Until project get first stable release it's recommended to lock it to specific commit version in your composer.lock file.
- Sync data from external systems (API)
- File processing
- Pushing data to external systems
- Data migrations
Using this library makes sense when we need to move data from one place to another, doing some transformations in between.
For example, let's say we must synchronize data from external API periodically, transform them into our internal data structure, filter out things that didn't change, and load in bulk into the database.
This is a perfect scenario for ETL.
Examples:
<?php
use Flow\ETL\DSL\From;
use Flow\ETL\DSL\Transform;
use Flow\ETL\DSL\To;
use Flow\ETL\ETL;
use Flow\ETL\Flow;
use Flow\ETL\Memory\ArrayMemory;
use Flow\ETL\Row\Sort;
use Flow\ETL\Rows;
$array = new ArrayMemory();
(new Flow())
->read(From::rows(new Rows()))
->rows(Transform::keep(['id', 'name', 'status']))
->sortBy(Sort::desc('status'))
->write(To::memory($array)
->run();
Flow comes with a rich DSL that will make code more readable and easier to understand, due to language limitations DSL is available through static methods on following classes:
Adapters should deliver their own DSL, so for example Json Adapter should create custom ToJson
and FromJson
classes with own methods.
By design all methods in DSL are marked as final (they are not an extension points) but classes itself can be extended in case you would like to add your own custom elements.
Some configuration options can be passed through environment variables, list below:
FLOW_LOCAL_FILESYSTEM_CACHE_DIR
- location of default local filesystem cache, default:\sys_get_temp_dir()
FLOW_EXTERNAL_SORT_MAX_MEMORY
- amount of memory to use for sorting, default:200M
To get more control over how Flow is processing data frames please use Config than can be created through ConfigBuilder.
<?php
$config = Config::builder()
->cache(new MyCustomCache());
Flow::setUp($config)
->extract($extractor)
->transform($transformer)
->write($loader)
->run();
Configuration makes possible to setup following options:
id
- unique identifier of ETL instancecache
- implementation of Cache interfaceexternal sort
- implementation of External Sort interfaceserializer
- implementation of Serializer interfacepipeline
- implementation of Pipeline interface
- Constant memory consumption even when processing millions of records
- Type safe Rows/Row/Entry abstractions
- Filtering
- Sorting
- Caching
- Built in Rows objects comparison
- Rich collection of entry types and generic transformers
- Out of the box support for popular data sources/sinks
- Simple API
- Schema Definition
All entries are available through DSL\Entry
- array
- boolean
- collection
- datetime
- enum
- float
- integer
- json
- list - strongly typed array
- null
- object
- string
- structure
While adding new entry type, please follow the checklist.
Entry names are case-sensitive,
entry
is not the same asEntry
.
All generic extractors are available through DSL\From
In most cases Extractors (Readers) should be provided by Adapters which you can find below, however there are few generic readers,
please find them below.
Please read tests to find examples of usage.
- buffer - tests
- cache - tests
- chain - tests
- chunk - tests
- memory - tests
- pipeline - tests
- process - tests
All generic transformers are available through DSL\Transform
Transformers can be registered in the pipeline through following methods:
DataFrame::transformer(Transformer $transformer) : DataFrame
DataFrame::rows(Transformer $transformer) : DataFrame
Set of ETL generic Transformers, for the detailed usage instruction please look into tests. Adapters might also define some custom transformers.
- Generic
- cast - tests
- chain - tests
- clone entry - tests
- conditional - tests
- dynamic entry - tests
- entry name style converter - tests
- filter rows - tests
- hash - non-cryptographic - tests
- group to array - tests
- keep entries - tests
- math operation - tests
- math value operation - tests
- remove entries - tests
- rename entries - tests
- static entry - tests
- Array
- Object
- String
- Callback - Might come with performance degradation
Some transformers come with complex configuration, please find more details here.
Flow allows to create data transformation pipelines with low memory consumption. In order to make it possible we need to be able to read/write files in chunks, in a linear way.
This repository defines a thin abstraction that unifies streaming.
FileStream is an interface with following implementations:
Those are just value holders that are later used by Handler to
open a resource
which can be used directly or passed to underlying implementation.
In order to fully support concurrency, while writing to stream we can't write to a single stream, but instead each concurrent execution needs to work with it own stream.
DSL for Loaders/Extractors should be prepared to take following input types:
FileStream
- if remote files are not supported than it should be narrowed toLocalFile
array<FileStream>
By design Flow should use StreamWrappers
with a custom protocol that starts from flow-
, for example flow-aws-s3
or flow-azure-blob
.
StreamWrapper should be used to register custom wrappers.
flow-php/etl-adapter-streams provides support for popular remote filesystems.
In order to allow serialization of callable based transformers please add into your dependencies opis/closure library:
{
"require": {
"opis/closure": "^3.5"
}
}
If possible it's recommended to avoid writing custom transformers. Official transformers are optimized again internal mechanisms which you might not be able to achieve in your custom code.
Custom should only implement Transformer
interface:
Example:
<?php
use Flow\ETL\Transformer;
use Flow\ETL\Rows;
class NotNorbertTransformer implements Transformer
{
public function transform(Rows $rows) : Rows
{
return $rows->filter(fn(Row $row) => $row->get('name')->value() !== "Norbert");
}
}
While working on datasets that requires complex transformations it might be helpful to divide the pipeline into smaller, testable pieces.
Transformation is an interface that takes and returns DataFrame. It can be used like in the following example:
<?php
use Flow\ETL\Transformation;
use Flow\ETL\DataFrame;
final class MyTransformation implements Transformation
{
public function transform(DataFrame $dataFrame) : DataFrame
{
return $dataFrame->rows(Transform:abc())
->rows(Transform::abc())
->rows(Transform::foo())
->rows(Transform::bar())
->rows(Transform::xyz())
->rows(Transform::zzz())
->rows(Transform::baz());
}
}
(new Flow)
->read(From::source())
->rows(new MyTransformation())
->write(To::sink())
->run();
All generic loaders are available through DSL\To
In most cases Loaders (Writers) should be provided by Adapters which you can find below, however there are few generic loaders,
please find them below.
Please read tests to find examples of usage.
If Pipe (Loader or Transformer) implements Closure interface, extra closure(Rows $rows)
method will be executed with the last Rows from Extractor.
This can be handy for things like buffer loader that maintain the state but that also needs to clean up that state at last Rows.
Adapter connects ETL with existing data sources/storages and including some times custom data entries.
Name | Extractor (read) | Loader (write) |
---|---|---|
Doctrine - DB | âś… | âś… |
Elasticsearch | N/A | âś… |
Text | âś… | âś… |
CSV | âś… | âś… |
JSON | âś… | âś… |
Parquet | âś… | âś… |
Avro | âś… | âś… |
XML | âś… | N/A |
HTTP | âś… | N/A |
Excel | N/A | N/A |
Logger | đźš« | âś… |
- âś… - at least one implementation is available
- đźš« - implementation not possible
N/A
- not available yet
âť— If adapter that you are looking for is not available yet, and you are willing to work on one, feel free to create one as a standalone repository.
Well designed and documented adapters can be pulled into flow-php
organization that will give them maintenance and security support from the organization.
Currently Flow is supporting only local multiprocess asynchronous processing.
In order to process data asynchronously one of the following adapters must be first installed:
Code example:
<?php
(Flow::setUp(Config::builder()))
->read(new CSVExtractor($path = __DIR__ . '/data/dataset.csv', 10_000, 0))
->pipeline(
new LocalSocketPipeline(
SocketServer::unixDomain(__DIR__ . "/var/run/", $logger),
new ChildProcessLauncher(__DIR__ . "/vendor/bin/worker-amp", $logger),
$workers = 8
)
)
->rows(Transform::array_unpack('row'))
->drop('row')
->rows(Transform::to_integer("id"))
->rows(Transform::string_concat(['name', 'last_name'], ' ', 'name'))
->drop('last_name')
->load(new DbalLoader($tableName, $chunkSize = 1000, $dbConnectionParams))
->run();
Following illustration presents current state and future plans of the asynchronouse processing in flow
Sometimes you might already have Rows
prepared, in that case instead of going
through Extractors just use Flow::process(Rows $rows) : DataFrame
.
<?php
$flow = new Flow();
$flow->process(new Rows(...))
->transform($transformer1)
->transform($transformer2)
->transform($transformer3)
->transform($transformer4)
->write($loader)
->run();
In order to quickly filter Rows DataFrame::filter
shortcut function can be used.
<?php
$flow = new Flow();
$flow->process(new Rows(...))
->filter(fn (Row $row) => $row->valueOf('id') % 2 === 0)
->write($loader)
->run();
Flow allows grouping that is similar to the one known from database engines.
All aggregations are null safe, meaning that if aggregation or grouping entry
is missing it will be skipped or grouped into null
entry.
- avg - arithmetic mean algorithm
- count
- max
- min
- sum
<?php
$flow = new Flow();
$rows = $flow->process(
new Rows(
Row::create(Entry::integer('id', 1), Entry::string('country', 'PL'), Entry::integer('age', 20)),
Row::create(Entry::integer('id', 2), Entry::string('country', 'PL'), Entry::integer('age', 20)),
Row::create(Entry::integer('id', 3), Entry::string('country', 'PL'), Entry::integer('age', 25)),
Row::create(Entry::integer('id', 4), Entry::string('country', 'PL'), Entry::integer('age', 30)),
Row::create(Entry::integer('id', 5), Entry::string('country', 'US'), Entry::integer('age', 40)),
Row::create(Entry::integer('id', 6), Entry::string('country', 'US'), Entry::integer('age', 40)),
Row::create(Entry::integer('id', 7), Entry::string('country', 'US'), Entry::integer('age', 45)),
Row::create(Entry::integer('id', 9), Entry::string('country', 'US'), Entry::integer('age', 50)),
)
)
->groupBy('country')
->aggregate(Aggregation::avg('age'))
->fetch();
$this->assertEquals(
new Rows(
Row::create(Entry::string('country', 'PL'), Entry::float('age_avg', 23.75)),
Row::create(Entry::string('country', 'US'), Entry::float('age_avg', 43.75)),
),
$rows
);
In order to quickly select only relevant entries use Rows DataFrame::select
<?php
$flow = new Flow();
$flow->process(new Rows(...))
->select("id", "name")
->write($loader)
->run();
This function is internally using keep entries transformers.
In order to quickly drop irrelevant entries use Rows DataFrame::drop
<?php
$flow = new Flow();
$flow->process(new Rows(...))
->drop("_tags")
->write($loader)
->run();
In order to quickly rename entries use Rows DataFrame::rename
<?php
$flow = new Flow();
$flow->process(new Rows(...))
->rename("old_name", "new_name")
->write($loader)
->run();
This function is internally using rename entries transformers.
Quick Row
transformations are available through DataFrame::map
function
<?php
$flow = new Flow();
$flow->process(new Rows(...))
->map(fn (Row $row) => $row->add(new BooleanEntry('odd', $row->valueOf('id') % 2 === 0)))
->write($loader)
->run();
This function is internally using filter transformer.
Reading from the source, transforming data, even loading to sink is executed only by one of the following trigger methods that will immediately run the pipeline.
DataFrame::run()
DataFrame::fetch()
DataFrame::display()
DataFrame::cache()
DataFrame::sortBy()
DataFrame::collect()
DataFrame::parallelize()
It is important to be aware of this, especially when using methods like DataFrame::limit()
that must be placed before first trigger method to make an effect.
Sometimes you might just want to process only few first rows, maybe for debugging purpose.
In this example, Pipeline will take only 5 rows from Extractor passing them through all transformers.
<?php
$flow = new Flow();
$flow->read($extractor)
->transform($transformer1)
->transform($transformer2)
->transform($transformer3)
->transform($transformer4)
->loader($loader)
->limit(5)
->run();
Join two data frames, left and right using one of following types of join:
- left
- right
- inner
Three are two available join methods:
- DataFrame::join - right side is static for each left Rows set.
- DataFrame::joinEach - right side dynamically generated for each left Rows set.
In the example below we are joining two lazy evaluated dataframes
on a condition that says that country
entry from $countries
(left) data frame
matches code
entry from $names
(right) data frame.
In this case, right data frame will be whole fetched into memory which might be a limitation when joining large data sets (in such case use DataFrame::joinEach instead).
<?php
$countries = new Rows(
Row::create(Entry::integer('id', 1), Entry::string('country', 'PL')),
Row::create(Entry::integer('id', 2), Entry::string('country', 'PL')),
Row::create(Entry::integer('id', 3), Entry::string('country', 'PL')),
Row::create(Entry::integer('id', 4), Entry::string('country', 'PL')),
Row::create(Entry::integer('id', 5), Entry::string('country', 'US')),
Row::create(Entry::integer('id', 6), Entry::string('country', 'US')),
Row::create(Entry::integer('id', 7), Entry::string('country', 'US')),
Row::create(Entry::integer('id', 9), Entry::string('country', 'US')),
);
$names = (new Flow())->process(
new Rows(
Row::create(Entry::string('code', 'PL'), Entry::string('name', 'Poland')),
Row::create(Entry::string('code', 'US'), Entry::string('name', 'United States')),
)
);
$countriesWithNames = (new Flow())
->process($countries)
->join($names, Condition::on(['country' => 'code']), $type = "left")
->fetch();
Loaders are a great way to load Rows
into specific data sinks, however sometimes
you want to simply grab Rows and do something with them.
<?php
$flow = new Flow();
$rows = $flow->read($extractor)
->transform($transformer1)
->transform($transformer2)
->transform($transformer3)
->transform($transformer4)
->fetch();
If DataFrame::fetch(int $limit = 0) : Rows
limit argument is different from 0, fetch will
return no more rows than requested.
Display is probably the easiest way to debug ETL's, by default it will grab selected number of rows (20 by default)
<?php
$flow = new Flow();
$output = $flow->read($extractor)
->transform($transformer1)
->transform($transformer2)
->transform($transformer3)
->transform($transformer4)
->display($limit = 5, $truncate = 0);
echo $output;
Output:
+------+--------+---------+---------------------------+-------+------------------------------+--------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+
| id | price | deleted | created-at | phase | items | tags | object |
+------+--------+---------+---------------------------+-------+------------------------------+--------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+
| 1234 | 123.45 | false | 2020-07-13T15:00:00+00:00 | null | {"item-id":"1","name":"one"} | [{"item-id":"1","name":"one"},{"item-id":"2","name":"two"},{"item-id":"3","name":"three"}] | ArrayIterator Object( [storage:ArrayIterator:private] => Array ( [0] => 1 [1] => 2 [2] => 3 )) |
| 1234 | 123.45 | false | 2020-07-13T15:00:00+00:00 | null | {"item-id":"1","name":"one"} | [{"item-id":"1","name":"one"},{"item-id":"2","name":"two"},{"item-id":"3","name":"three"}] | ArrayIterator Object( [storage:ArrayIterator:private] => Array ( [0] => 1 [1] => 2 [2] => 3 )) |
| 1234 | 123.45 | false | 2020-07-13T15:00:00+00:00 | null | {"item-id":"1","name":"one"} | [{"item-id":"1","name":"one"},{"item-id":"2","name":"two"},{"item-id":"3","name":"three"}] | ArrayIterator Object( [storage:ArrayIterator:private] => Array ( [0] => 1 [1] => 2 [2] => 3 )) |
| 1234 | 123.45 | false | 2020-07-13T15:00:00+00:00 | null | {"item-id":"1","name":"one"} | [{"item-id":"1","name":"one"},{"item-id":"2","name":"two"},{"item-id":"3","name":"three"}] | ArrayIterator Object( [storage:ArrayIterator:private] => Array ( [0] => 1 [1] => 2 [2] => 3 )) |
| 1234 | 123.45 | false | 2020-07-13T15:00:00+00:00 | null | {"item-id":"1","name":"one"} | [{"item-id":"1","name":"one"},{"item-id":"2","name":"two"},{"item-id":"3","name":"three"}] | ArrayIterator Object( [storage:ArrayIterator:private] => Array ( [0] => 1 [1] => 2 [2] => 3 )) |
+------+--------+---------+---------------------------+-------+------------------------------+--------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------+
5 rows
Another way to display Rows without breaking execution is through using stream loader
$flow = new Flow();
$output = $flow->read($extractor)
->transform($transformer1)
->transform($transformer2)
->transform(Transform::output()) // display rows in stdout stream.
->transform($transformer3)
->transform($transformer4)
->write($loader)
->run();
Void allows to process Rows only to a given moment in a pipeline which is mostly useful during debugging.
<?php
$flow = new Flow();
$rows = $flow->read($extractor) // extract non empty rows
->transform($transformer1) // non empty rows
->transform($transformer2) // non empty rows
->void()
->transform($transformer3) // empty rows
->transform($transformer4) // empty rows
->fetch();
// $rows are empty instance of Rows();
Entry names are case-sensitive,
entry
is not the same asEntry
.
Before loading data to sink it might be a good idea to validate it against the schema. Row Schema is built from Entry Definitions, each definition is created from:
entry
- name of entrytype
- type of entry (class string)nullable
- iftrue
NullEntry with matching name will also pass the validation regardless of the typeconstraint
- additional, flexible validation. Useful for checking if entry value is for example one of expected valuesmetadata
- additional key-value collection that can carry additional context for the definition
Example:
<?php
$flow = new Flow();
$flow->read($from)
->rows($transform)
->validate(
new Schema(
Schema\Definition::integer('id', $nullable = false),
Schema\Definition::string('name', $nullable = true),
Schema\Definition::boolean('active', $nullable = false, new SameAs(true), Metadata::empty()->add('key', 'value')),
)
)
->write($to)
->run();
There is more than one way to validate the schema, built in strategies are defined below:
- StrictValidator - each row must exactly match the schema, extra entries will fail validation
- SelectiveValidator - only rows defined in the schema must match, any extra entry in row will be ignored
By default, ETL is initializing StrictValidator
, but it's possible to override it by passing second argument to DataFrame::validate()
method.
Example:
<?php
$flow = new Flow();
$flow->read($from)
->rows($transform)
->validate(
new Schema(
Schema\Definition::integer('id', $nullable = false),
Schema\Definition::string('name', $nullable = true),
Schema\Definition::boolean('active', $nullable = false),
),
new SelectiveValidator()
)
->write($to)
->run();
In case of any exception in transform/load steps, ETL process will break, in order to change that behavior please set custom ErrorHandler.
Error Handler defines 3 behavior using 2 methods.
ErrorHandler::throw(\Throwable $error, Rows $rows) : bool
ErrorHandler::skipRows(\Throwable $error, Rows $rows) : bool
If throw
returns true, ETL will simply throw an error.
If `skipRows' returns true, ETL will stop processing given rows, and it will try to move to the next batch.
If both methods returns false, ETL will continue processing Rows using next transformers/loaders.
There are 3 build in ErrorHandlers (look for more in adapters):
Error Handling can be set directly at ETL:
<?php
$flow = new Flow();
$flow->read($extractor)
->onError(new IgnoreError())
->transform($transformer)
->write($loader)
->run();
Thanks to implementation of External Sort algorithm, sorting as everything else is by default memory-safe. This means, that even sorting 10gb file if doable in just few megabytes of RAM.
<?php
$flow = new Flow();
$rows = $flow->read($extractor)
->transform($transformer1)
->transform($transformer2)
->sortBy(Sort::desc('price'))
->fetch();
Please remember that sort is an expensive operation, usually datasets are either loaded into destination storages, or reduced by filtering/grouping. Sorting needs to go through entire dataset and sort all Rows regardless of how big the dataset is compared to available memory. In order to achieve that, External Sort is using cache which relays on I/O that might become a bottleneck.
Alternatively you can also fetch Rows from ETL, sort them and then process again. That way, sorting will happen in memory so make sure you have enough.
$flow = new Flow();
$rows = $flow->read($extractor)
->transform($transformer1)
->transform($transformer2)
->fetch();
$flow->process($rows->sortBy(Sort::desc('price')))
->write($loader);
The goal of cache is to serialize and save on disk (or in other location defined by Cache implementation) already transformer dataset.
Cache will run a pipeline, catching each Rows and saving them into cache from where those rows can be later extracted.
This is useful for operations that requires full transformation of dataset before moving forward, like for example sorting.
Another interesting use case for caching would be to share the dataset between multiple ETL's. So instead of going to datasource multiple times and then repeating all transformations, only one ETL would do the whole job and others could benefit from the final form of dataset in a memory-safe way.
<?php
$flow = new Flow();
$flow->read($extractor)
->transform($transformer1)
->transform($transformer2)
->cache()
->transform($transformer3)
->write($loader)
->run();
<?php
$flow = new Flow();
$flow->read($extractor)
->transform($transformer1)
->transform($transformer2)
->collect()
->write($loader)
->run();
Flow PHP ETL is designed to keep memory consumption constant. This can be achieved by processing only one chunk of data at time.
It's Extrator
responsibility to define how big those chunks are, for example when processing CSV file with 10k
lines, extractor might want to read only 1k lines at once.
Those 1k lines will be represented as an instance of Rows
. This means that through ETL pipeline we are
going to push 10 rows, 1k row each.
Main purpose of methods DataFrame::collect()
and DataFrame::parallelize()
is to adjust number of rows in the middle of processing.
This means that Extractor can still extract 1k rows at once, but before using loader we can use DataFrame::collect
which
will wait for all rows to get extracted, then it will merge them and pass total 10k rows into Loader
.
Parallelize method is exactly opposite, it will not wait for all Rows in order to collect them, instead it will
take any incoming Rows instance and split it into smaller chunks according to DataFrame::parallelize(int $chunks)
method chunks
argument.
<?php
$flow = new Flow();
$flow->read($extractor)
->transform($transformer1)
->transform($transformer2)
->write($loader1)
->parallelize(20)
->transform($transformer3)
->transform($transformer4)
->write($loader2)
->run();
The most important thing about performance to remember is that creating custom Loaders/Transformers might have negative impact to processing performance.
Using collect on a large number of rows might end up without of memory exception, but it can also significantly increase loading time into datasink. It might be cheaper to do one big insert than multiple smaller inserts.
Even that sortBy is memory efficient due to External Sort algorithm, it still might become a time bottleneck. In many cases sorting is redundant, since data sinks like databases can deal with this way more efficient. If sorting can't be avoided the best practice is to reduce the dataset by filtering as much as possible.
In order to install dependencies please, launch following commands:
composer install
In order to execute full test suite, please launch following command:
composer build
It's recommended to use pcov for code coverage however you can also use
xdebug by setting XDEBUG_MODE=coverage
env variable.