Group
Extension

AC-Yenta/lib/AC/Yenta/Status.pm

# -*- perl -*-

# Copyright (c) 2009 AdCopy
# Author: Jeff Weisberg
# Created: 2009-Apr-02 11:16 (EDT)
# Function: track status of peers
#
# $Id$

package AC::Yenta::Status;
use AC::Yenta::Kibitz::Status;
use AC::Yenta::Debug 'status';
use AC::Yenta::Config;
use AC::Yenta::MySelf;
use AC::Dumper;
use AC::Misc;
use Sys::Hostname;
use JSON;
use Socket;
require 'AC/protobuf/yenta_status.pl';
use strict;

my $KEEPDOWN = 1800;	# keep data about down servers for how long?
my $KEEPLOST = 600;	# keep data about servers we have not heard about for how long?
my $SAVEMAX  = 1800;	# do not save if older than

my $PORT;

our $DATA = bless {
    allpeer	=> {},		# yenta_status
    sceptical	=> {},
    mappeer	=> {},		# {map} => { id => id }
    peermap	=> {},		# {id}  => @map
    datacenter  => {},		# {dc}  => { id => id }
    peertype	=> {},		# {ss}  => { id => id }
};

sub init {
    my $port = shift;

    $PORT = $port;

    AC::DC::Sched->new(
        info	=> 'kibitz status',
        freq	=> (conf_value('time_status_kibitz') || 5),
        func	=> \&periodic,
       );
    AC::DC::Sched->new(
        info	=> 'save status',
        freq	=> (conf_value('time_status_save') || 5),
        func	=> \&save_status,
       );
}

# start up a client every so often
sub periodic {

    # clean up down or lost peers
    for my $id ( keys %{$DATA->{allpeer}} ){
        my $p = $DATA->{allpeer}{$id};
        next unless $p;

        next if $p->{status} == 200 && $p->{timestamp} > $^T - $KEEPLOST;
        _maybe_remove( $id );
    }

    # randomly pick a peer
    my($id, $ip, $port) = _random_peer();
    return unless $id;

    # start a client
    debug("starting status kibitz client to $id");

    my $c = AC::Yenta::Kibitz::Status::Client->new( $ip, $port,
                                            info 	=> "status client: $id",
                                            status_peer	=> $id,
                                           );
    return __PACKAGE__->isdown($id) unless $c;

    $c->start();
}

sub _random_peer {

    my $here  = my_datacenter();

    # sceptical
    my @scept = values %{$DATA->{sceptical}};

    my @all   = map  { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}};
    my @old   = grep { $_->{timestamp} < $^T - $KEEPLOST *.75 } @all;
    my @local = grep { $_->{datacenter} eq $here } @all;	# this datacenter
    my @away  = grep { $_->{datacenter} ne $here } @all;	# not this datacenter

    # first check anything sceptical
    my @peer  = @scept;

    # then (maybe) something about to expire
    @peer = @old  unless @peer || int rand(5);

    # then (maybe) something far away
    @peer = @away unless @peer || int rand(5);

    # then something local
    @peer = @local unless @peer;

    # last resort
    @peer = @all unless @peer;

    # sometimes use the seed, in case there was a network split
    if( @peer && int(rand(@all+1)) ){
        my $p = $peer[ rand(@peer) ];
        debug("using peer $p->{server_id}");
        return ($p->{server_id}, $p->{ip}, undef);
    }

    # seed peer
    my $seed = conf_value('seedpeer');
    my $p = $seed->[ rand(@$seed) ];
    my ($ip, $port) = split /:/, $p;
    $port ||= my_port();

    # don't talk to self. any of my addrs.
    my $ipinfo = my_network_info();
    for my $i (@$ipinfo){
        return if $ip eq $i->{ipa} && $port == $PORT;
    }

    return("seed/$ip:$port", $ip, $port);
}

# server list for save file
sub server_list {
    my $type = shift;

    ($type, my $where) = split m|/|, $type;
    # where - no longer used
    $where ||= my_datacenter();

    my @peer = keys %{ $DATA->{peertype}{$type} };
    return unless @peer;

    # nothing too old
    @peer = grep { $DATA->{allpeer}{$_}{lastup} > $^T - $SAVEMAX } @peer;
    return unless @peer;

    return map { $DATA->{allpeer}{$_} } @peer;
}

