From cce1631a5546528058b63c1524a048aaea9f7cf0 Mon Sep 17 00:00:00 2001 From: Rainer Weikusat Date: Wed, 21 May 2025 09:15:50 +0100 Subject: [PATCH 1/2] async connect changes as single commit --- Pg.h | 2 + Pg.pm | 35 ++++++++- Pg.xs | 11 +++ dbdimp.c | 194 +++++++++++++++++++++++++++++++++++++------------ dbdimp.h | 3 + t/03dbmethod.t | 56 ++++++++++++++ 6 files changed, 253 insertions(+), 48 deletions(-) diff --git a/Pg.h b/Pg.h index 250ffabf..27b9bcea 100644 --- a/Pg.h +++ b/Pg.h @@ -121,6 +121,8 @@ DBISTATE_DECLARE; #define TRACE_PQCMDSTATUS TRACE_XX "%sPQcmdStatus\n", THEADER_slow) #define TRACE_PQCMDTUPLES TRACE_XX "%sPQcmdTuples\n", THEADER_slow) #define TRACE_PQCONNECTDB TRACE_XX "%sPQconnectdb\n", THEADER_slow) +#define TRACE_PQCONNECTSTART TRACE_XX "%sPQconnectStart\n", THEADER_slow) +#define TRACE_PQCONNECTPOLL TRACE_XX "%sPQconnectPoll\n", THEADER_slow) #define TRACE_PQCONSUMEINPUT TRACE_XX "%sPQconsumeInput\n", THEADER_slow) #define TRACE_PQDB TRACE_XX "%sPQdb\n", THEADER_slow) #define TRACE_PQENDCOPY TRACE_XX "%sPQendcopy\n", THEADER_slow) diff --git a/Pg.pm b/Pg.pm index da9d0776..cf6800ac 100644 --- a/Pg.pm +++ b/Pg.pm @@ -41,7 +41,7 @@ use 5.008001; our %EXPORT_TAGS = ( - async => [qw($DBDPG_DEFAULT PG_ASYNC PG_OLDQUERY_CANCEL PG_OLDQUERY_WAIT)], + async => [qw($DBDPG_DEFAULT PG_ASYNC PG_OLDQUERY_CANCEL PG_OLDQUERY_WAIT PG_ASYNC_CONN_READ PG_ASYNC_CONN_WRITE)], pg_limits => [qw($DBDPG_DEFAULT PG_MIN_SMALLINT PG_MAX_SMALLINT PG_MIN_INTEGER PG_MAX_INTEGER PG_MAX_BIGINT PG_MIN_BIGINT PG_MIN_SMALLSERIAL PG_MAX_SMALLSERIAL PG_MIN_SERIAL PG_MAX_SERIAL PG_MIN_BIGSERIAL PG_MAX_BIGSERIAL)], @@ -151,6 +151,7 @@ use 5.008001; # uncoverable branch false if (!$methods_are_installed) { DBD::Pg::db->install_method('pg_cancel'); + DBD::Pg::db->install_method('pg_continue_connect'); DBD::Pg::db->install_method('pg_endcopy'); DBD::Pg::db->install_method('pg_error_field'); DBD::Pg::db->install_method('pg_getline'); @@ -3223,6 +3224,25 @@ the L method. Creates a copy of the database handle by connecting with the same parameters as the original handle, then trying to merge the attributes. See the DBI documentation for complete usage. +=head3 B + + $rc = $dbh->pg_continue_connect(); + +Continues an asychronous connect operation. See B below. After an asychronous connect was initiated, this +method must be called in a loop for as long as it returns either 1 or +2, indicating a desire to read or write data, +respectively. Afterwards, the next call to pg_continue_connect must +not take place until an indication that data can either be +read or written on the current pg_socket was obtained, eg, via +select. + +The method returns -1 if no asynchronous connect was in progress, -2 to +indicate that an asynchronous connect failed and 0 if the connection +was successfully established. + +The socket may have changed after each call to the method. + =head2 Database Handle Attributes =head3 B (boolean) @@ -4290,6 +4310,19 @@ as you don't need it anymore. $count = $sth2->fetchall_arrayref()->[0][0]; } +=head3 Asynchronous Connect + +Passing a true value for the attribute pg_async_connect to the DBI +connect method, eg, + + $dbh = DBI->connect('dbi:Pg:...', $username, $password, + { pg_async_connect => 1 }); + +starts an asynchronous connect. The B method must +be used afterwards to complete the connection establishment process. If +the attribute is present but its value is false, an ordinarty +synchronous connect will be done instead. + =head2 Array support DBD::Pg allows arrays (as arrayrefs) to be passed in to both diff --git a/Pg.xs b/Pg.xs index db0c10e8..2769f9b2 100644 --- a/Pg.xs +++ b/Pg.xs @@ -223,6 +223,9 @@ constant(name=Nullch) PG_OLDQUERY_CANCEL = 2 PG_OLDQUERY_WAIT = 4 + PG_ASYNC_CONN_READ = 1 + PG_ASYNC_CONN_WRITE = 2 + CODE: if (0==ix) { if (!name) { @@ -847,6 +850,14 @@ _pg_type_info (type_sv=Nullsv) ST(0) = sv_2mortal( newSViv( type_num ) ); } +int +pg_continue_connect(dbh) + SV* dbh + CODE: + RETVAL = pg_db_continue_connect(dbh); + OUTPUT: + RETVAL + void pg_result(dbh) SV * dbh diff --git a/dbdimp.c b/dbdimp.c index ae5186d9..28246088 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -68,6 +68,14 @@ typedef enum (SvROK(h) && SvTYPE(SvRV(h)) == SVt_PVHV && \ SvRMAGICAL(SvRV(h)) && (SvMAGIC(SvRV(h)))->mg_type == 'P') +enum { + DBH_ASYNC_CANCELLED = -1, + DBH_NO_ASYNC, + DBH_ASYNC, + DBH_ASYNC_CONNECT, + DBH_ASYNC_CONNECT_POLL +}; + static void pg_error(pTHX_ SV *h, int error_num, const char *error_msg); static void pg_warn (void * arg, const char * message); static ExecStatusType _result(pTHX_ imp_dbh_t *imp_dbh, const char *sql); @@ -92,9 +100,62 @@ void dbd_init (dbistate_t *dbistate) /* ================================================================== */ -int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, char * pwd, SV *attr) +static int want_async_connect(pTHX_ SV *attrs) { + HV *hv; + SV **psv, *sv; + + return + attrs + && (psv = hv_fetchs((HV *)SvRV(attrs), "pg_async_connect", 0)) + && (sv = *psv) + && SvTRUE(sv); +} + +static void after_connect_init(pTHX_ SV *dbh, imp_dbh_t * imp_dbh) +{ + /* Figure out what protocol this server is using (most likely 3) */ + TRACE_PQPROTOCOLVERSION; + imp_dbh->pg_protocol = PQprotocolVersion(imp_dbh->conn); + + /* Figure out this particular backend's version */ + TRACE_PQSERVERVERSION; + imp_dbh->pg_server_version = PQserverVersion(imp_dbh->conn); + + if (imp_dbh->pg_server_version < 80000) { + /* + Special workaround for PgBouncer, which has the unfortunate habit of modifying 'server_version', + something it should never do. If we think this is the case for the version failure, we + simply allow things to continue with a faked version. See github issue #47 + */ + if (NULL != strstr(PQparameterStatus(imp_dbh->conn, "server_version"), "bouncer")) { + imp_dbh->pg_server_version = 90600; + } + else { + TRACE_PQERRORMESSAGE; + strncpy(imp_dbh->sqlstate, "08001", 6); /* sqlclient_unable_to_establish_sqlconnection */ + pg_error(aTHX_ dbh, CONNECTION_BAD, "Server version 8.0 required"); + TRACE_PQFINISH; + PQfinish(imp_dbh->conn); + sv_free((SV *)imp_dbh->savepoints); + if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login (error)\n", THEADER_slow); + return; + } + } + + pg_db_detect_client_encoding_utf8(aTHX_ imp_dbh); + /* If the client_encoding is UTF8, flip the utf8 flag until convinced otherwise */ + imp_dbh->pg_utf8_flag = imp_dbh->client_encoding_utf8; + + /* Tell DBI that we should call destroy when the handle dies */ + DBIc_IMPSET_on(imp_dbh); + /* Tell DBI that we should call disconnect when the handle dies */ + DBIc_ACTIVE_on(imp_dbh); +} + +int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, char * pwd, SV *attr) +{ dTHR; dTHX; char * conn_str; @@ -102,8 +163,15 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, cha bool inquote = DBDPG_FALSE; STRLEN connect_string_size; ConnStatusType connstatus; + int async_connect; - if (TSTART_slow) TRC(DBILOGFP, "%sBegin dbd_db_login\n", THEADER_slow); + async_connect = want_async_connect(aTHX_ attr); + + if (TSTART_slow) { + TRC(DBILOGFP, "%sBegin dbd_db_login6\n", THEADER_slow); + if (async_connect) + TRC(DBILOGFP, "%sAsync connect requested\n", THEADER_slow); + } /* DBD::Pg syntax: 'dbname=dbname;host=host;port=port', 'User', 'Pass' */ /* libpq syntax: 'dbname=dbname host=host port=port user=uid password=pwd' */ @@ -172,12 +240,18 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, cha TRACE_PQFINISH; PQfinish(imp_dbh->conn); } - + /* Attempt the connection to the database */ if (TLOGIN_slow) TRC(DBILOGFP, "%sLogin connection string: (%s)\n", THEADER_slow, conn_str); - TRACE_PQCONNECTDB; - imp_dbh->conn = PQconnectdb(conn_str); - if (TLOGIN_slow) TRC(DBILOGFP, "%sConnection complete\n", THEADER_slow); + if (async_connect) { + TRACE_PQCONNECTSTART; + imp_dbh->conn = PQconnectStart(conn_str); + if (TLOGIN_slow) TRC(DBILOGFP, "%sConnection started\n", THEADER_slow); + } else { + TRACE_PQCONNECTDB; + imp_dbh->conn = PQconnectdb(conn_str); + if (TLOGIN_slow) TRC(DBILOGFP, "%sConnection complete\n", THEADER_slow); + } Safefree(conn_str); /* Set the initial sqlstate */ @@ -187,7 +261,8 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, cha /* Check to see that the backend connection was successfully made */ TRACE_PQSTATUS; connstatus = PQstatus(imp_dbh->conn); - if (CONNECTION_OK != connstatus) { + switch (connstatus) { + case CONNECTION_BAD: TRACE_PQERRORMESSAGE; strncpy(imp_dbh->sqlstate, "08006", 6); /* "CONNECTION FAILURE" */ pg_error(aTHX_ dbh, connstatus, PQerrorMessage(imp_dbh->conn)); @@ -196,46 +271,15 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, cha sv_free((SV *)imp_dbh->savepoints); if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login (error)\n", THEADER_slow); return 0; + + case CONNECTION_OK: + async_connect = 0; } /* Call the pg_warn function anytime this connection raises a notice */ TRACE_PQSETNOTICEPROCESSOR; (void)PQsetNoticeProcessor(imp_dbh->conn, pg_warn, (void *)SvRV(dbh)); - /* Figure out what protocol this server is using (most likely 3) */ - TRACE_PQPROTOCOLVERSION; - imp_dbh->pg_protocol = PQprotocolVersion(imp_dbh->conn); - - /* Figure out this particular backend's version */ - TRACE_PQSERVERVERSION; - imp_dbh->pg_server_version = PQserverVersion(imp_dbh->conn); - - if (imp_dbh->pg_server_version < 80000) { - /* - Special workaround for PgBouncer, which has the unfortunate habit of modifying 'server_version', - something it should never do. If we think this is the case for the version failure, we - simply allow things to continue with a faked version. See github issue #47 - */ - if (NULL != strstr(PQparameterStatus(imp_dbh->conn, "server_version"), "bouncer")) { - imp_dbh->pg_server_version = 90600; - } - else { - TRACE_PQERRORMESSAGE; - strncpy(imp_dbh->sqlstate, "08001", 6); /* sqlclient_unable_to_establish_sqlconnection */ - pg_error(aTHX_ dbh, CONNECTION_BAD, "Server version 8.0 required"); - TRACE_PQFINISH; - PQfinish(imp_dbh->conn); - sv_free((SV *)imp_dbh->savepoints); - if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login (error)\n", THEADER_slow); - return 0; - } - } - - pg_db_detect_client_encoding_utf8(aTHX_ imp_dbh); - - /* If the client_encoding is UTF8, flip the utf8 flag until convinced otherwise */ - imp_dbh->pg_utf8_flag = imp_dbh->client_encoding_utf8; - imp_dbh->pg_enable_utf8 = -1; imp_dbh->prepare_now = DBDPG_FALSE; @@ -252,18 +296,18 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, cha imp_dbh->copystate = 0; imp_dbh->copybinary = DBDPG_FALSE; imp_dbh->pg_errorlevel = 1; /* Default */ - imp_dbh->async_status = 0; + imp_dbh->async_status = DBH_NO_ASYNC; imp_dbh->async_sth = NULL; imp_dbh->last_result = NULL; /* NULL or the last PGresult returned by a database or statement handle */ imp_dbh->result_clearable = DBDPG_TRUE; imp_dbh->pg_int8_as_string = DBDPG_FALSE; imp_dbh->skip_deallocate = DBDPG_FALSE; - /* Tell DBI that we should call destroy when the handle dies */ - DBIc_IMPSET_on(imp_dbh); - - /* Tell DBI that we should call disconnect when the handle dies */ - DBIc_ACTIVE_on(imp_dbh); + /* if not connecting asynchronously, do after connect init */ + imp_dbh->pg_protocol = -1; + imp_dbh->pg_server_version = -1; + if (async_connect) imp_dbh->async_status = DBH_ASYNC_CONNECT; + else after_connect_init(aTHX_ dbh, imp_dbh); if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_db_login\n", THEADER_slow); @@ -271,6 +315,62 @@ int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, cha } /* end of dbd_db_login */ +int pg_db_continue_connect(SV *dbh) +{ + dTHX; + D_imp_dbh(dbh); + int status; + + if (TSTART_slow) + TRC(DBILOGFP, "%sBegin pg_db_continue_connect\n", THEADER_slow); + + switch (imp_dbh->async_status) { + default: + pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, "No async connect in progress\n"); + status = -1; + break; + + case DBH_ASYNC_CONNECT: + imp_dbh->async_status = DBH_ASYNC_CONNECT_POLL; + status = PGRES_POLLING_WRITING; + break; + + case DBH_ASYNC_CONNECT_POLL: + TRACE_PQCONNECTPOLL; + status = PQconnectPoll(imp_dbh->conn); + if (TRACE5_slow) TRC(DBILOGFP, "%sPQconnectPoll returned %d\n", THEADER_slow, status); + + switch (status) { + case PGRES_POLLING_READING: + case PGRES_POLLING_WRITING: + break; + + case PGRES_POLLING_OK: + if (TLOGIN_slow) TRC(DBILOGFP, "%sconnection established\n", THEADER_slow); + + imp_dbh->async_status = DBH_NO_ASYNC; + after_connect_init(aTHX_ dbh, imp_dbh); + + status = 0; + break; + + case PGRES_POLLING_FAILED: + TRACE_PQERRORMESSAGE; + strncpy(imp_dbh->sqlstate, "08006", 6); /* "CONNECTION FAILURE" */ + pg_error(aTHX_ dbh, PQstatus(imp_dbh->conn), PQerrorMessage(imp_dbh->conn)); + TRACE_PQFINISH; + PQfinish(imp_dbh->conn); + imp_dbh->conn = NULL; + + imp_dbh->async_status = DBH_NO_ASYNC; + + status = -2; + } + } + + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_continue_connect\n", THEADER_slow); + return status; +} /* ================================================================== */ /* diff --git a/dbdimp.h b/dbdimp.h index de74bde5..9dc4d080 100644 --- a/dbdimp.h +++ b/dbdimp.h @@ -144,6 +144,9 @@ extern void dbd_init (dbistate_t *dbistate); #define dbd_db_login6 pg_db_login6 int dbd_db_login6 (SV * dbh, imp_dbh_t * imp_dbh, char * dbname, char * uid, char * pwd, SV *attr); +#define dbd_db_continue_connect pg_db_continue_connect +int dbd_db_continue_connect(SV *h); + #define dbd_db_ping pg_db_ping int dbd_db_ping(SV *dbh); diff --git a/t/03dbmethod.t b/t/03dbmethod.t index b551672e..c6d3a104 100644 --- a/t/03dbmethod.t +++ b/t/03dbmethod.t @@ -2532,6 +2532,62 @@ $t=q{DB handle method "pg_type_info" returns 12 for type 123 (PrintError on)}; is ($dbh->pg_type_info(123), 12, $t); $dbh->{PrintError} = 0; +# +# Test async connect +# +ASYNC_CONNECT: { + my ($dsn, $user) = get_test_settings(); + my ($dbh, $rc); + + # + # test sync connect when pfg_async_connect is false + # + $dbh = DBI->connect($dsn, $user, '', { + RaiseError => 0, + PrintError => 0, + pg_async_connect => 0 }); + unless ($dbh) { + fail('failed to create dbh for sync connect test'); + last; + } + + $rc = $dbh->ping(); + ok($rc == 1, 'pg_ascync_connect false connects synchronously'); + $dbh->disconnect(); + + # + # test async connect + # + $dbh = DBI->connect($dsn, $user, '', { + RaiseError => 0, + PrintError => 0, + pg_async_connect => 1 }); + unless ($dbh) { + fail('failed to create async_connect dbh'); + last; + } + + while ($rc = $dbh->pg_continue_connect(), $rc > 0) { + my ($rin, $win, $ref); + + unless ($rc == 1 || $rc == 2) { + fail('pg_continue_connect return value > 0 but neither 1 nor 2'); + last ASYNC_CONNECT; + } + + $ref = $rc == 1 ? \$rin : \$win; + vec($$ref, $$dbh{pg_socket}, 1) = 1; + $rc = select($rin, $win, undef, undef); + } + ok($rc == 0 || $rc == -2, 'pg_continue_connect loop ended with success or failure return value'); + + # + # test pg_continue_connect ret value when connected + # + $rc = $dbh->pg_continue_connect(); + ok($rc == -1, 'pg_continue_connect returned -1 when async connect not in progress'); +} + done_testing(); exit; From 375cca011a4e101d41f6069d9a8ab6ad7b15ac67 Mon Sep 17 00:00:00 2001 From: Rainer Weikusat Date: Fri, 23 May 2025 12:39:27 +0100 Subject: [PATCH 2/2] some more async connect related changes which accidentally stayed in the async-prepare branch --- dbdimp.c | 64 +++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 45 insertions(+), 19 deletions(-) diff --git a/dbdimp.c b/dbdimp.c index 28246088..dc415952 100644 --- a/dbdimp.c +++ b/dbdimp.c @@ -3146,7 +3146,17 @@ long pg_quickexec (SV * dbh, const char * sql, const int asyncflag) } /* If we are still waiting on an async, handle it */ - if (imp_dbh->async_status) { + switch (imp_dbh->async_status) { + case DBH_NO_ASYNC: + break; + + case DBH_ASYNC_CONNECT: + case DBH_ASYNC_CONNECT_POLL: + if (TRACE5_slow) TRC(DBILOGFP, "%snot yet connected\n", THEADER_slow); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_quickexec (async rows: %d)\n", THEADER_slow, rows); + return -1; + + default: if (TRACE5_slow) TRC(DBILOGFP, "%shandling old async\n", THEADER_slow); rows = handle_old_async(aTHX_ dbh, imp_dbh, asyncflag); if (rows) { @@ -3195,7 +3205,7 @@ long pg_quickexec (SV * dbh, const char * sql, const int asyncflag) if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_quickexec (error: async do failed)\n", THEADER_slow); return -2; } - imp_dbh->async_status = 1; + imp_dbh->async_status = DBH_ASYNC; imp_dbh->async_sth = NULL; /* Needed? */ if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_quickexec (async)\n", THEADER_slow); @@ -3347,7 +3357,17 @@ long dbd_st_execute (SV * sth, imp_sth_t * imp_sth) } /* Check for old async transactions */ - if (imp_dbh->async_status) { + switch (imp_dbh->async_status) { + case DBH_NO_ASYNC: + break; + + case DBH_ASYNC_CONNECT: + case DBH_ASYNC_CONNECT_POLL: + if (TRACE5_slow) TRC(DBILOGFP, "%snot yet connected\n", THEADER_slow); + if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_st_execute\n", THEADER_slow); + return -2; + + default: if (TRACE7_slow) TRC(DBILOGFP, "%sAttempting to handle existing async transaction\n", THEADER_slow); ret = handle_old_async(aTHX_ sth, imp_dbh, imp_sth->async_flag); if (ret) { @@ -3748,7 +3768,7 @@ long dbd_st_execute (SV * sth, imp_sth_t * imp_sth) if (TRACEWARN_slow) TRC(DBILOGFP, "%sEarly return for async query", THEADER_slow); imp_sth->async_status = 1; imp_dbh->async_sth = imp_sth; - imp_dbh->async_status = 1; + imp_dbh->async_status = DBH_ASYNC; if (TEND_slow) TRC(DBILOGFP, "%sEnd dbd_st_execute (async)\n", THEADER_slow); return 0; } @@ -5370,11 +5390,11 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh) if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_result\n", THEADER_slow); - if (1 != imp_dbh->async_status) { + if (DBH_ASYNC != imp_dbh->async_status) { pg_error(aTHX_ h, PGRES_FATAL_ERROR, "No asynchronous query is running\n"); if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (error: no async)\n", THEADER_slow); return -2; - } + } imp_dbh->copystate = 0; /* Assume not in copy mode until told otherwise */ @@ -5473,10 +5493,9 @@ long pg_db_result (SV *h, imp_dbh_t *imp_dbh) imp_dbh->async_sth->rows = rows; imp_dbh->async_sth->async_status = 0; } - imp_dbh->async_status = 0; + imp_dbh->async_status = DBH_NO_ASYNC; if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_result (rows: %ld)\n", THEADER_slow, rows); return rows; - } /* end of pg_db_result */ @@ -5497,11 +5516,18 @@ int pg_db_ready(SV *h, imp_dbh_t *imp_dbh) if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_ready (async status: %d)\n", THEADER_slow, imp_dbh->async_status); - if (0 == imp_dbh->async_status) { + switch (imp_dbh->async_status) { + case DBH_NO_ASYNC: pg_error(aTHX_ h, PGRES_FATAL_ERROR, "No asynchronous query is running\n"); if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_ready (error: no async)\n", THEADER_slow); return -1; - } + + case DBH_ASYNC_CONNECT: + case DBH_ASYNC_CONNECT_POLL: + if (TRACE5_slow) TRC(DBILOGFP, "%snot yet connected\n", THEADER_slow); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_ready (error: not connected)\n", THEADER_slow); + return -1; + } TRACE_PQCONSUMEINPUT; if (!PQconsumeInput(imp_dbh->conn)) { @@ -5536,17 +5562,17 @@ int pg_db_cancel(SV *h, imp_dbh_t *imp_dbh) ExecStatusType status; if (TSTART_slow) TRC(DBILOGFP, "%sBegin pg_db_cancel (async status: %d)\n", - THEADER_slow, imp_dbh->async_status); + THEADER_slow, imp_dbh->async_status); - if (0 == imp_dbh->async_status) { - pg_error(aTHX_ h, PGRES_FATAL_ERROR, "No asynchronous query is running"); - if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel (error: no async)\n", THEADER_slow); - return DBDPG_FALSE; - } + if (DBH_ASYNC != imp_dbh->async_status) { + if (DBH_ASYNC_CANCELLED == imp_dbh->async_status) { + pg_error(aTHX_ h, PGRES_FATAL_ERROR, + "Asychronous query has already been cancelled"); + } else { + pg_error(aTHX_ h, PGRES_FATAL_ERROR, "No asynchronous query is running"); + } - if (-1 == imp_dbh->async_status) { - pg_error(aTHX_ h, PGRES_FATAL_ERROR, "Asychronous query has already been cancelled"); - if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel (error: async cancelled)\n", THEADER_slow); + if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_cancel (error: no async)\n", THEADER_slow); return DBDPG_FALSE; }