Group
Extension

Mashtree/bin/mashtree_jackknife.pl

#!/usr/bin/env perl
# Author: Lee Katz <lkatz@cdc.gov>
# Uses Mash and BioPerl to create a NJ tree based on distances.
# Run this script with -h for help and usage.

use strict;
use warnings;
use Data::Dumper;
use Getopt::Long;
use File::Temp qw/tempdir tempfile/;
use File::Basename qw/basename dirname fileparse/;
use File::Copy qw/cp mv/;
use List::Util qw/shuffle/;
use List::MoreUtils qw/part/;
use POSIX qw/floor/;
# if JSON::XS is installed, it will be automatically used.
# Specify () so that no functions are exported. We will use OO.
use JSON (); 
use IO::Handle; # allows me to autoflush file handles

#use Fcntl qw/:flock LOCK_EX LOCK_UN/;

use threads;
use Thread::Queue;
use threads::shared;

use FindBin;
use lib "$FindBin::RealBin/../lib";
use Mashtree qw/logmsg @fastqExt @fastaExt createTreeFromPhylip mashDist/;
use Mashtree::Db;
use Bio::SeqIO;
use Bio::TreeIO;
use Bio::Tree::DistanceFactory;
use Bio::Tree::Statistics;

my $writeStick :shared;
my $readStick  :shared; # limit disk IO by reading some files one at a time

local $0=basename $0;

exit main();

