Skip to content

Commit a56419b

Browse files
authored
[feat](cloud) Recycle operation logs according the snapshot keys (#55678)
1 parent 21993a2 commit a56419b

File tree

8 files changed

+333
-22
lines changed

8 files changed

+333
-22
lines changed

cloud/src/meta-service/meta_service_job.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1706,6 +1706,7 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
17061706
index_size_remove_rowsets += rs.index_disk_size();
17071707
segment_size_remove_rowsets += rs.data_disk_size();
17081708

1709+
int64_t start_version = rs.start_version(), end_version = rs.end_version();
17091710
auto recycle_key = recycle_rowset_key({instance_id, new_tablet_id, rs.rowset_id_v2()});
17101711
RecycleRowsetPB recycle_rowset;
17111712
recycle_rowset.set_creation_time(now);
@@ -1717,8 +1718,8 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str
17171718
auto recycle_val = recycle_rowset.SerializeAsString();
17181719
txn->put(recycle_key, recycle_val);
17191720
}
1720-
INSTANCE_LOG(INFO) << "put recycle rowset, new_tablet_id=" << new_tablet_id
1721-
<< " key=" << hex(recycle_key);
1721+
INSTANCE_LOG(INFO) << "put recycle rowset, new_tablet_id=" << new_tablet_id << " version=["
1722+
<< start_version << "-" << end_version << "] key=" << hex(recycle_key);
17221723
};
17231724

17241725
if (!is_versioned_read) {

cloud/src/meta-store/mem_txn_kv.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ class MemTxnKv : public TxnKv, public std::enable_shared_from_this<MemTxnKv> {
6464
return mem_kv_.size();
6565
}
6666

67+
void update_commit_version(int64_t version) {
68+
std::lock_guard<std::mutex> l(lock_);
69+
committed_version_ = std::max(committed_version_, version);
70+
read_version_ = std::max(committed_version_, read_version_);
71+
}
72+
6773
int64_t get_bytes_ {};
6874
int64_t put_bytes_ {};
6975
int64_t del_bytes_ {};

cloud/src/meta-store/meta_reader.cpp

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
#include "common/logging.h"
2626
#include "common/util.h"
27+
#include "meta-store/codec.h"
2728
#include "meta-store/document_message.h"
2829
#include "meta-store/document_message_get_range.h"
2930
#include "meta-store/keys.h"
@@ -904,4 +905,57 @@ TxnErrorCode MetaReader::is_partition_exists(Transaction* txn, int64_t partition
904905
return TxnErrorCode::TXN_OK;
905906
}
906907

908+
TxnErrorCode MetaReader::get_snapshots(
909+
Transaction* txn, std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots) {
910+
std::string snapshot_key = versioned::snapshot_full_key({instance_id_});
911+
std::string snapshot_start_key = encode_versioned_key(snapshot_key, Versionstamp::min());
912+
std::string snapshot_end_key = encode_versioned_key(snapshot_key, Versionstamp::max());
913+
914+
FullRangeGetOptions range_options;
915+
range_options.prefetch = true;
916+
auto it = txn->full_range_get(snapshot_start_key, snapshot_end_key, range_options);
917+
for (auto&& kvp = it->next(); kvp.has_value(); kvp = it->next()) {
918+
auto&& [key, snapshot_value] = *kvp;
919+
920+
Versionstamp version;
921+
std::string_view key_view(key);
922+
if (decode_tailing_versionstamp_end(&key_view) ||
923+
decode_tailing_versionstamp(&key_view, &version)) {
924+
LOG_WARNING("failed to decode versionstamp from snapshot full key")
925+
.tag("instance_id", instance_id_)
926+
.tag("key", hex(key));
927+
return TxnErrorCode::TXN_INVALID_DATA;
928+
}
929+
930+
SnapshotPB snapshot;
931+
if (!snapshot.ParseFromArray(snapshot_value.data(), snapshot_value.size())) {
932+
LOG_ERROR("Failed to parse SnapshotPB")
933+
.tag("instance_id", instance_id_)
934+
.tag("key", hex(key));
935+
return TxnErrorCode::TXN_INVALID_DATA;
936+
}
937+
938+
snapshots->emplace_back(std::move(snapshot), version);
939+
}
940+
941+
if (!it->is_valid()) {
942+
LOG_ERROR("failed to get snapshots")
943+
.tag("instance_id", instance_id_)
944+
.tag("error_code", it->error_code());
945+
return it->error_code();
946+
}
947+
948+
return TxnErrorCode::TXN_OK;
949+
}
950+
951+
TxnErrorCode MetaReader::get_snapshots(
952+
std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots) {
953+
std::unique_ptr<Transaction> txn;
954+
TxnErrorCode err = txn_kv_->create_txn(&txn);
955+
if (err != TxnErrorCode::TXN_OK) {
956+
return err;
957+
}
958+
return get_snapshots(txn.get(), snapshots);
959+
}
960+
907961
} // namespace doris::cloud

