dovecot-2.2: cassandra: Handle async queries internally - don't ...

dovecot at dovecot.org dovecot at dovecot.org
Thu Sep 3 18:38:10 UTC 2015


details:   http://hg.dovecot.org/dovecot-2.2/rev/5d0bcb628c88
changeset: 19083:5d0bcb628c88
user:      Timo Sirainen <tss at iki.fi>
date:      Thu Sep 03 21:37:09 2015 +0300
description:
cassandra: Handle async queries internally - don't use sql pooling code.
There's no need to create multiple Cassandra instances, since the single
instance is capable of doing multiple asynchronous requests in parallel.

diffstat:

 src/lib-sql/driver-cassandra.c |  105 ++++++++++++++++++++++++++--------------
 1 files changed, 69 insertions(+), 36 deletions(-)

diffs (269 lines):

diff -r edc402c28fe7 -r 5d0bcb628c88 src/lib-sql/driver-cassandra.c
--- a/src/lib-sql/driver-cassandra.c	Thu Sep 03 20:55:18 2015 +0300
+++ b/src/lib-sql/driver-cassandra.c	Thu Sep 03 21:37:09 2015 +0300
@@ -39,9 +39,10 @@
 	int fd_pipe[2];
 	struct io *io_pipe;
 	ARRAY(struct cassandra_callback *) callbacks;
+	ARRAY(struct cassandra_result *) results;
 	unsigned int callback_ids;
 
-	struct cassandra_result *cur_result;
+	/* for synchronous queries: */
 	struct ioloop *ioloop, *orig_ioloop;
 	struct sql_result *sync_result;
 
@@ -63,6 +64,7 @@
 	sql_query_callback_t *callback;
 	void *context;
 
+	unsigned int query_sent:1;
 	unsigned int finished:1;
 	unsigned int write_query:1;
 };
@@ -116,6 +118,7 @@
 	{ CASS_LOG_TRACE, "trace" }
 };
 
+static void driver_cassandra_send_queries(struct cassandra_db *db);
 static void result_finish(struct cassandra_result *result);
 
 static int consistency_parse(const char *str, CassConsistency *consistency_r)
@@ -146,8 +149,6 @@
 
 static void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
 {
-	i_assert(state == SQL_DB_STATE_BUSY || db->cur_result == NULL);
-
 	/* switch back to original ioloop in case the caller wants to
 	   add/remove timeouts */
 	if (db->ioloop != NULL)
@@ -157,8 +158,10 @@
 		io_loop_set_current(db->ioloop);
 }
 
-static void driver_cassandra_close(struct cassandra_db *db)
+static void driver_cassandra_close(struct cassandra_db *db, const char *error)
 {
+	struct cassandra_result *const *resultp;
+
 	if (db->io_pipe != NULL)
 		io_remove(&db->io_pipe);
 	if (db->fd_pipe[0] != -1) {
@@ -167,6 +170,13 @@
 	}
 	driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
 
+	while (array_count(&db->results) > 0) {
+		resultp = array_idx(&db->results, 0);
+		if ((*resultp)->error == NULL)
+			(*resultp)->error = i_strdup(error);
+		result_finish(*resultp);
+	}
+
 	if (db->ioloop != NULL) {
 		/* running a sync query, stop it */
 		io_loop_stop(db->ioloop);
@@ -238,7 +248,7 @@
 			driver_cassandra_input_id(db, ids[i]);
 		return;
 	}
-	driver_cassandra_close(db);
+	driver_cassandra_close(db, "IPC pipe closed");
 }
 
 static void
@@ -267,7 +277,7 @@
 	if ((rc = cass_future_error_code(future)) != CASS_OK) {
 		driver_cassandra_log_error(future,
 					   "Couldn't connect to Cassandra");
-		driver_cassandra_close(db);
+		driver_cassandra_close(db, "Couldn't connect to Cassandra");
 		return;
 	}
 	driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
@@ -276,6 +286,7 @@
 		   finish */
 		io_loop_stop(db->ioloop);
 	}
+	driver_cassandra_send_queries(db);
 }
 
 static int driver_cassandra_connect(struct sql_db *_db)
@@ -302,12 +313,7 @@
 {
 	struct cassandra_db *db = (struct cassandra_db *)_db;
 
-	if (db->cur_result != NULL && !db->cur_result->finished) {
-		if (db->cur_result->error == NULL)
-			db->cur_result->error = i_strdup("disconnecting");
-		result_finish(db->cur_result);
-	}
-	driver_cassandra_close(db);
+	driver_cassandra_close(db, "Disconnected");
 }
 
 static const char *
@@ -394,6 +400,7 @@
 	cass_cluster_set_request_timeout(db->cluster, SQL_QUERY_TIMEOUT_SECS * 1000);
 	cass_cluster_set_contact_points(db->cluster, db->hosts);
 	db->session = cass_session_new();
+	i_array_init(&db->results, 16);
 	i_array_init(&db->callbacks, 16);
 	return &db->api;
 }
