Group
Extension

Lim-Plugin-Zonalizer/lib/Lim/Plugin/Zonalizer/Collector.pm

package Lim::Plugin::Zonalizer::Collector;

use common::sense;

use Carp;
use Scalar::Util qw(blessed);
use Moose;
with 'MooseX::Getopt';

use Zonemaster;
use Zonemaster::Util ();
use Zonemaster::Logger::Entry;

use Net::LDNS;
use JSON::XS;

my $can_use_threads = eval 'use threads; use Thread::Queue; 1';

=encoding utf8

=head1 NAME

Lim::Plugin::Zonalizer::Collector - Collector that runs Zonemaster in multiple
threads.

=head1 VERSION

See L<Lim::Plugin::Zonalizer> for version.

=cut

our $VERSION = $Lim::Plugin::Zonalizer::VERSION;

our %numeric = Zonemaster::Logger::Entry->levels;

=head1 SYNOPSIS

  use Lim::Plugin::Zonalizer::Collector;

  Lim::Plugin::Zonalizer::Collector->new_with_options->run;

=head1 METHODS

=over 4

=item config

Name of configuration file to load.

=cut

has 'config' => (
    is            => 'ro',
    isa           => 'Str',
    required      => 0,
    documentation => 'Name of configuration file to load.',
);

=item policy

Name of policy file to load.

=cut

has 'policy' => (
    is            => 'ro',
    isa           => 'Str',
    required      => 0,
    documentation => 'Name of policy file to load.',
);

=item sourceaddr

Local IP address that the test engine should try to send its requests from.

=cut

has 'sourceaddr' => (
    is            => 'ro',
    isa           => 'Maybe[Str]',
    required      => 0,
    default       => undef,
    documentation => 'Local IP address that the test engine should try to send its requests from.',
);

=item threads

Number of threads to start.

=cut

has 'threads' => (
    is            => 'ro',
    isa           => 'Num',
    required      => 0,
    default       => 5,
    documentation => 'Number of threads to start.',
);

=item debug

Send debug information to STDERR.

=cut

has 'debug' => (
    is            => 'ro',
    isa           => 'Bool',
    required      => 0,
    documentation => 'Send debug information to STDERR.'
);

=item run

Starts the collector, sets up the thread queues and creates all the threads
needed.

=cut

sub run {
    my ( $self ) = @_;

    $self->debug and say STDERR 'start';

    if ( $self->sourceaddr ) {
        Zonemaster->config->resolver_source( $self->sourceaddr );
    }

    if ( $self->policy ) {
        Zonemaster->config->load_policy_file( $self->policy );
    }

    if ( $self->config ) {
        Zonemaster->config->load_config_file( $self->config );
    }

    my ( $in_q, $out_q );

    if ( $can_use_threads ) {
        $in_q  = Thread::Queue->new;
        $out_q = Thread::Queue->new;

        my $threads = $self->threads;

        unless ( $threads > 0 ) {
            confess;
        }

        while ( $threads-- ) {
            $self->debug and say STDERR 'start thread';

            threads->create(
                sub {
                    my $json = JSON::XS->new->allow_blessed->convert_blessed->canonical;

                    $self->debug and say STDERR 'thread started';
                    while ( my $in = $in_q->dequeue ) {
                        $self->debug and say STDERR 'dequeued in';
                        $self->process( $json->decode( $in ), $out_q );
                    }
                }
            );
        }

        threads->create(
            sub {
                while ( my $out = $out_q->dequeue ) {
                    $self->debug and say STDERR 'dequeued out';
                    say $out;
                }
            }
        )->detach;
    }

    {
        my $json = JSON::XS->new->allow_blessed->convert_blessed->canonical;

        $self->debug and say STDERR 'waiting';
        while ( <STDIN> ) {
            $self->debug and say STDERR 'read: ', $_;
            my @in = $json->incr_parse( $_ );
            foreach my $in ( @in ) {
                unless ( $self->validate( $in ) ) {
                    $self->debug and say STDERR 'invalid in';
                    next;
                }

                $self->debug and say STDERR 'queued in';

                if ( $can_use_threads ) {
                    $in_q->enqueue( $json->encode( $in ) );
                }
                else {
                    $self->process( $in );
                }
            }
        }
    }

    $self->debug and say STDERR 'end';

    if ( $can_use_threads ) {
        $in_q->end;

        foreach ( threads->list ) {
            $_->join;
        }
    }

    return;
}

=item process

Process an analyze request.

=cut