sub main{
  my $settings={};
  my @wrapperOptions=qw(help file-of-files outmatrix=s tempdir=s reps=i numcpus=i);
  GetOptions($settings,@wrapperOptions) or die $!;
  $$settings{reps}//=1;
  $$settings{numcpus}||=1;
  die usage() if($$settings{help});
  die usage() if(@ARGV < 1);

  $$settings{tempdir}||=tempdir("MASHTREE_WRAPPER.XXXXXX",CLEANUP=>1,TMPDIR=>1);
  mkdir($$settings{tempdir}) if(!-d $$settings{tempdir});
  logmsg "Temporary directory will be $$settings{tempdir}";

  if($$settings{reps} < 1){
    die "ERROR: no reps were given!";
  }
  if($$settings{reps} < 100){
    logmsg "WARNING: You have very few reps planned on this mashtree run. Recommended reps are at least 100.";
  }
  # Give a warning if JSON is going to be slow later
  my $jsonTmp = JSON->new();
  if(! $jsonTmp->is_xs){
    logmsg "WARNING: the currently installed JSON module will make this script very slow when jack knifing. To avoid this error, install the JSON::XS module like so: `cpanm ~l ~ JSON::XS`.";
  }
  
  ## Catch some options that are not allowed to be passed
  # Tempdir: All mashtree temporary directories will be under the
  # wrapper's tempdir.
  if(grep(/^\-+tempdir$/,@ARGV) || grep(/^\-+t$/,@ARGV)){
    die "ERROR: tempdir was specified for mashtree but should be an option for $0";
  }
  # Numcpus: this needs to be specified in the wrapper and will
  # appropriately be transferred to the mashtree script
  if(grep(/^\-+numcpus$/,@ARGV) || grep(/^\-+n$/,@ARGV)){
    die "ERROR: numcpus was specified for mashtree but should be an option for $0";
  }
  # Outmatrix: the wrapper script needs to control where
  # the matrix goes because it can only have the outmatrix
  # for the observed run and not the replicates for speed's
  # sake.
  if(grep(/^\-+outmatrix$/,@ARGV) || grep(/^\-+o$/,@ARGV)){
    die "ERROR: outmatrix was specified for mashtree but should be an option for $0";
  }
  # --file-of-files: should be something to give to mashtree directly
  if(grep(/^\-+file-of-files$/,@ARGV) || grep(/^\-+f$/,@ARGV)){
    die "ERROR: file-of-files was specified for mashtree but should be an option for $0";
  }
  
  # Separate flagged options from reads in the mashtree options
  my @reads = ();
  my @mashOptions = ();
  for(my $i=0;$i<@ARGV;$i++){
    if(-e $ARGV[$i]){
      push(@reads, $ARGV[$i]);
    } else {
      push(@mashOptions, $ARGV[$i]);
    }
  }
  if($$settings{'file-of-files'}){
    push(@mashOptions, '--file-of-files');
  }
  my $mashOptions=join(" ",@mashOptions);
  my $reads = join(" ", @reads);
  
  # Some filenames we'll expect
  my $observeddir="$$settings{tempdir}/observed";
  my $obsDistances="$observeddir/distances.phylip";
  my $observedTree="$$settings{tempdir}/observed.dnd";
  my $outmatrix="$$settings{tempdir}/observeddistances.tsv";
  my $mshList = "$$settings{tempdir}/mash.list";
  my $mergedMash = "$$settings{tempdir}/merged.msh";
  my $mergedJSON = "$$settings{tempdir}/merged.msh.json.gz";

  # Make the observed directory and run Mash
  logmsg "Running mashtree on full data (".scalar(@reads)." targets)";
  mkdir($observeddir);
  system("$FindBin::RealBin/mashtree --outmatrix $outmatrix.tmp --tempdir $observeddir --numcpus $$settings{numcpus} $mashOptions $reads > $observedTree.tmp");
  die if $?;
  mv("$observedTree.tmp",$observedTree) or die $?;
  mv("$outmatrix.tmp",$outmatrix) or die $?;

  # Merge the mash files to make the threads go faster later
  my @msh = glob("$observeddir/*.msh");
  open(my $mshListFh, ">", $mshList) or die "ERROR writing to mash list $mshList: $!";
  for(@msh){
    print $mshListFh $_."\n";
  }
  close $mshListFh;
  unlink($mergedMash) if(-e $mergedMash); # mash complains about overwriting an existing file
  system("mash paste -l $mergedMash $mshList >&2"); die "ERROR merging mash files" if $?; 
  # max compression on the json file so that it can be a
  # smaller footprint and so that it can be read faster
  # in each thread.
  system("mash info -d $mergedMash | gzip -c9 > $mergedJSON");
  die "ERROR with mash info | gzip -c9" if $?;
  unlink($_) for(@msh); # remove redundant files to the merged msh
  
  # Round-robin sample the replicate number, so that it's
  # well-balanced.
  my @repNumber = (1..$$settings{reps});
  my @reps;
  for(my $i=0;$i<$$settings{reps};$i++){
    my $thrIndex = $i % $$settings{numcpus};
    push(@{$reps[$thrIndex]}, $i);
  }

  # Sample the mash sketches with replacement for rapid bootstrapping
  my @bsThread;
  for my $i(0..$$settings{numcpus}-1){
    my %settingsCopy = %$settings;
    $bsThread[$i] = threads->new(\&subsampleMashSketchesWorker, $mergedJSON, $reps[$i], \%settingsCopy);
  }

  my @bsTree;
  for my $thr(@bsThread){
    my $fileArr = $thr->join;
    if(ref($fileArr) ne 'ARRAY'){
      die "ERROR: one or more threads did not return an array of jack knife tree files as expected.";
    }
    for my $file(@$fileArr){
      my $treein = Bio::TreeIO->new(-file=>$file);
      while(my $tree=$treein->next_tree){
        push(@bsTree, $tree);
      }
    }
  }
  
  # Combine trees into a bootstrapped tree and write it 
  # to an output file. Then print it to stdout.
  logmsg "Adding bootstraps to tree";
  my $guideTree=Bio::TreeIO->new(-file=>"$observeddir/tree.dnd")->next_tree;
  my $bsTree=assess_bootstrap($guideTree,\@bsTree,$guideTree);
  for my $node($bsTree->get_nodes){
    next if($node->is_Leaf);
    my $id=$node->bootstrap||$node->id||0;
    $node->id($id);
  }
  open(my $treeFh,">","$$settings{tempdir}/bstree.dnd") or die "ERROR: could not write to $$settings{tempdir}/bstree.dnd: $!";
  print $treeFh $bsTree->as_text('newick');
  print $treeFh "\n";
  close $treeFh;

  system("cat $$settings{tempdir}/bstree.dnd"); die if $?;

  if($$settings{'outmatrix'}){
    cp($outmatrix,$$settings{'outmatrix'});
  }
  
  return 0;
}

