-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: add RESET statement for configuration variabless #18408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Weijun-H
wants to merge
16
commits into
apache:main
Choose a base branch
from
Weijun-H:18384-reset-config
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
fa26326
feat: add RESET statement for configuration variabless
Weijun-H d654b69
chore: Update runtime desult
Weijun-H ef91aa1
chore
Weijun-H c4be208
feat: add reset functionality to configuration options
Weijun-H 3180c5a
chore
Weijun-H 0708177
chore: Added tests related to reset function
Weijun-H 2759aa6
chore: Clean up
Weijun-H f143b32
chore: Change return type
Weijun-H 5a0a1e4
chore: Move reset to trait
Weijun-H 1a87db8
chore: Improve error handling for config field resets
Weijun-H b884030
chore: Fix test
Weijun-H 134138f
Update datafusion/core/src/execution/context/mod.rs
Weijun-H 50c4b1f
chore: Export default cache and directory size constants public
Weijun-H 6cf60ff
chore: Added tests
Weijun-H 139b14a
chore: Added tests
Weijun-H 66d1b48
chore
Weijun-H File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -185,6 +185,31 @@ macro_rules! config_namespace { | |
| self.$field_name.visit(v, key.as_str(), desc); | ||
| )* | ||
| } | ||
|
|
||
| fn reset(&mut self, key: &str) -> $crate::error::Result<()> { | ||
| let (key, rem) = key.split_once('.').unwrap_or((key, "")); | ||
| match key { | ||
| $( | ||
| stringify!($field_name) => { | ||
| #[allow(deprecated)] | ||
| { | ||
| if rem.is_empty() { | ||
| let default_value: $field_type = $default; | ||
| self.$field_name = default_value; | ||
| Ok(()) | ||
| } else { | ||
| self.$field_name.reset(rem) | ||
| } | ||
| } | ||
| }, | ||
| )* | ||
| _ => $crate::error::_config_err!( | ||
| "Config value \"{}\" not found on {}", | ||
| key, | ||
| stringify!($struct_name) | ||
| ), | ||
| } | ||
| } | ||
| } | ||
| impl Default for $struct_name { | ||
| fn default() -> Self { | ||
|
|
@@ -1169,6 +1194,45 @@ impl ConfigField for ConfigOptions { | |
| _ => _config_err!("Config value \"{key}\" not found on ConfigOptions"), | ||
| } | ||
| } | ||
|
|
||
| /// Reset a configuration option back to its default value | ||
| fn reset(&mut self, key: &str) -> Result<()> { | ||
| let Some((prefix, rest)) = key.split_once('.') else { | ||
| return _config_err!("could not find config namespace for key \"{key}\""); | ||
| }; | ||
|
|
||
| if prefix != "datafusion" { | ||
| return _config_err!("Could not find config namespace \"{prefix}\""); | ||
| } | ||
|
|
||
| let (section, rem) = rest.split_once('.').unwrap_or((rest, "")); | ||
| if rem.is_empty() { | ||
| return _config_err!("could not find config field for key \"{key}\""); | ||
| } | ||
|
|
||
| match section { | ||
| "catalog" => self.catalog.reset(rem), | ||
| "execution" => self.execution.reset(rem), | ||
| "optimizer" => { | ||
| if rem == "enable_dynamic_filter_pushdown" { | ||
| let defaults = OptimizerOptions::default(); | ||
| self.optimizer.enable_dynamic_filter_pushdown = | ||
| defaults.enable_dynamic_filter_pushdown; | ||
| self.optimizer.enable_topk_dynamic_filter_pushdown = | ||
| defaults.enable_topk_dynamic_filter_pushdown; | ||
| self.optimizer.enable_join_dynamic_filter_pushdown = | ||
| defaults.enable_join_dynamic_filter_pushdown; | ||
| Ok(()) | ||
| } else { | ||
| self.optimizer.reset(rem) | ||
| } | ||
| } | ||
| "explain" => self.explain.reset(rem), | ||
| "sql_parser" => self.sql_parser.reset(rem), | ||
| "format" => self.format.reset(rem), | ||
| other => _config_err!("Config value \"{other}\" not found on ConfigOptions"), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl ConfigOptions { | ||
|
|
@@ -1477,6 +1541,10 @@ pub trait ConfigField { | |
| fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str); | ||
|
|
||
| fn set(&mut self, key: &str, value: &str) -> Result<()>; | ||
|
|
||
| fn reset(&mut self, key: &str) -> Result<()> { | ||
| _config_err!("Reset is not supported for this config field, key: {}", key) | ||
| } | ||
| } | ||
|
|
||
| impl<F: ConfigField + Default> ConfigField for Option<F> { | ||
|
|
@@ -1490,6 +1558,15 @@ impl<F: ConfigField + Default> ConfigField for Option<F> { | |
| fn set(&mut self, key: &str, value: &str) -> Result<()> { | ||
| self.get_or_insert_with(Default::default).set(key, value) | ||
| } | ||
|
|
||
| fn reset(&mut self, key: &str) -> Result<()> { | ||
| if key.is_empty() { | ||
| *self = Default::default(); | ||
| Ok(()) | ||
| } else { | ||
| self.get_or_insert_with(Default::default).reset(key) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Default transformation to parse a [`ConfigField`] for a string. | ||
|
|
@@ -1554,6 +1631,19 @@ macro_rules! config_field { | |
| *self = $transform; | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn reset(&mut self, key: &str) -> $crate::error::Result<()> { | ||
| if key.is_empty() { | ||
| *self = <$t as Default>::default(); | ||
| Ok(()) | ||
| } else { | ||
| $crate::error::_config_err!( | ||
| "Config field is a scalar {} and does not have nested field \"{}\"", | ||
| stringify!($t), | ||
| key | ||
| ) | ||
| } | ||
| } | ||
| } | ||
| }; | ||
| } | ||
|
|
@@ -2562,7 +2652,7 @@ impl ConfigField for ConfigFileDecryptionProperties { | |
| self.footer_signature_verification.set(rem, value.as_ref()) | ||
| } | ||
| _ => _config_err!( | ||
| "Config value \"{}\" not found on ConfigFileEncryptionProperties", | ||
| "Config value \"{}\" not found on ConfigFileDecryptionProperties", | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice catch |
||
| key | ||
| ), | ||
| } | ||
|
|
@@ -2876,7 +2966,6 @@ mod tests { | |
| }; | ||
| use std::any::Any; | ||
| use std::collections::HashMap; | ||
| use std::sync::Arc; | ||
|
|
||
| #[derive(Default, Debug, Clone)] | ||
| pub struct TestExtensionConfig { | ||
|
|
@@ -2991,6 +3080,19 @@ mod tests { | |
| assert_eq!(COUNT.load(std::sync::atomic::Ordering::Relaxed), 1); | ||
| } | ||
|
|
||
| #[test] | ||
| fn reset_nested_scalar_reports_helpful_error() { | ||
| let mut value = true; | ||
| let err = <bool as ConfigField>::reset(&mut value, "nested").unwrap_err(); | ||
| let message = err.to_string(); | ||
| assert!( | ||
| message.starts_with( | ||
| "Invalid or Unsupported Configuration: Config field is a scalar bool and does not have nested field \"nested\"" | ||
| ), | ||
| "unexpected error message: {message}" | ||
| ); | ||
| } | ||
|
|
||
| #[cfg(feature = "parquet")] | ||
| #[test] | ||
| fn parquet_table_options() { | ||
|
|
@@ -3013,6 +3115,7 @@ mod tests { | |
| }; | ||
| use parquet::encryption::decrypt::FileDecryptionProperties; | ||
| use parquet::encryption::encrypt::FileEncryptionProperties; | ||
| use std::sync::Arc; | ||
|
|
||
| let footer_key = b"0123456789012345".to_vec(); // 128bit/16 | ||
| let column_names = vec!["double_field", "float_field"]; | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -45,8 +45,8 @@ use crate::{ | |
| logical_expr::{ | ||
| CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateFunction, | ||
| CreateMemoryTable, CreateView, DropCatalogSchema, DropFunction, DropTable, | ||
| DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, SetVariable, | ||
| TableType, UNNAMED_TABLE, | ||
| DropView, Execute, LogicalPlan, LogicalPlanBuilder, Prepare, ResetVariable, | ||
| SetVariable, TableType, UNNAMED_TABLE, | ||
| }, | ||
| physical_expr::PhysicalExpr, | ||
| physical_plan::ExecutionPlan, | ||
|
|
@@ -63,7 +63,7 @@ use datafusion_catalog::MemoryCatalogProvider; | |
| use datafusion_catalog::{ | ||
| DynamicFileCatalog, TableFunction, TableFunctionImpl, UrlTableFactory, | ||
| }; | ||
| use datafusion_common::config::ConfigOptions; | ||
| use datafusion_common::config::{ConfigField, ConfigOptions}; | ||
| use datafusion_common::metadata::ScalarAndMetadata; | ||
| use datafusion_common::{ | ||
| config::{ConfigExtension, TableOptions}, | ||
|
|
@@ -72,7 +72,11 @@ use datafusion_common::{ | |
| tree_node::{TreeNodeRecursion, TreeNodeVisitor}, | ||
| DFSchema, DataFusionError, ParamValues, SchemaReference, TableReference, | ||
| }; | ||
| use datafusion_execution::cache::cache_manager::DEFAULT_METADATA_CACHE_LIMIT; | ||
| pub use datafusion_execution::config::SessionConfig; | ||
| use datafusion_execution::disk_manager::{ | ||
| DiskManagerBuilder, DEFAULT_MAX_TEMP_DIRECTORY_SIZE, | ||
| }; | ||
| use datafusion_execution::registry::SerializerRegistry; | ||
| pub use datafusion_execution::TaskContext; | ||
| pub use datafusion_expr::execution_props::ExecutionProps; | ||
|
|
@@ -711,7 +715,12 @@ impl SessionContext { | |
| } | ||
| // TODO what about the other statements (like TransactionStart and TransactionEnd) | ||
| LogicalPlan::Statement(Statement::SetVariable(stmt)) => { | ||
| self.set_variable(stmt).await | ||
| self.set_variable(stmt).await?; | ||
| self.return_empty_dataframe() | ||
| } | ||
| LogicalPlan::Statement(Statement::ResetVariable(stmt)) => { | ||
| self.reset_variable(stmt).await?; | ||
| self.return_empty_dataframe() | ||
| } | ||
| LogicalPlan::Statement(Statement::Prepare(Prepare { | ||
| name, | ||
|
|
@@ -1069,7 +1078,7 @@ impl SessionContext { | |
| exec_err!("Schema '{schemaref}' doesn't exist.") | ||
| } | ||
|
|
||
| async fn set_variable(&self, stmt: SetVariable) -> Result<DataFrame> { | ||
| async fn set_variable(&self, stmt: SetVariable) -> Result<()> { | ||
| let SetVariable { | ||
| variable, value, .. | ||
| } = stmt; | ||
|
|
@@ -1099,11 +1108,37 @@ impl SessionContext { | |
| for udf in udfs_to_update { | ||
| state.register_udf(udf)?; | ||
| } | ||
| } | ||
|
|
||
| drop(state); | ||
| Ok(()) | ||
| } | ||
|
|
||
| async fn reset_variable(&self, stmt: ResetVariable) -> Result<()> { | ||
| let variable = stmt.variable; | ||
| if variable.starts_with("datafusion.runtime.") { | ||
| return self.reset_runtime_variable(&variable); | ||
| } | ||
|
|
||
| self.return_empty_dataframe() | ||
| let mut state = self.state.write(); | ||
| state.config_mut().options_mut().reset(&variable)?; | ||
|
|
||
| // Refresh UDFs to ensure configuration-dependent behavior updates | ||
| let config_options = state.config().options(); | ||
| let udfs_to_update: Vec<_> = state | ||
| .scalar_functions() | ||
| .values() | ||
| .filter_map(|udf| { | ||
| udf.inner() | ||
| .with_updated_config(config_options) | ||
| .map(Arc::new) | ||
| }) | ||
| .collect(); | ||
|
|
||
| for udf in udfs_to_update { | ||
| state.register_udf(udf)?; | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn set_runtime_variable(&self, variable: &str, value: &str) -> Result<()> { | ||
|
|
@@ -1136,6 +1171,36 @@ impl SessionContext { | |
| Ok(()) | ||
| } | ||
|
|
||
| fn reset_runtime_variable(&self, variable: &str) -> Result<()> { | ||
| let key = variable.strip_prefix("datafusion.runtime.").unwrap(); | ||
|
|
||
| let mut state = self.state.write(); | ||
|
|
||
| let mut builder = RuntimeEnvBuilder::from_runtime_env(state.runtime_env()); | ||
| match key { | ||
| "memory_limit" => { | ||
| builder.memory_pool = None; | ||
| } | ||
| "max_temp_directory_size" => { | ||
| builder = | ||
| builder.with_max_temp_directory_size(DEFAULT_MAX_TEMP_DIRECTORY_SIZE); | ||
| } | ||
| "temp_directory" => { | ||
| builder.disk_manager_builder = Some(DiskManagerBuilder::default()); | ||
| } | ||
| "metadata_cache_limit" => { | ||
| builder = builder.with_metadata_cache_limit(DEFAULT_METADATA_CACHE_LIMIT); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here in cache_manager.rs |
||
| } | ||
| _ => return plan_err!("Unknown runtime configuration: {variable}"), | ||
| }; | ||
|
|
||
| *state = SessionStateBuilder::from(state.clone()) | ||
| .with_runtime_env(Arc::new(builder.build()?)) | ||
| .build(); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| /// Parse memory limit from string to number of bytes | ||
| /// Supports formats like '1.5G', '100M', '512K' | ||
| /// | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems off. Why error if there is a key but don't if there isn't? Seems backwards?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For scalar config fields the caller already strips off the key for the field before invoking reset. At that point key only contains a nested suffix (if any).