dovecot-1.2: dict: Added support for async commits. Changed dict...
dovecot at dovecot.org
dovecot at dovecot.org
Mon Sep 7 03:45:26 EEST 2009
details: http://hg.dovecot.org/dovecot-1.2/rev/a1b92a251bb9
changeset: 9361:a1b92a251bb9
user: Timo Sirainen <tss at iki.fi>
date: Sun Sep 06 20:44:00 2009 -0400
description:
dict: Added support for async commits. Changed dict_atomic_inc() behavior.
diffstat:
9 files changed, 341 insertions(+), 93 deletions(-)
src/dict/dict-server.c | 50 ++++++++--
src/lib-dict/dict-client.c | 211 +++++++++++++++++++++++++++++++++++--------
src/lib-dict/dict-client.h | 4
src/lib-dict/dict-db.c | 15 ++-
src/lib-dict/dict-file.c | 29 ++++-
src/lib-dict/dict-private.h | 5 -
src/lib-dict/dict-sql.c | 87 +++++++++++------
src/lib-dict/dict.c | 13 ++
src/lib-dict/dict.h | 20 +++-
diffs (truncated from 841 to 300 lines):
diff -r 4530228c8993 -r a1b92a251bb9 src/dict/dict-server.c
--- a/src/dict/dict-server.c Sun Sep 06 20:42:42 2009 -0400
+++ b/src/dict/dict-server.c Sun Sep 06 20:44:00 2009 -0400
@@ -18,6 +18,7 @@
struct dict_server_transaction {
unsigned int id;
+ struct dict_client_connection *conn;
struct dict_transaction_context *ctx;
};
@@ -93,7 +94,8 @@ static int cmd_iterate_flush(struct dict
o_stream_cork(conn->output);
while ((ret = dict_iterate(conn->iter_ctx, &key, &value)) > 0) {
str_truncate(str, 0);
- str_printfa(str, "%s\t%s\n", key, value);
+ str_printfa(str, "%c%s\t%s\n", DICT_PROTOCOL_REPLY_OK,
+ key, value);
o_stream_send(conn->output, str_data(str), str_len(str));
if (o_stream_get_buffer_used_size(conn->output) >
@@ -193,6 +195,7 @@ static int cmd_begin(struct dict_client_
/* <id> */
trans = array_append_space(&conn->transactions);
trans->id = id;
+ trans->conn = conn;
trans->ctx = dict_transaction_begin(conn->dict);
return 0;
}
@@ -221,7 +224,7 @@ static int cmd_commit(struct dict_client
static int cmd_commit(struct dict_client_connection *conn, const char *line)
{
struct dict_server_transaction *trans;
- const char *reply;
+ char chr;
int ret;
if (conn->iter_ctx != NULL) {
@@ -233,11 +236,44 @@ static int cmd_commit(struct dict_client
return -1;
ret = dict_transaction_commit(&trans->ctx);
- reply = t_strdup_printf("%c\n", ret == 0 ? DICT_PROTOCOL_REPLY_OK :
- DICT_PROTOCOL_REPLY_FAIL);
- o_stream_send_str(conn->output, reply);
+ switch (ret) {
+ case 1:
+ chr = DICT_PROTOCOL_REPLY_OK;
+ break;
+ case 0:
+ chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+ break;
+ default:
+ chr = DICT_PROTOCOL_REPLY_FAIL;
+ break;
+ }
+ o_stream_send_str(conn->output, t_strdup_printf("%c\n", chr));
dict_server_transaction_array_remove(conn, trans);
return 0;
+}
+
+static void cmd_commit_async_callback(int ret, void *context)
+{
+ struct dict_server_transaction *trans = context;
+ const char *reply;
+ char chr;
+
+ switch (ret) {
+ case 1:
+ chr = DICT_PROTOCOL_REPLY_OK;
+ break;
+ case 0:
+ chr = DICT_PROTOCOL_REPLY_NOTFOUND;
+ break;
+ default:
+ chr = DICT_PROTOCOL_REPLY_FAIL;
+ break;
+ }
+ reply = t_strdup_printf("%c%c%u\n", DICT_PROTOCOL_REPLY_ASYNC_COMMIT,
+ chr, trans->id);
+ o_stream_send_str(trans->conn->output, reply);
+
+ dict_server_transaction_array_remove(trans->conn, trans);
}
static int
@@ -253,8 +289,8 @@ cmd_commit_async(struct dict_client_conn
if (dict_server_transaction_lookup_parse(conn, line, &trans) < 0)
return -1;
- dict_transaction_commit_async(&trans->ctx);
- dict_server_transaction_array_remove(conn, trans);
+ dict_transaction_commit_async(&trans->ctx, cmd_commit_async_callback,
+ trans);
return 0;
}
diff -r 4530228c8993 -r a1b92a251bb9 src/lib-dict/dict-client.c
--- a/src/lib-dict/dict-client.c Sun Sep 06 20:42:42 2009 -0400
+++ b/src/lib-dict/dict-client.c Sun Sep 06 20:44:00 2009 -0400
@@ -1,6 +1,7 @@
/* Copyright (c) 2005-2009 Dovecot authors, see the included COPYING file */
#include "lib.h"
+#include "llist.h"
#include "str.h"
#include "network.h"
#include "istream.h"
@@ -8,6 +9,7 @@
#include "dict-private.h"
#include "dict-client.h"
+#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
@@ -24,9 +26,13 @@ struct client_dict {
time_t last_connect_try;
struct istream *input;
struct ostream *output;
+ struct io *io;
+
+ struct client_dict_transaction_context *transactions;
unsigned int connect_counter;
unsigned int transaction_id_counter;
+ unsigned int async_commits;
unsigned int in_iteration:1;
unsigned int handshaked:1;
@@ -41,6 +47,11 @@ struct client_dict_iterate_context {
struct client_dict_transaction_context {
struct dict_transaction_context ctx;
+ struct client_dict_transaction_context *prev, *next;
+
+ /* for async commits */
+ dict_transaction_commit_callback_t *callback;
+ void *context;
unsigned int id;
unsigned int connect_counter;
@@ -213,29 +224,97 @@ client_dict_send_transaction_query(struc
return 0;
}
-static char *client_dict_read_line(struct client_dict *dict)
-{
+static struct client_dict_transaction_context *
+client_dict_transaction_find(struct client_dict *dict, unsigned int id)
+{
+ struct client_dict_transaction_context *ctx;
+
+ for (ctx = dict->transactions; ctx != NULL; ctx = ctx->next) {
+ if (ctx->id == id)
+ return ctx;
+ }
+ return NULL;
+}
+
+static void
+client_dict_finish_transaction(struct client_dict *dict,
+ unsigned int id, int ret)
+{
+ struct client_dict_transaction_context *ctx;
+
+ ctx = client_dict_transaction_find(dict, id);
+ if (ctx == NULL) {
+ i_error("dict-client: Unknown transaction id %u", id);
+ return;
+ }
+ if (ctx->callback != NULL)
+ ctx->callback(ret, ctx->context);
+
+ DLLIST_REMOVE(&dict->transactions, ctx);
+ i_free(ctx);
+
+ i_assert(dict->async_commits > 0);
+ if (--dict->async_commits == 0)
+ io_remove(&dict->io);
+}
+
+static int client_dict_read_one_line(struct client_dict *dict, char **line_r)
+{
+ unsigned int id;
char *line;
int ret;
- line = i_stream_next_line(dict->input);
- if (line != NULL)
- return line;
-
- while ((ret = i_stream_read(dict->input)) > 0) {
- line = i_stream_next_line(dict->input);
- if (line != NULL)
- return line;
- }
- i_assert(ret < 0);
-
- if (ret == -2)
- i_error("read(%s) returned too much data", dict->path);
- else if (dict->input->stream_errno == 0)
- i_error("read(%s) failed: Remote disconnected", dict->path);
- else
- i_error("read(%s) failed: %m", dict->path);
- return NULL;
+ *line_r = NULL;
+ while ((line = i_stream_next_line(dict->input)) == NULL) {
+ ret = i_stream_read(dict->input);
+ switch (ret) {
+ case -1:
+ if (dict->input->stream_errno != 0)
+ i_error("read(%s) failed: %m", dict->path);
+ else {
+ i_error("read(%s) failed: Remote disconnected",
+ dict->path);
+ }
+ return -1;
+ case -2:
+ i_error("read(%s) returned too much data", dict->path);
+ return -1;
+ default:
+ i_assert(ret > 0);
+ break;
+ }
+ }
+ if (*line == DICT_PROTOCOL_REPLY_ASYNC_COMMIT) {
+ switch (line[1]) {
+ case DICT_PROTOCOL_REPLY_OK:
+ ret = 1;
+ break;
+ case DICT_PROTOCOL_REPLY_NOTFOUND:
+ ret = 0;
+ break;
+ case DICT_PROTOCOL_REPLY_FAIL:
+ ret = -1;
+ break;
+ default:
+ i_error("dict-client: Invalid async commit line: %s",
+ line);
+ return 0;
+ }
+ id = strtoul(line+2, NULL, 10);
+ client_dict_finish_transaction(dict, id, ret);
+ return 0;
+ }
+ *line_r = line;
+ return 1;
+}
+
+static char *client_dict_read_line(struct client_dict *dict)
+{
+ char *line;
+
+ while (client_dict_read_one_line(dict, &line) == 0)
+ ;
+ return line;
}
static int client_dict_connect(struct client_dict *dict)
@@ -263,6 +342,7 @@ static int client_dict_connect(struct cl
dict->input->blocking = TRUE;
dict->output = o_stream_create_fd(dict->fd, 4096, FALSE);
dict->transaction_id_counter = 0;
+ dict->async_commits = 0;
query = t_strdup_printf("%c%u\t%u\t%d\t%s\t%s\n",
DICT_PROTOCOL_CMD_HELLO,
@@ -283,6 +363,8 @@ static void client_dict_disconnect(struc
dict->connect_counter++;
dict->handshaked = FALSE;
+ if (dict->io != NULL)
+ io_remove(&dict->io);
if (dict->input != NULL)
i_stream_destroy(&dict->input);
if (dict->output != NULL)
@@ -339,6 +421,21 @@ static void client_dict_deinit(struct di
pool_unref(&dict->pool);
}
+static int client_dict_wait(struct dict *_dict)
+{
+ struct client_dict *dict = (struct client_dict *)_dict;
+ char *line;
+ int ret = 0;
+
+ while (dict->async_commits > 0) {
+ if (client_dict_read_one_line(dict, &line) < 0) {
+ ret = -1;
+ break;
+ }
+ }
+ return ret;
+}
+
static int client_dict_lookup(struct dict *_dict, pool_t pool,
const char *key, const char **value_r)
{
@@ -420,7 +517,10 @@ static int client_dict_iterate(struct di
More information about the dovecot-cvs
mailing list