Group
Extension

AnyEvent-MQTT/misc/t/Tester.pm

package # Hide from PAUSE
        Tester;
use strict;
use warnings;
use constant {
  SERVER => $ENV{ANYEVENT_MQTT_SERVER} || 'localhost',
  JOBS => $ENV{ANYEVENT_MQTT_TESTER_JOBS} || 1,
  REPEAT => $ENV{ANYEVENT_MQTT_TESTER_REPEAT} || 1,
  TIMEOUT => $ENV{ANYEVENT_MQTT_TESTER_TIMEOUT} || 5,
  DIAG => $ENV{ANYEVENT_MQTT_TESTER_DIAG},
};

use Test::More;
use AnyEvent::MQTT;
use Net::MQTT::Constants;
use JSON;
use File::Slurp;
use Test::SharedFork;

sub run {
  my ($pkg, $file) = @_;

  my $json = JSON->new;
  my $data = read_file($file);
  $data = $json->decode($data);
  my $conf = $data->{config} || {};
  my $streams = $data->{streams} || [ $data->{stream} ];
  my $logs = $data->{logs} || [ $data->{log} ];
  $conf->{jobs} ||= JOBS;
  my $new = [];
  push @$new, @$streams foreach (1..$conf->{jobs});
  $streams = $new;
  $new = [];
  push @$new, @$logs foreach (1..$conf->{jobs});
  $logs = $new;

  $conf->{topic} ||= '/zqk/test';
  $conf->{host} ||= SERVER;
  $conf->{repeat} ||= REPEAT;
  $conf->{timeout} ||= TIMEOUT * $conf->{repeat} * $conf->{jobs};
  my ($test) = ($0 =~ m!([^/]+)\.t$!);
  $conf->{testname} ||= $test;

  my $timeout = AnyEvent->timer(after => $conf->{timeout},
                                cb => sub { die "timeout\n" });

  foreach my $n (0..($conf->{repeat}-1)) {

    my @pids;
    foreach my $i (0..(@$streams-1)) {
      my $pid = fork;
      die "Fork failed\n" unless (defined $pid);
      if ($pid) {
        push @pids, $pid;
        next;
      }
      #diag('child '.$i);
      my @log;
      $conf->{pid} = $i;
      $conf->{testname} .= '.'.$n.'.'.$i;
      $conf->{topicpid} = $conf->{topic}.'/'.$i;
      run_stream($conf, $streams->[$i], \@log);
      check_log($conf, $logs->[$i], \@log);
      #diag('child '.$i.' finished');
      exit;
    }

    foreach my $pid (@pids) {
      #diag('waiting for child '.$pid);
      waitpid($pid, 0);
      if ($?) {
        die "child died: ", ($?>>8), "\n";
      }
    }
  }
  done_testing();
}

sub run_stream {
  my ($conf, $stream, $log) = @_;
  my $cv;
  my $mqtt;
  my %cv = ();
  my %timer = ();
  my $index = 0;
  foreach my $index (0..-1+@$stream) {
    $conf->{index} = $index;
    my $rec = $stream->[$index];
    my $name =
      $rec->{name} || $index.':'.($rec->{action}||'item').'/'.$conf->{pid};
    my $args = $rec->{arguments} || {};
    $_ = replace_conf($_, $conf) foreach (values %$args);
    if ($rec->{action} eq 'connect') {
      $mqtt = AnyEvent::MQTT->new(host => $conf->{host},
                                  client_id => $conf->{testname},
                                  %$args,
                                  message_log_callback => sub {
                                    push @$log, $_[0].' '.$_[1]->string;
                                  },
                                  on_error => sub {
                                    warn $_[1], "\n";
                                    die "\n" if ($_[0]);
                                  },
                                 );
      ok($cv = $mqtt->connect, 'connect - '.$name);
      ok($cv->recv, '...connected - '.$name)
        or BAIL_OUT('connect failed');
    } elsif ($rec->{action} eq 'subscribe') {
      my $cvname = $rec->{cvname}||$name;
      $cv{$cvname} = AnyEvent->condvar;
      my %args =
        (
         topic => $conf->{topicpid},
         qos => MQTT_QOS_AT_MOST_ONCE,
         callback => sub { $cv{$cvname}->send($_[2]); },
         %$args,
        );

      ok($cv = $mqtt->subscribe(%args), '...subscribe - '.$name);
      is($cv->recv, $rec->{result}, '...subscribed - '.$name);
    } elsif ($rec->{action} eq 'unsubscribe') {
      my %args =
        (
         topic => $conf->{topicpid},
         %$args,
        );

      ok($cv = $mqtt->unsubscribe(%args), '...unsubscribe - '.$name);
      is($cv->recv, $rec->{result}, '...unsubscribed - '.$name);
    } elsif ($rec->{action} eq 'publish') {
      my %args =
        (
         topic => $conf->{topicpid},
         qos => MQTT_QOS_AT_MOST_ONCE,
         message => '',
         %$args,
        );
      ok($cv = $mqtt->publish(%args), '...publish - '.$name);
      ok($cv->recv, '...published - '.$name);
    } elsif ($rec->{action} eq 'wait') {
      my $msg = $cv{$rec->{for}}->recv;
      $cv{$rec->{for}} = AnyEvent->condvar;
      my $result = $rec->{result};
      if (ref $result) {
        foreach my $k (sort keys %$result) {
          is($msg->$k, replace_conf($result->{$k}, $conf),
             '...result '.$k.' - '.$name);
        }
      } else {
        is($msg, replace_conf($result, $conf),
           '...result '.$result.' - '.$name);
      }
    } elsif ($rec->{action} eq 'timeout') {
      my $cvname = $rec->{cvname}||$name;
      $cv{$cvname} = AnyEvent->condvar unless (exists $cv{$cvname});
      $timer{$name} = AnyEvent->timer(after => $rec->{timeout},
                      cb => sub { $cv{$cvname}->send("timeout") });
    } elsif ($rec->{action} eq 'send') {
      ok($cv = $mqtt->_send(%$args, cv => AnyEvent->condvar),
         '...send - '.$name);
      ok($cv->recv, '...sent - '.$name);
      my $cvname = $rec->{cvname}||$name;
      $cv{$cvname} = AnyEvent->condvar;
      my $callback = 'before_'.($rec->{response}||'msg').'_callback';
      $mqtt->{$callback} =
        sub {
          $cv{$cvname}->send($_[0]);
          delete $mqtt->{$callback};
        };
    } else {
      die "Invalid action: ", $rec->{action}, "\n";
    }
  }
}

sub check_log {
  my ($conf, $expected, $log) = @_;
  my $i = 0;
  while (my $m = shift @$expected) {
    my ($str) = ($m =~ m!^(.*?)/!);
    if (ref $m) {
      foreach my $alt (@$m) {
        my $re = $alt->{re};
        if (!defined $re || $log->[0] =~ m!$re!) {
          diag($alt->{diag}) if (DIAG && exists $alt->{diag});
          return check_log($conf, $alt->{log}, $log);
        }
      }
      die "Didn't match any alternative message log pattern\n";
    } else {
      my $got = shift @$log;
      is($got, replace_conf($m, $conf), 'message '.$i.' '.$str);
    }
  } continue {
    $i++;
  }
  is(@$log, 0, 'no extra messages') or
    diag("Got:\n  ", (join "\n  ", @$log), "\n");
}

sub replace_conf {
  my ($m, $conf) = @_;
  foreach my $k (keys %$conf) {
    $m =~ s/\%$k\%/$conf->{$k}/eg;
  }
  $m;
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.