Group
Extension

Neo4j-Driver/lib/Neo4j/Driver/Result/Jolt.pm

use v5.12;
use warnings;

package Neo4j::Driver::Result::Jolt 1.02;
# ABSTRACT: Jolt result handler


# This package is not part of the public Neo4j::Driver API.


use parent 'Neo4j::Driver::Result';

use Carp qw(croak);
our @CARP_NOT = qw(Neo4j::Driver::Net::HTTP Neo4j::Driver::Result);
use JSON::MaybeXS 1.002004 ();

use Neo4j::Driver::Type::Bytes;
use Neo4j::Driver::Type::DateTime;
use Neo4j::Driver::Type::Duration;
use Neo4j::Driver::Type::Node;
use Neo4j::Driver::Type::Path;
use Neo4j::Driver::Type::Point;
use Neo4j::Driver::Type::Relationship;
use Neo4j::Driver::Type::V1::Node;
use Neo4j::Driver::Type::V1::Relationship;
use Neo4j::Error;

my ($FALSE, $TRUE) = Neo4j::Driver::Result->_bool_values;

my $MEDIA_TYPE = "application/vnd.neo4j.jolt";
my $ACCEPT_HEADER = "$MEDIA_TYPE-v2+json-seq";
my $ACCEPT_HEADER_V1 = "$MEDIA_TYPE+json-seq";
my $ACCEPT_HEADER_STRICT = "$MEDIA_TYPE+json-seq;strict=true";
my $ACCEPT_HEADER_SPARSE = "$MEDIA_TYPE+json-seq;strict=false";
my $ACCEPT_HEADER_NDJSON = "$MEDIA_TYPE";

my @CYPHER_TYPES = (
	{  # Types with legacy numeric ID (Jolt v1)
		node => 'Neo4j::Driver::Type::V1::Node',
		relationship => 'Neo4j::Driver::Type::V1::Relationship',
	},
	{  # Types with element ID (Jolt v2)
		node => 'Neo4j::Driver::Type::Node',
		relationship => 'Neo4j::Driver::Type::Relationship',
	},
);


our $gather_results = 1;  # 1: detach from the stream immediately (yields JSON-style result; used for testing)


sub new {
	# uncoverable pod (private method)
	my ($class, $params) = @_;
	
	my $jolt_v2 = $params->{http_header}->{content_type} =~ m/^\Q$MEDIA_TYPE\E-v2\b/i;
	my $self = {
		attached => 1,   # 1: unbuffered records may exist on the stream
		exhausted => 0,  # 1: all records read by the client; fetch() will fail
		buffer => [],
		server_info => $params->{server_info},
		json_coder => $params->{http_agent}->json_coder,
		http_agent => $params->{http_agent},
		jolt_v2 => $jolt_v2,
	};
	bless $self, $class;
	
	return $self->_gather_results($params) if $gather_results;
	
	die "Unimplemented";  # $gather_results 0
}