# save a list of peers, in case I crash, and for others to use
sub save_status {

    my $save = conf_value('savestatus');
    my $here = my_datacenter();

    # also save locally running services
    my @mon  = AC::Yenta::Monitor::export();

    for my $s ( @$save ){
        my $file  = $s->{file};
        my $types = $s->{type};

        my @peer;
        for my $type (@$types){
            push @peer, server_list($type);

            for my $m (@mon){
                push @peer, $m if $m->{subsystem} eq $type;
            }
        }

        next unless @peer;

        debug("saving peer status file");
        unless( open(FILE, ">$file.tmp") ){
            problem("cannot open save file '$file.tmp': $!");
            return;
        }

        for my $pd (@peer){
            # only save best addr in save file
            my($ip, $port) = AC::Yenta::IO::TCP::Client->use_addr_port( $pd->{ip} );

            my $data = {
                id		=> $pd->{server_id},
                addr		=> $ip,
                port		=> int($port),
                status		=> int($pd->{status}),
                subsystem	=> $pd->{subsystem},
                environment	=> $pd->{environment},
                sort_metric	=> int($pd->{sort_metric}),
                capacity_metric => int($pd->{capacity_metric}),
                datacenter	=> $pd->{datacenter},
                is_local	=> ($here eq $pd->{datacenter} ? 1 : 0),
            };
            if( $pd->{subsystem} eq 'yenta' ){
                $data->{map} = $pd->{map};
            }

            print FILE encode_json( $data ), "\n";
        }

        close FILE;
        unless( rename("$file.tmp", $file) ){
            problem("cannot rename save file '$file': $!");
        }

    }
}

################################################################
# diagnostic reports
sub report {

    my $res;

    for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
        my $id = sprintf '%-28s', $v->{server_id};
        my $metric = int( $v->{sort_metric} );
        $res .= "$id $v->{hostname}\t$v->{datacenter}\t$v->{subsystem}\t$v->{environment}\t$v->{status}\t$metric\n";
    }

    return $res;
}

sub report_long {

    my $res;

    for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){
        $res .= dumper( $v ) . "\n\n";
    }
    return $res;
}
################################################################

sub my_port { $PORT }


sub my_instance_id {
    my $class = shift;
    return my_server_id() . sprintf('/%04x', $$);
}

sub peer {
    my $class = shift;
    my $id    = shift;

    return $DATA->{allpeer}{$id};
}

sub allpeers {
    my $class = shift;

    # idown sets status to 0 (below), skip such
    return grep { $_->{status} } values %{$DATA->{allpeer}};
}

sub mappeers {
    my $class = shift;
    my $map   = shift;

    return keys %{ $DATA->{mappeer}{$map} };
}

sub datacenters {
    my $class = shift;

    return $DATA->{datacenter};
}
################################################################

sub _remove {
    my $id = shift;

    my $ss = $DATA->{allpeer}{$id}{subsystem};
    delete $DATA->{peertype}{$ss}{$id} if $ss;

    my $dc = $DATA->{allpeer}{$id}{datacenter};
    delete $DATA->{datacenter}{$dc}{$id} if $dc;

    verbose("deleting peer: $id");
    delete $DATA->{allpeer}{$id};

    # remove map info
    for my $map ( @{$DATA->{peermap}{$id}} ){
        delete $DATA->{mappeer}{$map}{$id};
    }
    delete $DATA->{peermap}{$id};

    # delete its monitored items
    for my $p (keys %{$DATA->{allpeer}}){
        next unless $DATA->{allpeer}{$p}{via} eq $id;
        _remove($p);
    }
}

sub _maybe_remove {
    my $id = shift;

    my $d = $DATA->{allpeer}{$id};

    if( ($^T - $d->{lastup} > $KEEPDOWN) || ($^T - $d->{timestamp} > $KEEPLOST) ){

        _remove($id);
    }
}

