Data-Tubes/lib/Data/Tubes/Manual.pod
=pod
=for vim
vim: tw=72 ts=3 sts=3 sw=3 et ai :
=encoding utf8
=head1 Data::Tubes - The Missing Manual
L<Data::Tubes> helps you manage transformations steps on a stream of
data that can be though as a sequence of I<records>. It does it by
passing I<records> through I<tubes>, usually a sequence of them.
There's an extensive documentation about the modules, here's a list. But
if you need a gentle introduction... read on the following sections!
=over
=item *
L<Data::Tubes>
=item *
L<Data::Tubes::Plugin::Parser>
=item *
L<Data::Tubes::Plugin::Plumbing>
=item *
L<Data::Tubes::Plugin::Reader>
=item *
L<Data::Tubes::Plugin::Renderer>
=item *
L<Data::Tubes::Plugin::Source>
=item *
L<Data::Tubes::Plugin::Util>
=item *
L<Data::Tubes::Plugin::Writer>
=item *
L<Data::Tubes::Util>
=item *
L<Data::Tubes::Util::Output>
=back
=head2 A Few Definitions
A I<record> can be whatever scalar you can think of. I might even be
C<undef>. What's in a record, and what it does mean, is completely up to
you --and, of course, to the tube that is going to manage that record.
A I<record> might even evolve through the pipeline to become something
different, e.g. multiple records. In other words, some tubes might take
an input record of some nature, and emit output record(s) of a
completely different one. Again, it's up to the tube to decide this,
which means that, eventually, it's up to you.
So, a I<tube> is a transformation function, that turns one single
I<input record> into zero, one or more I<output records>, according to
the model drawn below:
\________/
--| nothing
input --\ --> one output record
record --/ ==> some output records (array reference)
_________ ==> some output records (iterator)
/ \
In more formal terms, a I<tube> is a sub reference that:
=over
=item *
is able to take exactly one input argument (the I<input record>),
although it can receive more or less in different contexts (i.e. when
called outside of the I<tube> definition);
=item *
at each call in I<list context>, returns one of the following (every
call is a story apart, of course):
=over
=item *
I<nothing> (i.e. the empty list);
=item *
I<exactly one scalar>, representing the I<output record>;
=item *
the string C<records> followed by an I<array reference>, containing the
sequence of I<output records>;
=item *
the string I<iterator> followed by a I<sub reference>, from where you
can draw the I<output records>.
=back
=back
The iterator has some additional constraints:
=over
=item *
when called in I<list context>, returns the empty list or exactly one
scalar value;
=item *
if it returns the empty list, it means that the iterator is exhausted
and it cannot emit any more I<output records>;
=item *
otherwise, it returns exactly one scalar, representing the I<next output
record>.
=back
So far so good with definitions, but let's recap: a I<record> is a
scalar, a I<tube> is a sub reference.
=head2 Typical Use Case
The typical use case for transforming data (e.g. in some ETL) can be
summarized as follows, at least for me:
=over
=item *
I<manage sources>
=item *
I<read> input data, chunking them into raw textual/binary
representations of a single record;
=item *
I<parse> read data, hoping it adheres to some structure, and generating
a structured version of it (typically a Perl hash or array reference,
holding the parsed data);
=item *
I<render> the output based on the input record, most of the time filling
a template with values read from the input;
=item *
I<write> the rendered text/binary data to some output channel.
=back
All the above operations have to be performed in sequence, and repeated
until the sources cannot emit any more data to be read.
L<Data::Tubes> helps you with managing the sequencing, make sure that
you exhaust your sources, and also provides tools for addressing each
step.
Note that your steps might be different. For example, you might want to
introduce an intermediate step between the parsing and the rendering, to
perform some data validation. You might want to perform a different
rendering based on what has been parsed, or validated. You might want to
select a different output channel based on what was parsed. So, although
it's a sequence, you might actually think your pipeline as possibly
dividing into multiple alternatives at some steps, using data that come
from any of the previous steps. L<Data::Tubes> tools help you with this
too!
In the following sections, we will gradually introduce the different
tools available from L<Data::Tubes> and show you how they can help you
address the use case above, or the variant you might have.
=head2 What You Need 99% Of The Times
In most of the cases, you will only need to use function C<pipeline>
from the main module L<Data::Tubes>. It allows you to define a sequence
of tubes, or things that can be turned into tubes, with minimal hassle.
To show you how to use it, we will replicate the behaviour of the
following simple program:
my @names = qw< Foo Bar Baz >;
for my $name (@names) {
print "Hey, $name!\n";
}
We can re-implement it with L<Data::Tubes> using much, much more code!
Here's some way to do it:
use Data::Tubes qw< pipeline >;
my @names = qw< Foo Bar Baz >;
pipeline(
sub { return records => $_[0] }, # will iterate over items
sub { return "Hey, $_[0]!\n" }, # returns the string
sub { print $_[0]; return }, # prints it, returns nothing
{ tap => 'sink' }, # makes sure the input is drained
)->(\@names);
Does not seem to be very exciting, huh? Whatever, it allows us to get
our feet wet with C<pipeline>:
=over
=item *
it returns a sub reference that behaves like a tube itself. The C<tap>
indication in the final options hash reference says that the output
should be thrown in the C<sink>, so this tube will always return
nothing, but it's ok;
=item *
it makes sure to get the output from a tube, and feed the following tube
accordingly. In particular, if a tube returns multiple records (e.g. the
first tube does this, as it returns C<records> and a reference to an
array), it takes care to iterate over all of them and feed them to the
following tube one by one;
=item *
it does not make any assumption as to the nature of the I<record> at
each step
=item *
thanks to the specific setting for C<tap>, it takes care to exhaust all
inputs and possible intermediate records that are generated.
=back
Of course, we might have decided that the rendering step was not needed
in our case, so we might have done something like this:
use Data::Tubes qw< pipeline >;
my @names = qw< Foo Bar Baz >;
pipeline(
sub { return records => $_[0] }, # will iterate over items
sub { print "Hey, $_[0]!\n" },
{ tap => 'sink' }, # makes sure the input is drained
)->(\@names);
It really depends on what you want to do. In general terms, it's still
useful to think the pipeline in terms of the L</Typical Use Case>,
because the toolset provided by L<Data::Tubes>' plugins usually provide
you only one of those steps but, again, it's up to you.
=head2 Managing Sources
The initial tube of a sequence is, arguably, your source. From a
phylosophical point of view, you can view it as a transformation of the
inputs you will eventually provide (as in the example we saw in the last
section), or as a self-contained tube that is able to conjure things out
of nothing. You decide.
If you happen to take your inputs from files, the toolbox provides you
with a few tools inside L<Data::Tubes::Plugin::Source>. One, in
particular, will be your friend most of the times: C<iterate_files>.
This function is a I<factory>, in the sense that you give it some
arguments and it returns you a tube-compliant sub reference. All it does
is to transform an input array reference containing file names into a
data structure that contains a filehandle suitable for reading from
those files.
Suppose you have your names in two files instead of an array:
$ cat mydata-01.txt
Foo
Bar
$ cat mydata-02.txt
Baz
you can do like this:
use Data::Tubes qw< pipeline >;
pipeline(
'Source::iterate_files',
sub {
my $fh = $_[0]->{source}{fh};
chomp(my @names = <$fh>);
return records => \@names;
} # read records from one source
sub { return "Hey, $_[0]!\n" }, # returns the string
sub { print $_[0]; return }, # prints it, returns nothing
{ tap => 'sink' }, # makes sure the input is drained
)->([qw< mydata-01.txt mydata-02.txt >]);
In other terms, you have substituted the input gathering process with
different tubes, while keeping the rest of the pipeline as it was
before.
We can notice one interesting thing about C<pipeline>: in addition to
I<real> tubes, i.e. sub references, it can accept simple strings as
well, that it will take care to automatically transform into tubes. In
this case, it first turns C<Source::iterate_files> into
C<Data::Tubes::Plugin::Source::iterate_files>, then loads the function
C<iterate_files> from the relevant module and used it as a factory to
generate the real tube. We will see later how we can pass additional
parameters to this factory functions.
There are also a few things that are interesting about C<iterate_files>
(you're encouraged to read the docs in L<Data::Tubes::Plugin::Source>,
of course):
=over
=item *
it gets as input array references, containing lists of file names. The
list might be empty or contain any number of files, of course
=item *
it takes care to open them for you, returning a series of records with
details about the file;
=item *
each output record is a hash reference, containing a sub-hash associated
to the key C<source>, that holds data about the file (like the filename,
a conventional name for the channel, and of course the filehandle in key
C<fh>).
=back
=head2 What Is A Record, Toolkit Style
The representation of the output record from C<iterate_files> explained
in the previous section does not come out of the blue: the whole plugins
toolkit, with very few exceptions, works under the assumption that a
record, in each step, is a hash reference where each tube takes and adds
data. In particular, The different components in the toolkit make the
following assumptions:
=over
=item *
L<Data::Tubes::Plugin::Source> takes input from an array reference, and
emits hash references with a sub-hash pointed by key C<source>;
=item *
L<Data::Tubes::Plugin::Reader> expect to receive a hash reference with
key C<source>, from where they take the C<fh> field, and populate field
C<raw> with whatever they read;
=item *
L<Data::Tubes::Plugin::Parser> receive a hash reference with key C<raw>,
and populate key C<structured>;
=item *
L<Data::Tubes::Plugin::Renderer> receive a hash reference with key
C<structured>, and populate key C<rendered>;
=item *
L<Data::Tubes::Plugin::Writer> receive a hash reference with key
C<rendered>, and do not populate anything more (although they pass the
record along as output).
=back
So, basically, whatever is emitted by one plugin type is good as input
for the I<following> plugin type:
Source Reader Parser Renderer Writer
| ^ | ^ | ^ | ^
| | | | | | | |
+-> source -+ +-> raw -+ +- structured -+ +- rendered -+
with the notable exception that each step actually receives I<all> the
sub-fields populated by the previous tubes, which can be used to
customize the behaviour depending on your actual use case.
This sequence can also be useful for you to know if you want to insert
some behaviour in between. We will see some examples later.
=head2 Reading
Remember the last example from L</Managing Sources>? Here's a refresher:
use Data::Tubes qw< pipeline >;
pipeline(
'Source::iterate_files',
sub {
my $fh = $_[0]->{source}{fh};
chomp(my @names = <$fh>);
return records => \@names;
} # read records from one source
sub { return "Hey, $_[0]!\n" }, # returns the string
sub { print $_[0]; return }, # prints it, returns nothing
{ tap => 'sink' }, # makes sure the input is drained
)->([qw< mydata-01.txt mydata-02.txt >]);
It turns out that you actually don't need to do the reading of the file
line by line yourself, because there's a plugin to do that. The only
thing that we have to consider is that the read line will be put in the
C<raw> field of a hash-based record:
use Data::Tubes qw< pipeline >;
pipeline(
'Source::iterate_files',
'Reader::by_line',
sub { return "Hey, $_[0]->{raw}!\n" }, # returns the string
sub { print $_[0]; return }, # prints it, returns nothing
{ tap => 'sink' }, # makes sure the input is drained
)->([qw< mydata-01.txt mydata-02.txt >]);
Your eagle eye will surely have noticed that we got rid of the initial
plus sign before the name of the plugin. If your plugin lives directly
under the C<Data::Tubes::Plugin> namespace, this is fine (although, if
you go deeper, like C<Data::Tubes::Plugin::Foo::Bar::Baz>, you will need
to put the plus sign to get rid of the initial part until C<Plugin>
included).
A plugin to read by line might seem overkill, but it already started
sparing us a few lines of code, and I guess there are plenty of
occasions where your input records are line-based. You're invited to
take a look at L<Data::Tubes::Plugin::Reader>, where you might find
C<by_paragraph>, that reads... by the paragraph, and other reading
functions.
=head2 Passing Options To Plugins
All of a sudden, your greetings applications starts to choke and you
eventually figure that it depends on the encoding of the input file. In
particular, you discover that C<iterate_files> defaults on opening files
as C<UTF-8>, which is fine per-se, but when you print things out you get
strange messages and unfortunately your boss stops you from setting the
same encoding on STDOUT.
Don't despair! You have a few arrows available. The first one is to just
turn the input filehandle back to C<:raw>, like this:
use Data::Tubes qw< pipeline >;
pipeline(
'Source::iterate_files',
sub { binmode $_[0]->{source}{fh}, ':raw'; return $_[0]; },
'Reader::by_line',
sub { return "Hey, $_[0]->{raw}!\n" }, # returns the string
sub { print $_[0]; return }, # prints it, returns nothing
{ tap => 'sink' }, # makes sure the input is drained
)->([qw< mydata-01.txt mydata-02.txt >]);
The second one is to avoid setting the encoding in C<iterate_files> in
the first place, which can be obtained by passing options to the
factory. This is obtained by substituting the simple string with an
array reference, where the first item is the same as the string (i.e. a
I<locator> for the factory function), and the following ones are
arguments for the factory itself:
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
sub { return "Hey, $_[0]->{raw}!\n" }, # returns the string
sub { print $_[0]; return }, # prints it, returns nothing
{ tap => 'sink' }, # makes sure the input is drained
)->([qw< mydata-01.txt mydata-02.txt >]);
=head2 Parsing
So far, we relied upon the assumption that the whole input line is what
we are really after, and we don't need to parse it any more. Alas, this
is not the case most of the times.
If you have some complicated format, your best option is to just code a
tube to deal with it. For example, the following code would turn a
paragraph with HTTP headers into a hash of arrays:
sub parse_HTTP_headers {
my $headers = shift;
$headers =~ s{\n\s+}{ }gmxs; # remove multi-lines
my %retval;
for my $line (split /\n/mxs, $headers) {
my ($name, $value) = split /\s*:\s*/, $line, 2;
s{\A\s+|\s+\z}mxs for $name, $value; # lead/trail spaces
if (! exists $retval{$name}) {
$retval{$name} = $value;
}
else {
$retval{$name} = [$retval{$name}] # turn into array ref
unless ref $retval{$name}; # if necessary
push @{$retval{$name}}, $value;
}
}
return \%retval;
}
Now, suppose your input changes to a sequence of header groups, divided
into paragraphs, where you look for header C<X-Name>:
$ cat mydata-03.txt
X-Name: Foo
Host: example.com
X-Name: Bar
Barious
Date: 1970-01-01
Adapting to this input is quite easy now:
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_paragraph', # change how we read!
sub { parse_HTTP_Headers($_[0]->{raw}) } # wrap parse_HTTP_headers
sub { return "Hey, $_[0]->{'X-Name'}!\n" }, # use new field
sub { print $_[0]; return }, # prints it, returns nothing
{ tap => 'sink' }, # makes sure the input is drained
)->([qw< mydata-03.txt >]);
From now on, anyway, we will stick to the convention described in
I<What Is A Record, Toolkit Style> about what's available at the
different stages, which allows us have stable inputs for tubes without
having to worry too much about what we have before (within certain
limits). Hence, here's how our example transforms:
use Data::Tubes qw< pipeline >;
pipeline(
# Source management
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
# Reading, gets `source`, puts `raw`
'Reader::by_paragraph',
# Parsing, gets `raw`, puts `structured`
sub {
my $record = shift;
$record->{structured} = parse_HTTP_Headers($record->{raw});
return $record;
}
# Rendering, gets `structured`, puts `rendered`
sub {
my $record = shift;
$record->{rendered} =
"Hey, $record->{structured}{'X-Name'}!\n";
return $record;
},
# Printing, gets `rendered`, returns input unchanged
sub { print $_[0]{rendered}; return $_[0]; },
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-03.txt >]);
As anticipated, L<Data::Tubes::Plugin::Parser> contains some pre-canned
factories for generating common parsers, mostly geared to line-oriented
inputs (which can be quite common, anyway). So, if your input is as
simple as a sequence of fields separated by a character, without
anything fancy like quoting or escaping, you can simply rely on a
I<format>. For example, suppose you have the following data, with a name
(that we will assume does not contain a semicolon inside), the nickname
(ditto) and the age:
$ cat mydata-04.txt
Foo;foorious;32
Bar;barious;19
Baz;bazzecolas;44
You might describe each line as being C<name;nick;age>, and this is
exactly what's needed to use C<by_format>:
use Data::Tubes qw< pipeline >;
pipeline(
# Source management
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
# Reading, gets `source`, puts `raw`
'Reader::by_line',
# Parsing, gets `raw`, puts `structured`
['Parser::by_format', format => 'name;nick;age'],
# Rendering, gets `structured`, puts `rendered`
sub {
my $record = shift;
my $v = $record->{structured};
$record->{rendered} =
"Hey, $v->{name} (alias $v->{nick}), it's $v->{age}!\n";
return $record;
},
# Printing, gets `rendered`, returns input unchanged
sub { print $_[0]{rendered}; return $_[0]; },
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
Actually, any sequence of non-word characters (Perl-wise) is considered
a separator in the format, and any sequence of word characters is
considered the name of a field.
Another useful pre-canned parser in the toolkit is C<hashy>, that allows
you to handle something more complicated like sequences of key-value
pairs. The assumption here is that there are two separators: one for
separating key-value pairs, one for separating the key from the value.
Here's another example, assuming that the pipe character separates
pairs, and the equal sign separates the key from the value:
$ cat mydata-05.txt
name=Foo Foorious|nick=foorious|age=32
name=Bar Barious|age=19|nick=barious
age=44|nick=bazzecolas|name=Baz Bazzecolas
As you see, explicit naming of fields allows you to put them in any
order inside the input:
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
# Parsing, gets `raw`, puts `structured`
['Parser::hashy', chunks_separator => '|',
key_value_separator = '='],
# Rendering, gets `structured`, puts `rendered`
sub {
my $record = shift;
my $v = $record->{structured};
$record->{rendered} =
"Hey, $v->{name} (alias $v->{nick}), it's $v->{age}!\n";
return $record;
},
# Printing, gets `rendered`, returns input unchanged
sub { print $_[0]{rendered}; return $_[0]; },
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-05.txt >]);
C<hashy> also allows setting a I<default key>, in case none is found for
a pair, so that you can have something like this if your lines are
I<indexed> by nick:
$ cat mydata-06.txt
foorious|name=Foo Foorious|age=32
barious|name=Bar Barious|age=19
bazzecolas|age=44|name=Baz Bazzecolas
Ordering in this case is purely incidental, again the un-keyed element
can occur anywhere. The transformation is easy:
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_paragraph',
# Parsing, gets `raw`, puts `structured`
['Parser::hashy', default_key => 'nick',
chunks_separator => '|', key_value_separator = '='],
# Rendering, gets `structured`, puts `rendered`
sub {
my $record = shift;
my $v = $record->{structured};
$record->{rendered} =
"Hey, $v->{name} (alias $v->{nick}), it's $v->{age}!\n";
return $record;
},
# Printing, gets `rendered`, returns input unchanged
sub { print $_[0]{rendered}; return $_[0]; },
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-06.txt >]);
There are more to discover, take a look at
L<Data::Tubes::Plugin::Parser>.
=head2 Rendering
If you have to render a very simple string like the salutation we saw so
far, the simple system we used so far is quite effective. If your output
gets any more complicated, chances are you can benefit from using a
template. The plugin L<Data::Tubes::Plugin::Renderer> provides you a
factory to use templates built for L<Template::Perlish>, let's see how.
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
# Rendering, gets `structured`, puts `rendered`
['Renderer::with_template_perlish', format => <<'END' ],
Hey [% name %]!
... or should I call you [% nick %]?
It's your birthday, you're [% age %] now!
END
# Printing, gets `rendered`, returns input unchanged
sub { print $_[0]{rendered}; return $_[0]; },
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
As long as you only have I<simple> variables, L<Template::Perlish>
behaves much like the famous L<Template> toolkit. Anything more
complicated leads you to using Perl, anyway.
The same sequence can of course be used to render the input data in some
other format, e.g. as YAML as in the following example (we're ignoring
the need to do any escaping, of course):
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
# Rendering, gets `structured`, puts `rendered`
['Renderer::with_template_perlish', format => <<'END' ],
-
name: [% name %]
nick: [% nick %]
age: [% age %]
END
# Printing, gets `rendered`, returns input unchanged
sub { print $_[0]{rendered}; return $_[0]; },
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
We're putting the object element inside an array, so that the sequence
will print out smoothly as an overall YAML file.
=head2 Writing
The last step in our typical pipeline is writing out stuff. So far, we
just printed things out on STDOUT, but by no means we're limited to
this! Let's take a look at L<Data::Tubes::Plugin::Writer>.
The first tool that can help us is C<write_to_files>, that allows us to
transform our pipeline like this (without changing the behaviour):
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
['Renderer::with_template_perlish', format => <<'END' ],
-
name: [% name %]
nick: [% nick %]
age: [% age %]
END
# Printing, gets `rendered`, returns input unchanged
['Writer::to_files', filename => \*STDOUT],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
From here, it's easy to deduce that you can pass other things as
C<filename>, for example... a filename!
=head3 Framing records
The writer tools rely upon L<Data::Tubes::Util::Output>, that is a smart
wrapper that allows you to handle multiple cases. For example, suppose
that instead of YAML you want to output JSON; you might start with this:
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
['Renderer::with_template_perlish', format => <<'END' ],
{"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}
END
# Printing, gets `rendered`, returns input unchanged
['Writer::to_files', filename => \*STDOUT],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
There's a problem though: for more than one input record, the output is
not valid JSON:
{"name":"Foo","nick":"foorious","age":32}
{"name":"Bar","nick":"barious","age":19}
{"name":"Baz","nick":"bazzecolas","age":44}
We should put this in an array, and we should separate the objects with
a comma. The first thing might be naively solved like this:
use Data::Tubes qw< pipeline >;
# DO NOT USE THIS SOLUTION!
print "[\n";
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
['Renderer::with_template_perlish', format => <<'END' ],
{"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}
END
# Printing, gets `rendered`, returns input unchanged
['Writer::to_files', filename => \*STDOUT],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
# DO NOT USE THIS SOLUTION!
print "]\n"
This has two problems:
=over
=item *
first of all, it does not allow you to change the output channel. What
if you are writing to a file instead?
=item *
you still have to figure out how to print out the separator comma!
=back
Unfortunately, JSON is quite picky in the presence of separator commas,
in that it does not allow you to have a trailing one, so the following
would be incorrect:
[
{"name":"Foo","nick":"foorious","age":32},
{"name":"Bar","nick":"barious","age":19},
{"name":"Baz","nick":"bazzecolas","age":44},
]
because the last comma would be out of place.
Fortunately, C<to_files> can help you:
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
['Renderer::with_template_perlish', format => <<'END' ],
{"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}
END
# Printing, gets `rendered`, returns input unchanged
['Writer::to_files', filename => \*STDOUT,
header => "[\n", footer => "]\n", interlude => ','
],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
The above code produces:
[
{"name":"Foo","nick":"foorious","age":32}
,{"name":"Bar","nick":"barious","age":19}
,{"name":"Baz","nick":"bazzecolas","age":44}
]
that is probably not very good-looking, but at least correct and also
working correctly whatever the output filename you set. You can change
things to make it better looking, though; just get rid of the newline in
the template, add it after the comma and put some indentation:
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
['Renderer::with_template_perlish', format =>
' {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],
# Printing, gets `rendered`, returns input unchanged
['Writer::to_files', filename => \*STDOUT,
header => "[\n", footer => "\n]\n", interlude => ",\n"
],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
This now prints:
[
{"name":"Foo","nick":"foorious","age":32},
{"name":"Bar","nick":"barious","age":19},
{"name":"Baz","nick":"bazzecolas","age":44}
]
which you might like more.
=head3 Segmenting the output
If you're handling I<a lot> of input records, you might want to segment
the output in order to distribute the output records into multiple
files, instead of having only one single file. It turns out that
C<to_files> gets you covered also in this case!
The basic thing that you can do is to set a I<policy> object, where you
can set two keys: C<records_threshold> and C<characters_threshold>. They
set a threshold that will close the output channel when overcome, and
open a new one. We will assume that we're writing to files here:
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
['Renderer::with_template_perlish', format =>
' {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],
# Printing, gets `rendered`, returns input unchanged
['Writer::to_files', filename => 'output-01.json',
header => "[\n", footer => "\n]\n", interlude => ",\n",
policy => {records_threshold => 2},
],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
The example above produces two output files:
$ cat output-01.json
[
{"name":"Foo","nick":"foorious","age":32},
{"name":"Bar","nick":"barious","age":19}
]
$ cat output-01.json_1
[
{"name":"Baz","nick":"bazzecolas","age":44}
]
Again, valid JSON files with the correct number of (maximum) records.
You might have some issues with the file names, though, especially if
you rely on the file extension to do... anything.
C<to_files> allows you to set a filename template, using C<sprintf>-like
sequences using C<%n>, like this:
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
['Renderer::with_template_perlish', format =>
' {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],
# Printing, gets `rendered`, returns input unchanged
['Writer::to_files', filename => 'output-02-%03n.json',
header => "[\n", footer => "\n]\n", interlude => ",\n",
policy => {records_threshold => 2},
],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
The example above produces two output files:
$ cat output-02-000.json
[
{"name":"Foo","nick":"foorious","age":32},
{"name":"Bar","nick":"barious","age":19}
]
$ cat output-02-001.json
[
{"name":"Baz","nick":"bazzecolas","age":44}
]
This should make you happy, at least!
=head3 Output encoding
Last thing you need to know about C<to_files> is that you can set the
encoding too, just set a C<CORE::binmode> compatible string using the
C<binmode> argument, that is set to C<:encoding(UTF-8)> by default.
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
['Renderer::with_template_perlish', format =>
' {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],
# Printing, gets `rendered`, returns input unchanged
['Writer::to_files', filename => 'output-02-%03n.json',
header => "[\n", footer => "\n]\n", interlude => ",\n",
policy => {records_threshold => 2},
binmode => ':raw',
],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
Now, you have full control over your input. Or have you?
=head2 Writing, Reloaded
In the previous section about L</Writing> we saw that there's a very
flexible tool C<to_files>. Is it sufficient to get you covered in all
cases? Arguably not.
Suppose that you want to divide your outputs in two groups, one with
people with nicknames starting with letter C<a> to C<m>, another with
the rest. How do you do this?
=head3 Dispatching manually
One interesting thing about the toolkit is that you can use its function
outside of C<pipeline>, if you need to. The C<summon> function helps you
import the right function with minimal hassle:
use Data::Tubes qw< pipeline summon >;
my $writer_factory = summon('Writer::to_files');
Now, for example, you can do like this:
use Data::Tubes qw< pipeline summon >;
# pre-define two output channels, for lower and other initial chars
summon('Writer::to_files');
my $lower = to_files(
filename => 'output-lower-%02d.json',
header => "[\n", footer => "\n]\n", interlude => ",\n",
policy => {records_threshold => 2},
binmode => ':raw',
);
my $other = to_files(
filename => 'output-other-%02d.json',
header => "[\n", footer => "\n]\n", interlude => ",\n",
policy => {records_threshold => 2},
binmode => ':raw',
);
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
['Renderer::with_template_perlish', format =>
' {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],
# Printing, gets `rendered`, returns input unchanged
sub { # wrapper!
my $record = shift;
my $first_char = substr $record->{structured}{nick}, 0, 1;
return $lower->($record) if $first_char =~ m{[a-m]}mxs;
return $other->($record);
},
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
If you have some complicated business logic... you can always use this
technique! But there's more... read on.
=head3 Dispatching, the movie
Considering that dispatching can be quite common, you can guess that
there's something in the toolkit to get you covered. You guessed right!
L<Data::Tubes::Plugin::Writer> provides you C<dispatch_to_files>, that
helps you streamline what we saw in the previous section. Here's how:
use Data::Tubes qw< pipeline >;
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
['Renderer::with_template_perlish', format =>
' {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],
# Printing, gets `rendered`, returns input unchanged
['Writer::dispatch_to_files',
header => "[\n", footer => "\n]\n", interlude => ",\n",
policy => {records_threshold => 2},
filename_template => 'output-[% key %]-%03n.json',
selector => sub {
my $record = shift;
my $first_char = substr $record->{structured}{nick}, 0, 1;
return 'lower' if $first_char =~ m{[a-m]}mxs;
return 'other';
},
],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
Most parameters are the same as C<to_files>, so we already know about
them. We have two new ones, though: C<filename_template> and
C<selector>.
The latter (C<selector>) is a sub reference that receives the record as
input, and is supposed to provide a I<key> back. Whenever this key is
the same, the output channel chosen by the dispatcher will be the same.
In this case, we are outputting two strings, namely C<lower> and
C<other>.
The C<filename_template> is an extension on parameter C<filename>, that
allows you to put additional things in the C<filename> that is
eventually passed to C<to_files>. As you might have guessed already,
it's a L<Template::Perlish>-compatible template, where you can expand
the variable C<key>. So:
=over
=item *
if the C<selector> returns C<lower>, the C<filename_template> is
expanded into C<output-lower-%03n.json>;
=item *
if the C<selector> returns C<other>, the C<filename_template> is
expanded into C<output-other-%03n.json>;
=back
Nifty, huh?
=head2 Dispatching, Reloaded
In L</Dispatching, the movie> we saw that you can dispatch to the right
output channel depending on what's inside each single record. The
dispatching technique can be applied to other stages, though, if you are
brave enough to look at C<dispatch> in L<Data::Tubes::Plugin::Plumbing>.
=head3 General dispatching
For example, suppose that you want to divide your input records stream
into two different flows, one for records that are I<good>, one for the
I<bad> (e.g. records that do not parse correctly, or do not adhere to
some additional validation rules).
In the example below, we will assume that nicknames starting with any
letter are good, and bad otherwise. We still want to do some rendering
for the bad ones, though, because we want to write out an error file.
use Data::Tubes qw< pipeline summon >;
summon('Renderer::with_template_perlish');
my $render_good = with_template_perlish(format =>
' {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],);
# the "bad" renderer just complains about the nickname
my $render_bad = with_template_perlish(format =>
' {"nick":"[% nick %]";"error":"invalid"}'],);
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
# let's put an intermediate step to "classify" the record
sub {
my $record;
my $first_char = substr $record->{structured}{nick}, 0, 1;
$record->{class} = ($first_char =~ m{[a-m]}mxs) ? 'lower'
:($first_char =~ m{[a-z]}imxs) ? 'other'
: 'error';
return $record;
}
# Rendering is wrapped by dispatch here
['Plumbing::dispatch', key => 'class',
factory => sub {
my $key = shift;
return $render_bad if $key eq 'error';
return $render_good;
}
]
# Printing, gets `rendered`, returns input unchanged
['Writer::dispatch_to_files',
header => "[\n", footer => "\n]\n", interlude => ",\n",
policy => {records_threshold => 2},
filename_template => 'output-[% key %]-%03n.json',
key => 'class',
],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
The C<dispatcher> is based on two steps: one is the I<selection>, the
other one is the I<generation>.
As we saw previously, the I<selection> process is about getting a key
that allows the dispatcher to figure out what channel to use. In
L</Dispatching, the movie> we saw that we can put a C<selector> key in
the arguments, but if you already have your key in the record you can
just set a C<key> argument. In this example, we're doing this
classification immediately after the parse phase, so from that point on
we have a C<class> key inside the record, that we can use (and we do,
both in C<dispatch> and in C<dispatch_to_files>).
In case the dispatcher does not (yet) know which tube is associated to a
given string returned by the I<selector>, it's time for some
I<generation>. C<dispatch_to_files> already knows how to generate files
(although you can override this), and is fine with C<filename_template>;
on the other hand, the generic C<dispatch> needs to know something more,
which is why we're using C<factory> here.
The C<factory> in a dispatcher allows you to receive the key returned by
the selector (and also the whole record, should you need it) and return
a I<tube> back. Guess what? That tube will be associated to that key
from now on!
=head3 Pre-loading the cache
If you already have your downstream tubes available (as in our case),
you can pre-load the cache and avoid coding the factory completely:
use Data::Tubes qw< pipeline summon >;
summon('Renderer::with_template_perlish');
my $render_good = with_template_perlish(format =>
' {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],);
# the "bad" renderer just complains about the nickname
my $render_bad = with_template_perlish(format =>
' {"nick":"[% nick %]";"error":"invalid"}'],);
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
# let's put an intermediate step to "classify" the record
sub {
my $record;
my $first_char = substr $record->{structured}{nick}, 0, 1;
$record->{class} = ($first_char =~ m{[a-m]}mxs) ? 'lower'
:($first_char =~ m{[a-z]}imxs) ? 'other'
: 'error';
return $record;
}
# Rendering is wrapped by dispatch here
['Plumbing::dispatch', key => 'class',
handlers => {
lower => $render_good,
other => $render_good,
error => $render_bad,
},
]
# Printing, gets `rendered`, returns input unchanged
['Writer::dispatch_to_files',
header => "[\n", footer => "\n]\n", interlude => ",\n",
policy => {records_threshold => 2},
filename_template => 'output-[% key %]-%03n.json',
key => 'class',
],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
=head3 Dispatching, TIMTOWTDI
One drawback of the technique we saw in the previous sections about
dispatching is that, as a matter of fact, we have two dispatching
happening at two different times, i.e. at rendering and at writing. Many
times this might be what you actually need, but in our example it
actually limited us a bit, because it's somehow assuming that we want to
report errors as JSON structures, which is a bit overkill.
One alternative is to realize that the dispatcher's factory, and the
C<filename_template> expansion is no exception, also receives the whole
record in addition to the key. Hence, we might modify the pipeline as
follows:
use Data::Tubes qw< pipeline summon >;
summon('Renderer::with_template_perlish');
my $render_good = with_template_perlish(format =>
' {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}');
# the "bad" renderer just complains about the nickname
my $render_bad = with_template_perlish(format =>
' {"nick":"[% nick %]";"error":"invalid"}');
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
# let's put an intermediate step to "classify" the record
sub {
my $record;
my $first_char = substr $record->{structured}{nick}, 0, 1;
$record->{class} = ($first_char =~ m{[a-m]}mxs) ? 'lower'
:($first_char =~ m{[a-z]}imxs) ? 'other'
: 'error';
$record->{format} =
($record->{class} eq 'error') ? 'txt' : 'json';
return $record;
}
# Rendering is wrapped by dispatch here
['Plumbing::dispatch', key => 'class',
handlers => {
lower => $render_good,
other => $render_good,
error => $render_bad,
},
]
# Printing, gets `rendered`, returns input unchanged
['Writer::dispatch_to_files',
header => "[\n", footer => "\n]\n", interlude => ",\n",
policy => {records_threshold => 2},
filename_template =>
'output-[% key %]-%03n.[% record.format %]',
key => 'class',
],
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
=head3 Sequence
In the previous section we solved our problem, but the solution might
still be considered a bit clunky. What if we need to have different
intermediate processing steps, depending on the specific record? For
example, we might want to avoid processing wrong records too much, but
do some additional mangling on good ones.
Enter C<sequence> from C<Data::Tubes::Plugin::Plumbing>. This function
is similar to C<pipeline> --as a matter of fact, C<pipeline> uses it
behind the scenes, and returns it if you don't provide any C<tap>.
Hence, you can use dispatch to divide your flow across different
sequences, each with its own processing. Let's see how.
use Data::Tubes qw< pipeline >;
my $good = pipeline(
['Renderer::with_template_perlish', format =>
' {"name":"[% name %];"nick":"[% nick %]";"age":[% age %]}'],
['Writer::dispatch_to_files',
header => "[\n", footer => "\n]\n", interlude => ",\n",
policy => {records_threshold => 2},
filename_template => 'output-[% key %]-%03n.json',
key => 'class',
],
); # note... no tap here!
my $bad = pipeline(
sub { $_[0]{error}{message} = $_[0]{raw} },
['Renderer::with_template_perlish', format => "[% message %]\n",
input => 'error',
],
['Writer:to_files', filename => 'ouput-exception-%t.txt']
); # note.. no tap here!
pipeline(
['Source::iterate_files', open_file_args => {binmode => ':raw'}],
'Reader::by_line',
['Parser::by_format', format => 'name;nick;age'],
sub { # classification of input record
my $record;
my $first_char = substr $record->{structured}{nick}, 0, 1;
$record->{class} = ($first_char =~ m{[a-m]}mxs) ? 'lower'
:($first_char =~ m{[a-z]}imxs) ? 'other'
: 'error';
return $record;
}
# Further processing depends on class
['Plumbing::dispatch', key => 'class',
handlers => {
lower => $good,
other => $good,
error => $bad,
},
]
# Options, just flush the output to the sink
{ tap => 'sink' },
)->([qw< mydata-04.txt >]);
There are a few things going on here, let's take a look.
The first thing that pops out is that the renderer in C<$bad> is using a
new parameter I<input>, set to C<error>. Quite suspiciously, the
renderer is preceded by a small tube that populates an C<error> field in
the record... is this a coincidence?
It turns out that L</What Is A Record, Toolkit Style> did not tell us
the whole story: all the tools in the toolkit can take a different
I<input> and produce a different I<output>, all you have to do is to
specify the relevant key in the arguments to the factory function. So,
L</What Is A Record, Toolkit Style> actually describes the default
values for these parameters.
Of course, we might have just added C<message> to the sub-hash
C<structured>, but that would have been sort of cheating, wouldn't it?
The second thing is that, as anticipated, we managed to create two
different tracks for the input records, where the C<Plumbing::dispatch>
does the split of the records stream across them. This allows each of
the sub-tube to be independent of each other (there are two here, they
might be many more of course). Note that C<$good> and C<$bad> are
created using C<pipeline> (so that we avoid C<summon>ing
C<Plumbing::sequence> and shave off a few characters), taking care to
I<avoid> setting a C<tap>, otherwise we wouldn't get a tube back!
=head2 Process In Peace
Alas, we have come to the end of our journey through L<Data::Tubes>.
There's much more to discover in the manual pages listed at the
beginning, so be sure to check them out.
If you want to contribute, L<Data::Tubes> is on GitHub at
L<https://github.com/polettix/Data-Tubes>. One way to contribute might
be releasing your own plugins... e.g. if you prefer to use L<Template>
instead of L<Template::Perlish>!
=cut