cloud/src/meta-store/meta_reader.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,11 @@ class MetaReader {
224224
TxnErrorCode is_partition_exists(int64_t partition_id, bool snapshot = false);
225225
TxnErrorCode is_partition_exists(Transaction* txn, int64_t partition_id, bool snapshot = false);
226226

227+
// Get the snapshots.
228+
TxnErrorCode get_snapshots(Transaction* txn,
229+
std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots);
230+
TxnErrorCode get_snapshots(std::vector<std::pair<SnapshotPB, Versionstamp>>* snapshots);
231+
227232
private:
228233
const std::string_view instance_id_;
229234
const Versionstamp snapshot_version_;

cloud/src/recycler/recycler.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,4 +439,26 @@ class InstanceRecycler {
439439
SegmentRecyclerMetricsContext segment_metrics_context_;
440440
};
441441

442+
// Helper class to check if operation logs can be recycled based on snapshots and versionstamps
443+
class OperationLogRecycleChecker {
444+
public:
445+
OperationLogRecycleChecker(std::string_view instance_id, TxnKv* txn_kv)
446+
: instance_id_(instance_id), txn_kv_(txn_kv) {}
447+
448+
// Initialize the checker by loading snapshots and setting max version stamp
449+
int init();
450+
451+
// Check if an operation log can be recycled
452+
bool can_recycle(const Versionstamp& log_versionstamp, int64_t log_min_timestamp) const;
453+
454+
Versionstamp max_versionstamp() const { return max_versionstamp_; }
455+
456+
private:
457+
std::string_view instance_id_;
458+
TxnKv* txn_kv_;
459+
Versionstamp max_versionstamp_;
460+
std::map<Versionstamp, size_t> snapshot_indexes_;
461+
std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots_;
462+
};
463+
442464
} // namespace doris::cloud

cloud/src/recycler/recycler_operation_log.cpp

