dovecot-2.0: dict: Added support for async commits. Changed dict...

dovecot at dovecot.org dovecot at dovecot.org
Mon Sep 7 03:55:59 EEST 2009


details:   http://hg.dovecot.org/dovecot-2.0/rev/0f7b25f3e2ce
changeset: 9889:0f7b25f3e2ce
user:      Timo Sirainen <tss at iki.fi>
date:      Sun Sep 06 20:51:25 2009 -0400
description:
dict: Added support for async commits. Changed dict_atomic_inc() behavior.

diffstat:

10 files changed, 353 insertions(+), 105 deletions(-)
src/dict/dict-commands.c    |   73 +++++++++++---
src/dict/dict-connection.h  |    1 
src/lib-dict/dict-client.c  |  211 +++++++++++++++++++++++++++++++++++--------
src/lib-dict/dict-client.h  |    4 
src/lib-dict/dict-db.c      |   15 ++-
src/lib-dict/dict-file.c    |   29 ++++-
src/lib-dict/dict-private.h |    5 -
src/lib-dict/dict-sql.c     |   87 +++++++++++------
src/lib-dict/dict.c         |   13 ++
src/lib-dict/dict.h         |   20 +++-

diffs (truncated from 860 to 300 lines):

diff -r 551c273f4844 -r 0f7b25f3e2ce src/dict/dict-commands.c
--- a/src/dict/dict-commands.c	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/dict/dict-commands.c	Sun Sep 06 20:51:25 2009 -0400
@@ -54,7 +54,8 @@ static int cmd_iterate_flush(struct dict
 	o_stream_cork(conn->output);
 	while ((ret = dict_iterate(conn->iter_ctx, &key, &value)) > 0) {
 		str_truncate(str, 0);
-		str_printfa(str, "%s\t%s\n", key, value);
+		str_printfa(str, "%c%s\t%s\n", DICT_PROTOCOL_REPLY_OK,
+			    key, value);
 		o_stream_send(conn->output, str_data(str), str_len(str));
 
 		if (o_stream_get_buffer_used_size(conn->output) >
@@ -154,6 +155,7 @@ static int cmd_begin(struct dict_connect
 	/* <id> */
 	trans = array_append_space(&conn->transactions);
 	trans->id = id;
+	trans->conn = conn;
 	trans->ctx = dict_transaction_begin(conn->dict);
 	return 0;
 }
@@ -182,23 +184,56 @@ static int cmd_commit(struct dict_connec
 static int cmd_commit(struct dict_connection *conn, const char *line)
 {
 	struct dict_connection_transaction *trans;
+	char chr;
+	int ret;
+
+	if (conn->iter_ctx != NULL) {
+		i_error("dict client: COMMIT: Can't commit while iterating");
+		return -1;
+	}
+
+	if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
+		return -1;
+
+	ret = dict_transaction_commit(&trans->ctx);
+	switch (ret) {
+	case 1:
+		chr = DICT_PROTOCOL_REPLY_OK;
+		break;
+	case 0:
+		chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+		break;
+	default:
+		chr = DICT_PROTOCOL_REPLY_FAIL;
+		break;
+	}
+	o_stream_send_str(conn->output, t_strdup_printf("%c\n", chr));
+	dict_connection_transaction_array_remove(conn, trans);
+	return 0;
+}
+
+static void cmd_commit_async_callback(int ret, void *context)
+{
+	struct dict_connection_transaction *trans = context;
 	const char *reply;
-	int ret;
-
-	if (conn->iter_ctx != NULL) {
-		i_error("dict client: COMMIT: Can't commit while iterating");
-		return -1;
-	}
-
-	if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
-		return -1;
-
-	ret = dict_transaction_commit(&trans->ctx);
-	reply = t_strdup_printf("%c\n", ret == 0 ? DICT_PROTOCOL_REPLY_OK :
-				DICT_PROTOCOL_REPLY_FAIL);
-	o_stream_send_str(conn->output, reply);
-	dict_connection_transaction_array_remove(conn, trans);
-	return 0;
+	char chr;
+
+	switch (ret) {
+	case 1:
+		chr = DICT_PROTOCOL_REPLY_OK;
+		break;
+	case 0:
+		chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+		break;
+	default:
+		chr = DICT_PROTOCOL_REPLY_FAIL;
+		break;
+	}
+	reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT,
+				chr, trans->id);
+	o_stream_send_str(trans->conn->output, reply);
+
+	dict_connection_transaction_array_remove(trans->conn, trans);
 }
 
 static int
@@ -214,8 +249,8 @@ cmd_commit_async(struct dict_connection 
 	if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
 		return -1;
 
-	dict_transaction_commit_async(&trans->ctx);
-	dict_connection_transaction_array_remove(conn, trans);
+	dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback,
+				      trans);
 	return 0;
 }
 
diff -r 551c273f4844 -r 0f7b25f3e2ce src/dict/dict-connection.h
--- a/src/dict/dict-connection.h	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/dict/dict-connection.h	Sun Sep 06 20:51:25 2009 -0400
@@ -5,6 +5,7 @@
 
 struct dict_connection_transaction {
 	unsigned int id;
+	struct dict_connection *conn;
 	struct dict_transaction_context *ctx;
 };
 
diff -r 551c273f4844 -r 0f7b25f3e2ce src/lib-dict/dict-client.c
--- a/src/lib-dict/dict-client.c	Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict-client.c	Sun Sep 06 20:51:25 2009 -0400
@@ -1,6 +1,7 @@
 /* Copyright (c) 2005-2009 Dovecot authors, see the included COPYING file */
 
 #include "lib.h"
+#include "llist.h"
 #include "str.h"
 #include "network.h"
 #include "istream.h"
@@ -8,6 +9,7 @@
 #include "dict-private.h"
 #include "dict-client.h"
 
+#include <stdlib.h>
 #include <unistd.h>
 #include <fcntl.h>
 
@@ -24,9 +26,13 @@ struct client_dict {
 	time_t last_connect_try;
 	struct istream *input;
 	struct ostream *output;
+	struct io *io;
+
+	struct client_dict_transaction_context *transactions;
 
 	unsigned int connect_counter;
 	unsigned int transaction_id_counter;
+	unsigned int async_commits;
 
 	unsigned int in_iteration:1;
 	unsigned int handshaked:1;
@@ -41,6 +47,11 @@ struct client_dict_iterate_context {
 
 struct client_dict_transaction_context {
 	struct dict_transaction_context ctx;
+	struct client_dict_transaction_context *prev, *next;
+
+	/* for async commits */
+	dict_transaction_commit_callback_t *callback;
+	void *context;
 
 	unsigned int id;
 	unsigned int connect_counter;
@@ -213,29 +224,97 @@ client_dict_send_transaction_query(struc
 	return 0;
 }
 
-static char *client_dict_read_line(struct client_dict *dict)
-{
+static struct client_dict_transaction_context *
+client_dict_transaction_find(struct client_dict *dict, unsigned int id)
+{
+	struct client_dict_transaction_context *ctx;
+
+	for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) {
+		if (ctx->id == id)
+			return ctx;
+	}
+	return NULL;
+}
+
+static void
+client_dict_finish_transaction(struct client_dict *dict,
+			       unsigned int id, int ret)
+{
+	struct client_dict_transaction_context *ctx;
+
+	ctx = client_dict_transaction_find(dict, id);
+	if (ctx == NULL) {
+		i_error("dict-client: Unknown transaction id %u", id);
+		return;
+	}
+	if (ctx->callback != NULL)
+		ctx->callback(ret, ctx->context);
+
+	DLLIST_REMOVE(&dict->transactions, ctx);
+	i_free(ctx);
+
+	i_assert(dict->async_commits > 0);
+	if (--dict->async_commits == 0)
+		io_remove(&dict->io);
+}
+
+static int client_dict_read_one_line(struct client_dict *dict, char **line_r)
+{
+	unsigned int id;
 	char *line;
 	int ret;
 
-	line = i_stream_next_line(dict->input);
-	if (line != NULL)
-		return line;
-
-	while ((ret = i_stream_read(dict->input)) > 0) {
-		line = i_stream_next_line(dict->input);
-		if (line != NULL)
-			return line;
-	}
-	i_assert(ret < 0);
-
-	if (ret == -2)
-		i_error("read(%s) returned too much data", dict->path);
-	else if (dict->input->stream_errno == 0)
-		i_error("read(%s) failed: Remote disconnected", dict->path);
-	else
-		i_error("read(%s) failed: %m", dict->path);
-	return NULL;
+	*line_r = NULL;
+	while ((line = i_stream_next_line(dict->input)) == NULL) {
+		ret = i_stream_read(dict->input);
+		switch (ret) {
+		case -1:
+			if (dict->input->stream_errno != 0)
+				i_error("read(%s) failed: %m", dict->path);
+			else {
+				i_error("read(%s) failed: Remote disconnected",
+					dict->path);
+			}
+			return -1;
+		case -2:
+			i_error("read(%s) returned too much data", dict->path);
+			return -1;
+		default:
+			i_assert(ret > 0);
+			break;
+		}
+	}
+	if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) {
+		switch (line[1]) {
+		case DICT_PROTOCOL_REPLY_OK:
+			ret = 1;
+			break;
+		case DICT_PROTOCOL_REPLY_NOTFOUND:
+			ret = 0;
+			break;
+		case DICT_PROTOCOL_REPLY_FAIL:
+			ret = -1;
+			break;
+		default:
+			i_error("dict-client: Invalid async commit line: %s",
+				line);
+			return 0;
+		}
+		id = strtoul(line+2, NULL, 10);
+		client_dict_finish_transaction(dict, id, ret);
+		return 0;
+	}
+	*line_r = line;
+	return 1;
+}
+
+static char *client_dict_read_line(struct client_dict *dict)
+{
+	char *line;
+
+	while (client_dict_read_one_line(dict, &line) == 0)
+		;
+	return line;
 }
 
 static int client_dict_connect(struct client_dict *dict)
@@ -263,6 +342,7 @@ static int client_dict_connect(struct cl
 	dict->input->blocking = TRUE;
 	dict->output = o_stream_create_fd(dict->fd, 4096, FALSE);
 	dict->transaction_id_counter = 0;
+	dict->async_commits = 0;
 
 	query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
 				DICT_PROTOCOL_CMD_HELLO,
@@ -283,6 +363,8 @@ static void client_dict_disconnect(struc
 	dict->connect_counter++;
 	dict->handshaked = FALSE;
 
+	if (dict->io != NULL)
+		io_remove(&dict->io);
 	if (dict->input != NULL)
 		i_stream_destroy(&dict->input);
 	if (dict->output != NULL)
@@ -339,6 +421,21 @@ static void client_dict_deinit(struct di
 	pool_unref(&dict->pool);
 }
 


More information about the dovecot-cvs mailing list