@@ -402,15 +409,12 @@
 {
 	struct cassandra_db *db = (struct cassandra_db *)_db;
 
-	if (db->cur_result != NULL && !db->cur_result->finished) {
-		if (db->cur_result->error == NULL)
-			db->cur_result->error = i_strdup("deinitializing");
-		result_finish(db->cur_result);
-	}
-        driver_cassandra_close(db);
+        driver_cassandra_close(db, "Deinitialized");
 
 	i_assert(array_count(&db->callbacks) == 0);
 	array_free(&db->callbacks);
+	i_assert(array_count(&db->results) == 0);
+	array_free(&db->results);
 
 	cass_session_free(db->session);
 	cass_cluster_free(db->cluster);
@@ -421,11 +425,20 @@
 	i_free(db);
 }
 
-static void driver_cassandra_set_idle(struct cassandra_db *db)
+static void driver_cassandra_result_unlink(struct cassandra_db *db,
+					   struct cassandra_result *result)
 {
-	i_assert(db->api.state == SQL_DB_STATE_BUSY);
+	struct cassandra_result *const *results;
+	unsigned int i, count;
 
-	driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
+	results = array_get(&db->results, &count);
+	for (i = 0; i < count; i++) {
+		if (results[i] == result) {
+			array_delete(&db->results, i, 1);
+			return;
+		}
+	}
+	i_unreached();
 }
 
 static void driver_cassandra_result_free(struct sql_result *_result)
@@ -434,14 +447,11 @@
         struct cassandra_result *result = (struct cassandra_result *)_result;
 
 	i_assert(!result->api.callback);
-	i_assert(db->cur_result == result);
 	i_assert(result->callback == NULL);
 
 	if (_result == db->sync_result)
 		db->sync_result = NULL;
-	db->cur_result = NULL;
 
-	driver_cassandra_set_idle(db);
 	if (result->result != NULL)
 		cass_result_free(result->result);
 	if (result->iterator != NULL)
@@ -459,6 +469,7 @@
 	bool free_result = TRUE;
 
 	result->finished = TRUE;
+	driver_cassandra_result_unlink(db, result);
 
 	if (db->log_level >= CASS_LOG_DEBUG) {
 		i_debug("cassandra: Finished query '%s': %s", result->query,
@@ -504,26 +515,44 @@
 	result_finish(result);
 }
 
-static void do_query(struct cassandra_result *result, const char *query)
+static int driver_cassandra_send_query(struct cassandra_result *result)
 {
         struct cassandra_db *db = (struct cassandra_db *)result->api.db;
 	CassFuture *future;
+	int ret;
 
-	i_assert(SQL_DB_IS_READY(&db->api));
-	i_assert(db->cur_result == NULL);
+	if (!SQL_DB_IS_READY(&db->api)) {
+		if ((ret = sql_connect(&db->api)) <= 0) {
+			if (ret < 0)
+				driver_cassandra_close(db, "Couldn't connect to Cassandra");
+			return ret;
+		}
+	}
 
-	driver_cassandra_set_state(db, SQL_DB_STATE_BUSY);
-	db->cur_result = result;
-
-	result->query = i_strdup(query);
 	result->row_pool = pool_alloconly_create("cassandra result", 512);
-	result->statement = cass_statement_new(query, 0);
+	result->statement = cass_statement_new(result->query, 0);
 	if (result->write_query)
 		cass_statement_set_consistency(result->statement, db->write_consistency);
 	else
 		cass_statement_set_consistency(result->statement, db->read_consistency);
 	future = cass_session_execute(db->session, result->statement);
 	driver_cassandra_set_callback(future, db, query_callback, result);
+	result->query_sent = TRUE;
+	return 1;
+}
+
+static void driver_cassandra_send_queries(struct cassandra_db *db)
+{
+	struct cassandra_result *const *results;
+	unsigned int i, count;
+
+	results = array_get(&db->results, &count);
+	for (i = 0; i < count; i++) {
+		if (!results[i]->query_sent) {
+			if (driver_cassandra_send_query(results[i]) <= 0)
+				break;
+		}
+	}
 }
 
 static void exec_callback(struct sql_result *_result ATTR_UNUSED,
@@ -532,19 +561,23 @@
 }
 
 static void
-driver_cassandra_query_full(struct sql_db *db, const char *query, bool write_query,
+driver_cassandra_query_full(struct sql_db *_db, const char *query, bool write_query,
 			    sql_query_callback_t *callback, void *context)
 {
+        struct cassandra_db *db = (struct cassandra_db *)_db;
 	struct cassandra_result *result;
 
 	result = i_new(struct cassandra_result, 1);
 	result->api = driver_cassandra_result;
-	result->api.db = db;
+	result->api.db = _db;
 	result->api.refcount = 1;
 	result->callback = callback;
 	result->context = context;
 	result->write_query = write_query;
-	do_query(result, query);
+	result->query = i_strdup(query);
+	array_append(&db->results, &result, 1);
+
+	(void)driver_cassandra_send_query(result);
 }
 
 static void driver_cassandra_exec(struct sql_db *db, const char *query)
@@ -971,7 +1004,7 @@
 
 const struct sql_db driver_cassandra_db = {
 	.name = "cassandra",
-	.flags = SQL_DB_FLAG_POOLED,
+	.flags = 0,
 
 	.v = {
 		driver_cassandra_init_v,


More information about the dovecot-cvs mailing list