sub subsampleMashSketchesWorker{
  my($mergedJSON, $reps, $settings) = @_;
  $reps //= [];

  my @treeFile;
  return \@treeFile if(@$reps <1);

  # Initialize our JSON reader
  my $json = JSON->new;
  $json->utf8;           # If we only expect characters 0..255. Makes it fast.
  $json->allow_nonref;   # can convert a non-reference into its corresponding string
  $json->allow_blessed;  # encode method will not barf when it encounters a blessed reference 
  $json->pretty;         # enables indent, space_before and space_after 
  # Only let one thread at a time read the large JSON file
  my $mashInfoStr=""; # must be declared outside of the block because we are parsing it after the block
  {
    lock($readStick);
    logmsg "Reading huge JSON file describing all mash distances, $mergedJSON";
    $mashInfoStr = `gzip -cd $mergedJSON`; 
    die "ERROR running gzip -cd $mergedJSON: $!" if $?;
  }
  my $mashInfoHash = $json->decode($mashInfoStr);
  $mashInfoStr=""; # clear some ram
  my $kmerlength = $$mashInfoHash{kmer}; # find the kmer length right away

  my $numReps = @$reps;
  my $numSketches = scalar(@{ $$mashInfoHash{sketches} });
  my $msg="Initializing thread with $numReps replicates: ";
  for my $i(0..2){
    my $repID = $$reps[$i] // "";
    $msg.="$repID, ";
  }
  $msg=~s/[ ,]+$//; # right trim
  if($numReps > 3){
    $msg.="...";
  }
  logmsg $msg;
  for(my $repI=0;$repI<@$reps;$repI++){
    my $rep = $$reps[$repI];

    my $subsampleDir = "$$settings{tempdir}/rep$rep";
    mkdir $subsampleDir;
    my $log = "$subsampleDir/jackknife.log";
    open(my $logFh, ">", $log) or die "ERROR: cannot write to log $log: $!";
    $logFh->autoflush();
    logmsg "rep$rep: $subsampleDir (".($repI+1)." out of $numReps in this thread)";

    my @name;
    # Start off a distances string for printing to file later
    my %dist = ();
    for(my $sketchCounter=0; $sketchCounter<$numSketches; $sketchCounter++){
      my $nameI = basename($$mashInfoHash{sketches}[$sketchCounter]{name},(@fastqExt,@fastaExt));
      print $logFh "Subsampling sketches from entry $sketchCounter, $nameI\n";

      # subsample the hashes of one genome at a time as compared
      # to the other genomes, to get a jack knife distance.
      my $numHashes = scalar(@{ $$mashInfoHash{sketches}[$sketchCounter]{hashes} });
      my $keepHashes = int($numHashes / 2); # based on half the hashes
      my @subsampleHash = @{ $$mashInfoHash{sketches}[$sketchCounter]{hashes} };
      @subsampleHash = (shuffle(@subsampleHash))[0..$keepHashes-1];
      @subsampleHash = sort{$a<=>$b} @subsampleHash;

      print $logFh "Distances between $nameI and other genomes";
      # Initialize the distances string with the query name.
      # Use this string as a buffer for the distances file
      # to help avoid too much disk IO.
      push(@name, $nameI);
      for(my $j=0; $j<$numSketches; $j++){
        my $nameJ = basename($$mashInfoHash{sketches}[$j]{name},(@fastqExt,@fastaExt));
        my $distance = mashDist(\@subsampleHash, $$mashInfoHash{sketches}[$j]{hashes}, $kmerlength);
        $dist{$nameI}{$nameJ} = $distance;
        print $logFh ".";
      }
      print $logFh "\n";
    }

    # Add distances to database
    print $logFh "Creating database, $subsampleDir/distances.db.tsv\n";
    my $mashtreeDb = Mashtree::Db->new("$subsampleDir/distances.db.tsv");
    $mashtreeDb->addDistancesFromHash(\%dist);
    # Convert to Phylip
    my $phylipFile = "$subsampleDir/distances.phylip";
    print $logFh "Creating phylip file, $phylipFile\n";
    open(my $phylipFh, ">", $phylipFile) or die "ERROR: could not write to $phylipFile: $!";
    print $phylipFh $mashtreeDb->toString(\@name, "phylip");
    close $phylipFh;

    print $logFh "Creating tree file, $subsampleDir/tree.dnd\n";
    my $treeObj = createTreeFromPhylip($phylipFile, $subsampleDir, $settings);
    push(@treeFile, "$subsampleDir/tree.dnd");

    print $logFh "DONE!\n";
    close $logFh;
  }

  return \@treeFile;
}

