IGNITE-27460 Add schema compatibility validation for full commands#7598
IGNITE-27460 Add schema compatibility validation for full commands#7598rpuch wants to merge 9 commits intoapache:mainfrom
Conversation
fdf9a30 to
b65f713
Compare
There was a problem hiding this comment.
Pull request overview
This pull request implements schema compatibility validation for full (1PC) transaction commands in the Raft replication layer. The approach validates commands after safe time assignment but before they are added to the Raft log on the leader, preventing inconsistencies that could arise from forward-incompatible schema changes.
Changes:
- Introduces a
SafeTimeValidatorextension point in Raft that validates commands before replication, with support for both temporary retry (EBUSY) and permanent rejection (EREJECTED_BY_USER_LOGIC) - Implements
PartitionSafeTimeValidatorto validate full update commands against schema compatibility rules, ensuring commit timestamps are valid for the schema versions used - Updates
PartitionReplicaListenerto handle validation failures by converting EREJECTED_BY_USER_LOGIC errors toIncompatibleSchemaVersionException - Adds comprehensive integration tests verifying that schema version consistency is maintained across nodes during concurrent schema changes and writes
Reviewed changes
Copilot reviewed 44 out of 44 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidator.java | New interface for safe time validation in Raft |
| modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/SafeTimeValidationResult.java | New class representing validation results (valid, retry, or rejected) |
| modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/PermissiveSafeTimeValidator.java | Default no-op validator implementation |
| modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java | Integration of validation in Raft leader's executeApplyingTasks |
| modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java | New EREJECTED_BY_USER_LOGIC error code for permanent rejections |
| modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionSafeTimeValidator.java | Partition-specific validator implementation |
| modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java | Error handling for validation failures |
| modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/network/command/UpdateCommandBase.java | New base interface for update commands |
| modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/schemacompat/CompatValidationResult.java | Added validationFailedMessage() method |
| modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItSchemaForwardCompatibilityConsistencyTest.java | New integration test verifying schema consistency |
| modules/runner/src/integrationTest/java/org/apache/ignite/internal/schemasync/ItBlockedSchemaSyncAndRaftCommandExecutionTest.java | Test verifying node stop behavior with blocked schema sync |
Comments suppressed due to low confidence (1)
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/MetadataSufficiency.java:44
- The JavaDoc for isMetadataAvailableForTimestamp incorrectly says "Determines whether the local Catalog version is sufficient" when it should say something like "Determines whether the local schema metadata is sufficient for the given timestamp".
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...n/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
Show resolved
Hide resolved
...ain/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java
Show resolved
Hide resolved
...va/org/apache/ignite/internal/schemasync/ItBlockedSchemaSyncAndRaftCommandExecutionTest.java
Outdated
Show resolved
Hide resolved
modules/raft/src/main/java/org/apache/ignite/raft/jraft/error/RaftError.java
Outdated
Show resolved
Hide resolved
* Introduce a mechanism to validate command+safeTime in Raft - this happens after we assign a safeTime to a command, but before it gets saved to log on the leader (and hence before it gets replicated) - this allows to request a retry of the same command by the Raft client (if the failure is temporary, like insufficient schemas information on the node) or instruct it to fail the command if it will never become valid * Introduce full commands validation wrt schema compatibility - do the validation in the Raft extension point introduced above (as safeTime becomes commitTs for full updates) - if the node lacks schema information, fail the validation requesting a retry from the Raft client - if the commitTs is invalid, return the result to Replica listener - in Replica listener, handle the failure by retrying the replica request (using updated schema)
b65f713 to
02b9d0b
Compare
There was a problem hiding this comment.
| private static CommandApplicationResult throwIfFullTxCommitSchemaValidationFailedInRaft(CommandApplicationResult res, Throwable ex) { | |
| private static CommandApplicationResult throwIfFullTxCommitSchemaValidationFailedDuringReplication(CommandApplicationResult res, Throwable ex) { |
| // To prevent safe timestamp values from becoming stale, we must assign them under a valid leader lock. | ||
| safeTs = tryAssignSafeTimestamp(task, safeTs); | ||
|
|
||
| if (rejectCommandIfSafeTimeIsNotAcceptable(safeTs, task)) { |
There was a problem hiding this comment.
are these entries retried?
There was a problem hiding this comment.
Please don't confuse ActionRequest (which brings a command to Raft group via its leader) and AppendEntriesRequest (which replicates N commands from the leader to other members of the group). Here, it's about ActionRequest only. If we return EBUSY, the command will be retried by the Raft client; if we return the new Raft error (about user logic), it will not be retried
|
|
||
| package org.apache.ignite.internal.partition.replicator.schemacompat; | ||
|
|
||
| import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; |
There was a problem hiding this comment.
I genuinely feel we need to stick either to our formatter or to build-in one. I can see usage of both spread throughout code base.
There was a problem hiding this comment.
the upside of using built-in one: IDE will highlight if you messed up with parameters
There was a problem hiding this comment.
I switched to ours in 3 places as it's supported for both string formatting and for logging
There was a problem hiding this comment.
Isn't requiredLevel should be safeTime?
There was a problem hiding this comment.
In that case I would've use our formatter instead of java one and just share the message between this 2 invocations. Just to avoid any errors in the message.
| */ | ||
| public class CatalogVersionSufficiency { | ||
| private CatalogVersionSufficiency() { | ||
| public class MetadataSufficiency { |
There was a problem hiding this comment.
I would've remove this class altogether.
There was a problem hiding this comment.
Its only method is used in 2 places and it serves a purpose of explaining its intent, so I think it's useful
There was a problem hiding this comment.
This method is not used anywhere
| // TODO: IGNITE-20298 - throttle logging. | ||
| LOG.warn( | ||
| "Metadata not yet available, rejecting ActionRequest with EBUSY [group={}, requiredLevel={}].", | ||
| "Metadata not yet available by catalog version, rejecting ActionRequest with EBUSY [group={}, requiredLevel={}].", |
There was a problem hiding this comment.
The same thing about message and formater here.
There was a problem hiding this comment.
The same thing about message and formater here.
| return safeTs; | ||
| } | ||
|
|
||
| private boolean rejectCommandIfSafeTimeIsNotAcceptable(@Nullable HybridTimestamp safeTs, LogEntryAndClosure task) { |
There was a problem hiding this comment.
isSafeTimeCommandIsNotAcceptable feels better to me. Reject implies void action.
There was a problem hiding this comment.
What you suggested implies there are no side effects, but there are, and they are a lot more important than returning the 'whether we should not replicate the command'; that's why I think it's better to call it 'reject...If...' to highlight that we are mostly interested in the side effect
| @PropertyName("tablePartitionId") | ||
| ReplicationGroupIdMessage commitPartitionId(); | ||
|
|
||
| public interface UpdateAllCommand extends UpdateCommandBase { |
There was a problem hiding this comment.
I believe current solution is too complex. Instead we can pass safe timestamp validator as a volatile command field.
- Introduce validation closure on WriteCommand:
public interface WriteCommand extends Command {
/**
* Holds request's initiator timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand.
*
* @return The timestamp.
*/
default @Nullable HybridTimestamp initiatorTime() {
return null;
}
/**
* Gets safe timestamp. TODO IGNITE-24143 Move to SafeTimePropagatingCommand.
*
* @return The timestamp.
*/
default @Nullable HybridTimestamp safeTime() {
return null;
}
default @Nullable Predicate<HybridTimestamp> safeTimeValidator() {
return null;
}
}
- Set it in the PartitionReplicaListener (only for full operations)
UpdateCommandV2Builder bldr = PARTITION_REPLICATION_MESSAGES_FACTORY.updateCommandV2()
.tableId(tableId)
.commitPartitionId(replicationGroupIdMessage(commitPartitionId))
.rowUuid(rowUuid)
.txId(txId)
.full(full)
.initiatorTime(initiatorTime)
.txCoordinatorId(txCoordinatorId)
.requiredCatalogVersion(catalogVersion)
.leaseStartTime(leaseStartTime)
.safeTimeValidator(this::validateTimestamp);
- Apply validation:
node.apply(new Task(wrapper, new LocalAwareWriteCommandClosure() {
private HybridTimestamp safeTs;
@Override
public void result(Serializable res) {
sendResponse(res, rpcCtx);
}
@Override
public WriteCommand command() {
return command;
}
@Override
public @Nullable HybridTimestamp safeTimestamp() {
return safeTs;
}
@Override
public void run(Status status) {
assert !status.isOk() : status;
sendRaftError(rpcCtx, status, node);
}
@Override
public void safeTimestamp(HybridTimestamp safeTs) {
assert this.safeTs == null : "Safe time can be set only once";
// Apply binary patch.
node.getOptions().getCommandsMarshaller().patch(wrapper, safeTs);
// Avoid modifying WriteCommand object, because it's shared between raft pipeline threads.
if (command.safeTimeValidator() != null) {
if (!command.safeTimeValidator().test(safeTs)) {
throw new ValidationException();
}
}
this.safeTs = safeTs;
}
}));
There was a problem hiding this comment.
Command is built on the primary and the validator gets executed on the leader; those are not necessarily colocated, so applying your suggestion literally doesn't seem to be possible.
There was a problem hiding this comment.
Please avoid using fully-qualified names in javadocs where possible, IDEA does allow you to add this class to imports right from the javadoc itself
There was a problem hiding this comment.
I don't think I understand why these lines changed their positions. I'm clearly missing something, please help me to realize it
There was a problem hiding this comment.
It was an artifact of changing the approach. I reverted the changes that are not needed.
There was a problem hiding this comment.
I don't think that this null-check is necessary
There was a problem hiding this comment.
Apparently you could write Duration.ofSeconds(10) instead
| } | ||
|
|
||
| private CompletableFuture<SchemaSyncInhibitor> startInhibitingSchemaSyncWhenUpdateCommandArrives() { | ||
| AtomicBoolean startedInhibiting = new AtomicBoolean(); |
There was a problem hiding this comment.
Is it enough to inhibit it on a single node only? It's not clear for me why. Of course I'm not as familiar with the scenario as you are
There was a problem hiding this comment.
We are only inhibiting it on the leader, this effectively denies the leader any possibility to execute the command, so the put future will hang as desired
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
|
|
||
| throw new RuntimeException(e); |
There was a problem hiding this comment.
Why can't we throw unchecked exceptions here? I don't think that such a limitation makes sense, given that we allow throwing exceptions at all in this context
There was a problem hiding this comment.
But we are throwing unchecked exceptions here. If you wanted to ask 'why can't we throw checked exceptions', I have two points:
- If we declare the method to throw a checked exception, the caller will have to handle it itself, but the caller is a closure passed to
dropMessages(), and this would make its code heavier and less readable without a good reason - We actually don't care about any of those exceptions here: if something gets thrown, we can just let it fly
If you actually wrote what you meant, then I don't understand. Could you please elaborate/rephrase?
| private static void waitTillCommandStartsExecutionAndBlocksOnSchemaSync() throws InterruptedException { | ||
| // Current implementation doesn't actually block any threads, but we still give it a chance to get stuck if the implementation | ||
| // gets changed. | ||
| Thread.sleep(1000); |
There was a problem hiding this comment.
Why 1000? Seems arbitrary, but I'm sure that it's not.
Are these tests stable?
There was a problem hiding this comment.
They are stable.
This comes from the original PR number 1 (#7500) where the execution would actually block inside the state machine thread. I did not find any way to detect that it already 'blocked', but waiting for 1 second was enough. In the worst case, we would sometimes have a false positive (it did not block, the test passed because of this and not because the underlying code is correct), but most of the times it would actually block, so the test would pass for the correct reason. But the test would catch (and it was catching) situations when the command was blocked and the production code worked INcorrectly.
But in this PR it does not block in the state machine thread, so this wait is 'just in case' to try to catch a wrong behavior if we accidentally return to it in the production code. This is a poor justification for a 1 second wait. If you believe this should be removed, I'll remove it.
https://issues.apache.org/jira/browse/IGNITE-27460
What I tried before
In this PR (approach number 3), the commands are unconditional as before, and we make the decision 'to replicate the command or not' on just one node (on the leader) once.