diff --git a/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/controllers/ServerController.java b/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/controllers/ServerController.java index 82e8c05..fb8e95c 100644 --- a/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/controllers/ServerController.java +++ b/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/controllers/ServerController.java @@ -16,9 +16,12 @@ package org.springframework.cloud.schema.registry.controllers; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.springframework.cloud.schema.registry.config.SchemaServerProperties; import org.springframework.cloud.schema.registry.model.Schema; @@ -38,6 +41,7 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody; @@ -63,6 +67,10 @@ public class ServerController { private final SchemaServerProperties schemaServerProperties; + private final String schemaReferenceRegex = "(?.*)\\+v(?\\d+)"; + + private final Pattern schemaReferencePattern = Pattern.compile(schemaReferenceRegex); + public ServerController(SchemaRepository repository, Map validators, SchemaServerProperties schemaServerProperties) { @@ -74,7 +82,9 @@ public ServerController(SchemaRepository repository, } @RequestMapping(method = RequestMethod.POST, path = "/", consumes = "application/json", produces = "application/json") - public synchronized ResponseEntity register(@RequestBody Schema schema, UriComponentsBuilder builder) { + public synchronized ResponseEntity register( + @RequestHeader(value = "Schema-Reference", required = false) String schemaReferenceHeader, + @RequestBody Schema schema, UriComponentsBuilder builder) { SchemaValidator validator = this.validators.get(schema.getFormat()); @@ -84,7 +94,14 @@ public synchronized ResponseEntity register(@RequestBody Schema schema, .collectionToCommaDelimitedString(this.validators.keySet()))); } - validator.validate(schema.getDefinition()); + List schemaReferences = null; + + if (schemaReferenceHeader != null) { + schemaReferences = schemaReferenceResolver(schemaReferenceHeader, schema.getFormat()); + schema.setReferences(schemaReferences); + } + + validator.validate(schema.getDefinition(), schemaReferences); Schema result; List registeredEntities = this.repository @@ -95,7 +112,7 @@ public synchronized ResponseEntity register(@RequestBody Schema schema, result = this.repository.save(schema); } else { - result = validator.match(registeredEntities, schema.getDefinition()); + result = validator.match(registeredEntities, schema.getDefinition(), schemaReferences); if (result == null) { schema.setVersion( registeredEntities.get(registeredEntities.size() - 1).getVersion() @@ -262,6 +279,13 @@ public String onInvalidSchema(InvalidSchemaException e) { return errorMessage("Invalid Schema", e); } + @ExceptionHandler(NumberFormatException.class) + @ResponseStatus(HttpStatus.BAD_REQUEST) + @ResponseBody + public String onInvalidVersion(NumberFormatException ex) { + return errorMessage("Version should be a numeric value", ex); + } + @ExceptionHandler(SchemaNotFoundException.class) @ResponseStatus(HttpStatus.NOT_FOUND) @ResponseBody @@ -276,6 +300,39 @@ public String schemaDeletionNotPermitted(SchemaDeletionNotAllowedException ex) { return errorMessage("Schema deletion is not permitted", ex); } + private List schemaReferenceResolver(String schemaReferenceHeader, String format) { + List schemaReferences = new ArrayList<>(); + Schema schemaReference; + String subject; + Integer version; + Matcher schemaReferenceMatcher; + for (String schemaReferenceEntry : schemaReferenceHeader.split(";")) { + schemaReferenceMatcher = schemaReferencePattern.matcher(schemaReferenceEntry); + if (schemaReferenceMatcher.find()) { + subject = schemaReferenceMatcher.group("subject"); + version = Integer.parseInt(schemaReferenceMatcher.group("version")); + schemaReference = this.repository.findOneBySubjectAndFormatAndVersion(subject, format, version); + if (schemaReference == null) { + throw new SchemaNotFoundException( + String.format("Could not find Schema by subject: %s, format: %s, version %s", + subject, format, version)); + } + } + else { + subject = schemaReferenceEntry; + List registeredEntities = this.repository.findBySubjectAndFormatOrderByVersion(subject, format); + if (registeredEntities.isEmpty()) { + throw new SchemaNotFoundException( + String.format("Could not find Schema by subject: %s, format: %s", + subject, format)); + } + schemaReference = registeredEntities.get(registeredEntities.size() - 1); + } + schemaReferences.add(schemaReference); + } + return schemaReferences; + } + private String errorMessage(String prefix, Throwable e) { return prefix + (StringUtils.hasText(e.getMessage()) ? ": " + e.getMessage() : ""); } diff --git a/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/model/Schema.java b/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/model/Schema.java index 4a4c47c..d56ee26 100644 --- a/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/model/Schema.java +++ b/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/model/Schema.java @@ -16,13 +16,20 @@ package org.springframework.cloud.schema.registry.model; +import java.util.ArrayList; +import java.util.List; + import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.Id; -import javax.persistence.Lob; +import javax.persistence.ManyToMany; import javax.persistence.Table; +import com.fasterxml.jackson.annotation.JsonIdentityInfo; +import com.fasterxml.jackson.annotation.JsonIdentityReference; +import com.fasterxml.jackson.annotation.ObjectIdGenerators; + /** * @author Vinicius Carvalho * @@ -46,8 +53,12 @@ public class Schema { @Column(name = "FORMAT", nullable = false) private String format; - @Lob - @Column(name = "DEFINITION", nullable = false, length = 8192) + @JsonIdentityInfo(generator = ObjectIdGenerators.PropertyGenerator.class, property = "subject") + @JsonIdentityReference(alwaysAsId = true) + @ManyToMany + private List references = new ArrayList<>(); + + @Column(name = "DEFINITION", columnDefinition = "text", nullable = false, length = 8192) private String definition; public Integer getId() { @@ -82,6 +93,22 @@ public void setFormat(String format) { this.format = format; } + public List getReferences() { + return this.references; + } + + public void setReferences(List references) { + this.references = references; + } + + public void addReference(Schema schemaReference) { + this.references.add(schemaReference); + } + + public void removeReference(Schema schemaReference) { + this.references.remove(schemaReference); + } + public String getDefinition() { return this.definition; } diff --git a/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/support/AvroSchemaValidator.java b/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/support/AvroSchemaValidator.java index 18f7267..c2c9b93 100644 --- a/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/support/AvroSchemaValidator.java +++ b/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/support/AvroSchemaValidator.java @@ -18,6 +18,7 @@ import java.util.List; +import org.apache.avro.Schema.Parser; import org.apache.avro.SchemaParseException; import org.springframework.cloud.schema.registry.model.Compatibility; @@ -34,11 +35,25 @@ public class AvroSchemaValidator implements SchemaValidator { */ public static final String AVRO_FORMAT = "avro"; + private Parser parseReferences(Parser parser, List schemaReferences) { + for (Schema schemaReference : schemaReferences) { + if (!schemaReference.getReferences().isEmpty()) { + parser = parseReferences(parser, schemaReference.getReferences()); + } + parser.parse(schemaReference.getDefinition()); + } + return parser; + } + @Override - public boolean isValid(String definition) { + public boolean isValid(String definition, List schemaReferences) { boolean result = true; try { - new org.apache.avro.Schema.Parser().parse(definition); + Parser avroParser = new Parser(); + if (schemaReferences != null) { + avroParser = parseReferences(avroParser, schemaReferences); + } + avroParser.parse(definition); } catch (SchemaParseException ex) { result = false; @@ -47,9 +62,13 @@ public boolean isValid(String definition) { } @Override - public void validate(String definition) { + public void validate(String definition, List schemaReferences) { try { - new org.apache.avro.Schema.Parser().parse(definition); + Parser avroParser = new Parser(); + if (schemaReferences != null) { + avroParser = parseReferences(avroParser, schemaReferences); + } + avroParser.parse(definition); } catch (SchemaParseException ex) { throw new InvalidSchemaException((ex.getMessage())); @@ -62,11 +81,19 @@ public Compatibility compatibilityCheck(String source, String other) { } @Override - public Schema match(List schemas, String definition) { + public Schema match(List schemas, String definition, List schemaReferences) { Schema result = null; - org.apache.avro.Schema source = new org.apache.avro.Schema.Parser().parse(definition); + Parser avroParser = new Parser(); + if (schemaReferences != null) { + avroParser = parseReferences(avroParser, schemaReferences); + } + org.apache.avro.Schema source = avroParser.parse(definition); for (Schema s : schemas) { - org.apache.avro.Schema target = new org.apache.avro.Schema.Parser().parse(s.getDefinition()); + avroParser = new Parser(); + if (!s.getReferences().isEmpty()) { + avroParser = parseReferences(avroParser, s.getReferences()); + } + org.apache.avro.Schema target = avroParser.parse(s.getDefinition()); if (target.equals(source)) { result = s; break; diff --git a/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/support/SchemaValidator.java b/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/support/SchemaValidator.java index d53b540..485b3cd 100644 --- a/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/support/SchemaValidator.java +++ b/spring-cloud-schema-registry-core/src/main/java/org/springframework/cloud/schema/registry/support/SchemaValidator.java @@ -33,17 +33,19 @@ public interface SchemaValidator { /** * Verifies if a definition is a valid schema. * @param definition - The textual representation of the schema file + * @param schemaReferences - list of schemas that are referenced inside the definition * @return true if valid, false otherwise */ - boolean isValid(String definition); + boolean isValid(String definition, List schemaReferences); /** * Validates a schema definition and throws an {@link InvalidSchemaException} when the schema is invalid. * The exception is expected to have the violation description. * @param definition - The textual representation of the schema file + * @param schemaReferences - list of schemas that are referenced inside the definition */ - default void validate(String definition) { - if (!this.isValid(definition)) { + default void validate(String definition, List schemaReferences) { + if (!this.isValid(definition, schemaReferences)) { throw new InvalidSchemaException("Invalid Schema"); } } @@ -61,9 +63,10 @@ default void validate(String definition) { * Return the Schema that is represented by the definition. * @param schemas List of schemas to be tested * @param definition Textual representation of the schema + * @param schemaReferences List of schemas that are referenced inside the definition * @return A full Schema object with identifier and subject properties */ - Schema match(List schemas, String definition); + Schema match(List schemas, String definition, List schemaReferences); String getFormat(); diff --git a/spring-cloud-schema-registry-server/src/test/java/org/springframework/cloud/schema/registry/server/SchemaRegistryServerAvroTests.java b/spring-cloud-schema-registry-server/src/test/java/org/springframework/cloud/schema/registry/server/SchemaRegistryServerAvroTests.java index 59de67b..ba9be8a 100644 --- a/spring-cloud-schema-registry-server/src/test/java/org/springframework/cloud/schema/registry/server/SchemaRegistryServerAvroTests.java +++ b/spring-cloud-schema-registry-server/src/test/java/org/springframework/cloud/schema/registry/server/SchemaRegistryServerAvroTests.java @@ -44,6 +44,7 @@ import org.springframework.cloud.schema.registry.support.SchemaNotFoundException; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.io.DefaultResourceLoader; +import org.springframework.http.HttpEntity; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; @@ -84,14 +85,16 @@ public class SchemaRegistryServerAvroTests { private static final org.apache.avro.Schema AVRO_USER_AVRO_SCHEMA_V2 = new Parser() .parse(resourceToString("classpath:/avro_user_definition_schema_v2.json")); - private static final String AVRO_USER_SCHEMA_DEFAULT_NAME_STRATEGY_SUBJECT = AVRO_USER_AVRO_SCHEMA_V1.getName() + private static final String AVRO_USER_SCHEMA_DEFAULT_NAME_STRATEGY_SUBJECT = AVRO_USER_AVRO_SCHEMA_V1 + .getName() .toLowerCase(); - private static final String AVRO_USER_SCHEMA_QUALIFED_NAME_STRATEGY_SUBJECT = AVRO_USER_AVRO_SCHEMA_V1 .getFullName() .toLowerCase(); + private static final String AVRO_REFERENCE_SCHEMA_QUALIFIED_NAME_STRATEGY_SUBJECT = "reference"; + private static final Schema AVRO_USER_REGISTRY_SCHEMA_V1 = toSchema( AVRO_USER_SCHEMA_DEFAULT_NAME_STRATEGY_SUBJECT, AVRO_FORMAT_NAME, AVRO_USER_AVRO_SCHEMA_V1.toString()); @@ -100,6 +103,11 @@ public class SchemaRegistryServerAvroTests { AVRO_USER_SCHEMA_DEFAULT_NAME_STRATEGY_SUBJECT, AVRO_FORMAT_NAME, AVRO_USER_AVRO_SCHEMA_V2.toString()); + // Does not use Parser because otherwise will unwrap user reference + private static final Schema AVRO_REFERENCE_REGISTRY_SCHEMA = toSchema( + AVRO_REFERENCE_SCHEMA_QUALIFIED_NAME_STRATEGY_SUBJECT, + AVRO_FORMAT_NAME, resourceToString("classpath:/avro_reference_definition_schema.json")); + private static final Schema AAVRO_USER_REGISTRY_SCHEMA_V1_WITH_QUAL_SUBJECT = toSchema( AVRO_USER_SCHEMA_QUALIFED_NAME_STRATEGY_SUBJECT, AVRO_FORMAT_NAME, AVRO_USER_AVRO_SCHEMA_V1.toString()); @@ -234,6 +242,36 @@ public void testUserSchemaV2() { registerSchemasAndAssertSuccess(AVRO_USER_REGISTRY_SCHEMA_V1, AVRO_USER_REGISTRY_SCHEMA_V2); } + @Test + public void testReferenceSchemaWithoutVersion() { + registerSchemaAndAssertSuccess(AVRO_USER_REGISTRY_SCHEMA_V1, 1, 1); + HttpHeaders headers = new HttpHeaders(); + headers.add("Schema-Reference", AVRO_USER_REGISTRY_SCHEMA_V1.getSubject()); + registerSchemaAndAssertSuccess(AVRO_REFERENCE_REGISTRY_SCHEMA, 1, 2, headers); + } + + @Test + public void testReferenceSchemaWithVersion() { + registerSchemasAndAssertSuccess(AVRO_USER_REGISTRY_SCHEMA_V1, AVRO_USER_REGISTRY_SCHEMA_V2); + HttpHeaders headers = new HttpHeaders(); + headers.add("Schema-Reference", + String.format("%s+v%d", AVRO_USER_REGISTRY_SCHEMA_V2.getSubject(), 2)); + registerSchemaAndAssertSuccess(AVRO_REFERENCE_REGISTRY_SCHEMA, 1, 3, headers); + } + + @Test + public void testUndefinedSchemaReference() { + try { + this.client.postForEntity(this.serverControllerUri, AVRO_REFERENCE_REGISTRY_SCHEMA, Schema.class); + fail("Expects: " + HttpStatus.BAD_REQUEST + " error"); + } + catch (HttpClientErrorException.BadRequest badRequest) { + assertThat(badRequest.getMessage()) + .isEqualTo("400 : [Invalid Schema: \"example.avro.User\" is not a defined name. The type of the \"user\" field must be a defined name or a {\"type\": ...} expression.]"); + } + + } + @Test public void testIdempotentRegistration() { @@ -557,9 +595,19 @@ private Map>>> registerSchemasAn private ResponseEntity registerSchemaAndAssertSuccess(@NonNull Schema schema, @Nullable Integer expectedVersion, @Nullable Integer expectedId) { + return registerSchemaAndAssertSuccess(schema, expectedVersion, expectedId, new HttpHeaders()); + } + + @NonNull + private ResponseEntity registerSchemaAndAssertSuccess(@NonNull Schema schema, + @Nullable Integer expectedVersion, + @Nullable Integer expectedId, + @Nullable HttpHeaders requestHeaders) { + + HttpEntity httpEntity = new HttpEntity<>(schema, requestHeaders); ResponseEntity registerReponse = this.client - .postForEntity(this.serverControllerUri, schema, Schema.class); + .postForEntity(this.serverControllerUri, httpEntity, Schema.class); HttpStatus statusCode = registerReponse.getStatusCode(); assertThat(statusCode.is2xxSuccessful()).isTrue(); diff --git a/spring-cloud-schema-registry-server/src/test/resources/avro_reference_definition_schema.json b/spring-cloud-schema-registry-server/src/test/resources/avro_reference_definition_schema.json new file mode 100644 index 0000000..837dff5 --- /dev/null +++ b/spring-cloud-schema-registry-server/src/test/resources/avro_reference_definition_schema.json @@ -0,0 +1,11 @@ +{ + "namespace": "example.avro", + "type": "record", + "name": "Reference", + "fields": [ + { + "name": "user", + "type": "example.avro.User" + } + ] +}