Group
Extension

REST-Neo4p/lib/REST/Neo4p/Agent/Neo4j/Driver.pm

package REST::Neo4p::Agent::Neo4j::Driver;
use v5.10;
use lib '../../../../../lib'; # testing
use base qw/REST::Neo4p::Agent/;
use Neo4j::Driver 0.1803;
use JSON::ize;
use REST::Neo4p::Agent::Neo4j::DriverActions;
use REST::Neo4p::Exceptions;
use Try::Tiny;
use URI;
# use MIME::Base64;
use Carp qw/carp/;
use HTTP::Response;
use strict;
use warnings;
BEGIN {
  $REST::Neo4p::Agent::Neo4j::Driver::VERSION = '0.4003';
}
my $WARN_ON_ERROR;

BEGIN {
}

my %REST_ACTIONS = (
  all_relationships            => 'relationships/all',
  all_typed_relationships      => 'relationships/all/{-list|&|types}',
  create_relationship          => 'relationships',
  incoming_relationships       => 'relationships/in',
  incoming_typed_relationships => 'relationships/in/{-list|&|types}',
  labels                       => 'labels',
  outgoing_relationships       => 'relationships/out',
  outgoing_typed_relationships => 'relationships/out/{-list|&|types}',
  paged_traverse               => 'paged/traverse/{returnType}{?pageSize,leaseTime}',
  properties                   => 'properties',
  property                     => 'properties/{key}',
  traverse                     => 'traverse/{returnType}',
);

sub new {
  my ($class, @args) = @_;
  my $self = {
    _args => (@args ? \@args : undef), # pass args through to driver
   };
  return bless $self, $class;
}

sub credentials  {
  my $self = shift;
  my ($srv, $realm, $user, $pwd) = @_;
  $self->{_user} = $user;
  $self->{_pwd} = $pwd;
  $self->{_userinfo} = "$user:$pwd" if ($user && $pwd);
  $self->{_realm} = $realm;
  return;
}

sub user { shift->{_user} }
sub pwd { shift->{_pwd} }
sub server_uri { shift->{_server_uri} }
sub last_result { shift->{_last_result} }
sub last_errors { shift->{_last_errors} }
sub ssl_opts {
  my $self = shift;
  my %args = @_;
  if (%args) {
    return $self->{_ssl_opts} = \%args;
  }
  else {
    return %{$self->{_ssl_opts}};
  }
}
sub driver { shift->{__driver} }

# these are no-ops
sub default_header { return }
sub add_header { return }
sub remove_header { return }

sub agent {
  my $self = shift;
  return $_[0] ? $self->{_agent} = $_[0] : $self->{_agent};
}

# TODO: pass stream info along to Neo4j::Driver object

sub stream {
  my $self = shift;
  # do sth
}

sub no_stream {
  my $self = shift;
  # do sth
}

# http, https, bolt (if Neo4j::Bolt)...
sub protocols_allowed {
  my $self = shift;
  my ($protocols) = @_;
  push @{$self->{_protocols_allowed}}, @$protocols;
  return;
}

sub timeout {
  my $self=shift;
  return ($_[0] ? $self->{_timeout} = $_[0] : $self->{_timeout});
}

sub tls {
  my $self=shift;
  return ($_[0] ? $self->{_tls} = $_[0] : $self->{_tls});
}

sub tls_ca {
  my $self = shift;
  return ($_[0] ? $self->{_tls_ca} = $_[0] : $self->{_tls_ca});
}

sub database {
  my $self = shift;
  my ($db) = @_;
  if (defined $db) {
    return $self->{_database} = $db;
  }
  else {
    # Neo4j::Driver defaults to Neo v3 endpoints, but switches to v4 endpoints if 'database' is set.
    # so... don't set the attribute if unset (=> v3)
    return $self->{_database};
  }
}

# subclass override 
sub batch_mode {
  return 0; # batch mode not available
}

# subclass override 
sub batch_length {
  REST::Neo4p::LocalException->throw("Batch mode not available with Neo4j::Driver as agent\n");
}
sub execute_batch {
  REST::Neo4p::LocalException->throw("Batch mode not available with Neo4j::Driver as agent\n");
}

# subclass override
# $agent->connect($url [, $dbname])

