Group
Extension

WWW-Shopify/lib/WWW/Shopify/GraphQL.pm

package WWW::Shopify::GraphQL;

use strict;
use warnings; 

use JSON qw(from_json decode_json encode_json);
use Encode;
use Scalar::Util qw(blessed);
use Data::Dumper;

sub new {
	my ($package, $sa) = @_;
	return bless {
		_sa => $sa
	}, $package;
}

sub sa { return $_[0]->{_sa}; }

sub encode_hash {
	my ($self, $object) = @_;
	return $object if !ref($object);
	return join(" ", map { $self->encode_hash($_) } @$object) if ref($object) eq 'ARRAY';
	return "" if int(keys(%$object)) == 0;
	die new WWW::Shopify::Exception("Invalid GraphQL hash: " . int(keys(%$object)) . " keys present.") unless int(keys(%$object)) == 1;
	my ($key) = keys(%$object);
	return "$key { " . $self->encode_hash($object->{$key}) . " }";
}

sub internal_query {
	my ($self, $hash, $json) = @_;
	my $request = HTTP::Request->new(POST => $self->sa->encode_url('/admin/api/' . $self->sa->api_version . '/graphql.json'));
	my $content = ref($hash) ? encode("UTF-8", $self->encode_hash({ query => $hash })) : encode("UTF-8", $hash);
	my $body = encode_json({ "query" => $content, "variables" => ($json || {}) });
	$request->content($body);
	$request->header('Content-Length' => length($body));
	$request->header('Accept' => 'application/json');
	$request->header('Content-Type' => 'application/json; charset=UTF-8');
	my $response = $self->sa->ua->request($request);
	print STDERR "POST " . $response->request->uri . "\n" if $ENV{'SHOPIFY_LOG'} && $ENV{'SHOPIFY_LOG'} == 1;
	print STDERR Dumper($response) if $ENV{'SHOPIFY_LOG'} && $ENV{'SHOPIFY_LOG'} > 1;
	# This is explicitly decode_json here. If not, the thing doesn't decode properly.
	$content = eval { decode_json($response->decoded_content) };
	if (!$response->is_success) {
		print STDERR Dumper($response) if $ENV{'SHOPIFY_LOG'} && $ENV{'SHOPIFY_LOG'} > 1;
		die WWW::Shopify::Exception::CallLimit->new($response) if $response->code() == 429;
		die WWW::Shopify::Exception::InvalidKey->new($response) if $response->code() == 401;
		die WWW::Shopify::Exception::NotFound->new($response) if $response->code() == 404;
		# I honestly can't believe this.
		die WWW::Shopify::Exception::QueryParamTooLong->new($response) if $content->{errors} && $content->{errors} =~ m/query param length is too/i;
		die WWW::Shopify::Exception->new($response);
	}
	if ($content && $content->{errors}) {		
		die WWW::Shopify::Exception::CallLimit->new($response) if int(grep { $_->{message} eq "Throttled" } @{$content->{errors}}) > 0;
		die WWW::Shopify::Exception::ExceededCost->new($response) if int(grep { $_->{extensions} && $_->{extensions}->{code} && $_->{extensions}->{code} eq 'MAX_COST_EXCEEDED' } @{$content->{errors}}) > 0;
		# Adding this in the hope tht they have reasonable error handling someday.
		die WWW::Shopify::Exception::QueryParamTooLong->new($response) if int(grep { $_->{message} =~ m/query param length is too/i } @{$content->{errors}}) > 0;
		die WWW::Shopify::Exception->new($response);
	}
	return $content->{data} unless wantarray;
	return ($response, $content->{data});
}

sub query {
	my ($self, $hash, $variables) = @_;
	
	my @result;
	while (int(@result) == 0) {
		eval {
			@result = $self->internal_query($hash, $variables);
		};
		if (my $exp = $@) {
			if ($self->sa->sleep_for_limit && blessed($exp) && $exp->isa('WWW::Shopify::Exception::CallLimit')) {
				sleep(1);
				next;
			}
			die $exp;
		}
	};
	return $result[1] unless wantarray;
	return @result;
}

use File::Temp qw(tempfile);
use List::Util qw(min max);
use IO::Handle;
use Fcntl qw(SEEK_SET SEEK_END SEEK_CUR);