Lines changed: 69 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include <algorithm>
2727
#include <cstddef>
2828
#include <cstdint>
29+
#include <ranges>
2930
#include <string>
3031
#include <string_view>
3132
#include <utility>
@@ -53,6 +54,56 @@ namespace doris::cloud {
5354

5455
using namespace std::chrono;
5556

57+
int OperationLogRecycleChecker::init() {
58+
std::unique_ptr<Transaction> txn;
59+
TxnErrorCode err = txn_kv_->create_txn(&txn);
60+
if (err != TxnErrorCode::TXN_OK) {
61+
LOG_WARNING("failed to create txn").tag("err", err);
62+
return -1;
63+
}
64+
65+
snapshots_.clear();
66+
snapshot_indexes_.clear();
67+
MetaReader reader(instance_id_);
68+
err = reader.get_snapshots(txn.get(), &snapshots_);
69+
if (err != TxnErrorCode::TXN_OK) {
70+
LOG_WARNING("failed to get snapshots").tag("err", err);
71+
return -1;
72+
}
73+
74+
int64_t read_version = -1;
75+
err = txn->get_read_version(&read_version);
76+
if (err != TxnErrorCode::TXN_OK) {
77+
LOG_WARNING("failed to get the read version").tag("err", err);
78+
return -1;
79+
}
80+
81+
max_versionstamp_ = Versionstamp(read_version, 0);
82+
for (size_t i = 0; i < snapshots_.size(); ++i) {
83+
auto&& [snapshot, versionstamp] = snapshots_[i];
84+
snapshot_indexes_.insert(std::make_pair(versionstamp, i));
85+
}
86+
87+
return 0;
88+
}
89+
90+
bool OperationLogRecycleChecker::can_recycle(const Versionstamp& log_versionstamp,
91+
int64_t log_min_timestamp) const {
92+
Versionstamp log_min_read_timestamp(log_min_timestamp, 0);
93+
if (log_versionstamp > max_versionstamp_) {
94+
// Not recycleable.
95+
return false;
96+
}
97+
98+
auto it = snapshot_indexes_.lower_bound(log_min_read_timestamp);
99+
if (it != snapshot_indexes_.end() && snapshots_[it->second].second < log_versionstamp) {
100+
// in [log_min_read_timestmap, log_versionstamp)
101+
return false;
102+
}
103+
104+
return true;
105+
}
106+
56107
// A recycler for operation logs.
57108
class OperationLogRecycler {
58109
public:
@@ -144,6 +195,9 @@ int OperationLogRecycler::recycle_drop_index_log(const DropIndexLogPB& drop_inde
144195
return -1;
145196
}
146197
std::string recycle_key = recycle_index_key({instance_id_, index_id});
198+
LOG_INFO("put recycle index key")
199+
.tag("recycle_key", hex(recycle_key))
200+
.tag("index_id", index_id);
147201
kvs_.emplace_back(std::move(recycle_key), std::move(recycle_index_value));
148202
}
149203
return 0;
@@ -557,11 +611,18 @@ int InstanceRecycler::recycle_operation_logs() {
557611
.tag("recycled_operation_log_data_size", recycled_operation_log_data_size);
558612
};
559613

614+
OperationLogRecycleChecker recycle_checker(instance_id_, txn_kv_.get());
615+
int init_res = recycle_checker.init();
616+
if (init_res != 0) {
617+
LOG_WARNING("failed to initialize recycle checker").tag("error_code", init_res);
618+
return init_res;
619+
}
620+
560621
auto scan_and_recycle_operation_log = [&](const std::string_view& key,
561622
const std::string_view& value) {
562623
std::string_view log_key(key);
563-
Versionstamp versionstamp;
564-
if (!decode_versioned_key(&log_key, &versionstamp)) {
624+
Versionstamp log_versionstamp;
625+
if (!decode_versioned_key(&log_key, &log_versionstamp)) {
565626
LOG_WARNING("failed to decode versionstamp from operation log key")
566627
.tag("key", hex(key));
567628
return -1;
@@ -577,15 +638,15 @@ int InstanceRecycler::recycle_operation_logs() {
577638
if (!operation_log.has_min_timestamp()) {
578639
LOG_WARNING("operation log has not set the min_timestamp")
579640
.tag("key", hex(key))
580-
.tag("version", versionstamp.version())
581-
.tag("order", versionstamp.order())
641+
.tag("version", log_versionstamp.version())
642+
.tag("order", log_versionstamp.order())
582643
.tag("log", operation_log.ShortDebugString());
644+
return 0;
583645
}
584646

585-
bool need_recycle = true; // Always recycle operation logs for now
586-
if (need_recycle) {
587-
AnnotateTag tag("log_key", hex(log_key));
588-
int res = recycle_operation_log(versionstamp, std::move(operation_log));
647+
if (recycle_checker.can_recycle(log_versionstamp, operation_log.min_timestamp())) {
648+
AnnotateTag tag("log_key", hex(key));
649+
int res = recycle_operation_log(log_versionstamp, std::move(operation_log));
589650
if (res != 0) {
590651
LOG_WARNING("failed to recycle operation log").tag("error_code", res);
591652
return res;

cloud/test/meta_reader_test.cpp

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2050,3 +2050,58 @@ TEST(MetaReaderTest, BatchGetTabletCompactStats) {
20502050
ASSERT_EQ(versionstamps.size(), tablet_ids.size());
20512051
}
20522052
}
2053+
2054+
TEST(MetaReaderTest, GetSnapshots) {
2055+
auto txn_kv = std::make_shared<MemTxnKv>();
2056+
ASSERT_EQ(txn_kv->init(), 0);
2057+
2058+
std::string instance_id = "test_instance";
2059+
2060+
{
2061+
// Test empty result when no snapshots exist
2062+
MetaReader meta_reader(instance_id, txn_kv.get());
2063+
std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
2064+
TxnErrorCode err = meta_reader.get_snapshots(&snapshots);
2065+
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
2066+
ASSERT_TRUE(snapshots.empty());
2067+
}
2068+
2069+
// Create some snapshots
2070+
std::vector<Versionstamp> expected_versionstamps;
2071+
{
2072+
// Create multiple snapshots with different timestamps
2073+
for (int i = 1; i <= 3; ++i) {
2074+
std::unique_ptr<Transaction> txn;
2075+
ASSERT_EQ(txn_kv->create_txn(&txn), TxnErrorCode::TXN_OK);
2076+
2077+
std::string snapshot_key = versioned::snapshot_full_key({instance_id});
2078+
SnapshotPB snapshot_pb;
2079+
snapshot_pb.set_label(fmt::format("snapshot_{}", i));
2080+
snapshot_pb.set_instance_id(instance_id);
2081+
2082+
std::string snapshot_value = snapshot_pb.SerializeAsString();
2083+
versioned_put(txn.get(), snapshot_key, snapshot_value);
2084+
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
2085+
}
2086+
}
2087+
2088+
{
2089+
// Test getting snapshots with created transaction
2090+
MetaReader meta_reader(instance_id, txn_kv.get());
2091+
std::vector<std::pair<SnapshotPB, Versionstamp>> snapshots;
2092+
TxnErrorCode err = meta_reader.get_snapshots(&snapshots);
2093+
ASSERT_EQ(err, TxnErrorCode::TXN_OK);
2094+
ASSERT_EQ(snapshots.size(), 3);
2095+
2096+
// Verify snapshots are returned (order may vary due to versionstamp ordering)
2097+
std::set<std::string> snapshot_ids;
2098+
for (const auto& [snapshot_pb, versionstamp] : snapshots) {
2099+
snapshot_ids.insert(snapshot_pb.label());
2100+
ASSERT_EQ(snapshot_pb.instance_id(), instance_id);
2101+
}
2102+
ASSERT_EQ(snapshot_ids.size(), 3);
2103+
ASSERT_TRUE(snapshot_ids.count("snapshot_1"));
2104+
ASSERT_TRUE(snapshot_ids.count("snapshot_2"));
2105+
ASSERT_TRUE(snapshot_ids.count("snapshot_3"));
2106+
}
2107+
}

0 commit comments

Comments
 (0)