Group
Extension

Confluent-SchemaRegistry/lib/Confluent/SchemaRegistry.pm

package Confluent::SchemaRegistry;

=head1 NAME

Confluent::SchemaRegistry - A simple client for interacting with B<Confluent Schema Registry>.

=head1 SYNOPSIS

 use Confluent::SchemaRegistry;

 my $sr = Confluent::SchemaRegistry->new( { host => 'https://my-schema-registry.org' } );

=head1 DESCRIPTION

C<Confluent::SchemaRegistry> provides a simple way to interact with B<Confluent Schema Registry>
(L<https://docs.confluent.io/current/schema-registry/docs/index.html>) enabling writing into
B<Apache Kafka> (L<https://kafka.apache.org/>) according to I<Apache Avro> schema specification
(L<https://avro.apache.org/>).

=head2 HEAD UP

=over 4

=item Confluent Schema Registry documentation

Full RESTful API documentation of B<Schema Registry> is available here: 
L<https://docs.confluent.io/current/schema-registry/docs/api.html?_ga=2.234767710.1188695207.1526911788-1213051144.1524553242#>

=item Avro package

B<Avro> package is a dependency of I<Confluent::SchemaRegistry> but is not available in CPAN index.
Perhaps you may find and download it directly from GitHub repository at L<https://github.com/apache/avro/tree/master/lang/perl>.
Please, refer its documentation for installation.

=back


=cut

use 5.010;
use strict;
use warnings;

use JSON::XS;
use REST::Client;
use HTTP::Status qw/:is/;
use Try::Tiny;
use Aspect;
use Avro::Schema;


use version 0.77; our $VERSION = version->declare('v1.0.0');

our $COMPATIBILITY_LEVELS = [ qw/NONE FULL FORWARD BACKWARD/ ];


=head1 INSTALL

Installation of C<Kafka::Consumer::Avro> is a canonical:

  perl Makefile.PL
  make
  make test
  make install

=head2 TEST NOTES

Tests expect that in the target host is available Schema Registry listening on C<http://localhost:8081>, otherwise most of the test are skipped.

You can alternatively set a different URL by exporting C<CONFLUENT_SCHEMA_REGISTY_URL> environment variable.

=head1 USAGE

=head2 Constructor

=head3 new( [%config] )

Construct a new C<Confluent::SchemaRegistry>. Takes an optional hash that provides
configuration flags for the L<REST::Client> internal object.

The config flags, according to C<REST::Client::new> specs, are:

=over 4

=item host

The host at which I<Schema Registry> is listening.

The default is L<http://localhost:8081>

=item timeout

A timeout in seconds for requests made with the client.  After the timeout the
client will return a 500.

The default is 5 minutes.

=item cert

The path to a X509 certificate file to be used for client authentication.

The default is to not use a certificate/key pair.

=item key

The path to a X509 key file to be used for client authentication.

The default is to not use a certificate/key pair.

=item ca

The path to a certificate authority file to be used to verify host
certificates.

The default is to not use a certificates authority.

=item pkcs12

The path to a PKCS12 certificate to be used for client authentication.

=item pkcs12password

The password for the PKCS12 certificate specified with 'pkcs12'.

=item follow

Boolean that determins whether REST::Client attempts to automatically follow
redirects/authentication.

The default is false.

=item useragent

An L<LWP::UserAgent> object, ready to make http requests.

REST::Client will provide a default for you if you do not set this.

=back

=cut

sub new {
    my $this  = shift;
    my $class = ref($this) || $this;
	my %config = @_;
	my $self = {};

	# Creazione client REST
	$config{host} = 'http://localhost:8081' unless defined $config{host};
	$self->{_CLIENT} = REST::Client->new( %config );
	$self->{_CLIENT}->addHeader('Content-Type', 'application/vnd.schemaregistry.v1+json');
	$self->{_CLIENT}->{_ERROR}    = undef; # will be set in case of unsuccessfully responses
	$self->{_CLIENT}->{_RESPONSE} = undef; # will be set with normalized response contents

	$self = bless($self, $class);

	# Recupero la configurazione globale del registry per testare se le coordinate fanno
	# effettivamente riferimento ad un Confluent Schema Registry
	my $res = $self->get_top_level_config();
	return undef
		unless defined($res) && grep(/^$res$/, @$COMPATIBILITY_LEVELS);
		
	return $self;
}


#
# BEGIN Using Aspect to simplify error handling
#
my $rest_client_calls = qr/^REST::Client::(GET|PUT|POST|DELETE)/;

# Clear internal error and response before every REST::Client call
before {
	$_->self->{_RESPONSE} = undef;
	$_->self->{_ERROR} = undef;
	#print STDERR 'Calling ' . $_->sub_name . ' : ';
} call $rest_client_calls;

# Verify if REST calls are successfull 
after {
	if (is_success($_->self->responseCode())) {
		$_->self->{_RESPONSE} = _normalize_content( $_->self->responseContent() );
		$_->return_value(1); # success
	} else {
		$_->self->{_ERROR} = _normalize_content( $_->self->responseContent() );
		$_->return_value(0); # failure
	}
	#print STDERR $_->self->responseCode() . "\n";
} call $rest_client_calls;

#
# END Aspect
#



##############################################################################################
# CLASS METHODS
#

sub _normalize_content { 
	my $res = shift;
	return undef
		unless defined($res);
	return $res
		if ref($res) eq 'HASH';
	return try {
		decode_json($res);
	} catch {
		$res;
	}
} 

sub _encode_error {
	{ 
		error_code => $_[0],
		message => $_[1]
	}
}

##############################################################################################
# PRIVATE METHODS
#
 
sub _client       { $_[0]->{_CLIENT}                                      } # RESTful client
sub _set_error    { $_[0]->_client->{_ERROR} = _normalize_content($_[1]); } # get internal error
sub _get_error    { $_[0]->_client->{_ERROR}                              } # get internal error
sub _get_response { $_[0]->_client->{_RESPONSE}                           } # return http response



##############################################################################################
# PUBLIC METHODS
#
 
=head2 METHODS

C<Confluent::SchemRegistry> exposes the following methods.

=cut


=head3 get_response_content()

Returns the body (content) of the last method call to Schema Registry.

=cut

sub get_response_content { $_[0]->_get_response() }
	

=head3 get_error()

Returns the error structure of the last method call to Schema Registry.

=cut

sub get_error { $_[0]->_get_error() }
	

=head3 add_schema( %params )

Registers a new schema version under a subject.

Returns the generated id for the new schema or C<undef>.

Params keys are:

=over 4

=item SUBJECT ($scalar)

the name of the Kafka topic

=item TYPE ($scalar)

the type of schema ("key" or "value")

=item SCHEMA ($hashref or $json)

the schema to add

=back

=cut

sub add_schema {
	my $self = shift;
	my %params = @_;
	return undef
		unless	defined($params{SUBJECT})
				&& defined($params{TYPE})
				&& $params{SUBJECT} =~ m/^.+$/
				&& $params{TYPE} =~ m/^key|value$/;
	return undef
		unless	defined($params{SCHEMA});
	my $schema = _normalize_content($params{SCHEMA});
	$schema = encode_json({
		schema => encode_json($schema)
	});
	return $self->_get_response()->{id}
		if $self->_client()->POST('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE} . '/versions', $schema);
	return undef;
}


# List all the registered subjects
#
# Returns the list of subjects (ARRAY) or C<undef>
sub get_subjects {
	my $self = shift;
	$self->_client()->GET('/subjects');
	return $self->_get_response();
}


# Delete a subject
#
# SUBJECT...: the name of the Kafka topic
# TYPE......: the type of schema ("key" or "value")
#
# Returns the list of versions for the deleted subject or C<undef>
sub delete_subject {
	my $self = shift;
	my %params = @_;
	return undef
		unless	defined($params{SUBJECT})
				&& defined($params{TYPE})
				&& $params{SUBJECT} =~ m/^.+$/
				&& $params{TYPE} =~ m/^key|value$/;
	$self->_client()->DELETE('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE});
	return $self->_get_response()
}


# List the schema versions registered under a subject
#
# SUBJECT...: the name of the Kafka topic
# TYPE......: the type of schema ("key" or "value")
#
# Returns the list of schema versions (ARRAY)
sub get_schema_versions {
	my $self = shift;
	my %params = @_;
	return undef
		unless	defined($params{SUBJECT})
				&& defined($params{TYPE})
				&& $params{SUBJECT} =~ m/^.+$/
				&& $params{TYPE} =~ m/^key|value$/;
	$self->_client()->GET('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE} . '/versions');
	return $self->_get_response();
}


# Fetch a schema by globally unique id
#
# SCHEMA_ID...: the globally unique id of the schema
#
# Returns the schema in Avro::Schema format or C<undef>
sub get_schema_by_id {
	my $self = shift;
	my %params = @_;
	return undef
		unless	defined($params{SCHEMA_ID})
				&& $params{SCHEMA_ID} =~ m/^\d+$/;
	if ( $self->_client()->GET('/schemas/ids/' . $params{SCHEMA_ID})) {
		if (exists $self->_get_response()->{schema}) {
			my $avro_schema;
			try {
				$avro_schema = Avro::Schema->parse($self->_get_response()->{schema});
			} catch {
				$self->_set_error( _encode_error(-2, $_->{'-text'}) );
			};
			return $avro_schema;
		}
	}
	return undef;
}


# Fetch a specific version of the schema registered under a subject
#
# SUBJECT...: the name of the Kafka topic
# TYPE......: the type of schema ("key" or "value")
# VERSION...: the schema version to fetch; if omitted the latest version is fetched
#
# Returns the schema in Avro::Schema format or C<undef>
sub get_schema {
	my $self = shift;
	my %params = @_;
	return undef
		unless	defined($params{SUBJECT})
				&& defined($params{TYPE})
				&& $params{SUBJECT} =~ m/^.+$/
				&& $params{TYPE} =~ m/^key|value$/;
	return undef
		if	defined($params{VERSION})
			&& $params{VERSION} !~ m/^\d+$/;
	$params{VERSION} = 'latest' unless defined($params{VERSION});
	if ($self->_client()->GET('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE} . '/versions/' . $params{VERSION})) {
		my $sv = $self->_get_response();
		if (exists $sv->{schema}) {
			try {
				$sv->{schema} = Avro::Schema->parse($sv->{schema});
			} catch {
				$self->_set_error( _encode_error(-2, $_->{'-text'}) );
				return undef;
			};
		}
		return $sv;
	}
	return undef;
}


# Delete a specific version of the schema registered under a subject
#
# SUBJECT...: the name of the Kafka topic
# TYPE......: the type of schema ("key" or "value")
# VERSION...: the schema version to delete
#
# Returns the deleted version number (NUMBER) or C<undef>
sub delete_schema {
	my $self = shift;
	my %params = @_;
	return undef
		unless	defined($params{SUBJECT})
				&& defined($params{TYPE})
				&& $params{SUBJECT} =~ m/^.+$/
				&& $params{TYPE} =~ m/^key|value$/;
	return undef
		unless	defined($params{VERSION})
				&& $params{VERSION} =~ m/^\d+$/;
	$self->_client()->DELETE('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE} . '/versions/' . $params{VERSION});
	return $self->_get_response();
}


# Delete all versions of the schema registered under subject "Kafka-value"
#
# SUBJECT...: the name of the Kafka topic
# TYPE......: the type of schema ("key" or "value")
#
# Returns the list of deleted versions or C<undef>
sub delete_all_schemas {
	my $self = shift;
	my %params = @_;
	return undef
		unless	defined($params{SUBJECT})
				&& defined($params{TYPE})
				&& $params{SUBJECT} =~ m/^.+$/
				&& $params{TYPE} =~ m/^key|value$/;
	$self->_client()->DELETE('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE});
	return $self->_get_response();
}


# Check whether the schema $SCHEMA has been registered under subject "${SUBJECT}-${TYPE}"
#
# SUBJECT...: the name of the Kafka topic
# TYPE......: the type of schema ("key" or "value")
# SCHEMA....: the schema (HASH or JSON) to check for
#
# If found, returns the schema info (HASH) otherwise C<undef>
sub check_schema {
	my $self = shift;
	my %params = @_;
	return undef
		unless	defined($params{SUBJECT})
				&& defined($params{TYPE})
				&& $params{SUBJECT} =~ m/^.+$/
				&& $params{TYPE} =~ m/^key|value$/;
	return undef
		unless	defined($params{SCHEMA});
	my $schema = _normalize_content($params{SCHEMA});
	$schema = encode_json({
		schema => encode_json($schema)
	});
	$self->_client()->POST('/subjects/' . $params{SUBJECT} . '-' . $params{TYPE}, $schema);
	my $schema_info = $self->_get_response();
	return undef
		unless $schema_info;
	$schema_info->{schema} = Avro::Schema->parse($schema_info->{schema});
	return $schema_info;
}


# Test compatibility of the schema $SCHEMA with the version $VERSION of the schema under subject "${SUBJECT}-${TYPE}"
#
# SUBJECT...: the name of the Kafka topic
# TYPE......: the type of schema ("key" or "value")
# VERSION...: the schema version to test; if omitted latest version is used
# SCHEMA....: the schema (HASH or JSON) to check for
#
# returns TRUE if the providied schema is compatible with the latest one (BOOLEAN)
sub test_schema {
	my $self = shift;
	my %params = @_;
	return undef
		unless	defined($params{SUBJECT})
				&& defined($params{TYPE})
				&& $params{SUBJECT} =~ m/^.+$/
				&& $params{TYPE} =~ m/^key|value$/;
	return undef
		if	defined($params{VERSION})
			&& $params{VERSION} !~ m/^\d+$/;
	$params{VERSION} = 'latest' unless defined($params{VERSION});
	return undef
		unless	defined($params{SCHEMA});
	my $schema = _normalize_content($params{SCHEMA});
	$schema = {
		schema => encode_json($schema)
	};
	$self->_client()->POST('/compatibility/subjects/' . $params{SUBJECT} . '-' . $params{TYPE} . '/versions/' . $params{VERSION}, encode_json($schema));
	return undef
		unless defined $self->_get_response();
	return $self->_get_response()->{is_compatible}
		if exists($self->_get_response()->{is_compatible});
	return undef;
}


# Get top level config
#
# Return top-level compatibility level or C<undef>
sub get_top_level_config {
	my $self = shift;
	return $self->_get_response()->{compatibilityLevel}
		if $self->_client()->GET('/config');
	return undef;
}


# Update compatibility requirements globally
# $ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
#     --data '{"compatibility": "NONE"}' \
#     http://localhost:8081/config
#   {"compatibility":"NONE"}
sub set_top_level_config {
	my $self = shift;
	my %params = @_;
	$self->_set_error( _encode_error(-1, 'Unexpected value for COMPATIBILITY_LEVEL param') )
		and return undef
			unless	defined($params{COMPATIBILITY_LEVEL})
				&& grep(/^$params{COMPATIBILITY_LEVEL}$/, @$COMPATIBILITY_LEVELS);
	$self->_client()->PUT('/config', encode_json( { compatibility => $params{COMPATIBILITY_LEVEL} } ));
	return $self->_get_response()->{compatibility}
		if defined $self->_get_response();
	return undef;
}


# Get compatibility requirements under the subject
# 
# Return compatibility level for the subject or C<undef>
sub get_config {
	my $self = shift;
	my %params = @_;
	return undef
		unless	defined($params{SUBJECT})
				&& defined($params{TYPE})
				&& $params{SUBJECT} =~ m/^.+$/
				&& $params{TYPE} =~ m/^key|value$/;
	return $self->_get_response()->{compatibilityLevel}
		if $self->_client()->GET('/config/' . $params{SUBJECT} . '-' . $params{TYPE});
	return undef;
}

# Update compatibility requirements under the subject
# 
# Return the new compatibility level for the subject or C<undef>
sub set_config {
	my $self = shift;
	my %params = @_;
	$self->_set_error( _encode_error(-1, 'Bad SUBJECT or TYPE parameter') )
		and return undef
			unless	defined($params{SUBJECT})
					&& defined($params{TYPE})
					&& $params{SUBJECT} =~ m/^.+$/
					&& $params{TYPE} =~ m/^key|value$/;
	$self->_set_error( _encode_error(-1, 'Unexpected value for COMPATIBILITY_LEVEL param') )
		and return undef
			unless	defined($params{COMPATIBILITY_LEVEL})
				&& grep(/^$params{COMPATIBILITY_LEVEL}$/, @$COMPATIBILITY_LEVELS);
	$self->_client()->PUT('/config/' . $params{SUBJECT} . '-' . $params{TYPE}, encode_json( { compatibility => $params{COMPATIBILITY_LEVEL} } ));
	return $self->_get_response()->{compatibility}
		if defined $self->_get_response();
	return undef;
}

=head1 TODO

...

=head1 AUTHOR

Alvaro Livraghi, E<lt>alvarol@cpan.orgE<gt>

=head1 CONTRIBUTE

L<https://github.com/alivraghi/Confluent-SchemaRegistry>

=head1 BUGS

Please use GitHub project link above to report problems or contact authors.

=head1 COPYRIGHT AND LICENSE

Copyright 2018 by Alvaro Livraghi

This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

=cut

1;


Powered by Groonga
Maintained by Kenichi Ishigaki <ishigaki@cpan.org>. If you find anything, submit it on GitHub.