Group
Extension

Message-Passing-Output-MongoDB/lib/Message/Passing/Output/MongoDB.pm

package Message::Passing::Output::MongoDB;

# ABSTRACT: Module for Message::Passing to send log to mongodb

use Moose;
use MongoDB;
use AnyEvent;
use Scalar::Util qw/ weaken /;
use MooseX::Types::Moose qw/ ArrayRef HashRef Str Bool Int Num /;
use Moose::Util::TypeConstraints;
use aliased 'DateTime' => 'DT';
use MooseX::Types::ISO8601 qw/ ISO8601DateTimeStr /;
use Data::Dumper;
use Tie::IxHash;
use namespace::autoclean;

our $VERSION = '0.052';
$VERSION = eval $VERSION;

with qw/
    Message::Passing::Role::Output
    Message::Passing::Role::HasUsernameAndPassword
/;

has connection_options => (
    is => 'ro',
    isa => 'HashRef',
    required => 1,
);

has '+password' => (
    required => 0,
);

has '+username' => (
    required => 0,
);

has database => (
    isa => Str,
    is => 'ro',
    required => 1,
);

has _client  => (
    is  => 'ro',
    isa => 'MongoDB::MongoClient',
    lazy => 1,
    default => sub {
        my $self = shift;
        MongoDB::MongoClient->new( 
            $self->connection_options
        );
    },
);

has _db => (
    is => 'ro',
    isa => 'MongoDB::Database',
    lazy => 1,
    default => sub {
        my $self = shift;
        my $client = $self->_client;

        my $database = $self->database;
        if (defined $self->username) {
            $client->authenticate($database, $self->username, $self->password)
            or die "MongoDB authentication failure";
        }

        return $client->get_database($self->database);
    },
);

has collection => (
    isa => Str,
    is => 'ro',
    required => 1,
);

has _collection_of_day => (
    is       => 'rw',
    isa      => 'MongoDB::Collection',
    lazy => 1,
    builder => '_build_collection_of_day',
);

sub _build_collection_of_day {
    my ($self) = @_;

    my $dt = DateTime->now;
    my $collection_name_by_date = $self->collection .'_'. $dt->strftime('%Y%m%d');

    return $self->_db->get_collection($collection_name_by_date);
}

sub _ensure_collection_indexes {
    my ($self, $collection) = @_;
    $collection //= $self->_collection_of_day;

    if ($self->_has_indexes) {
        foreach my $index (@{$self->indexes}){
            $collection->ensure_index(@$index);
            warn("ensure index " . Dumper($index)) if $self->verbose;
        }
    }

    return $collection;
}

has _collection_of_day_name  => (
    is => 'rw',
    isa => Str,
);

sub _get_collection_by_date {
    my ($self, $dt) = @_;
    
    my $collection_name_by_date = $self->collection .'_'. $dt->strftime('%Y%m%d');
    # a new collection 
    if (!defined $self->_collection_of_day_name || $self->_collection_of_day_name ne $collection_name_by_date) {
        $self->_flush;
        my $collection_by_date = $self->_db->get_collection($collection_name_by_date);
        $self->_ensure_collection_indexes($collection_by_date);
        $self->_collection_of_day 
            and $self->_remove_expired_collection();
        $self->_collection_of_day($collection_by_date);
        $self->_collection_of_day_name($collection_name_by_date)
    }

    return $collection_name_by_date;
}

sub _remove_expired_collection{
    my ($self) = @_;
    
    my $retention_date = DT->now()->subtract(days => $self->retention );
    my $expired_collection_name = $self->collection .'_'. $retention_date->strftime('%Y%m%d');

    $self->_db->get_collection($expired_collection_name)->drop; 
}

sub _default_port { 27017 }

has _log_counter => (
    traits  => ['Counter'],
    is => 'rw',
    isa => Int,
    default => sub {0},
    handles => { _inc_log_counter => 'inc', },
);

has verbose => (
    isa => 'Bool',
    is => 'ro',
    default => sub {
        -t STDIN
    },
);