sub connect {
  my $self = shift;
  my ($server, $dbname) = @_;
  my ($drv, $uri);
  if (defined $server) {
    $uri = $self->{_server_uri} = URI->new($server);
    if ($uri->userinfo) {
      my ($u,$p) = split(/:/,$uri->userinfo);
      $self->credentials($uri->host,'',$u,$p);
    }
    $self->server_url($uri->scheme."://".$uri->host.':'.$uri->port);
  }
  if (defined $dbname) {
    $self->database($dbname);
  }
  unless ($self->server_url) {
    REST::Neo4p::Exception->throw("Server not set\n");
  }
  try {
    $drv = Neo4j::Driver->new($self->server_url);
    $drv->config( @{$self->{_args}} ) if $self->{_args};
  } catch {
    REST::Neo4p::LocalException->throw("Problem creating new Neo4j::Driver: $_");
  };
  if ($self->user || $self->pwd) {
    $drv->basic_auth($self->user, $self->pwd);
  }
  $self->{__driver} = $drv;
  for (my $i = $REST::Neo4p::Agent::RQ_RETRIES; $i>0; $i--) {
    my $f;
    try {
      my $version = $drv->session->server->version;
      $version =~ s|^\S+/||;  # server version strings look like "Neo4j/3.2.1"
      $self->{_actions}{neo4j_version} = $version or
        die "Can't find neo4j_version from server";
      $f=1;
    } catch {
      if ($i > 1) {
	sleep $REST::Neo4p::Agent::RETRY_WAIT;
      }
      else {
	REST::Neo4p::CommException->throw(message => "$_ (after $REST::Neo4p::Agent::RQ_RETRIES retries)");
      }
    };
    last if $f;
  }
  # set actions
  $self->{_actions}{$_} = $REST_ACTIONS{$_} for keys %REST_ACTIONS;
  return 1;
}

sub session {
  my $self = shift;
  unless ($self->driver) {
    REST::Neo4p::LocalException->throw("No driver connection; can't create session ( try \$agent->connect() )\n");
  }
  my $session = $self->driver->session( $self->database ? (database => $self->database) : () );
  if ($self->server_uri->scheme =~ /^http/) {
    if (my $client = $session->{net}{http_agent}{client}) {
      $client->setTimeout($self->timeout);
      $client->setCa($self->tls_ca);
      if ($self->{_ssl_opts}) {
	$client->getUseragent->ssl_opts($self->ssl_opts);
      }
    }
  }
  elsif ($self->server_uri->scheme =~ /^bolt/) {
    1;
  }
  return $session;
}

# run_in_session( $query_string, { parm => value, ... } )
# run_in_transaction( $driver_txn, $query_string, { parm => value, ... } )
# these throw REST::Neo4p::Exceptions on Neo4j errors
# and otherwise return a Neo4j::Driver::StatementResult

sub run_in_session {
  my $self = shift;
  my ($qry, $params) = @_;
  $self->{_last_result} = $self->{_last_errors} = undef;
  $params = {} unless defined $params;
  try {
    $self->{_last_result} = $self->session->run($qry, $params);
  } catch {
    $self->{_last_errors} = $_;
  };
  $self->maybe_throw_neo4p_error;
  return $self->{_last_result} // 1;
}

sub run_in_transaction {
  my $self = shift;
  my ($tx, $qry, $params) = @_;
  $self->{_last_result} = $self->{_last_errors} = undef;
  $params = {} unless defined $params;
  try {
    $self->{_last_result} = $tx->run($qry, $params);
  } catch {
    $self->{_last_errors} = $_;
  };
  $self->maybe_throw_neo4p_error;
  return $self->{_last_result} // 1;
}

sub maybe_throw_neo4p_error {
  my $self = shift;
  return unless $self->last_errors;
  for ($self->last_errors) {
    /neo4j enterprise/i && do {
      REST::Neo4p::Neo4jTightwadException->throw( code=>599, message => "You must spend thousands of dollars a year to use this feature; see agent->last_errors()");
    };
    /SchemaRuleAlreadyExists/ && do {
      REST::Neo4p::IndexExistsException->throw( code=>599, neo4j_message => $self->last_errors );
    };
    /ConstraintAlreadyExists/ && do {
      REST::Neo4p::SchemaConstraintExistsException->throw( code=>599, neo4j_message => $self->last_errors );
    };
    /ConstraintValidationFailed/ && do {
      REST::Neo4p::ConflictException->throw( code => 409,
					     neo4j_message => $self->last_errors);
    };
    /NotFound/ && do {
      REST::Neo4p::NotFoundException->throw( code => 404,
					     neo4j_message => $self->last_errors );
    };
    /SyntaxError/ && do {
      REST::Neo4p::QuerySyntaxException->throw( code => 400,
						neo4j_message => $self->last_errors);
    };
    do {
      REST::Neo4p::Neo4jException->throw( code => 599, neo4j_message => $self->last_errors );
    };
  }
}


# $rq : [get|post|put|delete]
# $action : {neo4j REST endpt action}
# @args : depends on REST rq
# get|delete : my @url_components = @args;
# post|put : my ($url_components, $content, $addl_headers) = @args;

# emulate rest calls with appropriate queries

1;



Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.