sub isdown {
    my $class = shift;
    my $id    = shift;

    debug("marking peer '$id' as down");

    if( ! $DATA->{allpeer}{$id} ){
        return unless $DATA->{sceptical}{$id};
        # we know it is down, and want to kibbitz this fact
        $DATA->{allpeer}{$id} = $DATA->{sceptical}{$id};
    }

    delete $DATA->{sceptical}{$id};

    if( $DATA->{allpeer}{$id} ){
        $DATA->{allpeer}{$id}{timestamp} = $^T;
        $DATA->{allpeer}{$id}{status}    = 0;
        $DATA->{allpeer}{$id}{path}      = my_server_id();
    }
    _maybe_remove( $id );
}

################################################################

sub _env_ok {
    my $class = shift;
    my $id    = shift;
    my $up    = shift;

    # if( $up->{environment} ne conf_value('environment') ){
    #     verbose("ignoring update from $id - wrong env: $up->{environment}");
    #     return;
    # }
    return 1;
}

sub update_sceptical {
    my $class = shift;
    my $id    = shift;	# ->server_id
    my $up    = shift;
    my $io    = shift;

    return unless $class->_env_ok($id, $up);

    if( $DATA->{allpeer}{$id} ){
        # already known
        delete $DATA->{sceptical}{$id};
        return;
    }

    debug("rcvd update (sceptical) about $id from $io->{peerip}");

    # only accept updates from the server itself
    # no 3rd party updates. no misconfigured serevrs.
    problem("server misconfigured $id != $io->{peerip}")
      unless grep { inet_atoi($io->{peerip}) == $_->{ipv4}  } @{$up->{ip}};

    $up->{id} = $id;
    delete $up->{lastup};
    $DATA->{sceptical}{$id} = $up;
}

sub update {
    my $class = shift;
    my $id    = shift;	# -> server_id
    my $up    = shift;

    return unless $class->_env_ok($id, $up);

    # only keep it if it is relatively fresh, and valid
    return unless $up->{timestamp} > $^T - $KEEPLOST;
    return unless $up->{status};

    delete $DATA->{sceptical}{$id};

    $up->{id} = $id;
    my $previnfo = $DATA->{allpeer}{$id};
    verbose("discovered new peer: $id ($up->{hostname})") unless $previnfo;

    # only keep it if it is newer than what we have
    return if $previnfo && $up->{timestamp} <= $previnfo->{timestamp};

    $up->{path} .= ' ' . my_server_id();

    # debug("updating $id => $up->{status} => " . dumper($up));
    debug("updating $id => $up->{status}");

    $DATA->{allpeer}{$id} = $up;

    if( $up->{status} != 200 ){
        _maybe_remove( $id );
        return ;
    }

    # update datacenter info
    unless( $DATA->{datacenter}{$up->{datacenter}}{$id} ){
        my $pdc = $previnfo->{datacenter};
        delete $DATA->{datacenter}{$pdc}{$id} if $pdc;
        $DATA->{datacenter}{$up->{datacenter}}{$id} = $id;
    }

    # update subsystem info
    unless( $DATA->{peertype}{$up->{subsystem}}{$id} ){
        my $ss = $previnfo->{subsystem};
        delete $DATA->{peertype}{$ss}{$id} if $ss;
        $DATA->{peertype}{$up->{subsystem}}{$id} = $id;
    }

    # update map info
    $DATA->{peermap}{$id} ||= [];
    $up->{map} ||= [];
    my @curmap = @{$DATA->{peermap}{$id}};
    my @newmap = sort @{$up->{map}};

    return if "@curmap" eq "@newmap";		# unchanged

    # what do we need to add/remove
    my (%remove, %add);
    @remove{@curmap} = @curmap;
    @add{@newmap}    = @newmap;
    delete $remove{$_} for @newmap;
    delete $add{$_}    for @curmap;

    for my $map (keys %remove){
        debug("removing $map from $id");
        delete $DATA->{mappeer}{$map}{$id};
    }
    for my $map (keys %add){
        debug("adding $map to $id");
        $DATA->{mappeer}{$map}{$id} = $id;
    }
    $DATA->{peermap}{$id} = \@newmap;
}



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.