dovecot-2.2: dict: Use the new async APIs for everything.

dovecot at dovecot.org dovecot at dovecot.org
Wed Sep 2 14:37:48 UTC 2015


details:   http://hg.dovecot.org/dovecot-2.2/rev/3de8de46f4a8
changeset: 19065:3de8de46f4a8
user:      Timo Sirainen <tss at iki.fi>
date:      Wed Sep 02 17:36:47 2015 +0300
description:
dict: Use the new async APIs for everything.
If the dict backend supports async operations, this means that dict service
can now be configured with client_count>1.

diffstat:

 src/dict/dict-commands.c   |  366 ++++++++++++++++++++++++++++----------------
 src/dict/dict-commands.h   |    3 +
 src/dict/dict-connection.c |   52 +++++-
 src/dict/dict-connection.h |    7 +-
 4 files changed, 286 insertions(+), 142 deletions(-)

diffs (truncated from 683 to 300 lines):

diff -r d40d7f24ffcf -r 3de8de46f4a8 src/dict/dict-commands.c
--- a/src/dict/dict-commands.c	Wed Sep 02 17:34:43 2015 +0300
+++ b/src/dict/dict-commands.c	Wed Sep 02 17:36:47 2015 +0300
@@ -14,87 +14,159 @@
 
 #define DICT_OUTPUT_OPTIMAL_SIZE 1024
 