sub process {
    my ( $self, $in, $out_q ) = @_;

    unless ( ref( $in ) eq 'HASH' ) {
        confess;
    }
    if ( $can_use_threads ) {
        unless ( blessed $out_q and $out_q->isa( 'Thread::Queue' ) ) {
            confess;
        }
    }

    my $json = JSON::XS->new->allow_blessed->convert_blessed->canonical;

    Zonemaster->reset;

    Zonemaster->config->ipv4_ok( $in->{ipv4} ? 1 : 0 );
    Zonemaster->config->ipv6_ok( $in->{ipv6} ? 1 : 0 );

    Zonemaster->logger->callback(
        sub {
            my ( $entry ) = @_;

            if ( $numeric{ uc $entry->level } >= $numeric{DEBUG} ) {
                $self->debug and say STDERR 'queued out';
                my $out = $json->encode(
                    {
                        _id       => $in->{id},
                        timestamp => $entry->timestamp,
                        module    => $entry->module,
                        tag       => $entry->tag,
                        level     => $entry->level,
                        $entry->args ? ( args => $entry->args ) : ()
                    }
                );

                if ( $can_use_threads ) {
                    $out_q->enqueue( $out );
                }
                else {
                    say $out;
                }
            }
        }
    );

    Zonemaster::Util::info(
        MODULE_VERSION => {
            module  => 'Zonemaster::Test::Basic',
            version => Zonemaster::Test::Basic->version
        }
    );
    foreach my $mod ( Zonemaster::Test->modules ) {
        $mod = 'Zonemaster::Test::' . $mod;
        Zonemaster::Util::info(
            MODULE_VERSION => {
                module  => $mod,
                version => $mod->version
            }
        );
    }

    my $domain = $self->to_idn( $in->{fqdn} );

    if ( exists $in->{ns} ) {
        my %ns;
        foreach ( @{ $in->{ns} } ) {
            my $idn = $self->to_idn( $_->{fqdn} );
            my @ips;

            if ( $_->{ip} ) {
                push @ips, $_->{ip};
            }
            else {
                push @ips, Net::LDNS->new->name2addr( $idn );
            }

            push @{ $ns{$idn} }, @ips;
        }
        if ( scalar %ns ) {
            Zonemaster->add_fake_delegation( $domain => \%ns );
        }
    }

    if ( exists $in->{ds} ) {
        Zonemaster->add_fake_ds( $domain => $in->{ds} );
    }

    if ( exists $in->{test} ) {
        foreach ( @{ $in->{test} } ) {
            if ( $_->{method} ) {
                Zonemaster->test_method( $_->{module}, $_->{method}, Zonemaster->zone( $domain ) );
            }
            else {
                Zonemaster->test_module( $_->{module}, $domain );
            }
        }
    }
    else {
        Zonemaster->test_zone( $domain );
    }

    Zonemaster::Util::info( MODULE_END => { module => 'Lim::Plugin::Zonalizer::Collector' } );

    return;
}

=back

=head1 PRIVATE METHODS

=over 4

=item to_idn

Converts input into an IDN string is its not ASCII and Net::LDNS has support for
IDN.

=cut

sub to_idn {
    my ( $self, $str ) = @_;

    if ( $str =~ m/^[[:ascii:]]+$/ ) {
        return $str;
    }

    if ( Net::LDNS::has_idn() ) {
        return Net::LDNS::to_idn( decode( $self->encoding, $str ) );
    }
    else {
        return $str;
    }
}

=item validate

Validates an input object.

=cut

sub validate {
    my ( $self, $in ) = @_;

    unless ( ref( $in ) eq 'HASH' ) {
        return;
    }
    foreach ( qw(id fqdn ipv4 ipv6) ) {
        unless ( defined $in->{$_} ) {
            return;
        }
    }
    if ( exists $in->{ns} ) {
        unless ( ref( $in->{ns} ) eq 'ARRAY' ) {
            return;
        }
        foreach my $ns ( @{ $in->{ns} } ) {
            unless ( ref( $ns ) eq 'HASH' ) {
                return;
            }
            foreach ( qw(fqdn) ) {
                unless ( defined $ns->{$_} ) {
                    return;
                }
            }
            foreach ( qw(ip) ) {
                if ( exists $ns->{$_} and !defined $ns->{$_} ) {
                    return;
                }
            }
        }
    }
    if ( exists $in->{ds} ) {
        unless ( ref( $in->{ds} ) eq 'ARRAY' ) {
            return;
        }
        foreach my $ds ( @{ $in->{ds} } ) {
            unless ( ref( $ds ) eq 'HASH' ) {
                return;
            }
            foreach ( qw(keytag algorithm type digest) ) {
                unless ( defined $ds->{$_} ) {
                    return;
                }
            }
        }
    }
    if ( exists $in->{test} ) {
        unless ( ref( $in->{test} ) eq 'ARRAY' ) {
            return;
        }
        foreach my $test ( @{ $in->{test} } ) {
            unless ( ref( $test ) eq 'HASH' ) {
                return;
            }
            foreach ( qw(module) ) {
                unless ( defined $test->{$_} ) {
                    return;
                }
            }
            foreach ( qw(method) ) {
                if ( exists $test->{$_} and !defined $test->{$_} ) {
                    return;
                }
            }
        }
    }
    return 1;
}

=back

=head1 AUTHOR

Jerry Lundström, C<< <lundstrom.jerry@gmail.com> >>

=head1 BUGS

Please report any bugs or feature requests to L<https://github.com/jelu/lim-plugin-zonalizer/issues>.

=head1 SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc Lim::Plugin::Zonalizer::Collector

You can also look for information at:

=over 4

=item * Lim issue tracker (report bugs here)

L<https://github.com/jelu/lim-plugin-zonalizer/issues>

=back

=head1 ACKNOWLEDGEMENTS

=head1 LICENSE AND COPYRIGHT

Copyright 2015-2016 Jerry Lundström
Copyright 2015-2016 IIS (The Internet Foundation in Sweden)

This program is free software; you can redistribute it and/or modify it
under the terms of either: the GNU General Public License as published
by the Free Software Foundation; or the Artistic License.

See http://dev.perl.org/licenses/ for more information.


=cut

1;    # End of Lim::Plugin::Zonalizer::Collector


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.