# Fixed bootstrapping function from bioperl
# https://github.com/bioperl/bioperl-live/pull/304
sub assess_bootstrap{
   my ($self,$bs_trees,$guide_tree) = @_;
   my @consensus;

   if(!defined($bs_trees) || ref($bs_trees) ne 'ARRAY'){
     die "ERROR: second parameter in assess_bootstrap() must be a list";
   }
   my $num_bs_trees = scalar(@$bs_trees);
   if($num_bs_trees < 1){
     die "ERROR: no bootstrap trees were passed to assess_bootstrap()";
   }

   # internal nodes are defined by their children

   my (%lookup,%internal);
   my $i = 0;
   for my $tree ( $guide_tree, @$bs_trees ) {
       # Do this as a top down approach, can probably be
       # improved by caching internal node states, but not going
       # to worry about it right now.

       my @allnodes = $tree->get_nodes;
       my @internalnodes = grep { ! $_->is_Leaf } @allnodes;
       for my $node ( @internalnodes ) {
           my @tips = sort map { $_->id } 
                      grep { $_->is_Leaf() } $node->get_all_Descendents;
           my $id = "(".join(",", @tips).")";
           if( $i == 0 ) {
               $internal{$id} = $node->internal_id;
           } else { 
               $lookup{$id}++;
           }
       }
       $i++;
   }
   #my @save; # unsure why this variable is needed
   for my $l ( keys %lookup ) {
       if( defined $internal{$l} ) {#&& $lookup{$l} > $min_seen ) {
           my $intnode = $guide_tree->find_node(-internal_id => $internal{$l});
           $intnode->bootstrap(sprintf("%d",100 * $lookup{$l} / $num_bs_trees));
       }
   }
   return $guide_tree;
}

#######
# Utils
#######

sub usage{
  my $usage="$0: a wrapper around mashtree.
  Usage: $0 [options] [-- mashtree options] *.fastq.gz *.fasta > tree.dnd
  --outmatrix          ''   Output file for distance matrix
  --reps               0    How many bootstrap repetitions to run;
                            If zero, no bootstrapping.
  --numcpus            1    This will be passed to mashtree and will
                            be used to multithread reps.
  --tempdir                 A directory path that will be created to
                            store temporary files
  --file-of-files           If specified, mashtree will try to read
                            filenames from each input file. The file of
                            files format is one filename per line. This
                            file of files cannot be compressed.
  
  --                        Used to separate options for $0 and mashtree
  MASHTREE OPTIONS:\n".
  # Print the mashtree options starting with numcpus,
  # skipping the tempdir option.
  `mashtree --help 2>&1 | grep -A 999 "TREE OPTIONS" | grep -v ^Stopped`;

  return $usage;
}



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.