Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion LiteCore/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ add_executable(
${TOP}Replicator/tests/ReplicatorSGTest.cc
${TOP}Replicator/tests/ReplicatorCollectionTest.cc
${TOP}Replicator/tests/ReplicatorCollectionSGTest.cc
${TOP}Replicator/tests/ReplicatorSG30Test.cc
${TOP}Replicator/tests/ReplicatorVVUpgradeTest.cc
${TOP}Replicator/tests/SG.cc
${TOP}Replicator/tests/SGTestUser.cc
Expand Down
85 changes: 8 additions & 77 deletions Replicator/Replicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,14 @@ namespace litecore::repl {

if ( !_options->isActive() ) { return; }

_setMsgHandlerFor3_0_ClientDone = true; // only needed for passive replicators
_findExistingConflicts();

bool goOn = true;
for ( CollectionIndex i = 0; goOn && i < _subRepls.size(); ++i ) {
// if any getLocalCheckpoint fails, the replicator would already be stopped.
goOn = goOn && getLocalCheckpoint(reset, i);
}
if ( goOn ) {
if ( _options->collectionAware() ) {
getCollections();
} else {
getRemoteCheckpoint(false, 0);
}
}
if ( goOn ) { getCollections(); }
} catch ( ... ) {
C4Error err = C4Error::fromCurrentException();
logError("Failed to start replicator: %s", err.description().c_str());
Expand Down Expand Up @@ -646,13 +639,7 @@ namespace litecore::repl {
if ( _connectionState != Connection::kClosing ) {
// skip this if stop() already called
_connectionState = Connection::kConnected;
if ( _options->isActive() ) {
if ( _options->collectionAware() ) {
getCollections();
} else {
getRemoteCheckpoint(false, 0);
}
}
if ( _options->isActive() ) { getCollections(); }
}
}

Expand Down Expand Up @@ -751,12 +738,8 @@ namespace litecore::repl {
if ( !sub.remoteCheckpointDocID ) sub.remoteCheckpointDocID = sub.checkpointer->initialCheckpointID();
if ( !sub.remoteCheckpointDocID || _connectionState != Connection::kConnected ) return; // not ready yet

if ( !_options->collectionAware() ) {
logVerbose("Requesting remote checkpoint '%.*s' of the default collection",
SPLAT(sub.remoteCheckpointDocID));
} else {
cLogVerbose(coll, "Requesting remote checkpoint '%.*s'", SPLAT(sub.remoteCheckpointDocID));
}
cLogVerbose(coll, "Requesting remote checkpoint '%.*s'", SPLAT(sub.remoteCheckpointDocID));

MessageBuilder msg("getCheckpoint"_sl);
msg["client"_sl] = sub.remoteCheckpointDocID;
assignCollectionToMsg(msg, coll);
Expand All @@ -771,22 +754,14 @@ namespace litecore::repl {
if ( response->isError() ) {
auto err = response->getError();
if ( !(err.domain == "HTTP"_sl && err.code == 404) ) return gotError(response);
if ( !_options->collectionAware() ) {
logInfo("No remote checkpoint '%.*s' of the default collection", SPLAT(sub.remoteCheckpointDocID));
} else {
cLogInfo(coll, "No remote checkpoint '%.*s'", SPLAT(sub.remoteCheckpointRevID));
}
cLogInfo(coll, "No remote checkpoint '%.*s'", SPLAT(sub.remoteCheckpointRevID));

sub.remoteCheckpointRevID.reset();
} else {
remoteCheckpoint.readJSON(response->body());
sub.remoteCheckpointRevID = response->property("rev"_sl);
if ( !_options->collectionAware() ) {
logInfo("Received remote checkpoint (rev='%.*s'): %.*s of the default collection",
SPLAT(sub.remoteCheckpointRevID), SPLAT(response->body()));
} else {
cLogInfo(coll, "Received remote checkpoint (rev='%.*s'): %.*s", SPLAT(sub.remoteCheckpointRevID),
SPLAT(response->body()));
}
cLogInfo(coll, "Received remote checkpoint (rev='%.*s'): %.*s", SPLAT(sub.remoteCheckpointRevID),
SPLAT(response->body()));
}
sub.remoteCheckpointReceived = true;

Expand Down Expand Up @@ -1058,10 +1033,6 @@ namespace litecore::repl {

// Handles a "getCheckpoint" request by looking up a peer checkpoint.
void Replicator::handleGetCheckpoint(Retained<MessageIn> request) {
setMsgHandlerFor3_0_Client(request);
// The above method may already responded with error.
if ( request->responded() ) return;

slice checkpointID = getPeerCheckpointDocID(request, "get");
if ( !checkpointID ) return;

Expand Down Expand Up @@ -1098,10 +1069,6 @@ namespace litecore::repl {

// Handles a "setCheckpoint" request by storing a peer checkpoint.
void Replicator::handleSetCheckpoint(Retained<MessageIn> request) {
setMsgHandlerFor3_0_Client(request);
// The above method may already responded with error.
if ( request->responded() ) return;

slice checkpointID = getPeerCheckpointDocID(request, "set");
if ( !checkpointID ) return;

Expand Down Expand Up @@ -1314,10 +1281,6 @@ namespace litecore::repl {

void Replicator::delegateCollectionSpecificMessageToWorker(Retained<blip::MessageIn> request) {
// This method is NOT called on the Replicator's queue/thread! It must be thread-safe.
if ( !_setMsgHandlerFor3_0_ClientDone ) {
enqueue(FUNCTION_TO_QUEUE(Replicator::setMsgHandlerFor3_0_Client), request);
waitTillCaughtUp();
}

slice profile = request->property("Profile"_sl);
Assert(profile);
Expand Down Expand Up @@ -1356,38 +1319,6 @@ namespace litecore::repl {
}
}

// This method is to properly initialize the passive replicator to get ready to
// serve 3.0 replicator, which is unaware of the collection.
// It is a no-op if
// 1. it is working in the active mode, or
// 2. the incoming meesage includes explicit "collection" property, or
// 3. the second time and after that this method is called.
void Replicator::setMsgHandlerFor3_0_Client(Retained<blip::MessageIn> request) {
if ( _setMsgHandlerFor3_0_ClientDone ) { return; }

if ( !_options->isActive()
&& request->intProperty(kCollectionProperty, kNotCollectionIndex) == kNotCollectionIndex ) {
// At this point, we are dealing with a 3.0 style replicator which can only have exactly
// one collection, which is the default one. If the default collection is not specified in
// the passive config, rearrangeCollectionsFor3_0_Client() will put a null collection path
// at the place of index 0, and then we return an error here.
// (If the collection does not exist in the database, prepareWorkers() will fail and an
// error will be returned from there.)

_options->rearrangeCollectionsFor3_0_Client();
DebugAssert(!_options->collectionAware());
DebugAssert(_options->workingCollectionCount() == 1);
if ( !_options->collectionPath(0) ) {
logVerbose("Client is legacy 3.0, but the default collection is not in the config of this 3.1 "
"replicator.");
request->respondWithError({"BLIP"_sl, 400, "This server is not configured for 3.0 client support"_sl});
} else {
prepareWorkers();
}
}
_setMsgHandlerFor3_0_ClientDone = true;
}

string Replicator::loggingKeyValuePairs() const {
string kv = Worker::loggingKeyValuePairs();
if ( _correlationID ) {
Expand Down
2 changes: 0 additions & 2 deletions Replicator/Replicator.hh
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,6 @@ namespace litecore::repl {

using ReplicatedRevBatcher = actor::ActorBatcher<Replicator, ReplicatedRev>;

void setMsgHandlerFor3_0_Client(Retained<blip::MessageIn>);

Delegate* _delegate; // Delegate whom I report progress/errors to
blip::Connection::State _connectionState; // Current BLIP connection state

Expand Down
27 changes: 0 additions & 27 deletions Replicator/ReplicatorOptions.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ namespace litecore::repl {
void* callbackContext{nullptr};
std::atomic<C4ReplicatorProgressLevel> progressLevel{kC4ReplProgressOverall};

bool collectionAware() const { return _mutables._collectionAware; }

bool isActive() const { return _mutables._isActive; }

#ifdef LITECORE_CPPTEST
Expand All @@ -53,8 +51,6 @@ namespace litecore::repl {
void setDisableReplacementRevs(const bool disable) { _disableReplacementRevs = disable; }

bool disableReplacementRevs() const { return _disableReplacementRevs; }

static bool inline sActiveIsCollectionAware = false;
#endif

const std::unordered_map<C4CollectionSpec, size_t>& collectionSpecToIndex() const {
Expand Down Expand Up @@ -345,12 +341,6 @@ namespace litecore::repl {
}
}

void rearrangeCollectionsFor3_0_Client() const {
_mutables._collectionAware = false;
std::vector<C4CollectionSpec> activeCollections{kC4DefaultCollectionSpec};
rearrangeCollections(activeCollections);
}

static const std::unordered_set<slice> kWhiteListOfKeysToLog;

private:
Expand All @@ -361,7 +351,6 @@ namespace litecore::repl {

struct Mutables {
mutable std::vector<CollectionOptions> _workingCollections;
mutable bool _collectionAware{true};
mutable bool _isActive{true};
mutable std::unordered_map<C4CollectionSpec, size_t> _collectionSpecToIndex;
};
Expand Down Expand Up @@ -465,22 +454,6 @@ namespace litecore::repl {
}
}
}

// For the passive replicator, rearrangeCollectionsFor3_0_Client() will set
// collectionAware to false
if ( _mutables._isActive && collectionOpts.size() == 1 ) {
auto spec = collectionOpts[0].collectionSpec;
if ( spec == kC4DefaultCollectionSpec ) {
#ifndef LITECORE_CPPTEST
_mutables._collectionAware = false;
#else
// For the purpose to test clients not derived from Replicator
// that use 3.1 collection aware protocol even if the only collection
// is the default collection.
if ( !sActiveIsCollectionAware ) _mutables._collectionAware = false;
#endif
}
}
}

// Post-conditions:
Expand Down
10 changes: 1 addition & 9 deletions Replicator/Worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,7 @@ namespace litecore::repl {
constexpr static slice kErrorIndexOutOfRange = "the collection property is out of range."_sl;

slice err = nullslice;
if ( _options->collectionAware() ) {
if ( collIn == kNotCollectionIndex ) { err = kErrorIndexInappropriateUse; }
} else {
if ( collIn != kNotCollectionIndex ) {
err = kErrorIndexInappropriateUse;
} else {
collIn = 0;
}
}
if ( collIn == kNotCollectionIndex ) { err = kErrorIndexInappropriateUse; }

if ( !err && collIn >= _options->workingCollectionCount() ) { err = kErrorIndexOutOfRange; }

Expand Down
4 changes: 1 addition & 3 deletions Replicator/Worker.hh
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,7 @@ namespace litecore::repl {
return fleece::narrow_cast<CollectionIndex>(msgIn.intProperty(kCollectionProperty, kNotCollectionIndex));
}

void assignCollectionToMsg(blip::MessageBuilder& msg, CollectionIndex i) const {
if ( _options->collectionAware() ) { msg[kCollectionProperty] = i; }
}
void assignCollectionToMsg(blip::MessageBuilder& msg, CollectionIndex i) const { msg[kCollectionProperty] = i; }

// This method does two things. First it fetches the collectino index from 'msg';
// then, it returns a pair of {collectionIndex, errorSlice} with the post-conditions:
Expand Down
37 changes: 0 additions & 37 deletions Replicator/tests/ReplicatorCollectionTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,33 +252,6 @@ N_WAY_TEST_CASE_METHOD(ReplicatorCollectionTest, "Use Unmatched Collections", "[
runPushPullReplication({Roses, Lavenders}, {Tulips, Lavenders});
}

// CBL-7181.
// The problem scenario:
// 1. active replicator has the default collection as the only one in its collection set
// 2. passive replicator does not have the default collection in its collection set
// In this case, the active replicator will use 3.0 protocol in order to connect to 3.0 server, as well
// as 3.1 server (passive replicator).
N_WAY_TEST_CASE_METHOD(ReplicatorCollectionTest, "3.0 Active vs 3.1 Passive", "[Push][Pull]") {
// When the collection set of the active replicator has only one default collection,
// it will act like a 3.0 client (3.0 protocols)

SECTION("Passive replicator has default collection") {
runPushPullReplication({Default}, {Tulips, Lavenders, Default});
}

SECTION("Passive replicator does not have default collection") {
// Before the CBL ticket is resolved, the following test
// would hit an assertion.
// Several different errors can be triggered by this protocol mismatch concurretly,
// and actual error before Stop is somewhat random. We check the error manually at
// the end.
_expectedError = {LiteCoreDomain, kC4ErrorRemoteError};
runPushPullReplication({Default}, {Tulips, Lavenders});
alloc_slice msg = c4error_getMessage(_statusReceived.error);
CHECK(msg.asString() == "This server is not configured for 3.0 client support");
}
}

N_WAY_TEST_CASE_METHOD(ReplicatorCollectionTest, "Use Zero Collections", "[Push][Pull]") {
ExpectingExceptions x;
_expectedError = {LiteCoreDomain, kC4ErrorInvalidParameter};
Expand Down Expand Up @@ -339,16 +312,6 @@ struct CheckDBEntries {
};

N_WAY_TEST_CASE_METHOD(ReplicatorCollectionTest, "Sync with Default Collection", "[Push][Pull]") {
#ifdef LITECORE_CPPTEST
bool collectionAwareActive = GENERATE(false, true);
bool collectionAwareOnEntry = repl::Options::sActiveIsCollectionAware;
if ( collectionAwareActive ) {
repl::Options::sActiveIsCollectionAware = true;
std::cerr << " Active Replicator is collection-aware" << std::endl;
}
DEFER { repl::Options::sActiveIsCollectionAware = collectionAwareOnEntry; };
#endif

addDocs(db, Default, 10);
addDocs(db2, Default, 10);

Expand Down
Loading