-struct dict_client_cmd {
-	int cmd;
-	int (*func)(struct dict_connection *conn, const char *line);
+struct dict_cmd_func {
+	enum dict_protocol_cmd cmd;
+	int (*func)(struct dict_connection_cmd *cmd, const char *line);
 };
 
-static int cmd_lookup(struct dict_connection *conn, const char *line)
+struct dict_connection_cmd {
+	const struct dict_cmd_func *cmd;
+	struct dict_connection *conn;
+	char *reply;
+
+	struct dict_iterate_context *iter;
+	enum dict_iterate_flags iter_flags;
+
+	struct dict_connection_transaction *trans;
+};
+
+static void dict_connection_cmd_output_more(struct dict_connection_cmd *cmd);
+
+static void dict_connection_cmd_free(struct dict_connection_cmd *cmd)
 {
-	const char *reply;
-	const char *value;
-	int ret;
+	if (cmd->iter != NULL)
+		(void)dict_iterate_deinit(&cmd->iter);
+	array_delete(&cmd->conn->cmds, 0, 1);
+	i_free(cmd->reply);
 
-	if (conn->iter_ctx != NULL) {
-		i_error("dict client: LOOKUP: Can't lookup while iterating");
-		return -1;
-	}
-
-	/* <key> */
-	ret = dict_lookup(conn->dict, pool_datastack_create(), line, &value);
-	if (ret > 0) {
-		reply = t_strdup_printf("%c%s\n",
-			DICT_PROTOCOL_REPLY_OK, str_tabescape(value));
-		o_stream_nsend_str(conn->output, reply);
-	} else {
-		reply = t_strdup_printf("%c\n", ret == 0 ?
-					DICT_PROTOCOL_REPLY_NOTFOUND :
-					DICT_PROTOCOL_REPLY_FAIL);
-		o_stream_nsend_str(conn->output, reply);
-	}
-	return 0;
+	dict_connection_continue_input(cmd->conn);
+	i_free(cmd);
 }
 
-static int cmd_iterate_flush(struct dict_connection *conn)
+static void dict_connection_cmd_remove(struct dict_connection_cmd *cmd)
+{
+	struct dict_connection_cmd *const *cmds;
+	unsigned int i, count;
+
+	cmds = array_get(&cmd->conn->cmds, &count);
+	for (i = 0; i < count; i++) {
+		if (cmds[i] == cmd) {
+			dict_connection_cmd_free(cmd);
+			return;
+		}
+	}
+	i_unreached();
+}
+
+static void dict_connection_cmds_flush(struct dict_connection *conn)
+{
+	struct dict_connection_cmd *cmd, *const *first_cmdp;
+
+	while (array_count(&conn->cmds) > 0) {
+		first_cmdp = array_idx(&conn->cmds, 0);
+		cmd = *first_cmdp;
+
+		if (cmd->reply == NULL) {
+			/* command not finished yet */
+			break;
+		}
+
+		o_stream_nsend_str(conn->output, cmd->reply);
+		dict_connection_cmd_remove(cmd);
+	}
+}
+
+void dict_connection_cmds_free(struct dict_connection *conn)
+{
+	struct dict_connection_cmd *const *first_cmdp;
+
+	while (array_count(&conn->cmds) > 0) {
+		first_cmdp = array_idx(&conn->cmds, 0);
+		dict_connection_cmd_remove(*first_cmdp);
+	}
+}
+
+static void
+cmd_lookup_callback(const struct dict_lookup_result *result, void *context)
+{
+	struct dict_connection_cmd *cmd = context;
+
+	if (result->ret > 0) {
+		cmd->reply = i_strdup_printf("%c%s\n",
+			DICT_PROTOCOL_REPLY_OK, str_tabescape(result->value));
+	} else if (result->ret == 0) {
+		cmd->reply = i_strdup_printf("%c\n", DICT_PROTOCOL_REPLY_NOTFOUND);
+	} else {
+		i_error("%s", result->error);
+		cmd->reply = i_strdup_printf("%c\n", DICT_PROTOCOL_REPLY_FAIL);
+	}
+	dict_connection_cmds_flush(cmd->conn);
+}
+
+static int cmd_lookup(struct dict_connection_cmd *cmd, const char *line)
+{
+	/* <key> */
+	dict_lookup_async(cmd->conn->dict, line, cmd_lookup_callback, cmd);
+	return 1;
+}
+
+static int cmd_iterate_flush(struct dict_connection_cmd *cmd)
 {
 	string_t *str;
 	const char *key, *value;
 
 	str = t_str_new(256);
-	o_stream_cork(conn->output);
-	while (dict_iterate(conn->iter_ctx, &key, &value)) {
+	o_stream_cork(cmd->conn->output);
+	while (dict_iterate(cmd->iter, &key, &value)) {
 		str_truncate(str, 0);
 		str_append_c(str, DICT_PROTOCOL_REPLY_OK);
 		str_append_tabescaped(str, key);
 		str_append_c(str, '\t');
-		if ((conn->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0)
+		if ((cmd->iter_flags & DICT_ITERATE_FLAG_NO_VALUE) == 0)
 			str_append_tabescaped(str, value);
 		str_append_c(str, '\n');
-		o_stream_nsend(conn->output, str_data(str), str_len(str));
+		o_stream_nsend(cmd->conn->output, str_data(str), str_len(str));
 
-		if (o_stream_get_buffer_used_size(conn->output) >
+		if (o_stream_get_buffer_used_size(cmd->conn->output) >
 		    DICT_OUTPUT_OPTIMAL_SIZE) {
-			if (o_stream_flush(conn->output) <= 0) {
-				/* continue later */
-				o_stream_uncork(conn->output);
+			if (o_stream_flush(cmd->conn->output) <= 0) {
+				/* continue later when there's more space
+				   in output buffer */
+				o_stream_uncork(cmd->conn->output);
+				o_stream_set_flush_pending(cmd->conn->output, TRUE);
 				return 0;
 			}
 			/* flushed everything, continue */
 		}
 	}
-
-	/* finished iterating */
-	o_stream_unset_flush_callback(conn->output);
+	if (dict_iterate_has_more(cmd->iter)) {
+		/* wait for the next iteration callback */
+		return 0;
+	}
 
 	str_truncate(str, 0);
-	if (dict_iterate_deinit(&conn->iter_ctx) < 0)
+	if (dict_iterate_deinit(&cmd->iter) < 0)
 		str_append_c(str, DICT_PROTOCOL_REPLY_FAIL);
 	str_append_c(str, '\n');
-	o_stream_nsend(conn->output, str_data(str), str_len(str));
-	o_stream_uncork(conn->output);
+	o_stream_uncork(cmd->conn->output);
+
+	cmd->reply = i_strdup(str_c(str));
+	dict_connection_cmds_flush(cmd->conn);
 	return 1;
 }
 
-static int cmd_iterate(struct dict_connection *conn, const char *line)
+static void cmd_iterate_callback(void *context)
+{
+	struct dict_connection_cmd *cmd = context;
+
+	dict_connection_cmd_output_more(cmd);
+}
+
+static int cmd_iterate(struct dict_connection_cmd *cmd, const char *line)
 {
 	const char *const *args;
 	unsigned int flags;
 
-	if (conn->iter_ctx != NULL) {
-		i_error("dict client: ITERATE: Already iterating");
-		return -1;
-	}
-
 	args = t_strsplit_tabescaped(line);
 	if (str_array_length(args) < 2 ||
 	    str_to_uint(args[0], &flags) < 0) {
@@ -103,12 +175,12 @@
 	}
 
 	/* <flags> <path> */
-	conn->iter_ctx = dict_iterate_init_multiple(conn->dict, args+1, flags);
-	conn->iter_flags = flags;
-
-	o_stream_set_flush_callback(conn->output, cmd_iterate_flush, conn);
-	(void)cmd_iterate_flush(conn);
-	return 0;
+	flags |= DICT_ITERATE_FLAG_ASYNC;
+	cmd->iter = dict_iterate_init_multiple(cmd->conn->dict, args+1, flags);
+	cmd->iter_flags = flags;
+	dict_iterate_set_async_callback(cmd->iter, cmd_iterate_callback, cmd);
+	dict_connection_cmd_output_more(cmd);
+	return 1;
 }
 
 static struct dict_connection_transaction *
@@ -134,6 +206,8 @@
 	const struct dict_connection_transaction *transactions;
 	unsigned int i, count;
 
+	i_assert(trans->ctx == NULL);
+
 	transactions = array_get(&conn->transactions, &count);
 	for (i = 0; i < count; i++) {
 		if (&transactions[i] == trans) {
@@ -143,7 +217,7 @@
 	}
 }
 
-static int cmd_begin(struct dict_connection *conn, const char *line)
+static int cmd_begin(struct dict_connection_cmd *cmd, const char *line)
 {
 	struct dict_connection_transaction *trans;
 	unsigned int id;
@@ -152,19 +226,19 @@
 		i_error("dict client: Invalid transaction ID %s", line);
 		return -1;
 	}
-	if (dict_connection_transaction_lookup(conn, id) != NULL) {
+	if (dict_connection_transaction_lookup(cmd->conn, id) != NULL) {
 		i_error("dict client: Transaction ID %u already exists", id);
 		return -1;
 	}
 
-	if (!array_is_created(&conn->transactions))
-		i_array_init(&conn->transactions, 4);
+	if (!array_is_created(&cmd->conn->transactions))
+		i_array_init(&cmd->conn->transactions, 4);
 
 	/* <id> */
-	trans = array_append_space(&conn->transactions);
+	trans = array_append_space(&cmd->conn->transactions);
 	trans->id = id;
-	trans->conn = conn;
-	trans->ctx = dict_transaction_begin(conn->dict);
+	trans->conn = cmd->conn;
+	trans->ctx = dict_transaction_begin(cmd->conn->dict);
 	return 0;
 }
 
@@ -187,41 +261,9 @@
 	return 0;
 }
 
-static int cmd_commit(struct dict_connection *conn, const char *line)
+static void
+cmd_commit_finish(struct dict_connection_cmd *cmd, int ret, bool async)
 {
-	struct dict_connection_transaction *trans;
-	char chr;
-	int ret;
-
-	if (conn->iter_ctx != NULL) {
-		i_error("dict client: COMMIT: Can't commit while iterating");
-		return -1;
-	}
-
-	if (dict_connection_transaction_lookup_parse(conn, line, &trans) < 0)
-		return -1;
-
-	ret = dict_transaction_commit(&trans->ctx);
-	switch (ret) {
-	case 1:
-		chr = DICT_PROTOCOL_REPLY_OK;
-		break;
-	case 0:
-		chr = DICT_PROTOCOL_REPLY_NOTFOUND;
-		break;


More information about the dovecot-cvs mailing list