sub _gather_results {
	my ($self, $params) = @_;
	
	my $error = 'Neo4j::Error';
	my @results = ();
	my $columns = undef;
	my @data = ();
	$self->{result} = {};
	my ($state, $prev) = (0, 'in first place');
	my ($type, $event);
	while ( ($type, $event) = $self->_next_event ) {
		if ($type eq 'header') {  # StatementStartEvent
			croak "Jolt error: unexpected header event $prev" unless $state == 0 || $state == 3;
			croak "Jolt error: expected reference to HASH, received " . (ref $event ? "reference to " . ref $event : "scalar") . " in $type event $prev" unless ref $event eq 'HASH';
			$state = 1;
			$columns = $event->{fields};
		}
		elsif ($type eq 'data') {  # RecordEvent
			croak "Jolt error: unexpected data event $prev" unless $state == 1 || $state == 2;
			croak "Jolt error: expected reference to ARRAY, received " . (ref $event ? "reference to " . ref $event : "scalar") . " in $type event $prev" unless ref $event eq 'ARRAY';
			$state = 2;
			push @data, { row => $event };
		}
		elsif ($type eq 'summary') {  # StatementEndEvent
			croak "Jolt error: unexpected summary event $prev" unless $state == 1 || $state == 2;
			croak "Jolt error: expected reference to HASH, received " . (ref $event ? "reference to " . ref $event : "scalar") . " in $type event $prev" unless ref $event eq 'HASH';
			$state = 3;
			push @results, {
				data => [@data],
				stats => $event->{stats},
				plan => $event->{plan},
				columns => $columns,
			};
			@data = ();
			$columns = undef;
		}
		elsif ($type eq 'info') {  # TransactionInfoEvent
			croak "Jolt error: unexpected info event $prev" unless $state == 0 || $state == 3 || $state == 4;
			croak "Jolt error: expected reference to HASH, received " . (ref $event ? "reference to " . ref $event : "scalar") . " in $type event $prev" unless ref $event eq 'HASH';
			$state += 10;
			$self->{info} = $event;
			$self->{notifications} = $event->{notifications};
		}
		elsif ($type eq 'error') {  # FailureEvent
			# If a rollback caused by a failure fails as well,
			# two failure events may appear on the Jolt stream.
			# Otherwise, there is always one at most.
			croak "Jolt error: expected reference to HASH, received " . (ref $event ? "reference to " . ref $event : "scalar") . " in $type event $prev" unless ref $event eq 'HASH';
			$state = 4;
			$error = $error->append_new(Internal => "Jolt error: Jolt $type event with 0 errors $prev") unless @{$event->{errors}};
			$error = $error->append_new(Server => $_) for @{$event->{errors}};
		}
		else {
			croak "Jolt error: unsupported $type event $prev";
		}
		$prev = "after $type event";
	}
	croak "Jolt error: unexpected end of event stream $prev" unless $state >= 10;
	
	if (! $params->{http_header}->{success}) {
		$error = $error->append_new(Network => {
			code => $params->{http_header}->{status},
			as_string => sprintf("HTTP error: %s %s on %s to %s", $params->{http_header}->{status}, $params->{http_agent}->http_reason, $params->{http_method}, $params->{http_path}),
		});
	}
	
	$self->{info}->{_error} = $error if ref $error;
	$self->{http_agent} = undef;
	
	if (@results == 1) {
		$self->{result} = $results[0];
		$self->{query} = $params->{queries}->[0];
		return $self->_as_fully_buffered;
	}
	
	# If the number of Cypher queries run wasn't exactly one, provide a list
	# of all results so that callers get a uniform interface for all of them.
	@results = map { __PACKAGE__->_new_result($_, undef, $params) } @results;
	$results[$_]->{query} = $params->{queries}->[$_] for (0 .. $#results);
	$self->{attached} = 0;
	$self->{exhausted} = 1;
	$self->{result_list} = \@results if @results;
	return $self;
}


sub _new_result {
	my ($class, $result, $json, $params) = @_;
	
	my $self = {
		attached => 0,   # 1: unbuffered records may exist on the stream
		exhausted => 0,  # 1: all records read by the client; fetch() will fail
		result => $result,
		buffer => [],
		field_names_cache => undef,
		summary => undef,
		server_info => $params->{server_info},
		jolt_v2 => $params->{jolt_v2},
	};
	bless $self, $class;
	
	return $self->_as_fully_buffered;
}


sub _next_event {
	my ($self) = @_;
	
	my $line = $self->{http_agent}->fetch_event;
	return unless length $line;
	
	my $json = $self->{json_coder}->decode($line);
	
	my @events = keys %$json;
	croak "Jolt error: expected exactly 1 event, received " . scalar @events unless @events == 1;
	
	return ( $events[0], $json->{$events[0]} );
}


# Return the full list of results this object represents.
sub _results {
	my ($self) = @_;
	
	return @{ $self->{result_list} } if $self->{result_list};
	return ($self);
}


# Return transaction status information (if available).
sub _info {
	my ($self) = @_;
	return $self->{info};
}


# Bless and initialise the given reference as a Record.
sub _init_record {
	my ($self, $record) = @_;
	
	$record->{field_names_cache} = $self->{field_names_cache};
	$self->_deep_bless( $record->{row} );
	return bless $record, 'Neo4j::Driver::Record';
}


sub _deep_bless {
	my ($self, $data) = @_;
	
	if (JSON::MaybeXS::is_bool $data) {  # Boolean (sparse)
		return $data ? $TRUE : $FALSE;
	}
	if (ref $data eq 'ARRAY') {  # List (sparse)
		$_ = $self->_deep_bless($_) for @$data;
		return $data;
	}
	if (ref $data eq '') {  # Null or Integer (sparse) or String (sparse)
		return $data;
	}
	
	die "Assertion failed: sigil count: " . scalar keys %$data if scalar keys %$data != 1;
	my $sigil = (keys %$data)[0];
	my $value = $data->{$sigil};
	
	if ($sigil eq '?') {  # Boolean (strict)
		return $TRUE  if $value eq 'true';
		return $FALSE if $value eq 'false';
		die "Assertion failed: unexpected bool value: " . $value;
	}
	if ($sigil eq 'Z') {  # Integer (strict)
		return 0 + $value;
	}
	if ($sigil eq 'R') {  # Float
		return 0 + $value;
	}
	if ($sigil eq 'U') {  # String (strict)
		return $value;
	}
	if ($sigil eq '[]') {  # List (strict)
		$_ = $self->_deep_bless($_) for @$value;
		return $value;
	}
	if ($sigil eq '{}') {  # Map
		$_ = $self->_deep_bless($_) for values %$value;
		return $value;
	}
	if ($sigil eq '()') {  # Node
		die "Assertion failed: unexpected node fields: " . scalar @$value unless @$value == 3;
		$_ = $self->_deep_bless($_) for values %{ $value->[2] };
		return bless $value, $CYPHER_TYPES[ $self->{jolt_v2} ]->{node};
	}
	if ($sigil eq '->' || $sigil eq '<-') {  # Relationship
		die "Assertion failed: unexpected rel fields: " . scalar @$value unless @$value == 5;
		$_ = $self->_deep_bless($_) for values %{ $value->[4] };
		@{$value}[ 3, 1 ] = @{$value}[ 1, 3 ] if $sigil eq '<-';
		return bless $value, $CYPHER_TYPES[ $self->{jolt_v2} ]->{relationship};
	}
	if ($sigil eq '..') {  # Path
		die "Assertion failed: unexpected path fields: " . scalar @$value unless @$value & 1;
		$_ = $self->_deep_bless($_) for @$value;
		return bless $data, 'Neo4j::Driver::Type::Path';
	}
	if ($sigil eq '@') {  # Spatial
		return bless $data, 'Neo4j::Driver::Type::Point';
	}
	if ($sigil eq 'T') {  # Temporal
		return bless $data, $value =~ m/^-?P/
			? 'Neo4j::Driver::Type::Duration'
			: 'Neo4j::Driver::Type::DateTime';
	}
	if ($sigil eq '#') {  # Bytes
		$value =~ tr/ //d;  # spaces were allowed in the Jolt draft, but aren't actually implemented in Neo4j 4.2's jolt.JoltModule
		$value = pack 'H*', $value;  # see neo4j#12660
		return bless \$value, 'Neo4j::Driver::Type::Bytes';
	}
	
	die "Assertion failed: unexpected sigil: " . $sigil;
	
}


sub _accept_header {
	my (undef, $want_jolt, $method) = @_;
	
	return unless $method eq 'POST';  # work around Neo4j HTTP Content Negotiation bug #12644
	
	if (defined $want_jolt) {
		return if ! $want_jolt;
		return ($ACCEPT_HEADER_V1) if $want_jolt eq 'v1';
		return ($ACCEPT_HEADER_STRICT) if $want_jolt eq 'strict';
		return ($ACCEPT_HEADER_SPARSE) if $want_jolt eq 'sparse';
		return ($ACCEPT_HEADER_NDJSON) if $want_jolt eq 'ndjson';
	}
	return ($ACCEPT_HEADER);
}


sub _acceptable {
	my (undef, $content_type) = @_;
	
	return $content_type =~ m/^\Q$MEDIA_TYPE\E\b/i;
}


1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.