dovecot-2.2: cassandra: Handle async queries internally - don't ...
dovecot at dovecot.org
dovecot at dovecot.org
Thu Sep 3 18:38:10 UTC 2015
details: http://hg.dovecot.org/dovecot-2.2/rev/5d0bcb628c88
changeset: 19083:5d0bcb628c88
user: Timo Sirainen <tss at iki.fi>
date: Thu Sep 03 21:37:09 2015 +0300
description:
cassandra: Handle async queries internally - don't use sql pooling code.
There's no need to create multiple Cassandra instances, since the single
instance is capable of doing multiple asynchronous requests in parallel.
diffstat:
src/lib-sql/driver-cassandra.c | 105 ++++++++++++++++++++++++++--------------
1 files changed, 69 insertions(+), 36 deletions(-)
diffs (269 lines):
diff -r edc402c28fe7 -r 5d0bcb628c88 src/lib-sql/driver-cassandra.c
--- a/src/lib-sql/driver-cassandra.c Thu Sep 03 20:55:18 2015 +0300
+++ b/src/lib-sql/driver-cassandra.c Thu Sep 03 21:37:09 2015 +0300
@@ -39,9 +39,10 @@
int fd_pipe[2];
struct io *io_pipe;
ARRAY(struct cassandra_callback *) callbacks;
+ ARRAY(struct cassandra_result *) results;
unsigned int callback_ids;
- struct cassandra_result *cur_result;
+ /* for synchronous queries: */
struct ioloop *ioloop, *orig_ioloop;
struct sql_result *sync_result;
@@ -63,6 +64,7 @@
sql_query_callback_t *callback;
void *context;
+ unsigned int query_sent:1;
unsigned int finished:1;
unsigned int write_query:1;
};
@@ -116,6 +118,7 @@
{ CASS_LOG_TRACE, "trace" }
};
+static void driver_cassandra_send_queries(struct cassandra_db *db);
static void result_finish(struct cassandra_result *result);
static int consistency_parse(const char *str, CassConsistency *consistency_r)
@@ -146,8 +149,6 @@
static void driver_cassandra_set_state(struct cassandra_db *db, enum sql_db_state state)
{
- i_assert(state == SQL_DB_STATE_BUSY || db->cur_result == NULL);
-
/* switch back to original ioloop in case the caller wants to
add/remove timeouts */
if (db->ioloop != NULL)
@@ -157,8 +158,10 @@
io_loop_set_current(db->ioloop);
}
-static void driver_cassandra_close(struct cassandra_db *db)
+static void driver_cassandra_close(struct cassandra_db *db, const char *error)
{
+ struct cassandra_result *const *resultp;
+
if (db->io_pipe != NULL)
io_remove(&db->io_pipe);
if (db->fd_pipe[0] != -1) {
@@ -167,6 +170,13 @@
}
driver_cassandra_set_state(db, SQL_DB_STATE_DISCONNECTED);
+ while (array_count(&db->results) > 0) {
+ resultp = array_idx(&db->results, 0);
+ if ((*resultp)->error == NULL)
+ (*resultp)->error = i_strdup(error);
+ result_finish(*resultp);
+ }
+
if (db->ioloop != NULL) {
/* running a sync query, stop it */
io_loop_stop(db->ioloop);
@@ -238,7 +248,7 @@
driver_cassandra_input_id(db, ids[i]);
return;
}
- driver_cassandra_close(db);
+ driver_cassandra_close(db, "IPC pipe closed");
}
static void
@@ -267,7 +277,7 @@
if ((rc = cass_future_error_code(future)) != CASS_OK) {
driver_cassandra_log_error(future,
"Couldn't connect to Cassandra");
- driver_cassandra_close(db);
+ driver_cassandra_close(db, "Couldn't connect to Cassandra");
return;
}
driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
@@ -276,6 +286,7 @@
finish */
io_loop_stop(db->ioloop);
}
+ driver_cassandra_send_queries(db);
}
static int driver_cassandra_connect(struct sql_db *_db)
@@ -302,12 +313,7 @@
{
struct cassandra_db *db = (struct cassandra_db *)_db;
- if (db->cur_result != NULL && !db->cur_result->finished) {
- if (db->cur_result->error == NULL)
- db->cur_result->error = i_strdup("disconnecting");
- result_finish(db->cur_result);
- }
- driver_cassandra_close(db);
+ driver_cassandra_close(db, "Disconnected");
}
static const char *
@@ -394,6 +400,7 @@
cass_cluster_set_request_timeout(db->cluster, SQL_QUERY_TIMEOUT_SECS * 1000);
cass_cluster_set_contact_points(db->cluster, db->hosts);
db->session = cass_session_new();
+ i_array_init(&db->results, 16);
i_array_init(&db->callbacks, 16);
return &db->api;
}
@@ -402,15 +409,12 @@
{
struct cassandra_db *db = (struct cassandra_db *)_db;
- if (db->cur_result != NULL && !db->cur_result->finished) {
- if (db->cur_result->error == NULL)
- db->cur_result->error = i_strdup("deinitializing");
- result_finish(db->cur_result);
- }
- driver_cassandra_close(db);
+ driver_cassandra_close(db, "Deinitialized");
i_assert(array_count(&db->callbacks) == 0);
array_free(&db->callbacks);
+ i_assert(array_count(&db->results) == 0);
+ array_free(&db->results);
cass_session_free(db->session);
cass_cluster_free(db->cluster);
@@ -421,11 +425,20 @@
i_free(db);
}
-static void driver_cassandra_set_idle(struct cassandra_db *db)
+static void driver_cassandra_result_unlink(struct cassandra_db *db,
+ struct cassandra_result *result)
{
- i_assert(db->api.state == SQL_DB_STATE_BUSY);
+ struct cassandra_result *const *results;
+ unsigned int i, count;
- driver_cassandra_set_state(db, SQL_DB_STATE_IDLE);
+ results = array_get(&db->results, &count);
+ for (i = 0; i < count; i++) {
+ if (results[i] == result) {
+ array_delete(&db->results, i, 1);
+ return;
+ }
+ }
+ i_unreached();
}
static void driver_cassandra_result_free(struct sql_result *_result)
@@ -434,14 +447,11 @@
struct cassandra_result *result = (struct cassandra_result *)_result;
i_assert(!result->api.callback);
- i_assert(db->cur_result == result);
i_assert(result->callback == NULL);
if (_result == db->sync_result)
db->sync_result = NULL;
- db->cur_result = NULL;
- driver_cassandra_set_idle(db);
if (result->result != NULL)
cass_result_free(result->result);
if (result->iterator != NULL)
@@ -459,6 +469,7 @@
bool free_result = TRUE;
result->finished = TRUE;
+ driver_cassandra_result_unlink(db, result);
if (db->log_level >= CASS_LOG_DEBUG) {
i_debug("cassandra: Finished query '%s': %s", result->query,
@@ -504,26 +515,44 @@
result_finish(result);
}
-static void do_query(struct cassandra_result *result, const char *query)
+static int driver_cassandra_send_query(struct cassandra_result *result)
{
struct cassandra_db *db = (struct cassandra_db *)result->api.db;
CassFuture *future;
+ int ret;
- i_assert(SQL_DB_IS_READY(&db->api));
- i_assert(db->cur_result == NULL);
+ if (!SQL_DB_IS_READY(&db->api)) {
+ if ((ret = sql_connect(&db->api)) <= 0) {
+ if (ret < 0)
+ driver_cassandra_close(db, "Couldn't connect to Cassandra");
+ return ret;
+ }
+ }
- driver_cassandra_set_state(db, SQL_DB_STATE_BUSY);
- db->cur_result = result;
-
- result->query = i_strdup(query);
result->row_pool = pool_alloconly_create("cassandra result", 512);
- result->statement = cass_statement_new(query, 0);
+ result->statement = cass_statement_new(result->query, 0);
if (result->write_query)
cass_statement_set_consistency(result->statement, db->write_consistency);
else
cass_statement_set_consistency(result->statement, db->read_consistency);
future = cass_session_execute(db->session, result->statement);
driver_cassandra_set_callback(future, db, query_callback, result);
+ result->query_sent = TRUE;
+ return 1;
+}
+
+static void driver_cassandra_send_queries(struct cassandra_db *db)
+{
+ struct cassandra_result *const *results;
+ unsigned int i, count;
+
+ results = array_get(&db->results, &count);
+ for (i = 0; i < count; i++) {
+ if (!results[i]->query_sent) {
+ if (driver_cassandra_send_query(results[i]) <= 0)
+ break;
+ }
+ }
}
static void exec_callback(struct sql_result *_result ATTR_UNUSED,
@@ -532,19 +561,23 @@
}
static void
-driver_cassandra_query_full(struct sql_db *db, const char *query, bool write_query,
+driver_cassandra_query_full(struct sql_db *_db, const char *query, bool write_query,
sql_query_callback_t *callback, void *context)
{
+ struct cassandra_db *db = (struct cassandra_db *)_db;
struct cassandra_result *result;
result = i_new(struct cassandra_result, 1);
result->api = driver_cassandra_result;
- result->api.db = db;
+ result->api.db = _db;
result->api.refcount = 1;
result->callback = callback;
result->context = context;
result->write_query = write_query;
- do_query(result, query);
+ result->query = i_strdup(query);
+ array_append(&db->results, &result, 1);
+
+ (void)driver_cassandra_send_query(result);
}
static void driver_cassandra_exec(struct sql_db *db, const char *query)
@@ -971,7 +1004,7 @@
const struct sql_db driver_cassandra_db = {
.name = "cassandra",
- .flags = SQL_DB_FLAG_POOLED,
+ .flags = 0,
.v = {
driver_cassandra_init_v,
More information about the dovecot-cvs
mailing list