Group
Extension

App-RecordStream/lib/App/RecordStream/InputStream.pm

package App::RecordStream::InputStream;

=head1 NAME

App::RecordStream::InputStream

=head1 AUTHOR

Benjamin Bernard <perlhacker@benjaminbernard.com>
Keith Amling <keith.amling@gmail.com>

=head1 DESCRIPTION

This module will generate an stream of App::RecordStream::Record objects for given inputs.

=head1 SYNOPSIS

  use App::RecordStream::InputStream;
  my $stream = App::RecordStream::InputStream(STRING => $recs_string);

  while ( my $record = $stream->get_record() ) {
    ... do stuff ...
  }

=head1 CONSTRUCTOR

=over 4

=item my $in = App::RecordStream::InputStream->new(OPTIONS);

The input stream takes named paramters, it will take one of: FILE, STRING, or FH
(a file handle).

  FILE   - Name of a file, must be readable
  STRING - String of new line separated records
  FH     - File handle to a stream of data

Optionally, it wil take a NEXT argument.  The NEXT argument should be another
InputStream object.  Once the returned object reaches the end of its string, it
will get records from the NEXT App::RecordStream::InputStream.  In this manner, InputStream
objects can be chained

returns an instance of InputStream

=item my $in = App::RecordStream::InputStream->new_magic()

Provides GNU-style input semantics for scripts.  If there are arguments left in
@ARGV, it will assume those are file names and make a set of chained streams
for those files, returning the first stream.  If no files are specified, will
open an InputStream on STDIN

=item my $in = App::RecordStream::InpustStream->new_from_files(FILES)

Takes an array of FILES and constructs a set of chained streams for those
files.  Returns the first stream

=back


=head1 PUBLIC METHODS

=over 4

=item my $record = $this->get_record();

Retrieve the next L<App::RecordStream::Record> from the stream.  Will return a false value
if no records are available.  If this stream has a NEXT stream specified in the
constructor, this will continue to return Record objects until all chained
streams are exhausted

=back

=cut

our $VERSION = "3.4";

use strict;
use warnings;

use IO::String;
use JSON qw(decode_json);

use App::RecordStream::Record;
require App::RecordStream::Operation;

my $ONE_OF = [qw(FH STRING FILE)];

my $ARGUMENTS = {
  FH     => 0,
  STRING => 0,
  FILE   => 0,
  NEXT   => 0,
};

sub new_magic {
  my $class = shift;
  my $files = shift || \@ARGV;

  if ( scalar @$files > 0 ) {
    return $class->new_from_files($files);
  }

  return $class->new(FH => \*STDIN);
}

sub new_from_files {
  my $class = shift;
  my $files = shift;

  my $last_stream;

  foreach my $file ( reverse @$files )  {
    unless ( -e $file && -r $file ) {
      die "File does not exist or is not readable: $file\n";
    }

    my $new_stream = $class->new(FILE => $file, NEXT => $last_stream);
    $last_stream   = $new_stream;
  }

  return $last_stream;
}

sub new {
  my $class = shift;
  my %args  = @_;

  my $this = {};

  foreach my $key (keys %$ARGUMENTS) {
    my $value = $args{$key};
    $this->{$key} = $value;

    if ( $ARGUMENTS->{$key} ) {
      die "Did not supply required argument: $key" unless ( $value );
    }
  }

  bless $this, $class;

  $this->_init();
  return $this;
}

sub _init {
  my $this = shift;

  my $found = {};

  foreach my $arg (@$ONE_OF) {
    if ( $this->{$arg} ) {
      $found->{$arg} = $this->{$arg};
    }
  }

  if ( scalar keys %$found > 1 ) {
    die "Must specify only one of " . join(' ', keys %$found);
  }

  unless ( scalar keys %$found == 1 ) {
    die "Must specify one of " . join(' ', @$ONE_OF);
  }

  if ( $this->get_string() ) {
    $this->{'FH'} = IO::String->new($this->get_string());
  }

  my $file = $this->get_file();
  if ( $file ) {
    open(my $fh, '<', $file) or die "Cannot open $file: $!";
    $this->{'FH'} = $fh;
  }
}

sub get_file {
  my $this = shift;
  return $this->{'FILE'};
}

sub get_string {
  my $this = shift;
  return $this->{'STRING'};
}

# Performance! :(
sub get_fh {
  return $_[0]->{'FH'};
}

sub get_record {
  my $this = shift;

  if ( $this->is_done() ) {
    return $this->call_next_record();
  }

  my $fh   = $this->get_fh();

  my $line   = <$fh>;

  if ( ! $line ) {
    close $fh;
    $this->set_done();

    # This is ugly, reaching into the other class
    App::RecordStream::Operation::set_current_filename($this->get_filename());

    return $this->call_next_record();
  }

  # Direct bless done in the name of performance
  my $record = decode_json($line);
  bless $record, 'App::RecordStream::Record';

  return $record;
}

sub call_next_record {
  my $this = shift;

  my $next = $this->get_next();

  unless ( $next ) {
    return undef;
  }

  # Prevent a deep recursion with many passed files
  if ( $next && $next->is_done() ) {
    $next = $next->get_next();
    $this->{'NEXT'} = $next;
  }

  return $next->get_record();
}

sub get_filename {
  my $this = shift;

  if ( ! $this->is_done() ) {
    return $this->get_file() if ( $this->get_file() );
    return 'STRING_INPUT' if ( $this->get_string() );
    return 'STREAM_INPUT' if ( $this->get_fh() );
    return 'UNKNOWN';
  }
  elsif ( $this->get_next() ) {
    return $this->get_next()->get_filename();
  }

}

sub get_next {
  my $this = shift;
  return $this->{'NEXT'};
}

sub is_done {
  return $_[0]->{'DONE'};
}

sub set_done {
  my $this = shift;
  $this->{'DONE'} = 1;
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.