@@ -31,7 +31,9 @@ def _retry(f, interval_ms: int, times: int):
3131 return f ()
3232 except Exception as e :
3333 ee = e
34- logging .warning (f"retrying function, exception: { e } , { traceback .format_exc ()} " )
34+ logging .warning (
35+ f"retrying function, exception: { e } , { traceback .format_exc ()} "
36+ )
3537 cnt += 1
3638 time .sleep (interval_ms / 1000 )
3739 raise RuntimeError (
@@ -153,12 +155,12 @@ def from_connection_info(
153155 user: Username for authentication
154156 password: Password for authentication
155157 database: Name of the database to connect to
156- ssl: SSL mode for connection. Valid values are "disable", "allow",
158+ ssl: SSL mode for connection. Valid values are "disable", "allow",
157159 "prefer", "require", "verify-ca", "verify-full"
158160 **extra_params: Additional connection parameters to be included in the URL
159161
160162 Returns:
161- RisingWaveConnOptions: A connection options instance configured with the
163+ RisingWaveConnOptions: A connection options instance configured with the
162164 provided parameters
163165
164166 Examples:
@@ -225,7 +227,7 @@ def fetch(self, sql: str, format=OutputFormat.RAW, *args):
225227 *args: Additional arguments to be passed to the SQL query.
226228
227229 Returns:
228- The fetched result.
230+ The fetched result.
229231 If `format` is set to `OutputFormat.DATAFRAME`, the result is returned as a pandas DataFrame.
230232 Otherwise, the result is returned as a list of tuples.
231233
@@ -257,7 +259,7 @@ def fetchone(self, sql: str, format=OutputFormat.RAW, *args):
257259 *args: Additional arguments to be passed to the SQL query.
258260
259261 Returns:
260- The first row of the result set or None if the result set is empty.
262+ The first row of the result set or None if the result set is empty.
261263 If format is set to OutputFormat.DATAFRAME, it returns a pandas DataFrame with the result.
262264 Otherwise, it returns a tuple.
263265
@@ -381,7 +383,7 @@ def check_exist(self, name: str, schema_name: str = "public"):
381383 Returns:
382384 bool: True if the table exists, False otherwise.
383385 """
384-
386+
385387 result = self .fetch (
386388 f"SELECT * FROM information_schema.tables WHERE table_name = '{ name } ' and table_schema = '{ schema_name } '"
387389 )
@@ -554,7 +556,13 @@ def _run(
554556 wait_interval_ms : int = DEFAULT_CURSOR_IDLE_INTERVAL_MS ,
555557 cursor_name : str = "default" ,
556558 ):
557- cursor_name = f"{ self .schema_name } .risingwave_py_cursor_{ cursor_name } _{ self .sub_name } "
559+ cursor_name = (
560+ f"risingwave_py_cursor_{ cursor_name } _{ self .schema_name } _{ self .sub_name } "
561+ # https://github.com/risingwavelabs/risingwave/pull/20221
562+ if self .conn .rw_version >= "2.3.0"
563+ else f"{ self .schema_name } .risingwave_py_cursor_{ cursor_name } _{ self .sub_name } "
564+ )
565+ print (cursor_name )
558566 fully_qual_sub_name = f"{ self .schema_name } .{ self .sub_name } "
559567
560568 if self .persist_progress :
0 commit comments