sub consume {
    my ($self, $data) = @_;
     return unless $data;
    
    my $date;
    if (my $epochtime = delete($data->{epochtime})) {
        $date = DT->from_epoch(epoch => $epochtime);
        delete($data->{date});
    }
    elsif (my $try_date = delete($data->{date})) {
        if (is_ISO8601DateTimeStr($try_date)) {
            $date = to_DateTime($try_date);
        }
    }
    $date ||= DT->from_epoch(epoch => time());
    
    my $collection = $self->_get_collection_by_date($date);

    push (@{$self->queue}, $data);

    if (scalar(@{$self->queue}) > 1000) {
        $self->_flush;
    }

    #$collection->insert($data)
        #or warn "Insertion failure: " . Dumper($data) . "\n";
    if ($self->verbose) {
        $self->_inc_log_counter;
        warn("Total " . $self->_log_counter . " records inserted in MongoDB\n");
    }
}

sub _flush {
    my $self = shift;
    weaken($self);
    return if $self->_am_flushing;
    my $queue = $self->queue;
    return unless scalar @$queue;
    $self->_am_flushing(1);

    eval {
        $self->_collection_of_day->batch_insert($queue);
        1;
    } or do {
        $self->_client->connect();
        warn("Failed to do the insertion of logs. \n");
    };
    $self->_clear_queue;
    $self->_am_flushing(0);
}

has queue => (
    is => 'ro',
    isa => ArrayRef,
    default => sub { [] },
    init_arg => undef,
    lazy => 1,
    clearer => '_clear_queue',
);

has _am_flushing => (
    isa => Bool,
    is => 'rw',
    default => 0,
);

has _flush_timer => (
    is => 'ro',
    lazy => 1,
    builder => '_build_flush_time',
);

sub _build_flush_time {
    my $self = shift;
    weaken($self);
    AnyEvent->timer(
        after => 10,
        interval => 10,
        cb => sub { $self->_flush },
    );
}

has indexes => (
    isa => ArrayRef[ArrayRef[HashRef]],
    is => 'ro',
    predicate => '_has_indexes',
);

has retention => (
    is => 'ro',
    isa => Num,
    default => 7, # days
    documentation => 'Int, days to retent log, set 0 to always keep log',
);

has collect_fields => (
    isa => 'Bool',
    is => 'ro',
    default => 0,
);

has _observer => (
    is => 'ro',
    lazy => 1,
    builder => '_build_observer'
);

sub _build_observer {
    my $self = shift;
    weaken($self);
    my $retention_date = DT->now()->subtract(days => $self->retention);
    AnyEvent->timer(
        after => 30,
        interval => 24*3600,
        cb => sub {
        }
    );
}

sub BUILD {
    my ($self) = @_;
    $self->_flush_timer;
    $self->_observer
        if $self->collect_fields;
}

1;

=head1 NAME

Message::Passing::Output::MongoDB - message-passing out put to MongoDB

=head1 SYNOPSIS

    message-pass --input STDIN 
      --output MongoDB --output_options '{ "database":"log_database", "collection":"logs"}'
    
    {"foo":"bar"}

=head1 DESCRIPTION

Module for L<Message::Passing>, send output to MongoDB

=head1 METHODS

=over

=item consume

Consumes a message by JSON encoding it save it in MongoDB

=back

=head1 ATTRIBUTES

=over

=item database

Required, Str, the database to use.

=item collection

Required, Str, the collection to use.

=item connection_options

HashRef, takes any options as MongoDB::MongoClient->new(\%options) do

=item username

Str, mongodb authentication user

=item password

Str, mongodb authentication password

=item indexes

ArrayRef[ArrayRef[HashRef]], mongodb indexes

    ...
    indexes => [
        [{"foo" => 1, "bar" => -1}, { unique => true }],
        [{"foo" => 1}],
    ]
    ...

=item collect_fields

Bool, default to 0, set to 1 to collect the fields' key and inserted in collection
$self->collection . "_keys", execution at the starting and once per day.

=item retention

Int, time in seconds to conserver logs, set 0 to keep it permanent, default is
a week

=item verbose

Boolean, verbose

=back

=head1 SEE ALSO

L<Message::Passing>

=head1 SPONSORSHIP

This module exists due to the wonderful people at Suretec Systems Ltd.
<http://www.suretecsystems.com/> who sponsored its development for its
VoIP division called SureVoIP <http://www.surevoip.co.uk/> for use with
the SureVoIP API - 
<http://www.surevoip.co.uk/support/wiki/api_documentation>

=head1 AUTHOR, COPYRIGHT AND LICENSE

See L<Message::Passing>.

=cut



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.