1717
1818#include " cloud/cloud_schema_change_job.h"
1919
20+ #include < gen_cpp/Types_types.h>
2021#include < gen_cpp/cloud.pb.h>
2122
2223#include < algorithm>
3233#include " olap/delete_handler.h"
3334#include " olap/olap_define.h"
3435#include " olap/rowset/beta_rowset.h"
36+ #include " olap/rowset/rowset.h"
3537#include " olap/rowset/rowset_factory.h"
3638#include " olap/rowset/segment_v2/inverted_index_desc.h"
3739#include " olap/storage_engine.h"
@@ -217,6 +219,20 @@ Status CloudSchemaChangeJob::process_alter_tablet(const TAlterTabletReqV2& reque
217219
218220 SchemaChangeParams sc_params;
219221
222+ // cache schema change output to file cache
223+ std::vector<RowsetSharedPtr> rowsets;
224+ rowsets.resize (rs_splits.size ());
225+ std::transform (rs_splits.begin (), rs_splits.end (), rowsets.begin (),
226+ [](RowSetSplits& split) { return split.rs_reader ->rowset (); });
227+ sc_params.output_to_file_cache = _should_cache_sc_output (rowsets);
228+ if (request.__isset .query_globals && request.__isset .query_options ) {
229+ sc_params.runtime_state =
230+ std::make_shared<RuntimeState>(request.query_options , request.query_globals );
231+ } else {
232+ // for old version request compatibility
233+ sc_params.runtime_state = std::make_shared<RuntimeState>();
234+ }
235+
220236 RETURN_IF_ERROR (DescriptorTbl::create (&sc_params.pool , request.desc_tbl , &sc_params.desc_tbl ));
221237 sc_params.ref_rowset_readers .reserve (rs_splits.size ());
222238 for (RowSetSplits& split : rs_splits) {
@@ -269,7 +285,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
269285
270286 // Add filter information in change, and filter column information will be set in _parse_request
271287 // And filter some data every time the row block changes
272- BlockChanger changer (_new_tablet->tablet_schema (), *sc_params.desc_tbl );
288+ BlockChanger changer (_new_tablet->tablet_schema (), *sc_params.desc_tbl ,
289+ sc_params.runtime_state );
273290
274291 bool sc_sorting = false ;
275292 bool sc_directly = false ;
@@ -309,6 +326,8 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
309326 context.tablet_schema = _new_tablet->tablet_schema ();
310327 context.newest_write_timestamp = rs_reader->newest_write_timestamp ();
311328 context.storage_resource = _cloud_storage_engine.get_storage_resource (sc_params.vault_id );
329+ context.write_file_cache = sc_params.output_to_file_cache ;
330+ context.tablet = _new_tablet;
312331 if (!context.storage_resource ) {
313332 return Status::InternalError (" vault id not found, maybe not sync, vault id {}" ,
314333 sc_params.vault_id );
@@ -467,7 +486,7 @@ Status CloudSchemaChangeJob::_convert_historical_rowsets(const SchemaChangeParam
467486 // during double write phase by `CloudMetaMgr::sync_tablet_rowsets` in another thread
468487 std::unique_lock lock {_new_tablet->get_sync_meta_lock ()};
469488 std::unique_lock wlock (_new_tablet->get_header_lock ());
470- _new_tablet->add_rowsets (std::move (_output_rowsets), true , wlock);
489+ _new_tablet->add_rowsets (std::move (_output_rowsets), true , wlock, false );
471490 _new_tablet->set_cumulative_layer_point (_output_cumulative_point);
472491 _new_tablet->reset_approximate_stats (stats.num_rowsets (), stats.num_segments (),
473492 stats.num_rows (), stats.data_size ());
@@ -503,7 +522,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
503522 std::make_shared<CloudTablet>(_cloud_storage_engine, tmp_meta);
504523 {
505524 std::unique_lock wlock (tmp_tablet->get_header_lock ());
506- tmp_tablet->add_rowsets (_output_rowsets, true , wlock);
525+ tmp_tablet->add_rowsets (_output_rowsets, true , wlock, false );
507526 // Set alter version to let the tmp_tablet can fill hole rowset greater than alter_version
508527 tmp_tablet->set_alter_version (alter_version);
509528 }
@@ -521,7 +540,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
521540 DBUG_BLOCK);
522541 {
523542 std::unique_lock wlock (tmp_tablet->get_header_lock ());
524- tmp_tablet->add_rowsets (_output_rowsets, true , wlock);
543+ tmp_tablet->add_rowsets (_output_rowsets, true , wlock, false );
525544 }
526545 for (auto rowset : ret.rowsets ) {
527546 RETURN_IF_ERROR (CloudTablet::update_delete_bitmap_without_lock (tmp_tablet, rowset));
@@ -544,7 +563,7 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version,
544563 {max_version + 1 , new_max_version}, CaptureRowsetOps {}));
545564 {
546565 std::unique_lock wlock (tmp_tablet->get_header_lock ());
547- tmp_tablet->add_rowsets (_output_rowsets, true , wlock);
566+ tmp_tablet->add_rowsets (_output_rowsets, true , wlock, false );
548567 }
549568 for (auto rowset : ret.rowsets ) {
550569 RETURN_IF_ERROR (CloudTablet::update_delete_bitmap_without_lock (tmp_tablet, rowset));
@@ -595,4 +614,34 @@ void CloudSchemaChangeJob::clean_up_on_failure() {
595614 }
596615}
597616
617+ bool CloudSchemaChangeJob::_should_cache_sc_output (
618+ const std::vector<RowsetSharedPtr>& input_rowsets) {
619+ int64_t total_size = 0 ;
620+ int64_t cached_index_size = 0 ;
621+ int64_t cached_data_size = 0 ;
622+
623+ for (const auto & rs : input_rowsets) {
624+ const RowsetMetaSharedPtr& rs_meta = rs->rowset_meta ();
625+ total_size += rs_meta->total_disk_size ();
626+ cached_index_size += rs->approximate_cache_index_size ();
627+ cached_data_size += rs->approximate_cached_data_size ();
628+ }
629+
630+ double input_hit_rate = static_cast <double >(cached_index_size + cached_data_size) / total_size;
631+
632+ LOG (INFO) << " CloudSchemaChangeJob check cache sc output strategy. "
633+ << " job_id=" << _job_id << " , input_rowsets_count=" << input_rowsets.size ()
634+ << " , total_size=" << total_size << " , cached_index_size=" << cached_index_size
635+ << " , cached_data_size=" << cached_data_size << " , input_hit_rate=" << input_hit_rate
636+ << " , min_hit_ratio_threshold="
637+ << config::file_cache_keep_schema_change_output_min_hit_ratio << " , should_cache="
638+ << (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio);
639+
640+ if (input_hit_rate > config::file_cache_keep_schema_change_output_min_hit_ratio) {
641+ return true ;
642+ }
643+
644+ return false ;
645+ }
646+
598647} // namespace doris
0 commit comments