From b27cfe0b8273bc714b4e083d1f343cb2751dba77 Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Mon, 7 Apr 2025 16:13:30 +0800 Subject: [PATCH 1/2] fix: cursor name should not be schema-bound Signed-off-by: Bugen Zhao --- risingwave/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/risingwave/core.py b/risingwave/core.py index 5e23a8a..d5e3bef 100644 --- a/risingwave/core.py +++ b/risingwave/core.py @@ -554,7 +554,7 @@ def _run( wait_interval_ms: int = DEFAULT_CURSOR_IDLE_INTERVAL_MS, cursor_name: str = "default", ): - cursor_name = f"{self.schema_name}.risingwave_py_cursor_{cursor_name}_{self.sub_name}" + cursor_name = f"risingwave_py_cursor_{cursor_name}_{self.schema_name}_{self.sub_name}" fully_qual_sub_name = f"{self.schema_name}.{self.sub_name}" if self.persist_progress: From 94217c6f0f83bf56c008d0808296b3fb5c162ace Mon Sep 17 00:00:00 2001 From: Bugen Zhao Date: Tue, 8 Apr 2025 14:59:38 +0800 Subject: [PATCH 2/2] determine based on version Signed-off-by: Bugen Zhao --- risingwave/core.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/risingwave/core.py b/risingwave/core.py index d5e3bef..4fd1dca 100644 --- a/risingwave/core.py +++ b/risingwave/core.py @@ -31,7 +31,9 @@ def _retry(f, interval_ms: int, times: int): return f() except Exception as e: ee = e - logging.warning(f"retrying function, exception: {e}, {traceback.format_exc()}") + logging.warning( + f"retrying function, exception: {e}, {traceback.format_exc()}" + ) cnt += 1 time.sleep(interval_ms / 1000) raise RuntimeError( @@ -153,12 +155,12 @@ def from_connection_info( user: Username for authentication password: Password for authentication database: Name of the database to connect to - ssl: SSL mode for connection. Valid values are "disable", "allow", + ssl: SSL mode for connection. Valid values are "disable", "allow", "prefer", "require", "verify-ca", "verify-full" **extra_params: Additional connection parameters to be included in the URL Returns: - RisingWaveConnOptions: A connection options instance configured with the + RisingWaveConnOptions: A connection options instance configured with the provided parameters Examples: @@ -225,7 +227,7 @@ def fetch(self, sql: str, format=OutputFormat.RAW, *args): *args: Additional arguments to be passed to the SQL query. Returns: - The fetched result. + The fetched result. If `format` is set to `OutputFormat.DATAFRAME`, the result is returned as a pandas DataFrame. Otherwise, the result is returned as a list of tuples. @@ -257,7 +259,7 @@ def fetchone(self, sql: str, format=OutputFormat.RAW, *args): *args: Additional arguments to be passed to the SQL query. Returns: - The first row of the result set or None if the result set is empty. + The first row of the result set or None if the result set is empty. If format is set to OutputFormat.DATAFRAME, it returns a pandas DataFrame with the result. Otherwise, it returns a tuple. @@ -381,7 +383,7 @@ def check_exist(self, name: str, schema_name: str = "public"): Returns: bool: True if the table exists, False otherwise. """ - + result = self.fetch( f"SELECT * FROM information_schema.tables WHERE table_name = '{name}' and table_schema = '{schema_name}'" ) @@ -554,7 +556,13 @@ def _run( wait_interval_ms: int = DEFAULT_CURSOR_IDLE_INTERVAL_MS, cursor_name: str = "default", ): - cursor_name = f"risingwave_py_cursor_{cursor_name}_{self.schema_name}_{self.sub_name}" + cursor_name = ( + f"risingwave_py_cursor_{cursor_name}_{self.schema_name}_{self.sub_name}" + # https://github.com/risingwavelabs/risingwave/pull/20221 + if self.conn.rw_version >= "2.3.0" + else f"{self.schema_name}.risingwave_py_cursor_{cursor_name}_{self.sub_name}" + ) + print(cursor_name) fully_qual_sub_name = f"{self.schema_name}.{self.sub_name}" if self.persist_progress: