dovecot-2.2: cassandra: Added read/write/delete_fallback_consist...

dovecot at dovecot.org dovecot at dovecot.org
Wed Oct 14 11:08:20 UTC 2015


details:   http://hg.dovecot.org/dovecot-2.2/rev/8f7a0201ebe3
changeset: 19304:8f7a0201ebe3
user:      Timo Sirainen <tss at iki.fi>
date:      Wed Oct 14 14:06:35 2015 +0300
description:
cassandra: Added read/write/delete_fallback_consistency settings.
The fallback is attempted if the primary consistency can't be satisfied.
One useful use case for this is to have:

write_consistency=each-quorum
write_fallback_consistency=local-quorum

Which means that during regular operation all writes go to all data centers
before they are finished, but if one of the data centers go down we'll
switch to just waiting for local data center writes to finish.

diffstat:

 src/lib-sql/driver-cassandra.c |  126 ++++++++++++++++++++++++++++++++++++++--
 1 files changed, 118 insertions(+), 8 deletions(-)

diffs (242 lines):

diff -r a5e47d9637c3 -r 8f7a0201ebe3 src/lib-sql/driver-cassandra.c
--- a/src/lib-sql/driver-cassandra.c	Wed Oct 14 13:32:02 2015 +0300
+++ b/src/lib-sql/driver-cassandra.c	Wed Oct 14 14:06:35 2015 +0300
@@ -17,6 +17,10 @@
 	((db)->api.state != SQL_DB_STATE_DISCONNECTED && \
 	 (db)->api.state != SQL_DB_STATE_CONNECTING)
 
+#define CASSANDRA_FALLBACK_WARN_INTERVAL_SECS 60
+#define CASSANDRA_FALLBACK_FIRST_RETRY_MSECS 50
+#define CASSANDRA_FALLBACK_MAX_RETRY_MSECS (1000*60)
+
 typedef void driver_cassandra_callback_t(CassFuture *future, void *context);
 
 enum cassandra_query_type {
@@ -24,6 +28,11 @@
 	CASSANDRA_QUERY_TYPE_WRITE,
 	CASSANDRA_QUERY_TYPE_DELETE
 };
+#define CASSANDRA_QUERY_TYPE_COUNT 3
+
+static const char *cassandra_query_type_names[CASSANDRA_QUERY_TYPE_COUNT] = {
+	"read", "write", "delete"
+};
 
 struct cassandra_callback {
 	unsigned int id;
@@ -38,6 +47,7 @@
 
 	char *hosts, *keyspace;
 	CassConsistency read_consistency, write_consistency, delete_consistency;
+	CassConsistency read_fallback_consistency, write_fallback_consistency, delete_fallback_consistency;
 	CassLogLevel log_level;
 	unsigned int protocol_version;
 
@@ -51,6 +61,10 @@
 	ARRAY(struct cassandra_result *) results;
 	unsigned int callback_ids;
 
+	struct timeval first_fallback_sent[CASSANDRA_QUERY_TYPE_COUNT];
+	time_t last_fallback_warning[CASSANDRA_QUERY_TYPE_COUNT];
+	unsigned int fallback_failures[CASSANDRA_QUERY_TYPE_COUNT];
+
 	/* for synchronous queries: */
 	struct ioloop *ioloop, *orig_ioloop;
 	struct sql_result *sync_result;
@@ -65,6 +79,7 @@
 	CassIterator *iterator;
 	char *query;
 	char *error;
+	CassConsistency consistency, fallback_consistency;
 	enum cassandra_query_type query_type;
 	struct timeval start_time, finish_time;
 	unsigned int row_count;
@@ -129,6 +144,7 @@
 	{ CASS_LOG_TRACE, "trace" }
 };
 
+static void driver_cassandra_result_send_query(struct cassandra_result *result);
 static void driver_cassandra_send_queries(struct cassandra_db *db);
 static void result_finish(struct cassandra_result *result);
 
@@ -350,6 +366,7 @@
 {
 	const char *const *args, *key, *value;
 	string_t *hosts = t_str_new(64);
+	bool read_fallback_set = FALSE, write_fallback_set = FALSE, delete_fallback_set = FALSE;
 
 	db->log_level = CASS_LOG_WARN;
 	db->read_consistency = CASS_CONSISTENCY_LOCAL_QUORUM;
@@ -376,12 +393,24 @@
 		} else if (strcmp(key, "read_consistency") == 0) {
 			if (consistency_parse(value, &db->read_consistency) < 0)
 				i_fatal("cassandra: Unknown read_consistency: %s", value);
+		} else if (strcmp(key, "read_fallback_consistency") == 0) {
+			if (consistency_parse(value, &db->read_fallback_consistency) < 0)
+				i_fatal("cassandra: Unknown read_fallback_consistency: %s", value);
+			read_fallback_set = TRUE;
 		} else if (strcmp(key, "write_consistency") == 0) {
 			if (consistency_parse(value, &db->write_consistency) < 0)
 				i_fatal("cassandra: Unknown write_consistency: %s", value);
+		} else if (strcmp(key, "write_fallback_consistency") == 0) {
+			if (consistency_parse(value, &db->write_fallback_consistency) < 0)
+				i_fatal("cassandra: Unknown write_fallback_consistency: %s", value);
+			write_fallback_set = TRUE;
 		} else if (strcmp(key, "delete_consistency") == 0) {
 			if (consistency_parse(value, &db->delete_consistency) < 0)
 				i_fatal("cassandra: Unknown delete_consistency: %s", value);
+		} else if (strcmp(key, "delete_fallback_consistency") == 0) {
+			if (consistency_parse(value, &db->delete_fallback_consistency) < 0)
+				i_fatal("cassandra: Unknown delete_fallback_consistency: %s", value);
+			delete_fallback_set = TRUE;
 		} else if (strcmp(key, "log_level") == 0) {
 			if (log_level_parse(value, &db->log_level) < 0)
 				i_fatal("cassandra: Unknown log_level: %s", value);
@@ -393,6 +422,13 @@
 		}
 	}
 
+	if (!read_fallback_set)
+		db->read_fallback_consistency = db->read_consistency;
+	if (!write_fallback_set)
+		db->write_fallback_consistency = db->write_consistency;
+	if (!delete_fallback_set)
+		db->delete_fallback_consistency = db->delete_consistency;
+
 	if (str_len(hosts) == 0)
 		i_fatal("cassandra: No hosts given in connect string");
 	if (db->keyspace == NULL)
@@ -526,11 +562,34 @@
 		sql_result_unref(&result->api);
 }
 
