Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package org.springframework.cloud.schema.registry.controllers;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.springframework.cloud.schema.registry.config.SchemaServerProperties;
import org.springframework.cloud.schema.registry.model.Schema;
Expand All @@ -38,6 +41,7 @@
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
Expand All @@ -63,6 +67,10 @@ public class ServerController {

private final SchemaServerProperties schemaServerProperties;

private final String schemaReferenceRegex = "(?<subject>.*)\\+v(?<version>\\d+)";

private final Pattern schemaReferencePattern = Pattern.compile(schemaReferenceRegex);

public ServerController(SchemaRepository repository,
Map<String, SchemaValidator> validators,
SchemaServerProperties schemaServerProperties) {
Expand All @@ -74,7 +82,9 @@ public ServerController(SchemaRepository repository,
}

@RequestMapping(method = RequestMethod.POST, path = "/", consumes = "application/json", produces = "application/json")
public synchronized ResponseEntity<Schema> register(@RequestBody Schema schema, UriComponentsBuilder builder) {
public synchronized ResponseEntity<Schema> register(
@RequestHeader(value = "Schema-Reference", required = false) String schemaReferenceHeader,
@RequestBody Schema schema, UriComponentsBuilder builder) {

SchemaValidator validator = this.validators.get(schema.getFormat());

Expand All @@ -84,7 +94,14 @@ public synchronized ResponseEntity<Schema> register(@RequestBody Schema schema,
.collectionToCommaDelimitedString(this.validators.keySet())));
}

validator.validate(schema.getDefinition());
List<Schema> schemaReferences = null;

if (schemaReferenceHeader != null) {
schemaReferences = schemaReferenceResolver(schemaReferenceHeader, schema.getFormat());
schema.setReferences(schemaReferences);
}

validator.validate(schema.getDefinition(), schemaReferences);

Schema result;
List<Schema> registeredEntities = this.repository
Expand All @@ -95,7 +112,7 @@ public synchronized ResponseEntity<Schema> register(@RequestBody Schema schema,
result = this.repository.save(schema);
}
else {
result = validator.match(registeredEntities, schema.getDefinition());
result = validator.match(registeredEntities, schema.getDefinition(), schemaReferences);
if (result == null) {
schema.setVersion(
registeredEntities.get(registeredEntities.size() - 1).getVersion()
Expand Down Expand Up @@ -262,6 +279,13 @@ public String onInvalidSchema(InvalidSchemaException e) {
return errorMessage("Invalid Schema", e);
}

@ExceptionHandler(NumberFormatException.class)
@ResponseStatus(HttpStatus.BAD_REQUEST)
@ResponseBody
public String onInvalidVersion(NumberFormatException ex) {
return errorMessage("Version should be a numeric value", ex);
}

@ExceptionHandler(SchemaNotFoundException.class)
@ResponseStatus(HttpStatus.NOT_FOUND)
@ResponseBody
Expand All @@ -276,6 +300,39 @@ public String schemaDeletionNotPermitted(SchemaDeletionNotAllowedException ex) {
return errorMessage("Schema deletion is not permitted", ex);
}

private List<Schema> schemaReferenceResolver(String schemaReferenceHeader, String format) {
List<Schema> schemaReferences = new ArrayList<>();
Schema schemaReference;
String subject;
Integer version;
Matcher schemaReferenceMatcher;
for (String schemaReferenceEntry : schemaReferenceHeader.split(";")) {
Copy link
Copy Markdown
Author

@yeralin yeralin Mar 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method needs some refactoring.

Can you advice any "springy" way of doing schema reference resolution (header parsing, schema fetching, etc)?

schemaReferenceMatcher = schemaReferencePattern.matcher(schemaReferenceEntry);
if (schemaReferenceMatcher.find()) {
subject = schemaReferenceMatcher.group("subject");
version = Integer.parseInt(schemaReferenceMatcher.group("version"));
schemaReference = this.repository.findOneBySubjectAndFormatAndVersion(subject, format, version);
if (schemaReference == null) {
throw new SchemaNotFoundException(
String.format("Could not find Schema by subject: %s, format: %s, version %s",
subject, format, version));
}
}
else {
subject = schemaReferenceEntry;
List<Schema> registeredEntities = this.repository.findBySubjectAndFormatOrderByVersion(subject, format);
if (registeredEntities.isEmpty()) {
throw new SchemaNotFoundException(
String.format("Could not find Schema by subject: %s, format: %s",
subject, format));
}
schemaReference = registeredEntities.get(registeredEntities.size() - 1);
}
schemaReferences.add(schemaReference);
}
return schemaReferences;
}

private String errorMessage(String prefix, Throwable e) {
return prefix + (StringUtils.hasText(e.getMessage()) ? ": " + e.getMessage() : "");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@

package org.springframework.cloud.schema.registry.model;

import java.util.ArrayList;
import java.util.List;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.Lob;
import javax.persistence.ManyToMany;
import javax.persistence.Table;

import com.fasterxml.jackson.annotation.JsonIdentityInfo;
import com.fasterxml.jackson.annotation.JsonIdentityReference;
import com.fasterxml.jackson.annotation.ObjectIdGenerators;

/**
* @author Vinicius Carvalho
*
Expand All @@ -46,8 +53,12 @@ public class Schema {
@Column(name = "FORMAT", nullable = false)
private String format;

@Lob
@Column(name = "DEFINITION", nullable = false, length = 8192)
@JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "subject")
@JsonIdentityReference(alwaysAsId = true)
@ManyToMany
private List<Schema> references = new ArrayList<>();
Copy link
Copy Markdown
Author

@yeralin yeralin Mar 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another issue I stumbled upon, since now instead of returning all Schema references, I only return their subject names i.e.:

{
    "subject": "test",
    "id": 1,
    "version": 1,
    "format": "avro",
    "references": [
        "subject.reference"
    ],
    "definition": "..."
}

as opposed to:

{
    "subject": "test",
    "id": 1,
    "version": 1,
    "format": "avro",
    "references": [
        {
                "subject": "subject.reference",
                "id": 2,
                 ...
        }
    ],
    "definition": "..."
}

This however, now breaking tests since MappingJackson2HttpMessageConverter's object mapper is trying to map REST's response payload directly to Schema.class where Schema.references is List<Schema> not List<String>.

I am not entirely sure how to solve this for now.

  1. Write an interceptor to fetch referenced schemas during serialization
  2. Map REST's response payload to String, then perform manual parsing
  3. Somehow deactivate @Json* annotations under Schema.class during tests, so that the latter payload format gets returned

What do you think?


@Column(name = "DEFINITION", columnDefinition = "text", nullable = false, length = 8192)
Copy link
Copy Markdown
Author

@yeralin yeralin Mar 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed @Lob annotation as it was creating problems while fetching references field during schema validation by throwing:

org.postgresql.util.PSQLException: Large Objects may not be used in auto-commit mode.

when using PostgreSQL.

It was replaced by columnDefinition = "test" and effectively creates the same type of column for the definition field.

References:
https://shred.zone/cilla/page/299/string-lobs-on-postgresql-with-hibernate-36.html

private String definition;

public Integer getId() {
Expand Down Expand Up @@ -82,6 +93,22 @@ public void setFormat(String format) {
this.format = format;
}

public List<Schema> getReferences() {
return this.references;
}

public void setReferences(List<Schema> references) {
this.references = references;
}

public void addReference(Schema schemaReference) {
this.references.add(schemaReference);
}

public void removeReference(Schema schemaReference) {
this.references.remove(schemaReference);
}

public String getDefinition() {
return this.definition;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.List;

import org.apache.avro.Schema.Parser;
import org.apache.avro.SchemaParseException;

import org.springframework.cloud.schema.registry.model.Compatibility;
Expand All @@ -34,11 +35,25 @@ public class AvroSchemaValidator implements SchemaValidator {
*/
public static final String AVRO_FORMAT = "avro";

private Parser parseReferences(Parser parser, List<Schema> schemaReferences) {
for (Schema schemaReference : schemaReferences) {
if (!schemaReference.getReferences().isEmpty()) {
parser = parseReferences(parser, schemaReference.getReferences());
}
parser.parse(schemaReference.getDefinition());
}
return parser;
}

@Override
public boolean isValid(String definition) {
public boolean isValid(String definition, List<Schema> schemaReferences) {
boolean result = true;
try {
new org.apache.avro.Schema.Parser().parse(definition);
Parser avroParser = new Parser();
if (schemaReferences != null) {
avroParser = parseReferences(avroParser, schemaReferences);
}
avroParser.parse(definition);
}
catch (SchemaParseException ex) {
result = false;
Expand All @@ -47,9 +62,13 @@ public boolean isValid(String definition) {
}

@Override
public void validate(String definition) {
public void validate(String definition, List<Schema> schemaReferences) {
try {
new org.apache.avro.Schema.Parser().parse(definition);
Parser avroParser = new Parser();
if (schemaReferences != null) {
avroParser = parseReferences(avroParser, schemaReferences);
}
avroParser.parse(definition);
}
catch (SchemaParseException ex) {
throw new InvalidSchemaException((ex.getMessage()));
Expand All @@ -62,11 +81,19 @@ public Compatibility compatibilityCheck(String source, String other) {
}

@Override
public Schema match(List<Schema> schemas, String definition) {
public Schema match(List<Schema> schemas, String definition, List<Schema> schemaReferences) {
Schema result = null;
org.apache.avro.Schema source = new org.apache.avro.Schema.Parser().parse(definition);
Parser avroParser = new Parser();
if (schemaReferences != null) {
avroParser = parseReferences(avroParser, schemaReferences);
}
org.apache.avro.Schema source = avroParser.parse(definition);
for (Schema s : schemas) {
org.apache.avro.Schema target = new org.apache.avro.Schema.Parser().parse(s.getDefinition());
avroParser = new Parser();
if (!s.getReferences().isEmpty()) {
avroParser = parseReferences(avroParser, s.getReferences());
}
org.apache.avro.Schema target = avroParser.parse(s.getDefinition());
if (target.equals(source)) {
result = s;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,19 @@ public interface SchemaValidator {
/**
* Verifies if a definition is a valid schema.
* @param definition - The textual representation of the schema file
* @param schemaReferences - list of schemas that are referenced inside the definition
* @return true if valid, false otherwise
*/
boolean isValid(String definition);
boolean isValid(String definition, List<Schema> schemaReferences);

/**
* Validates a schema definition and throws an {@link InvalidSchemaException} when the schema is invalid.
* The exception is expected to have the violation description.
* @param definition - The textual representation of the schema file
* @param schemaReferences - list of schemas that are referenced inside the definition
*/
default void validate(String definition) {
if (!this.isValid(definition)) {
default void validate(String definition, List<Schema> schemaReferences) {
if (!this.isValid(definition, schemaReferences)) {
throw new InvalidSchemaException("Invalid Schema");
}
}
Expand All @@ -61,9 +63,10 @@ default void validate(String definition) {
* Return the Schema that is represented by the definition.
* @param schemas List of schemas to be tested
* @param definition Textual representation of the schema
* @param schemaReferences List of schemas that are referenced inside the definition
* @return A full Schema object with identifier and subject properties
*/
Schema match(List<Schema> schemas, String definition);
Schema match(List<Schema> schemas, String definition, List<Schema> schemaReferences);

String getFormat();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.springframework.cloud.schema.registry.support.SchemaNotFoundException;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.io.DefaultResourceLoader;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
Expand Down Expand Up @@ -84,14 +85,16 @@ public class SchemaRegistryServerAvroTests {
private static final org.apache.avro.Schema AVRO_USER_AVRO_SCHEMA_V2 = new Parser()
.parse(resourceToString("classpath:/avro_user_definition_schema_v2.json"));

private static final String AVRO_USER_SCHEMA_DEFAULT_NAME_STRATEGY_SUBJECT = AVRO_USER_AVRO_SCHEMA_V1.getName()
private static final String AVRO_USER_SCHEMA_DEFAULT_NAME_STRATEGY_SUBJECT = AVRO_USER_AVRO_SCHEMA_V1
.getName()
.toLowerCase();


private static final String AVRO_USER_SCHEMA_QUALIFED_NAME_STRATEGY_SUBJECT = AVRO_USER_AVRO_SCHEMA_V1
.getFullName()
.toLowerCase();

private static final String AVRO_REFERENCE_SCHEMA_QUALIFIED_NAME_STRATEGY_SUBJECT = "reference";

private static final Schema AVRO_USER_REGISTRY_SCHEMA_V1 = toSchema(
AVRO_USER_SCHEMA_DEFAULT_NAME_STRATEGY_SUBJECT,
AVRO_FORMAT_NAME, AVRO_USER_AVRO_SCHEMA_V1.toString());
Expand All @@ -100,6 +103,11 @@ public class SchemaRegistryServerAvroTests {
AVRO_USER_SCHEMA_DEFAULT_NAME_STRATEGY_SUBJECT,
AVRO_FORMAT_NAME, AVRO_USER_AVRO_SCHEMA_V2.toString());

// Does not use Parser because otherwise will unwrap user reference
private static final Schema AVRO_REFERENCE_REGISTRY_SCHEMA = toSchema(
AVRO_REFERENCE_SCHEMA_QUALIFIED_NAME_STRATEGY_SUBJECT,
AVRO_FORMAT_NAME, resourceToString("classpath:/avro_reference_definition_schema.json"));

private static final Schema AAVRO_USER_REGISTRY_SCHEMA_V1_WITH_QUAL_SUBJECT = toSchema(
AVRO_USER_SCHEMA_QUALIFED_NAME_STRATEGY_SUBJECT,
AVRO_FORMAT_NAME, AVRO_USER_AVRO_SCHEMA_V1.toString());
Expand Down Expand Up @@ -234,6 +242,36 @@ public void testUserSchemaV2() {
registerSchemasAndAssertSuccess(AVRO_USER_REGISTRY_SCHEMA_V1, AVRO_USER_REGISTRY_SCHEMA_V2);
}

@Test
public void testReferenceSchemaWithoutVersion() {
registerSchemaAndAssertSuccess(AVRO_USER_REGISTRY_SCHEMA_V1, 1, 1);
HttpHeaders headers = new HttpHeaders();
headers.add("Schema-Reference", AVRO_USER_REGISTRY_SCHEMA_V1.getSubject());
registerSchemaAndAssertSuccess(AVRO_REFERENCE_REGISTRY_SCHEMA, 1, 2, headers);
}

@Test
public void testReferenceSchemaWithVersion() {
registerSchemasAndAssertSuccess(AVRO_USER_REGISTRY_SCHEMA_V1, AVRO_USER_REGISTRY_SCHEMA_V2);
HttpHeaders headers = new HttpHeaders();
headers.add("Schema-Reference",
String.format("%s+v%d", AVRO_USER_REGISTRY_SCHEMA_V2.getSubject(), 2));
registerSchemaAndAssertSuccess(AVRO_REFERENCE_REGISTRY_SCHEMA, 1, 3, headers);
}

@Test
public void testUndefinedSchemaReference() {
try {
this.client.postForEntity(this.serverControllerUri, AVRO_REFERENCE_REGISTRY_SCHEMA, Schema.class);
fail("Expects: " + HttpStatus.BAD_REQUEST + " error");
}
catch (HttpClientErrorException.BadRequest badRequest) {
assertThat(badRequest.getMessage())
.isEqualTo("400 : [Invalid Schema: \"example.avro.User\" is not a defined name. The type of the \"user\" field must be a defined name or a {\"type\": ...} expression.]");
}

}

@Test
public void testIdempotentRegistration() {

Expand Down Expand Up @@ -557,9 +595,19 @@ private Map<String, Map<String, List<ResponseEntity<Schema>>>> registerSchemasAn
private ResponseEntity<Schema> registerSchemaAndAssertSuccess(@NonNull Schema schema,
@Nullable Integer expectedVersion,
@Nullable Integer expectedId) {
return registerSchemaAndAssertSuccess(schema, expectedVersion, expectedId, new HttpHeaders());
}

@NonNull
private ResponseEntity<Schema> registerSchemaAndAssertSuccess(@NonNull Schema schema,
@Nullable Integer expectedVersion,
@Nullable Integer expectedId,
@Nullable HttpHeaders requestHeaders) {

HttpEntity<Schema> httpEntity = new HttpEntity<>(schema, requestHeaders);

ResponseEntity<Schema> registerReponse = this.client
.postForEntity(this.serverControllerUri, schema, Schema.class);
.postForEntity(this.serverControllerUri, httpEntity, Schema.class);

HttpStatus statusCode = registerReponse.getStatusCode();
assertThat(statusCode.is2xxSuccessful()).isTrue();
Expand Down
Loading