dovecot-2.2: cassandra: Split consistency setting to read_consis...

dovecot at dovecot.org dovecot at dovecot.org
Mon Aug 31 18:32:43 UTC 2015


details:   http://hg.dovecot.org/dovecot-2.2/rev/c33868915764
changeset: 19047:c33868915764
user:      Timo Sirainen <tss at iki.fi>
date:      Mon Aug 31 21:31:46 2015 +0300
description:
cassandra: Split consistency setting to read_consistency and write_consistency.

diffstat:

 src/lib-sql/driver-cassandra.c |  100 ++++++++++++++++------------------------
 1 files changed, 40 insertions(+), 60 deletions(-)

diffs (181 lines):

diff -r 7316c0b61cb9 -r c33868915764 src/lib-sql/driver-cassandra.c
--- a/src/lib-sql/driver-cassandra.c	Mon Aug 31 20:23:32 2015 +0300
+++ b/src/lib-sql/driver-cassandra.c	Mon Aug 31 21:31:46 2015 +0300
@@ -29,7 +29,7 @@
 	struct sql_db api;
 
 	char *hosts, *keyspace;
-	CassConsistency consistency;
+	CassConsistency read_consistency, write_consistency;
 	CassLogLevel log_level;
 
 	CassCluster *cluster;
@@ -46,7 +46,8 @@
 
 	char *error;
 
-	unsigned int set_consistency:1;
+	unsigned int set_read_consistency:1;
+	unsigned int set_write_consistency:1;
 };
 
 struct cassandra_result {
@@ -64,6 +65,7 @@
 	void *context;
 
 	unsigned int finished:1;
+	unsigned int write_query:1;
 };
 
 struct cassandra_transaction_context {
@@ -352,10 +354,14 @@
 			   strcmp(key, "keyspace") == 0) {
 			i_free(db->keyspace);
 			db->keyspace = i_strdup(value);
-		} else if (strcmp(key, "consistency") == 0) {
-			if (consistency_parse(value, &db->consistency) < 0)
-				i_fatal("cassandra: Unknown consistency: %s", value);
-			db->set_consistency = TRUE;
+		} else if (strcmp(key, "read_consistency") == 0) {
+			if (consistency_parse(value, &db->read_consistency) < 0)
+				i_fatal("cassandra: Unknown read_consistency: %s", value);
+			db->set_read_consistency = TRUE;
+		} else if (strcmp(key, "write_consistency") == 0) {
+			if (consistency_parse(value, &db->write_consistency) < 0)
+				i_fatal("cassandra: Unknown write_consistency: %s", value);
+			db->set_write_consistency = TRUE;
 		} else if (strcmp(key, "log_level") == 0) {
 			if (log_level_parse(value, &db->log_level) < 0)
 				i_fatal("cassandra: Unknown log_level: %s", value);
@@ -518,8 +524,13 @@
 	result->query = i_strdup(query);
 	result->row_pool = pool_alloconly_create("cassandra result", 512);
 	result->statement = cass_statement_new(query, 0);
-	if (db->set_consistency)
-		cass_statement_set_consistency(result->statement, db->consistency);
+	if (result->write_query) {
+		if (db->set_write_consistency)
+			cass_statement_set_consistency(result->statement, db->write_consistency);
+	} else {
+		if (db->set_read_consistency)
+			cass_statement_set_consistency(result->statement, db->read_consistency);
+	}
 	future = cass_session_execute(db->session, result->statement);
 	driver_cassandra_set_callback(future, db, query_callback, result);
 }
@@ -529,20 +540,9 @@
 {
 }
 
-static void driver_cassandra_exec(struct sql_db *db, const char *query)
-{
-	struct cassandra_result *result;
-
-	result = i_new(struct cassandra_result, 1);
-	result->api = driver_cassandra_result;
-	result->api.db = db;
-	result->api.refcount = 1;
-	result->callback = exec_callback;
-	do_query(result, query);
-}
-
-static void driver_cassandra_query(struct sql_db *db, const char *query,
-				   sql_query_callback_t *callback, void *context)
+static void
+driver_cassandra_query_full(struct sql_db *db, const char *query, bool write_query,
+			    sql_query_callback_t *callback, void *context)
 {
 	struct cassandra_result *result;
 
@@ -552,9 +552,21 @@
 	result->api.refcount = 1;
 	result->callback = callback;
 	result->context = context;
+	result->write_query = write_query;
 	do_query(result, query);
 }
 
+static void driver_cassandra_exec(struct sql_db *db, const char *query)
+{
+	driver_cassandra_query_full(db, query, TRUE, exec_callback, NULL);
+}
+
+static void driver_cassandra_query(struct sql_db *db, const char *query,
+				   sql_query_callback_t *callback, void *context)
+{
+	driver_cassandra_query_full(db, query, FALSE, callback, context);
+}
+
 static void cassandra_query_s_callback(struct sql_result *result, void *context)
 {
         struct cassandra_db *db = context;
@@ -797,22 +809,10 @@
 }
 
 static void
-transaction_begin_callback(struct sql_result *result,
-			   struct cassandra_transaction_context *ctx)
+transaction_commit_callback(struct sql_result *result, void *context)
 {
-	if (sql_result_next_row(result) < 0) {
-		ctx->begin_failed = TRUE;
-		transaction_set_failed(ctx, sql_result_get_error(result));
-	} else {
-		ctx->begin_succeeded = TRUE;
-	}
-	driver_cassandra_transaction_unref(&ctx);
-}
+	struct cassandra_transaction_context *ctx = context;
 
-static void
-transaction_commit_callback(struct sql_result *result,
-			    struct cassandra_transaction_context *ctx)
-{
 	if (sql_result_next_row(result) < 0)
 		ctx->callback(sql_result_get_error(result), ctx->context);
 	else
@@ -821,20 +821,8 @@
 }
 
 static void
-transaction_update_callback(struct sql_result *result,
-			    struct sql_transaction_query *query)
-{
-	struct cassandra_transaction_context *ctx =
-		(struct cassandra_transaction_context *)query->trans;
-
-	if (sql_result_next_row(result) < 0)
-		transaction_set_failed(ctx, sql_result_get_error(result));
-	driver_cassandra_transaction_unref(&ctx);
-}
-
-static void
 driver_cassandra_transaction_commit(struct sql_transaction_context *_ctx,
-				sql_commit_callback_t *callback, void *context)
+				    sql_commit_callback_t *callback, void *context)
 {
 	struct cassandra_transaction_context *ctx =
 		(struct cassandra_transaction_context *)_ctx;
@@ -847,19 +835,11 @@
 		driver_cassandra_transaction_unref(&ctx);
 	} else if (_ctx->head->next == NULL) {
 		/* just a single query, send it */
-		sql_query(_ctx->db, _ctx->head->query,
+		driver_cassandra_query_full(_ctx->db, _ctx->head->query, TRUE,
 			  transaction_commit_callback, ctx);
 	} else {
-		/* multiple queries, use a transaction */
-		ctx->refcount++;
-		sql_query(_ctx->db, "BEGIN", transaction_begin_callback, ctx);
-		while (_ctx->head != NULL) {
-			ctx->refcount++;
-			sql_query(_ctx->db, _ctx->head->query,
-				  transaction_update_callback, _ctx->head);
-			_ctx->head = _ctx->head->next;
-		}
-		sql_query(_ctx->db, "COMMIT", transaction_commit_callback, ctx);
+		/* multiple queries - we don't actually have a transaction though */
+		callback("Multiple changes in transaction not supported", context);
 	}
 }
 


More information about the dovecot-cvs mailing list