Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions risingwave/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def _retry(f, interval_ms: int, times: int):
return f()
except Exception as e:
ee = e
logging.warning(f"retrying function, exception: {e}, {traceback.format_exc()}")
logging.warning(
f"retrying function, exception: {e}, {traceback.format_exc()}"
)
cnt += 1
time.sleep(interval_ms / 1000)
raise RuntimeError(
Expand Down Expand Up @@ -153,12 +155,12 @@ def from_connection_info(
user: Username for authentication
password: Password for authentication
database: Name of the database to connect to
ssl: SSL mode for connection. Valid values are "disable", "allow",
ssl: SSL mode for connection. Valid values are "disable", "allow",
"prefer", "require", "verify-ca", "verify-full"
**extra_params: Additional connection parameters to be included in the URL

Returns:
RisingWaveConnOptions: A connection options instance configured with the
RisingWaveConnOptions: A connection options instance configured with the
provided parameters

Examples:
Expand Down Expand Up @@ -225,7 +227,7 @@ def fetch(self, sql: str, format=OutputFormat.RAW, *args):
*args: Additional arguments to be passed to the SQL query.

Returns:
The fetched result.
The fetched result.
If `format` is set to `OutputFormat.DATAFRAME`, the result is returned as a pandas DataFrame.
Otherwise, the result is returned as a list of tuples.

Expand Down Expand Up @@ -257,7 +259,7 @@ def fetchone(self, sql: str, format=OutputFormat.RAW, *args):
*args: Additional arguments to be passed to the SQL query.

Returns:
The first row of the result set or None if the result set is empty.
The first row of the result set or None if the result set is empty.
If format is set to OutputFormat.DATAFRAME, it returns a pandas DataFrame with the result.
Otherwise, it returns a tuple.

Expand Down Expand Up @@ -381,7 +383,7 @@ def check_exist(self, name: str, schema_name: str = "public"):
Returns:
bool: True if the table exists, False otherwise.
"""

result = self.fetch(
f"SELECT * FROM information_schema.tables WHERE table_name = '{name}' and table_schema = '{schema_name}'"
)
Expand Down Expand Up @@ -554,7 +556,13 @@ def _run(
wait_interval_ms: int = DEFAULT_CURSOR_IDLE_INTERVAL_MS,
cursor_name: str = "default",
):
cursor_name = f"{self.schema_name}.risingwave_py_cursor_{cursor_name}_{self.sub_name}"
cursor_name = (
f"risingwave_py_cursor_{cursor_name}_{self.schema_name}_{self.sub_name}"
# https://github.com/risingwavelabs/risingwave/pull/20221
if self.conn.rw_version >= "2.3.0"
else f"{self.schema_name}.risingwave_py_cursor_{cursor_name}_{self.sub_name}"
)
print(cursor_name)
fully_qual_sub_name = f"{self.schema_name}.{self.sub_name}"

if self.persist_progress:
Expand Down