sub cancel_bulk_operation {
	my ($self, $id) = @_;
	my ($response, $result) = $self->query('mutation bulkOperationCancel($id: ID!) { bulkOperationCancel(id: $id) { bulkOperation { status } userErrors { field message } } }', { id => $id });
	die WWW::Shopify::Exception->new($response) if int(@{$result->{bulkOperationCancel}->{userErrors}}) > 0;
	return 1;
}

sub current_bulk_operation {
	my ($self) = @_;
	my ($response, $result) = $self->query('query { currentBulkOperation { id status errorCode createdAt completedAt objectCount fileSize url partialDataUrl } }');
	return undef if !$result->{currentBulkOperation} || !$result->{currentBulkOperation}->{id};
	return $result->{currentBulkOperation};
}

sub bulk_operation_query {
	my ($self, $hash, $callback, $properties) = @_;
	my $content = ref($hash) ? $self->encode_hash({ query => $hash }) : $hash;
	my $query = "mutation { bulkOperationRunQuery(query: \"\"\"$content\"\"\") { bulkOperation { id status } userErrors { field message } } }";
	my ($response, $result) = $self->query($query);
	
	if ($properties && $properties->{cancel_running_operation} && $result->{bulkOperationRunQuery} && $result->{bulkOperationRunQuery}->{userErrors} && int(@{$result->{bulkOperationRunQuery}->{userErrors}}) > 0 && 
		$result->{bulkOperationRunQuery}->{userErrors}->[0]->{message} =~ m/already in progress/) {
		my $bulk_operation = $self->current_bulk_operation;
		die WWW::Shopify::Exception->new($response) unless $bulk_operation && $bulk_operation->{status} eq "RUNNING";
		$self->cancel_bulk_operation($bulk_operation->{id});
		my $max_wait = 10*60;
		my $start = DateTime->now;
		while ($bulk_operation->{status} ne "CANCELED" && DateTime->now->epoch - $start->epoch < $max_wait) {
			$bulk_operation = $self->current_bulk_operation;
			sleep(5);
		}
		($response, $result) = $self->query($query);
	}
	die WWW::Shopify::Exception->new($response) if !$result->{bulkOperationRunQuery} || !$result->{bulkOperationRunQuery}->{bulkOperation}->{status} || $result->{bulkOperationRunQuery}->{bulkOperation}->{status} ne "CREATED";
	while (1) {
		my ($response, $result) = $self->query('query { currentBulkOperation { id status errorCode createdAt completedAt objectCount fileSize url partialDataUrl } }');
		die WWW::Shopify::Exception->new($response) if $result->{currentBulkOperation}->{errorCode};
		if ($result->{currentBulkOperation}->{status} eq "COMPLETED") {
			my $fh = tempfile();
			$self->sa->ua->set_my_handler(response_data => sub {
				 my($response, $ua, $handler, $data) = @_;
				 print $fh $data;
			});
			my $response = $self->sa->ua->request(HTTP::Request->new(GET => $result->{currentBulkOperation}->{url}));
			$fh->flush;
			$self->sa->ua->set_my_handler(reponse_data => undef);
			my $chunk_size = 10*1024;
			my $buffer = "";
			my $chunk;
			seek($fh, -1, SEEK_END);
			my $cursor_position = tell($fh);
			while ($cursor_position > 0) {
				my $to_read = $chunk_size;
				if ($cursor_position >= $chunk_size) {
					$cursor_position -= $chunk_size;
				} else {
					$to_read = $cursor_position;
					$cursor_position = 0;
				}
				seek($fh, $cursor_position, SEEK_SET);
				my $size_read = read($fh, $chunk, $to_read);
				$buffer = $chunk . $buffer;
				while ((my $index = rindex($buffer, "\n")) != -1) {
					my $line = substr($buffer, $index, length($buffer) - $index + 1, "");
					$callback->(substr($line, 1)) if length($line) > 1;
				}
			}
			$callback->($buffer) if length($buffer) > 0;
			last;
		} else {
			die WWW::Shopify::Exception->new($response) if $result->{currentBulkOperation}->{status} !~ /(RUNNING|CREATED)/;
			sleep(1);
		}
	}
}

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.