+static void query_resend_with_fallback(struct cassandra_result *result)
+{
+	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+	time_t last_warning =
+		ioloop_time - db->last_fallback_warning[result->query_type];
+
+	if (last_warning >= CASSANDRA_FALLBACK_WARN_INTERVAL_SECS) {
+		i_warning("%s - retrying future %s queries with consistency %s (instead of %s)",
+			  result->error, cassandra_query_type_names[result->query_type],
+			  cass_consistency_string(result->fallback_consistency),
+			  cass_consistency_string(result->consistency));
+		db->last_fallback_warning[result->query_type] = ioloop_time;
+	}
+	i_free_and_null(result->error);
+	if (db->fallback_failures[result->query_type]++ == 0)
+		db->first_fallback_sent[result->query_type] = ioloop_timeval;
+
+	result->consistency = result->fallback_consistency;
+	driver_cassandra_result_send_query(result);
+}
+
 static void query_callback(CassFuture *future, void *context)
 {
 	struct cassandra_result *result = context;
+	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+	CassError error = cass_future_error_code(future);
 
-	if (cass_future_error_code(future) != CASS_OK) {
+	if (error != CASS_OK) {
 		const char *errmsg;
 		size_t errsize;
 
@@ -539,18 +598,64 @@
 		result->error = i_strdup_printf("Query '%s' failed: %.*s",
 						result->query,
 						(int)errsize, errmsg);
+		if (error == CASS_ERROR_SERVER_UNAVAILABLE &&
+		    result->fallback_consistency != result->consistency) {
+			/* retry with fallback consistency */
+			query_resend_with_fallback(result);
+			return;
+		}
 		result_finish(result);
 		return;
 	}
+
+	if (result->fallback_consistency != result->consistency) {
+		/* non-fallback query finished successfully. if there had been
+		   any fallbacks, reset them. */
+		db->fallback_failures[result->query_type] = 0;
+	}
+
 	result->result = cass_future_get_result(future);
 	result->iterator = cass_iterator_from_result(result->result);
 	result_finish(result);
 }
 
+static void driver_cassandra_result_send_query(struct cassandra_result *result)
+{
+	struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+	CassFuture *future;
+
+	result->statement = cass_statement_new(result->query, 0);
+	cass_statement_set_consistency(result->statement, result->consistency);
+
+	future = cass_session_execute(db->session, result->statement);
+	driver_cassandra_set_callback(future, db, query_callback, result);
+}
+
+static bool
+driver_cassandra_want_fallback_query(struct cassandra_result *result)
+{
+        struct cassandra_db *db = (struct cassandra_db *)result->api.db;
+	unsigned int failure_count = db->fallback_failures[result->query_type];
+	unsigned int i, msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
+	struct timeval tv;
+
+	if (failure_count == 0)
+		return FALSE;
+	tv = db->first_fallback_sent[result->query_type];
+	for (i = 1; i < failure_count; i++) {
+		msecs *= 2;
+		if (msecs >= CASSANDRA_FALLBACK_MAX_RETRY_MSECS) {
+			msecs = CASSANDRA_FALLBACK_FIRST_RETRY_MSECS;
+			break;
+		}
+	}
+	timeval_add_msecs(&tv, msecs);
+	return timeval_cmp(&ioloop_timeval, &tv) < 0;
+}
+
 static int driver_cassandra_send_query(struct cassandra_result *result)
 {
         struct cassandra_db *db = (struct cassandra_db *)result->api.db;
-	CassFuture *future;
 	int ret;
 
 	if (!SQL_DB_IS_READY(&db->api)) {
@@ -563,20 +668,25 @@
 
 	result->start_time = ioloop_timeval;
 	result->row_pool = pool_alloconly_create("cassandra result", 512);
-	result->statement = cass_statement_new(result->query, 0);
 	switch (result->query_type) {
 	case CASSANDRA_QUERY_TYPE_READ:
-		cass_statement_set_consistency(result->statement, db->read_consistency);
+		result->consistency = db->read_consistency;
+		result->fallback_consistency = db->read_fallback_consistency;
 		break;
 	case CASSANDRA_QUERY_TYPE_WRITE:
-		cass_statement_set_consistency(result->statement, db->write_consistency);
+		result->consistency = db->write_consistency;
+		result->fallback_consistency = db->write_fallback_consistency;
 		break;
 	case CASSANDRA_QUERY_TYPE_DELETE:
-		cass_statement_set_consistency(result->statement, db->delete_consistency);
+		result->consistency = db->delete_consistency;
+		result->fallback_consistency = db->delete_fallback_consistency;
 		break;
 	}
-	future = cass_session_execute(db->session, result->statement);
-	driver_cassandra_set_callback(future, db, query_callback, result);
+
+	if (driver_cassandra_want_fallback_query(result))
+		result->consistency = result->fallback_consistency;
+
+	driver_cassandra_result_send_query(result);
 	result->query_sent = TRUE;
 	return 1;
 }


More information about the dovecot-cvs mailing list