One of the things my employer uses dovecot for is as mail download
server for an 'e-mail purification service' (AV/ anti-spam) for
smartphones. The service itself presently runs on a rented server
somewhere in the UK and the corresponding 'web service' front-end and
user account/ mail account database resides on a server in
Germany. The UK dovecot server uses the PostgreSQL server on the German
machine for user authentication. The latter is reachable using two
entirely different 'internet paths' and there was an outage of several
hours on one of them a couple of weeks ago. This prompted some frantic
network reconfiguration efforts in order to get the abovementioned
service going again and resulted in the conviction that - ideally -
the dovecot server should be capable of using connections to multiple
PostgreSQL servers (or a single server reachable via several IPs)
simultaneoulsy, distributing requests among them, and should be
capable of detecting a possible problem on one of the db server
connections and use the still functioning ones to continue operations.
After having spent about eight hours reading through the code of
existing pgsql driver, I concluded that modifying that in order to
achieve the goals outlined above would require some major brain
surgery with a high chance of causing a lot of collateral damage in
the course of that and that writing a new driver providing the
sought-after feature set was the better idea. Which is the point of
this e-mail.
Example Usage
-------------
Multiple connections can be specified in an ordinary connect-string by
separating the necessary parameters with a ;;;-sequence. For the
server I was writing about above, this looks like:
connect = host=1.1.1.1 dbname=mailgate user=mailgate_user password=secret sslmode=require ;;; \
host=2.2.2.2 dbname=mailgate user=mailgate_user password=secret sslmode=require
Leading and trailing whitespace in a connect sub-string is ignored.
Basic Operation
---------------
After having split the connect string, the driver creates a connection
structure (struct multi_pgsql_pgc) for each connection specification
contained in that and initiates an asynchronous connect for each. As
the actual connections become ready, they are put onto a (linked list)
(FIFO) queue of 'connections which are ready'. When a query is
submitted to the driver, the first connection structure is removed
from the ready list and used to execute the query. After this has
finished, the connection is added to the end of the 'connections which
are ready' list. If no connection is ready at the time a query was
submitted, the query is put on a FIFO queue of queries which will be
executed as the resources to actually do so become availabe. Each
query has a lifetime of 60 seconds (arbitrary). A query which couldn't
be started before his lifetime was over will be aborted with a timeout
error. A connection which is currently being used to execute a query
will be considered dead if no forward progress of any kind has
happened for 20 seconds (also arbitrary). If this happens (or if some
kind of I/O error occurs), the existing connection will be closed and
an asychronous connect on a newly created one will be
started. Assuming the lifetime of the query isn't yet over, it will
either be started again on another available connection or pushed back
onto the front of the query queue, causing it to be started as soon as
a connection becomes available. Attemtps to reestablish the failed
connection will continue (with 5 seconds of delay between each) in the
background until success.
Design Overview
---------------
struct multi_pgsql_pgdb & routines working on that provide both the
'simple' query interface and transaction support.
struct multi_pgsql_pgc represents a database connection.
The result interface is implemented as struct multi_pgsql_result &
assorted subroutines.
Transactions are provided by a struct multi_pgsql_transaction_context
which contains a linked-list of transaction subqueries
(struct multi_pgsql_transaction_subquery).
A 'class hierarchy' of 'query classes' is used to provide the
different kinds of 'queries' supported by the driver (simple queries,
simple synchronous queries, transaction queries, synchronous
transaction queries). The complete hierarchy itself looks like this:
multi_pgsql_query
/ \
/ \
multi_pgsql_simple_query multi_pgsql_transaction_query
/ \ / \
/ \ / \
multi_pgsql_user_query multi_pgsql_sync_query \
/ \
/ \
multi_pgsql_async_transaction_query multi_pgsql_sync_transaction_query
Notable Difference to the pgsql Driver
--------------------------------------
sql_exec
This interface is not implemented. It is (as far as I can
tell) not used anywhere except inside the sqllite driver. It
cannot be used for queries which return information because
there is no way to access this information and it can neither
generally be used for queries which cause side effects since
it cannot be determined if execution was successful (the only
possible use I can think of is actually a rollback after a
failed transaction).
sql_result_get_field_value_binary
Not implemented. Appears to be unused and the way I understand
the libpq programming documentation, the only way to retrieve
a binary result to some query is to use PQsendQueryParams or
PQsendQueryPrepare and request that some columns are returned
in binary. Nothing like this is supported in the present,
abstract SQL database interface.
synchronous query execution
Because background connection reestablishment might still be
going on after a synchronous query has been executed, it is
necessary to switch any timeouts/ io operations registered to
the ioloop used for synchronous query execution back to the
main ioloop. The code implements this by using
io_loop_set_current to temporarily restore the original
ioloop, unregistering and registering any timeouts/
io-requests possibly associated with one of the existing
database connections, switching back to the synchronous
execution ioloop afterwards and then destroying that via
io_loop_destroy. This works but is probably not exactly how
the existing interfaces were supposed to be used.
error messages for synchronously committed transaction
These are put onto the data stack by the commit_s subroutine
which will hopefully (judging from the dict-code, this should
work, too) be sufficient to enable them to survive until
someone higher up in the callchain wants to look at them.
Remarks on the Patch
--------------------
Created against dovecot-1-2-cf3fe573a560. Applies cleanly to
1.2.11. Applies to 2.0.beta4 with a rejection because of a feature
which doesn't exist anymore. The result can be compiled but it hasn't
been tested.
----
Index: dovecot/configure.in
===================================================================
RCS file: /sysdata/cvs/dovecot/configure.in,v
retrieving revision 1.1.1.2
retrieving revision 1.1.1.2.6.9
diff -u -r1.1.1.2 -r1.1.1.2.6.9
--- dovecot/configure.in 15 Mar 2010 18:18:14 -0000 1.1.1.2
+++ dovecot/configure.in 5 Apr 2010 16:16:03 -0000 1.1.1.2.6.9
@@ -159,6 +159,11 @@
[ --with-sqlite Build with SQLite3 driver support],
TEST_WITH(sqlite, $withval),
want_sqlite=no)
+
+AC_ARG_WITH(multi-pgsql,
+[ --with-multi-pgsql Build with PostgreSQL driver supporting multiple connections],
+ TEST_WITH(multi-pgsql, $withval),
+ want_multi_pgsql=no)
AC_ARG_WITH(lucene,
[ --with-lucene Build with CLucene full text search support],
@@ -1969,7 +1974,12 @@
LIBS=$old_LIBS
fi
-if test $want_pgsql != no; then
+want_pgsql_driver=no
+test "$want_pgsql" != no && want_pgsql_driver=yes
+test "$want_multi_pgsql" != no && want_pgsql_driver=yes
+
+
+if test $want_pgsql_driver != no; then
AC_CHECK_PROG(PG_CONFIG, pg_config, YES, NO)
if test $PG_CONFIG = NO; then
# based on code from PHP
@@ -2014,19 +2024,20 @@
fi
PGSQL_LIBS="$PGSQL_LIBS -lpq"
AC_DEFINE(HAVE_PGSQL,, Build with PostgreSQL support)
- found_sql_drivers="$found_sql_drivers pgsql"
+ test "$want_pgsql" != no && found_sql_drivers="$found_sql_drivers pgsql"
+ test "$want_multi_pgsql" != no && found_sql_drivers="$found_sql_drivers multi_pgsql"
if test "$all_sql_drivers" = "yes"; then
- sql_drivers="$sql_drivers pgsql"
+ sql_drivers="$sql_drivers pgsql pgsql_timeout"
fi
], [
- if test $want_pgsql = yes; then
+ if test $want_pgsql_driver = yes; then
AC_ERROR([Can't build with PostgreSQL support: libpq-fe.h not found])
fi
])
CPPFLAGS=$old_CPPFLAGS
], [
- if test $want_pgsql = yes; then
+ if test $want_pgsql_driver = yes; then
AC_ERROR([Can't build with PostgreSQL support: libpq not found])
fi
])
@@ -2323,6 +2334,7 @@
build_pgsql=no
build_mysql=no
build_sqlite=no
+build_multi_pgsql=no
for driver in $sql_drivers; do
if test "$driver" = "pgsql"; then
AC_DEFINE(BUILD_PGSQL,, Built-in PostgreSQL support)
@@ -2333,7 +2345,10 @@
elif test "$driver" = "sqlite"; then
AC_DEFINE(BUILD_SQLITE,, Built-in SQLite support)
build_sqlite=yes
- fi
+ elif test "$driver" = "multi_pgsql"; then
+ AC_DEFINE(BUILD_MULTI_PGSQL,, Built-in PostgreSQL support with multiple connection support)
+ build_multi_pgsql=yes
+ fi
done
if test $build_pgsql = no; then
not_sql_drivers="$not_sql_drivers pgsql"
@@ -2344,11 +2359,15 @@
if test $build_sqlite = no; then
not_sql_drivers="$not_sql_drivers sqlite"
fi
+if test $build_multi_pgsql = no; then
+ not_sql_drivers="$not_sql_drivers multi_pgsql"
+fi
AC_SUBST(sql_drivers)
AM_CONDITIONAL(BUILD_PGSQL, test "$build_pgsql" = "yes")
AM_CONDITIONAL(BUILD_MYSQL, test "$build_mysql" = "yes")
AM_CONDITIONAL(BUILD_SQLITE, test "$build_sqlite" = "yes")
+AM_CONDITIONAL(BUILD_MULTI_PGSQL, test "$build_multi_pgsql" = "yes")
AM_CONDITIONAL(SQL_PLUGINS, test "$want_sql" = "plugin")
dnl **
Index: dovecot/src/lib-sql/Makefile.am
===================================================================
RCS file: /sysdata/cvs/dovecot/src/lib-sql/Makefile.am,v
retrieving revision 1.1.1.1
retrieving revision 1.1.1.1.6.3
diff -u -r1.1.1.1 -r1.1.1.1.6.3
--- dovecot/src/lib-sql/Makefile.am 28 Dec 2009 13:52:04 -0000 1.1.1.1
+++ dovecot/src/lib-sql/Makefile.am 5 Apr 2010 15:05:54 -0000 1.1.1.1.6.3
@@ -15,11 +15,16 @@
SQLITE_LIB = libdriver_sqlite.la
SQL_DRIVER_PLUGINS += sqlite
endif
+if BUILD_MULTI_PGSQL
+MULTI_PGSQL_LIB = libdriver_multi_pgsql.la
+SQL_DRIVER_PLUGINS += multi_pgsql
+endif
sql_module_LTLIBRARIES = \
$(MYSQL_LIB) \
$(PGSQL_LIB) \
- $(SQLITE_LIB)
+ $(SQLITE_LIB) \
+ $(MULTI_PGSQL_LIB)
sql_moduledir = $(moduledir)/sql
endif
@@ -39,7 +44,8 @@
driver_sources = \
driver-mysql.c \
driver-pgsql.c \
- driver-sqlite.c
+ driver-sqlite.c \
+ driver-multi-pgsql.c
endif
libsql_a_SOURCES = \
@@ -62,6 +68,11 @@
libdriver_sqlite_la_LIBADD = $(SQLITE_LIBS)
libdriver_sqlite_la_CPPFLAGS = -I$(top_srcdir)/src/lib $(SQLITE_CFLAGS)
libdriver_sqlite_la_SOURCES = driver-sqlite.c
+
+libdriver_multi_pgsql_la_LDFLAGS = -module -avoid-version
+libdriver_multi_pgsql_la_LIBADD = $(MULTI_PGSQL_LIBS)
+libdriver_multi_pgsql_la_CPPFLAGS = -I$(top_srcdir)/src/lib $(MULTI_PGSQL_CFLAGS)
+libdriver_multi_pgsql_la_SOURCES = driver-multi-pgsql.c
endif
headers = \
Index: dovecot/src/lib-sql/driver-multi-pgsql.c
===================================================================
RCS file: dovecot/src/lib-sql/driver-multi-pgsql.c
diff -N dovecot/src/lib-sql/driver-multi-pgsql.c
--- /dev/null 1 Jan 1970 00:00:00 -0000
+++ dovecot/src/lib-sql/driver-multi-pgsql.c 14 Apr 2010 15:31:48 -0000 1.1.2.134
@@ -0,0 +1,1763 @@
+/*
+ Copyright (c) MAD Partners, Ltd 2010 (rweikusat@madpartnerltd.com)
+ Portions Copyright (c) 2004-2010 Dovecot authors, see the included COPYING file
+*/
+
+/* includes */
+#include "lib.h"
+#include "array.h"
+#include "ioloop.h"
+#include "ioloop-internal.h" /* kind of dirty, but it should be fine.. */
+#include "sql-api-private.h"
+
+#ifdef BUILD_MULTI_PGSQL
+#include
+#include
+#include
+#include
+
+/* macros */
+/* #define DEBUG */
+#ifdef DEBUG
+# define dprintf(args) i_info args
+#else
+# define dprintf(args)
+#endif
+
+#define DEINIT_ERROR "db driver deinit"
+#define TIMEOUT_ERROR "query timed out"
+
+/* constants */
+enum {
+ MULTI_PGSQL_POOL = 512,
+ MULTI_PGSQL_XACT_POOL = 1024,
+
+ TIMEOUT_UNIT = 1000,
+
+ RECONNECT_DELAY = 5 * TIMEOUT_UNIT,
+ USER_QUERY_LIFETIME = 60,
+
+ PGC_IO_TIMEOUT = 20,
+ MIN_QUERY_TIMEOUT = 10
+};
+
+/** general helper routines */
+static char *kill_pg_errmsg_newline(char *s)
+{
+ char *r;
+ unsigned c;
+
+ r = s;
+ while ((c = *r) && c != '\n') ++r;
+ if (c) *r = 0;
+
+ return s;
+}
+
+/** error result */
+struct multi_pgsql_error_result {
+ struct sql_result api;
+ char *msg;
+};
+
+static void error_result_free(struct sql_result *r)
+{
+ struct multi_pgsql_error_result *result;
+
+ result = (void *)r;
+ dprintf(("%s: %p", __func__, result));
+
+ i_free(result->msg);
+ i_free(result);
+}
+
+static void error_result_nop(struct sql_result *r)
+{
+ dprintf(("%s: %p", __func__, r));
+}
+
+static int error_result_next_row(struct sql_result *r)
+{
+ dprintf(("%s: %p", __func__, r));
+ return -1;
+}
+
+static char const *error_result_get_error(struct sql_result *r)
+{
+ struct multi_pgsql_error_result *result;
+
+ result = (void *)r;
+ dprintf(("%s: %p", __func__, result));
+
+ return result->msg;
+}
+
+static void init_error_result(struct multi_pgsql_error_result *result,
+ char const *msg)
+{
+ struct sql_result_vfuncs *v;
+
+ memset(&result->api, 0, sizeof(result->api));
+
+ v = &result->api.v;
+ v->free = error_result_nop;
+ v->next_row = error_result_next_row;
+ v->get_error = error_result_get_error;
+
+ result->msg = (char *)msg;
+
+ dprintf(("%s: %p", __func__, result));
+}
+
+/** multi_pgsql_result */
+struct multi_pgsql_result {
+ struct sql_result api;
+ struct multi_pgsql_pgc *pgc;
+
+ PGresult *pgr;
+ unsigned row, n_rows;
+ char const **values, *errmsg;
+};
+
+/*** methods/ subroutines */
+static void multi_pgsql_result_free(struct sql_result *r)
+{
+ struct multi_pgsql_result *result;
+
+ result = (void *)r;
+
+ PQclear(result->pgr);
+ if (result->values) i_free(result->values);
+ i_free(result);
+}
+
+static void multi_pgsql_result_nop(struct sql_result *r)
+{
+ (void)r;
+}
+
+
+static int multi_pgsql_result_next_row(struct sql_result *r)
+{
+ struct multi_pgsql_result *result;
+ unsigned row;
+
+ dprintf(("%s: %p", __func__, r));
+
+ result = (void *)r;
+ row = result->row;
+ if (++row < result->n_rows) return result->row = row;
+ return 0;
+}
+
+static int multi_pgsql_result_first_row(struct sql_result *r)
+{
+ struct multi_pgsql_result *result;
+ PGresult *pgr;
+ ExecStatusType status;
+ unsigned n_rows;
+
+ result = (void *)r;
+ pgr = result->pgr;
+
+ status = PQresultStatus(pgr);
+ dprintf(("%s: %p: result status %s(%d)",
+ __func__, r, PQresStatus(status), status));
+
+ switch (status) {
+ case PGRES_COMMAND_OK:
+ case PGRES_COPY_OUT:
+ case PGRES_COPY_IN:
+ n_rows = 0;
+ break;
+
+ case PGRES_TUPLES_OK:
+ n_rows = PQntuples(pgr);
+
+ dprintf(("%s: %p: %u rows", __func__, result, n_rows));
+ break;
+
+ default:
+ return -1;
+ }
+
+ result->n_rows = n_rows;
+ result->api.v.next_row = multi_pgsql_result_next_row;
+ return n_rows;
+}
+
+static unsigned multi_pgsql_result_get_fields_count(struct sql_result *r)
+{
+ struct multi_pgsql_result *result;
+ unsigned count;
+
+ result = (void *)r;
+ count = PQnfields(result->pgr);
+
+ dprintf(("%s: %p: %u", __func__, result, count));
+ return count;
+}
+
+static char const *multi_pgsql_result_get_field_name(struct sql_result *r,
+ unsigned i)
+{
+ struct multi_pgsql_result *result;
+ char const *name;
+
+ result = (void *)r;
+ name = PQfname(result->pgr, i);
+
+ dprintf(("%s: %p: %u: '%s'", __func__, result, i, name));
+ return name;
+}
+
+static int multi_pgsql_result_find_field(struct sql_result *r,
+ char const *fname)
+{
+ struct multi_pgsql_result *result;
+ int i;
+
+ result = (void *)r;
+ i = PQfnumber(result->pgr, fname);
+
+ if (i == -1) {
+ i_info("%s: %p: no field named '%s'",
+ __func__, result, fname);
+ return -1;
+ }
+
+ dprintf(("%s: %p: '%s' <-> %d",
+ __func__, result, fname, i));
+ return i;
+}
+
+static char const *multi_pgsql_result_get_field_value(struct sql_result *r,
+ unsigned i)
+{
+ struct multi_pgsql_result *result;
+ char const *value;
+ unsigned row;
+
+ result = (void *)r;
+ row = result->row;
+ value = PQgetvalue(result->pgr, row, i);
+ if (!*value && PQgetisnull(result->pgr, row, i)) value = NULL;
+
+ dprintf(("%s: %p: %u,%u: '%s'",
+ __func__, result, row, i, value ? value : "NULL"));
+ return value;
+}
+
+static unsigned char const *
+multi_pgsql_result_get_field_value_binary(struct sql_result *r,
+ unsigned i, size_t *size)
+{
+ (void)i;
+ i_info("%s: %p: not implemented", __func__, r);
+
+ *size = 0;
+ return NULL;
+}
+
+static char const *multi_pgsql_result_find_field_value(struct sql_result *r,
+ char const *fname)
+{
+ int i;
+
+ dprintf(("%s: %p", __func__, r));
+
+ i = r->v.find_field(r, fname);
+ return i != -1 ? r->v.get_field_value(r, i) : NULL;
+}
+
+static char const * const *multi_pgsql_result_get_values(struct sql_result *r)
+{
+ struct multi_pgsql_result *result;
+ char const **values;
+ unsigned n;
+
+ dprintf(("%s: %p", __func__, r));
+ result = (void *)r;
+
+ values = result->values;
+ if (values) return values;
+
+ n = r->v.get_fields_count(r);
+ values = result->values = i_new(char const *, n);
+ do {
+ --n;
+ values[n] = r->v.get_field_value(r, n);
+ } while (n);
+
+ return values;
+}
+
+static char const *
+multi_pgsql_result_get_error(struct sql_result *r)
+{
+ struct multi_pgsql_result *result;
+ char const *errmsg;
+
+ result = (void *)r;
+
+ errmsg = result->errmsg;
+ if (errmsg) return errmsg;
+
+ errmsg = result->errmsg =
+ kill_pg_errmsg_newline(PQresultErrorMessage(result->pgr));
+ return errmsg;
+}
+
+/*** vtable/ init */
+static struct sql_result multi_pgsql_result = {
+ MEMBER(v) {
+ multi_pgsql_result_nop,
+ multi_pgsql_result_first_row,
+ multi_pgsql_result_get_fields_count,
+ multi_pgsql_result_get_field_name,
+ multi_pgsql_result_find_field,
+ multi_pgsql_result_get_field_value,
+ multi_pgsql_result_get_field_value_binary,
+ multi_pgsql_result_find_field_value,
+ multi_pgsql_result_get_values,
+ multi_pgsql_result_get_error
+ }
+};
+
+static void init_multi_pgsql_result(struct multi_pgsql_result *result,
+ PGresult *pgr)
+{
+ result->api = multi_pgsql_result;
+ result->pgr = pgr;
+ result->row = 0;
+ result->values = NULL;
+}
+
+/** query classes */
+struct multi_pgsql_query {
+ struct multi_pgsql_query *p;
+ struct multi_pgsql_query_vtable *vtable;
+};
+
+struct multi_pgsql_query_vtable {
+ void (*start)(struct multi_pgsql_query *, struct multi_pgsql_pgc *);
+ void (*result)(struct multi_pgsql_query *, struct multi_pgsql_pgc *,
+ PGresult **);
+ void (*abort)(struct multi_pgsql_query *, struct sql_result *);
+ void (*dtor)(struct multi_pgsql_query *);
+ time_t (*eol)(struct multi_pgsql_query *);
+ void (*set_sync_ioloop)(struct multi_pgsql_query *, struct ioloop *);
+ char const *(*get_query)(struct multi_pgsql_query *);
+};
+
+static void start_new_query_on_pgc(struct multi_pgsql_query *, struct multi_pgsql_pgc *);
+static void start_query_on_pgc(struct multi_pgsql_pgc *);
+static void done_with_query(struct multi_pgsql_pgc *);
+
+/*** method invocation wrappers */
+#define the_qry(q) ((struct multi_pgsql_query *)q)
+
+static inline void start_query(void *q, struct multi_pgsql_pgc *pgc)
+{
+ dprintf(("%s: %p", __func__, q));
+ the_qry(q)->vtable->start(the_qry(q), pgc);
+}
+
+static inline void query_result(void *q, struct multi_pgsql_pgc *pgc,
+ PGresult **pgr)
+{
+ dprintf(("%s: %p", __func__, q));
+ the_qry(q)->vtable->result(the_qry(q), pgc, pgr);
+}
+
+static inline void abort_query(void *q, struct sql_result *r)
+{
+ dprintf(("%s: %p", __func__, q));
+
+ the_qry(q)->vtable->abort(the_qry(q), r);
+ the_qry(q)->vtable->dtor(the_qry(q));
+}
+
+static inline void destroy_query(void *q)
+{
+ dprintf(("%s: %p", __func__, q));
+ the_qry(q)->vtable->dtor(the_qry(q));
+}
+
+static inline time_t query_eol(void *q)
+{
+ dprintf(("%s: %p", __func__, q));
+ return the_qry(q)->vtable->eol(the_qry(q));
+}
+
+static inline void set_query_sync_ioloop(void *q, struct ioloop *ioloop)
+{
+ dprintf(("%s: %p", __func__, q));
+ the_qry(q)->vtable->set_sync_ioloop(the_qry(q), ioloop);
+}
+
+static inline char const *get_query(void *q)
+{
+ return
+ the_qry(q)->vtable->get_query(the_qry(q));
+}
+
+#undef the_qry
+
+/*** query */
+static void init_multi_pgsql_query(struct multi_pgsql_query *qry,
+ struct multi_pgsql_query_vtable *vtable)
+{
+ qry->p = NULL;
+ qry->vtable = vtable;
+}
+
+/*** simple_query */
+struct multi_pgsql_simple_query {
+ struct multi_pgsql_query super;
+
+ time_t eol_at;
+ char *query;
+};
+
+static void start_simple_query(struct multi_pgsql_query *q,
+ struct multi_pgsql_pgc *pgc)
+{
+ start_new_query_on_pgc(q, pgc);
+}
+
+static time_t simple_query_eol(struct multi_pgsql_query *q)
+{
+ struct multi_pgsql_simple_query *qry;
+
+ qry = (void *)q;
+ dprintf(("%s: %p", __func__, qry));
+ return qry->eol_at;
+}
+
+static char const *simple_query_get_query(struct multi_pgsql_query *q)
+{
+ return ((struct multi_pgsql_simple_query *)q)->query;
+}
+
+static void init_simple_query(struct multi_pgsql_simple_query *qry,
+ struct multi_pgsql_query_vtable *vtable,
+ unsigned query_lifetime, char const *query)
+{
+ init_multi_pgsql_query(&qry->super, vtable);
+
+ qry->query = (void *)query;
+ qry->eol_at = time(NULL) + query_lifetime;
+}
+
+/*** user_query */
+struct multi_pgsql_user_query {
+ struct multi_pgsql_simple_query super;
+
+ sql_query_callback_t *cb;
+ void *ctx;
+};
+
+static void abort_user_query(struct multi_pgsql_query *q,
+ struct sql_result *r)
+{
+ struct multi_pgsql_user_query *qry;
+
+ qry = (void *)q;
+
+ T_BEGIN {
+ qry->cb(r, qry->ctx);
+ } T_END;
+}
+
+static void user_query_result(struct multi_pgsql_query *q,
+ struct multi_pgsql_pgc *pgc,
+ PGresult **pgr)
+{
+ struct multi_pgsql_result result;
+ struct multi_pgsql_user_query *qry;
+
+ qry = (void *)q;
+ init_multi_pgsql_result(&result, *pgr);
+
+ T_BEGIN {
+ qry->cb(&result.api, qry->ctx);
+ } T_END;
+
+ if (result.values) i_free(result.values);
+
+ done_with_query(pgc);
+}
+
+static void user_query_dtor(struct multi_pgsql_query *q)
+{
+ struct multi_pgsql_user_query *qry;
+
+ qry = (void *)q;
+
+ i_free(qry->super.query);
+ i_free(qry);
+}
+
+/**** vtable/ init */
+static struct multi_pgsql_query_vtable user_query_vtable = {
+ start_simple_query,
+ user_query_result,
+ abort_user_query,
+ user_query_dtor,
+ simple_query_eol,
+ NULL,
+ simple_query_get_query
+};
+
+static struct multi_pgsql_query *create_user_query(char const *query,
+ sql_query_callback_t *cb, void *ctx)
+{
+ struct multi_pgsql_user_query *qry;
+
+ qry = i_new(struct multi_pgsql_user_query, 1);
+
+ init_simple_query(&qry->super, &user_query_vtable,
+ USER_QUERY_LIFETIME, i_strdup(query));
+
+ qry->cb = cb;
+ qry->ctx = ctx;
+ return (void *)qry;
+}
+
+/*** sync_query */
+struct multi_pgsql_sync_query {
+ struct multi_pgsql_simple_query super;
+
+ struct ioloop *ioloop;
+ PGresult *pgr;
+ char *errmsg;
+};
+
+static void sync_query_result(struct multi_pgsql_query *q,
+ struct multi_pgsql_pgc *pgc,
+ PGresult **pgr)
+{
+ struct multi_pgsql_sync_query *qry;
+
+ qry = (void *)q;
+
+ qry->errmsg = NULL;
+ qry->pgr = *pgr;
+ *pgr = NULL;
+
+ done_with_query(pgc);
+}
+
+static void abort_sync_query(struct multi_pgsql_query *q,
+ struct sql_result *r)
+{
+ struct multi_pgsql_sync_query *qry;
+
+ qry = (void *)q;
+
+ qry->pgr = NULL;
+ qry->errmsg = i_strdup(r->v.get_error(r));
+}
+
+static void sync_query_dtor(struct multi_pgsql_query *q)
+{
+ struct multi_pgsql_sync_query *qry;
+
+ qry = (void *)q;
+ io_loop_stop(qry->ioloop);
+}
+
+static void set_sync_query_sync_ioloop(struct multi_pgsql_query *q,
+ struct ioloop *ioloop)
+{
+ struct multi_pgsql_sync_query *qry;
+
+ qry = (void *)q;
+ qry->ioloop = ioloop;
+}
+
+static struct sql_result *
+result_from_sync_query(struct multi_pgsql_sync_query *qry)
+{
+ struct sql_result *result;
+ PGresult *pgr;
+
+ pgr = qry->pgr;
+
+ if (!pgr) {
+ result = (void *)i_new(struct multi_pgsql_error_result, 1);
+ init_error_result((void *)result, qry->errmsg);
+ result->v.free = error_result_free;
+
+ return result;
+ }
+
+ result = (void *)i_new(struct multi_pgsql_result, 1);
+ init_multi_pgsql_result((void *)result, pgr);
+ result->v.free = multi_pgsql_result_free;
+
+ return result;
+}
+
+/**** vtable/ init */
+static struct multi_pgsql_query_vtable sync_query_vtable = {
+ start_simple_query,
+ sync_query_result,
+ abort_sync_query,
+ sync_query_dtor,
+ simple_query_eol,
+ set_sync_query_sync_ioloop,
+ simple_query_get_query
+};
+
+static inline void init_sync_query(struct multi_pgsql_sync_query *qry,
+ char const *query)
+{
+ init_simple_query(&qry->super, &sync_query_vtable,
+ USER_QUERY_LIFETIME, query);
+}
+
+/*** transaction query */
+struct multi_pgsql_transaction_subquery {
+ struct multi_pgsql_transaction_subquery *p;
+
+ char *query;
+ unsigned *affected_rows;
+};
+
+struct multi_pgsql_transaction_context {
+ struct sql_transaction_context super;
+ struct multi_pgsql_transaction_subquery *first, **link_to;
+ pool_t pool;
+};
+
+struct multi_pgsql_transaction_query {
+ struct multi_pgsql_query super;
+ struct multi_pgsql_transaction_context *x_ctx;
+
+ struct multi_pgsql_transaction_subquery *sub;
+ time_t cur_eol;
+
+ /*
+ If set, points to the PGresult corresponding
+ with the subquery which caused the transaction
+ to fail.
+ */
+ PGresult *pgr;
+};
+
+struct multi_pgsql_transaction_query_vtable {
+ struct multi_pgsql_query_vtable super;
+ void (*xact_done)(struct multi_pgsql_transaction_query *, char const *);
+};
+
+static inline void transaction_query_xact_done(void *q, char const *errmsg)
+{
+ struct multi_pgsql_transaction_query *qry;
+ struct multi_pgsql_transaction_query_vtable *vtable;
+
+ dprintf(("%s: %p", __func__, q));
+
+ qry = q;
+ vtable = (void *)qry->super.vtable;
+ vtable->xact_done(q, errmsg);
+}
+
+
+static void start_transaction_query(struct multi_pgsql_query *q,
+ struct multi_pgsql_pgc *pgc)
+{
+ struct multi_pgsql_transaction_query *qry;
+
+ qry = (void *)q;
+ qry->sub = qry->x_ctx->first;
+ start_new_query_on_pgc(q, pgc);
+}
+
+static void transaction_query_result(struct multi_pgsql_query *q,
+ struct multi_pgsql_pgc *pgc,
+ PGresult **ppgr)
+{
+ struct multi_pgsql_transaction_query *qry;
+ struct multi_pgsql_transaction_subquery *sub;
+ PGresult *pgr;
+ char const *errmsg;
+ ExecStatusType status;
+
+ qry = (void *)q;
+
+ pgr = qry->pgr;
+ if (pgr) {
+ errmsg = kill_pg_errmsg_newline(PQresultErrorMessage(pgr));
+
+ query_done:
+ transaction_query_xact_done(qry, errmsg);
+ done_with_query(pgc);
+ return;
+ }
+
+ sub = qry->sub;
+
+ pgr = *ppgr;
+ status = PQresultStatus(pgr);
+ dprintf(("%s: %p: status %s(%d)",
+ __func__, qry, PQresStatus(status), status));
+
+ switch (status) {
+ case PGRES_COMMAND_OK:
+ case PGRES_TUPLES_OK:
+ case PGRES_COPY_OUT:
+ case PGRES_COPY_IN:
+ if (sub->affected_rows)
+ *sub->affected_rows = atoi(PQcmdTuples(pgr));
+
+ sub = sub->p;
+ if (!sub) {
+ errmsg = NULL;
+ goto query_done;
+ }
+
+ qry->sub = sub;
+ qry->cur_eol = time(NULL) + USER_QUERY_LIFETIME;
+ break;
+
+ default:
+ qry->pgr = pgr;
+ *ppgr = NULL;
+
+ qry->cur_eol = 0; /* don't restart failed transactions */
+ sub->query = "rollback";
+ }
+
+ start_query_on_pgc(pgc);
+}
+
+static void abort_transaction_query(struct multi_pgsql_query *q,
+ struct sql_result *r)
+{
+ char const *errmsg;
+ PGresult *pgr;
+
+ pgr = ((struct multi_pgsql_transaction_query *)q)->pgr;
+ errmsg = pgr ?
+ kill_pg_errmsg_newline(PQresultErrorMessage(pgr)) : r->v.get_error(r);
+ transaction_query_xact_done(q, errmsg);
+}
+
+static time_t transaction_query_eol(struct multi_pgsql_query *q)
+{
+ return ((struct multi_pgsql_transaction_query *)q)->cur_eol;
+}
+
+static void transaction_query_dtor(struct multi_pgsql_query *q)
+{
+ struct multi_pgsql_transaction_query *qry;
+ PGresult *pgr;
+ pool_t pool;
+
+ qry = (void *)q;
+
+ pgr = qry->pgr;
+ if (pgr) PQclear(pgr);
+
+ pool = qry->x_ctx->pool;
+ pool_unref(&pool);
+}
+
+static char const *transaction_query_get_query(struct multi_pgsql_query *q)
+{
+ struct multi_pgsql_transaction_query *qry;
+
+ qry = (void *)q;
+ return qry->sub->query;
+}
+
+static void init_transaction_query(struct multi_pgsql_transaction_query *qry,
+ struct multi_pgsql_transaction_query_vtable *vtable,
+ struct multi_pgsql_transaction_context *x_ctx)
+{
+ init_multi_pgsql_query(&qry->super, &vtable->super);
+
+ qry->x_ctx = x_ctx;
+ qry->cur_eol = time(NULL) + USER_QUERY_LIFETIME;
+ qry->pgr = NULL;
+}
+
+/*** async transaction query */
+struct multi_pgsql_async_transaction_query {
+ struct multi_pgsql_transaction_query super;
+
+ sql_commit_callback_t *cb;
+ void *ctx;
+};
+
+static void async_transaction_query_xact_done(struct multi_pgsql_transaction_query *q,
+ char const *errmsg)
+{
+ struct multi_pgsql_async_transaction_query *qry;
+
+ qry = (void *)q;
+ qry->cb(errmsg, qry->ctx);
+}
+
+/**** vtable/ init */
+static struct multi_pgsql_transaction_query_vtable async_transaction_query_vtable = {
+ {
+ start_transaction_query,
+ transaction_query_result,
+ abort_transaction_query,
+ transaction_query_dtor,
+ transaction_query_eol,
+ NULL,
+ transaction_query_get_query },
+
+ async_transaction_query_xact_done
+};
+
+static void
+init_async_transaction_query(struct multi_pgsql_async_transaction_query *qry,
+ struct multi_pgsql_transaction_context *x_ctx,
+ sql_commit_callback_t *cb, void *cb_ctx)
+{
+ init_transaction_query(&qry->super,
+ &async_transaction_query_vtable, x_ctx);
+
+ qry->cb = cb;
+ qry->ctx = cb_ctx;
+}
+
+/*** sync transaction query */
+struct multi_pgsql_sync_transaction_query {
+ struct multi_pgsql_transaction_query super;
+
+ struct ioloop *ioloop;
+ char const *errmsg;
+};
+
+static void sync_transaction_query_dtor(struct multi_pgsql_query *q)
+{
+ struct multi_pgsql_sync_transaction_query *qry;
+
+ qry = (void *)q;
+ io_loop_stop(qry->ioloop);
+ transaction_query_dtor(q);
+}
+
+static void set_sync_transaction_query_sync_ioloop(struct multi_pgsql_query *q,
+ struct ioloop *ioloop)
+{
+ struct multi_pgsql_sync_transaction_query *qry;
+
+ qry = (void *)q;
+ qry->ioloop = ioloop;
+}
+
+static void sync_transaction_query_xact_done(struct multi_pgsql_transaction_query *q,
+ char const *errmsg)
+{
+ struct multi_pgsql_sync_transaction_query *qry;
+
+ qry = (void *)q;
+ qry->errmsg = i_strdup(errmsg);
+}
+
+/**** vtable/ init */
+static struct multi_pgsql_transaction_query_vtable sync_transaction_query_vtable = {
+ {
+ start_transaction_query,
+ transaction_query_result,
+ abort_transaction_query,
+ sync_transaction_query_dtor,
+ transaction_query_eol,
+ set_sync_transaction_query_sync_ioloop,
+ transaction_query_get_query },
+
+ sync_transaction_query_xact_done
+};
+
+static void init_sync_transaction_query(struct multi_pgsql_sync_transaction_query *qry,
+ struct multi_pgsql_transaction_context *x_ctx)
+{
+ init_transaction_query(&qry->super,
+ &sync_transaction_query_vtable, x_ctx);
+}
+
+/** pgc (connection) code */
+struct multi_pgsql_pgc {
+ struct multi_pgsql_pgc *p;
+
+ char *connect_string;
+ PGconn *pgc;
+ PGresult *pgr;
+ struct multi_pgsql_db *pgdb;
+ struct multi_pgsql_query *qry;
+ struct io *io;
+ struct timeout *timeout;
+};
+
+/*** misc */
+static void start_pgc_connect(struct multi_pgsql_pgc *);
+
+static void init_pgc(struct multi_pgsql_pgc *pgc,
+ struct multi_pgsql_db *pgdb, char *connect_string)
+{
+ pgc->connect_string = connect_string;
+ pgc->pgdb = pgdb;
+
+ dprintf(("%s: pgc %p, '%s'", __func__, pgc, connect_string));
+
+ start_pgc_connect(pgc);
+}
+
+static void reset_pgc(struct multi_pgsql_pgc *pgc)
+{
+ dprintf(("%s: %p", __func__, pgc));
+
+ if (pgc->io) io_remove(&pgc->io);
+ if (pgc->timeout) timeout_remove(&pgc->timeout);
+
+ if (pgc->pgr) {
+ PQclear(pgc->pgr);
+ pgc->pgr = NULL;
+ }
+
+ if (pgc->pgc) {
+ PQfinish(pgc->pgc);
+ pgc->pgc = NULL;
+ }
+}
+
+static void pgc_ioloop_switch(struct multi_pgsql_pgc *pgc)
+{
+ union {
+ struct io io;
+ struct timeout timeout;
+ } old;
+
+ if (pgc->io) {
+ old.io = *pgc->io;
+ io_remove(&pgc->io);
+
+ pgc->io = io_add(PQsocket(pgc->pgc),
+ old.io.condition, old.io.callback, old.io.context);
+ }
+
+ if (pgc->timeout) {
+ old.timeout = *pgc->timeout;
+ timeout_remove(&pgc->timeout);
+
+ pgc->timeout = timeout_add(old.timeout.msecs,
+ old.timeout.callback, old.timeout.context);
+ }
+}
+
+static void log_pg_error(struct multi_pgsql_pgc *pgc, char const *fnc, char const *pg_call)
+{
+ i_error("%s: %p: %s: %s",
+ fnc, pgc, pg_call, kill_pg_errmsg_newline(PQerrorMessage(pgc->pgc)));
+}
+
+/*** connection establishment */
+static void add_ready_pgc(struct multi_pgsql_db *, struct multi_pgsql_pgc *);
+
+static void restart_pgc_connect(struct multi_pgsql_pgc *pgc)
+{
+ timeout_remove(&pgc->timeout);
+ start_pgc_connect(pgc);
+}
+
+static void continue_pgc_connect(struct multi_pgsql_pgc *pgc)
+{
+ PGconn *the_pgc;
+ char const *pg_call;
+ PostgresPollingStatusType status;
+ int io_dir, rc;
+
+ io_remove(&pgc->io);
+ the_pgc = pgc->pgc;
+
+ status = PQconnectPoll(the_pgc);
+ dprintf(("%s: %p: PQconnectPoll returned %d",
+ __func__, pgc, status));
+
+ switch (status) {
+ /*
+ According to the PostgreSQL source, this is an
+ unused legacy constant. Here to prevent gcc from
+ complaining about it.
+ */
+ case PGRES_POLLING_ACTIVE:
+ case PGRES_POLLING_OK:
+ rc = PQsetnonblocking(the_pgc, 1);
+ if (rc == -1) {
+ pg_call = "PQsetnonblocking";
+ goto error;
+ }
+
+ i_info("%s: %p: connected", __func__, pgc);
+ add_ready_pgc(pgc->pgdb, pgc);
+ return;
+
+ case PGRES_POLLING_FAILED:
+ pg_call = "PQconnectPoll";
+ goto error;
+
+ case PGRES_POLLING_READING:
+ io_dir = IO_READ;
+ break;
+
+ case PGRES_POLLING_WRITING:
+ io_dir = IO_WRITE;
+ }
+
+ pgc->io = io_add(PQsocket(the_pgc), io_dir, continue_pgc_connect, pgc);
+ return;
+
+error:
+ log_pg_error(pgc, __func__, pg_call);
+
+ PQfinish(the_pgc);
+ pgc->pgc = NULL;
+
+ pgc->timeout = timeout_add(RECONNECT_DELAY,
+ restart_pgc_connect, pgc);
+}
+
+static void start_pgc_connect(struct multi_pgsql_pgc *pgc)
+{
+ PGconn *the_pgc;
+
+ dprintf(("%s: %p", __func__, pgc));
+
+ pgc->pgc = the_pgc = PQconnectStart(pgc->connect_string);
+ if (!the_pgc)
+ i_fatal("%s: %p: out of memory",
+ __func__, pgc);
+
+ if (PQstatus(the_pgc) == CONNECTION_BAD) {
+ log_pg_error(pgc, __func__, "PQconnectStart");
+ pgc->timeout = timeout_add(RECONNECT_DELAY,
+ restart_pgc_connect, pgc);
+ return;
+ }
+
+ pgc->io = io_add(PQsocket(the_pgc), IO_WRITE,
+ continue_pgc_connect, pgc);
+}
+
+/*** query processing */
+/**** error handling */
+static void requeue_query_to_pgdb(struct multi_pgsql_query *, struct multi_pgsql_db *);
+
+static void pgc_query_io_timeout(struct multi_pgsql_pgc *pgc)
+{
+ i_info("%s: %p", __func__, pgc);
+
+ requeue_query_to_pgdb(pgc->qry, pgc->pgdb);
+ pgc->qry = NULL;
+
+ reset_pgc(pgc);
+ start_pgc_connect(pgc);
+}
+
+static void pgc_query_processing_failure(struct multi_pgsql_pgc *pgc,
+ char const *fnc,
+ char const *pg_call)
+{
+ log_pg_error(pgc, fnc, pg_call);
+
+ requeue_query_to_pgdb(pgc->qry, pgc->pgdb);
+ pgc->qry = NULL;
+
+ reset_pgc(pgc);
+ start_pgc_connect(pgc);
+}
+
+/**** result processing */
+static void done_with_query(struct multi_pgsql_pgc *pgc)
+{
+ struct multi_pgsql_query *qry;
+
+ qry = pgc->qry;
+ pgc->qry = NULL;
+
+ destroy_query(qry);
+}
+
+static inline int got_result(PGconn *pgc)
+{
+ PGresult *pgr;
+
+ pgr = PQgetResult(pgc);
+ if (!pgr) return 0;
+
+ PQclear(pgr);
+ return 1;
+}
+
+static void eat_results(struct multi_pgsql_pgc *pgc)
+{
+ PGconn *the_pgc;
+ PGresult *pgr;
+ int rc;
+
+ dprintf(("%s: %p", __func__, pgc));
+
+ the_pgc = pgc->pgc;
+ do {
+ rc = PQconsumeInput(the_pgc);
+ if (rc == 0) {
+ pgc_query_processing_failure(pgc, __func__,
+ "PQconsumeInput");
+ return;
+ }
+
+ if (PQisBusy(the_pgc)) {
+ timeout_reset(pgc->timeout);
+ return;
+ }
+ } while (got_result(the_pgc));
+
+ io_remove(&pgc->io);
+ timeout_remove(&pgc->timeout);
+
+ pgr = pgc->pgr;
+ pgc->pgr = NULL;
+ query_result(pgc->qry, pgc, &pgr);
+ if (pgr) PQclear(pgr);
+
+ if (!pgc->qry) add_ready_pgc(pgc->pgdb, pgc);
+}
+
+static void get_first_result(struct multi_pgsql_pgc *pgc)
+{
+ PGconn *the_pgc;
+ PGresult *pgr;
+ int rc;
+
+ dprintf(("%s: %p", __func__, pgc));
+ the_pgc = pgc->pgc;
+
+ rc = PQconsumeInput(the_pgc);
+ if (rc == 0) {
+ pgc_query_processing_failure(pgc, __func__,
+ "PQconsumeInput");
+ return;
+ }
+
+ if (PQisBusy(the_pgc)) {
+ dprintf(("%s: %p: still busy", __func__, pgc));
+
+ timeout_reset(pgc->timeout);
+ return;
+ }
+
+ io_remove(&pgc->io);
+
+ pgr = pgc->pgr = PQgetResult(the_pgc);
+ if (pgr) {
+ pgc->io = io_add(PQsocket(the_pgc), IO_READ, eat_results, pgc);
+ timeout_reset(pgc->timeout);
+
+ eat_results(pgc);
+ return;
+ } else
+ timeout_remove(&pgc->timeout);
+
+
+ query_result(pgc->qry, pgc, NULL);
+ if (!pgc->qry) add_ready_pgc(pgc->pgdb, pgc);
+}
+
+/**** query transmission */
+static void flush_query(struct multi_pgsql_pgc *pgc)
+{
+ PGconn *the_pgc;
+ int rc;
+
+ dprintf(("%s: %p", __func__, pgc));
+ the_pgc = pgc->pgc;
+
+ rc = PQflush(the_pgc);
+ switch (rc) {
+ case -1:
+ pgc_query_processing_failure(pgc, __func__,
+ "PQflush");
+ return;
+
+ case 0:
+ dprintf(("%s: %p: query sent", __func__, pgc));
+
+ io_remove(&pgc->io);
+ pgc->io = io_add(PQsocket(the_pgc), IO_READ, get_first_result, pgc);
+ }
+
+ timeout_reset(pgc->timeout);
+}
+
+static void start_query_on_pgc(struct multi_pgsql_pgc *pgc)
+{
+ PGconn *the_pgc;
+ char const *pg_call, *query;
+ void (*io_cb)(struct multi_pgsql_pgc *);
+ int rc, io_dir;
+
+ query = get_query(pgc->qry);
+ dprintf(("%s: %p: %s", __func__, pgc, query));
+
+ the_pgc = pgc->pgc;
+
+ rc = PQsendQuery(the_pgc, query);
+ if (rc == 0) {
+ pg_call = "PQsendQuery";
+ goto error;
+ }
+
+ rc = PQflush(the_pgc);
+ switch (rc) {
+ case -1:
+ pg_call = "PQflush";
+ goto error;
+
+ case 0:
+ io_cb = get_first_result;
+ io_dir = IO_READ;
+ break;
+
+ default:
+ io_cb = flush_query;
+ io_dir = IO_WRITE;
+ }
+
+ pgc->io = io_add(PQsocket(the_pgc), io_dir, io_cb, pgc);
+ pgc->timeout = timeout_add(PGC_IO_TIMEOUT * TIMEOUT_UNIT,
+ pgc_query_io_timeout, pgc);
+ return;
+
+error:
+ pgc_query_processing_failure(pgc, __func__, "PQflush");
+}
+
+static void start_new_query_on_pgc(struct multi_pgsql_query *qry,
+ struct multi_pgsql_pgc *pgc)
+{
+ dprintf(("%s: %p -> %p", __func__, qry, pgc));
+
+ pgc->qry = qry;
+ start_query_on_pgc(pgc);
+}
+
+/** pgdb */
+struct multi_pgsql_db {
+ struct sql_db api;
+
+ unsigned n_pgcs;
+ struct multi_pgsql_pgc *pgcs;
+
+ struct {
+ struct multi_pgsql_pgc *first, **link_to;
+ } pgc_q;
+
+ struct {
+ struct timeout *timeout;
+ struct multi_pgsql_query *first, **link_to;
+ } query_q;
+
+ pool_t pool;
+};
+
+static struct multi_pgsql_query *dequeue_query_from_pgdb(struct multi_pgsql_db *);
+
+/*** pgc support code */
+static void do_init_pgcs(struct multi_pgsql_db *pgdb, char const *s, unsigned ofs)
+{
+ char const *r, *lws_start;
+ unsigned c, semi_count;
+
+ while ((c = *s) && isspace(c)) ++s;
+
+ if (!c) {
+ if (!ofs) i_fatal("%s: empty connect string", __func__);
+
+ dprintf(("%s: end of connect_string, %u pgcs",
+ __func__, ofs));
+
+ pgdb->n_pgcs = ofs;
+ pgdb->pgcs = p_new(pgdb->pool, struct multi_pgsql_pgc, ofs);
+ return;
+ }
+
+ semi_count = 0;
+ lws_start = NULL;
+ r = s;
+ do {
+ if (c == ';') {
+ if (++semi_count == 3) break;
+ continue;
+ }
+
+ semi_count = 0;
+
+ if (isspace(c)) {
+ if (!lws_start) lws_start = r;
+ continue;
+ }
+
+ lws_start = NULL;
+ } while ((c = *++r));
+
+ do_init_pgcs(pgdb, c ? r + 1 : r, ofs + 1);
+
+ if (lws_start) r = lws_start;
+ else if (c) r -= 2;
+
+ init_pgc(pgdb->pgcs + ofs, pgdb, p_strndup(pgdb->pool, s, r - s));
+}
+
+static inline void init_pgcs(struct multi_pgsql_db *pgdb, char const *connect_string)
+{
+ do_init_pgcs(pgdb, connect_string, 0);
+}
+
+static void add_ready_pgc(struct multi_pgsql_db *pgdb, struct multi_pgsql_pgc *pgc)
+{
+ struct multi_pgsql_query *qry;
+
+ if (!pgdb->pgc_q.first) {
+ qry = dequeue_query_from_pgdb(pgdb);
+
+ if (qry) {
+ start_query(qry, pgc);
+ return;
+ }
+ }
+
+ *pgdb->pgc_q.link_to = pgc;
+ pgdb->pgc_q.link_to = &pgc->p;
+
+ dprintf(("%s: %p -> %p", __func__, pgc, pgdb));
+}
+
+static struct multi_pgsql_pgc *get_ready_pgc(struct multi_pgsql_db *pgdb)
+{
+ struct multi_pgsql_pgc *pgc, *next_pgc;
+
+ pgc = pgdb->pgc_q.first;
+ if (!pgc) return NULL;
+
+ next_pgc = pgc->p;
+ if (next_pgc) pgc->p = NULL;
+ else pgdb->pgc_q.link_to = &pgdb->pgc_q.first;
+ pgdb->pgc_q.first = next_pgc;
+
+ dprintf(("%s: %p -> %p", __func__, pgdb, pgc));
+ return pgc;
+}
+
+static void switch_pgdb_pgcs(struct multi_pgsql_db *pgdb)
+{
+ struct multi_pgsql_pgc *pgcs;
+ unsigned n;
+
+ pgcs = pgdb->pgcs;
+ n = pgdb->n_pgcs;
+ do pgc_ioloop_switch(pgcs + --n); while (n);
+}
+
+/*** query timeout */
+static void start_pgdb_query_timeout(struct multi_pgsql_db *,
+ struct multi_pgsql_query *);
+
+static void pgdb_query_timeout(struct multi_pgsql_db *pgdb)
+{
+ struct multi_pgsql_query *qry, *next_qry;
+ struct multi_pgsql_error_result timeout;
+ time_t now;
+
+ dprintf(("%s: %p", __func__, pgdb));
+
+ timeout_remove(&pgdb->query_q.timeout);
+ qry = pgdb->query_q.first;
+ now = time(NULL);
+ init_error_result(&timeout, TIMEOUT_ERROR);
+ do {
+ next_qry = qry->p;
+ abort_query(qry, &timeout.api);
+ } while (next_qry && now > query_eol(next_qry));
+
+ pgdb->query_q.first = next_qry;
+
+ if (next_qry) {
+ start_pgdb_query_timeout(pgdb, next_qry);
+ return;
+ }
+
+ pgdb->query_q.link_to = &pgdb->query_q.first;
+}
+
+static void start_pgdb_query_timeout(struct multi_pgsql_db *pgdb,
+ struct multi_pgsql_query *qry)
+{
+ time_t now, timeout, eol;
+
+ now = time(NULL);
+ eol = query_eol(qry);
+ timeout = now < eol ? eol - now : 0;
+ if (timeout < MIN_QUERY_TIMEOUT) timeout = MIN_QUERY_TIMEOUT;
+
+ pgdb->query_q.timeout = timeout_add(timeout * TIMEOUT_UNIT,
+ pgdb_query_timeout, pgdb);
+ dprintf(("%s: %p: timeout in %us", __func__, pgdb, (unsigned)timeout));
+}
+
+/*** query queueing */
+static void queue_query_to_pgdb(struct multi_pgsql_query *qry, struct multi_pgsql_db *pgdb)
+{
+ if (!pgdb->query_q.first) start_pgdb_query_timeout(pgdb, qry);
+
+ *pgdb->query_q.link_to = qry;
+ pgdb->query_q.link_to = &qry->p;
+}
+
+static struct multi_pgsql_query *dequeue_query_from_pgdb(struct multi_pgsql_db *pgdb)
+{
+ struct multi_pgsql_query *qry, *next_qry;
+
+ qry = pgdb->query_q.first;
+ if (!qry) return NULL;
+
+ timeout_remove(&pgdb->query_q.timeout);
+
+ next_qry = qry->p;
+ if (next_qry) start_pgdb_query_timeout(pgdb, next_qry);
+ else pgdb->query_q.link_to = &pgdb->query_q.first;
+ pgdb->query_q.first = next_qry;
+
+ qry->p = NULL;
+ return qry;
+}
+
+static void requeue_query_to_pgdb(struct multi_pgsql_query *qry,
+ struct multi_pgsql_db *pgdb)
+{
+ struct multi_pgsql_error_result timeout;
+ struct multi_pgsql_pgc *pgc;
+
+ if (time(NULL) >= query_eol(qry)) {
+ init_error_result(&timeout, TIMEOUT_ERROR);
+ abort_query(qry, &timeout.api);
+ return;
+ }
+
+ pgc = get_ready_pgc(pgdb);
+ if (pgc) {
+ start_query(qry, pgc);
+ return;
+ }
+
+ if (pgdb->query_q.timeout) {
+ timeout_remove(&pgdb->query_q.timeout);
+ qry->p = pgdb->query_q.first;
+ } else
+ pgdb->query_q.link_to = &qry->p;
+
+ pgdb->query_q.first = qry;
+ start_pgdb_query_timeout(pgdb, qry);
+}
+
+
+/*** query submission */
+static void pgdb_async_query(struct multi_pgsql_db *pgdb,
+ struct multi_pgsql_query *qry)
+{
+ struct multi_pgsql_pgc *pgc;
+
+ dprintf(("%s: %p: %p", __func__, pgdb, qry));
+
+ pgc = get_ready_pgc(pgdb);
+ if (pgc) {
+ start_query(qry, pgc);
+ return;
+ }
+
+ dprintf(("%s: %p: no pgcs ready, queueing", __func__, pgdb));
+ queue_query_to_pgdb(qry, pgdb);
+}
+
+
+static void pgdb_sync_query(struct multi_pgsql_db *pgdb,
+ struct multi_pgsql_query *qry)
+{
+ struct ioloop *old_ioloop, *sync_ioloop;
+
+ dprintf(("%s: %p: %p", __func__, pgdb, qry));
+
+ old_ioloop = current_ioloop;
+ sync_ioloop = io_loop_create();
+ switch_pgdb_pgcs(pgdb);
+
+ set_query_sync_ioloop(qry, sync_ioloop);
+ pgdb_async_query(pgdb, qry);
+
+ io_loop_run(sync_ioloop);
+
+ io_loop_set_current(old_ioloop);
+ switch_pgdb_pgcs(pgdb);
+ io_loop_set_current(sync_ioloop);
+ io_loop_destroy(&sync_ioloop);
+}
+
+/*** non-transaction SQL driver methods */
+extern struct sql_db driver_multi_pgsql_db;
+
+static struct sql_db *multi_pgsql_init_v(char const *connect_string)
+{
+ struct multi_pgsql_db *pgdb;
+ pool_t pool;
+
+ dprintf(("%s: %s", __func__, connect_string));
+
+ pool = pool_alloconly_create("multi_pgsql_pool", MULTI_PGSQL_POOL);
+
+ pgdb = p_new(pool, struct multi_pgsql_db, 1);
+ pgdb->api = driver_multi_pgsql_db;
+ pgdb->pool = pool;
+
+ pgdb->query_q.link_to = &pgdb->query_q.first;
+ pgdb->pgc_q.link_to = &pgdb->pgc_q.first;
+
+ init_pgcs(pgdb, connect_string);
+ return (void *)pgdb;
+}
+
+static void multi_pgsql_deinit_v(struct sql_db *db)
+{
+ struct multi_pgsql_error_result deinit;
+ struct multi_pgsql_db *pgdb;
+ struct multi_pgsql_pgc *pgc;
+ struct multi_pgsql_query *qry, *next_qry;
+ pool_t pool;
+ unsigned n;
+
+ dprintf(("%s: %p", __func__, db));
+
+ array_free(&db->module_contexts);
+
+ pgdb = (void *)db;
+ init_error_result(&deinit, DEINIT_ERROR);
+
+ n = pgdb->n_pgcs;
+ do {
+ pgc = pgdb->pgcs + --n;
+
+ qry = pgc->qry;
+ if (qry) abort_query(qry, &deinit.api);
+
+ reset_pgc(pgc);
+
+ } while (n);
+
+ qry = pgdb->query_q.first;
+ if (qry) {
+ timeout_remove(&pgdb->query_q.timeout);
+
+ do {
+ next_qry = qry->p;
+ abort_query(qry, &deinit.api);
+ } while ((qry = next_qry));
+ }
+
+ pool = pgdb->pool;
+ pool_unref(&pool);
+}
+
+static int multi_pgsql_connect(struct sql_db *db)
+{
+ dprintf(("%s: %p", __func__, db));
+ return 1;
+}
+
+static enum sql_db_flags
+multi_pgsql_get_flags(struct sql_db *db)
+{
+ dprintf(("%s: %p", __func__, db));
+ return 0;
+}
+
+static const char *
+multi_pgsql_escape_string(struct sql_db *db, const char *in)
+{
+ struct multi_pgsql_db *pgdb;
+ char *out;
+ size_t len;
+
+ pgdb = (void *)db;
+ len = strlen(in);
+ out = t_buffer_get(len * 2 + 1);
+ len = PQescapeStringConn(pgdb->pgcs->pgc, out, in, len,
+ NULL);
+ t_buffer_alloc(len + 1);
+
+ dprintf(("%s: %p: '%s' -> '%s'",
+ __func__, pgdb, in, out));
+ return out;
+}
+
+static void multi_pgsql_query(struct sql_db *db, const char *query,
+ sql_query_callback_t *cb, void *ctx)
+{
+ struct multi_pgsql_query *qry;
+
+ dprintf(("%s: %p: %s", __func__, db, query));
+
+ qry = create_user_query(query, cb, ctx);
+ pgdb_async_query((void *)db, qry);
+}
+
+static void multi_pgsql_exec(struct sql_db *db, const char *query)
+{
+ i_info("%s: %p: %s", __func__, db, query);
+ i_info("%s: not implemented", __func__);
+}
+
+static struct sql_result *
+multi_pgsql_query_s(struct sql_db *db, const char *query)
+{
+ struct multi_pgsql_db *pgdb;
+ struct multi_pgsql_sync_query sync_qry;
+
+ pgdb = (void *)db;
+ dprintf(("%s: %p: %s", __func__, pgdb, query));
+
+ init_sync_query(&sync_qry, query);
+ pgdb_sync_query(pgdb, (void *)&sync_qry);
+ return result_from_sync_query(&sync_qry);
+}
+
+/*** transaction support code */
+static void add_subquery_to(struct multi_pgsql_transaction_context *x_ctx,
+ char const *query, unsigned *affected_rows)
+
+{
+ struct multi_pgsql_transaction_subquery *sub;
+ pool_t pool;
+
+ pool = x_ctx->pool;
+
+ sub = p_new(pool, struct multi_pgsql_transaction_subquery, 1);
+ sub->query = p_strdup(pool, query);
+ sub->affected_rows = affected_rows;
+
+ *x_ctx->link_to = sub;
+ x_ctx->link_to = &sub->p;
+
+ dprintf(("%s: %p: '%s'", __func__, x_ctx, query));
+}
+
+static struct sql_transaction_context *
+multi_pgsql_transaction_begin(struct sql_db *db)
+{
+ struct multi_pgsql_transaction_context *x_ctx;
+ pool_t pool;
+
+ pool = pool_alloconly_create("multi_pgsql_xact_pool",
+ MULTI_PGSQL_XACT_POOL);
+ x_ctx = p_new(pool, struct multi_pgsql_transaction_context, 1);
+ x_ctx->super.db = db;
+ x_ctx->link_to = &x_ctx->first;
+ x_ctx->pool = pool;
+
+ add_subquery_to(x_ctx, "begin", NULL);
+
+ dprintf(("%s: db %p: %p", __func__, db, x_ctx));
+ return (void *)x_ctx;
+}
+
+static void
+multi_pgsql_transaction_commit(struct sql_transaction_context *ctx,
+ sql_commit_callback_t *cb, void *cb_ctx)
+{
+ struct multi_pgsql_transaction_context *x_ctx;
+ struct multi_pgsql_async_transaction_query *qry;
+
+ x_ctx = (void *)ctx;
+ dprintf(("%s: %p", __func__, x_ctx));
+
+ add_subquery_to(x_ctx, "commit", NULL);
+
+ qry = p_new(x_ctx->pool, struct multi_pgsql_async_transaction_query, 1);
+ init_async_transaction_query(qry, x_ctx, cb, cb_ctx);
+ pgdb_async_query((void *)ctx->db, (void *)qry);
+}
+
+static int
+multi_pgsql_transaction_commit_s(struct sql_transaction_context *ctx,
+ const char **error_r)
+{
+ struct multi_pgsql_transaction_context *x_ctx;
+ struct multi_pgsql_sync_transaction_query sync_qry;
+ char *errmsg;
+
+ x_ctx = (void *)ctx;
+ dprintf(("%s: %p", __func__, x_ctx));
+
+ add_subquery_to(x_ctx, "commit", NULL);
+
+ init_sync_transaction_query(&sync_qry, x_ctx);
+ pgdb_sync_query((void *)x_ctx->super.db, &sync_qry.super.super);
+
+ errmsg = (char *)sync_qry.errmsg;
+ if (errmsg) {
+ *error_r = t_strdup(errmsg);
+ i_free(errmsg);
+
+ return -1;
+ }
+
+ return 0;
+}
+
+static void
+multi_pgsql_transaction_rollback(struct sql_transaction_context *ctx)
+{
+ struct multi_pgsql_transaction_context *x_ctx;
+
+ x_ctx = (void *)ctx;
+ dprintf(("%s: %p", __func__, x_ctx));
+
+ pool_unref(&x_ctx->pool);
+}
+
+static void
+multi_pgsql_transaction_update(struct sql_transaction_context *ctx, const char *query,
+ unsigned int *affected_rows)
+{
+ struct multi_pgsql_transaction_context *x_ctx;
+
+ x_ctx = (void *)ctx;
+ add_subquery_to(x_ctx, query, affected_rows);
+}
+
+/*** db driver vtable */
+struct sql_db driver_multi_pgsql_db = {
+ "multi-pgsql",
+
+ MEMBER(v) {
+ multi_pgsql_init_v,
+ multi_pgsql_deinit_v,
+ multi_pgsql_get_flags,
+ multi_pgsql_connect,
+ multi_pgsql_escape_string,
+ multi_pgsql_exec,
+ multi_pgsql_query,
+ multi_pgsql_query_s,
+
+ multi_pgsql_transaction_begin,
+ multi_pgsql_transaction_commit,
+ multi_pgsql_transaction_commit_s,
+ multi_pgsql_transaction_rollback,
+ multi_pgsql_transaction_update
+ }
+};
+#endif