dovecot-2.2: lib-dict: Added memcached_ascii backend.

dovecot at dovecot.org dovecot at dovecot.org
Wed Aug 15 16:56:30 EEST 2012


details:   http://hg.dovecot.org/dovecot-2.2/rev/96a9a086c052
changeset: 14906:96a9a086c052
user:      Timo Sirainen <tss at iki.fi>
date:      Wed Aug 15 16:56:17 2012 +0300
description:
lib-dict: Added memcached_ascii backend.

diffstat:

 src/lib-dict/Makefile.am            |    1 +
 src/lib-dict/dict-memcached-ascii.c |  636 ++++++++++++++++++++++++++++++++++++
 src/lib-dict/dict-private.h         |    1 +
 src/lib-dict/dict.c                 |    2 +
 src/lib-dict/test-dict.c            |    1 +
 5 files changed, 641 insertions(+), 0 deletions(-)

diffs (truncated from 692 to 300 lines):

diff -r e266c31ebd02 -r 96a9a086c052 src/lib-dict/Makefile.am
--- a/src/lib-dict/Makefile.am	Wed Aug 15 16:55:44 2012 +0300
+++ b/src/lib-dict/Makefile.am	Wed Aug 15 16:56:17 2012 +0300
@@ -15,6 +15,7 @@
 	dict-client.c \
 	dict-file.c \
 	dict-memcached.c \
+	dict-memcached-ascii.c \
 	dict-redis.c \
 	dict-transaction-memory.c
 
diff -r e266c31ebd02 -r 96a9a086c052 src/lib-dict/dict-memcached-ascii.c
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/lib-dict/dict-memcached-ascii.c	Wed Aug 15 16:56:17 2012 +0300
@@ -0,0 +1,636 @@
+/* Copyright (c) 2008-2012 Dovecot authors, see the included COPYING memcached_ascii */
+
+#include "lib.h"
+#include "array.h"
+#include "str.h"
+#include "istream.h"
+#include "ostream.h"
+#include "connection.h"
+#include "dict-transaction-memory.h"
+#include "dict-private.h"
+
+#define MEMCACHED_DEFAULT_PORT 11211
+#define MEMCACHED_DEFAULT_LOOKUP_TIMEOUT_MSECS (1000*30)
+#define DICT_USERNAME_SEPARATOR '/'
+
+enum memcached_ascii_input_state {
+	/* GET: expecting VALUE or END */
+	MEMCACHED_INPUT_STATE_GET,
+	/* SET/(APPEND+ADD): expecting STORED / NOT_STORED */
+	MEMCACHED_INPUT_STATE_STORED,
+	/* DELETE: expecting DELETED */
+	MEMCACHED_INPUT_STATE_DELETED,
+	/* (INCR+ADD)/DECR: expecting number / NOT_FOUND / STORED / NOT_STORED */
+	MEMCACHED_INPUT_STATE_INCRDECR
+};
+
+struct memcached_ascii_connection {
+	struct connection conn;
+	struct memcached_ascii_dict *dict;
+
+	string_t *reply_str;
+	unsigned int reply_bytes_left;
+	bool value_received;
+	bool value_waiting_end;
+};
+
+struct memcached_ascii_dict_reply {
+	unsigned int reply_count;
+	dict_transaction_commit_callback_t *callback;
+	void *context;
+};
+
+struct dict_memcached_ascii_commit_ctx {
+	struct memcached_ascii_dict *dict;
+	struct dict_transaction_memory_context *memctx;
+	string_t *str;
+
+	dict_transaction_commit_callback_t *callback;
+	void *context;
+};
+
+struct memcached_ascii_dict {
+	struct dict dict;
+	struct ip_addr ip;
+	char *username, *key_prefix;
+	unsigned int port;
+	unsigned int timeout_msecs;
+
+	struct ioloop *ioloop;
+	struct timeout *to;
+	struct memcached_ascii_connection conn;
+
+	ARRAY_DEFINE(input_states, enum memcached_ascii_input_state);
+	ARRAY_DEFINE(replies, struct memcached_ascii_dict_reply);
+};
+
+static struct connection_list *memcached_ascii_connections;
+
+static void memcached_ascii_conn_destroy(struct connection *_conn)
+{
+	struct memcached_ascii_connection *conn =
+		(struct memcached_ascii_connection *)_conn;
+	const struct memcached_ascii_dict_reply *reply;
+
+	connection_disconnect(_conn);
+	if (conn->dict->ioloop != NULL)
+		io_loop_stop(conn->dict->ioloop);
+
+	array_foreach(&conn->dict->replies, reply) {
+		if (reply->callback != NULL)
+			reply->callback(-1, reply->context);
+	}
+	array_clear(&conn->dict->replies);
+	array_clear(&conn->dict->input_states);
+	conn->reply_bytes_left = 0;
+}
+
+static bool memcached_ascii_input_value(struct memcached_ascii_connection *conn)
+{
+	const unsigned char *data;
+	size_t size;
+
+	data = i_stream_get_data(conn->conn.input, &size);
+	if (size > conn->reply_bytes_left)
+		size = conn->reply_bytes_left;
+	conn->reply_bytes_left -= size;
+
+	str_append_n(conn->reply_str, data, size);
+	i_stream_skip(conn->conn.input, size);
+	if (conn->reply_bytes_left > 0)
+		return FALSE;
+
+	/* finished. drop the trailing CRLF */
+	str_truncate(conn->reply_str, str_len(conn->reply_str)-2);
+	conn->value_received = TRUE;
+	return TRUE;
+}
+
+static int memcached_ascii_input_reply_read(struct memcached_ascii_dict *dict)
+{
+	struct memcached_ascii_connection *conn = &dict->conn;
+	const enum memcached_ascii_input_state *states;
+	const char *line, *p;
+	unsigned int count;
+	long long num;
+
+	if (conn->reply_bytes_left > 0) {
+		/* continue reading bulk reply */
+		if (!memcached_ascii_input_value(conn))
+			return 0;
+		conn->value_waiting_end = TRUE;
+	} else if (conn->value_waiting_end) {
+		conn->value_waiting_end = FALSE;
+	} else {
+		str_truncate(conn->reply_str, 0);
+		conn->value_received = FALSE;
+	}
+
+	line = i_stream_next_line(conn->conn.input);
+	if (line == NULL)
+		return 0;
+
+	states = array_get(&dict->input_states, &count);
+	if (count == 0) {
+		i_error("memcached_ascii: Unexpected input (expected nothing): %s",
+			line);
+		return -1;
+	}
+	switch (states[0]) {
+	case MEMCACHED_INPUT_STATE_GET:
+		/* VALUE <key> <flags> <bytes>
+		   END */
+		if (strncmp(line, "VALUE ", 6) == 0) {
+			p = strrchr(line, ' ');
+			if (str_to_uint(p+1, &conn->reply_bytes_left) < 0)
+				break;
+			conn->reply_bytes_left += 2; /* CRLF */
+			return memcached_ascii_input_reply_read(dict);
+		} else if (strcmp(line, "END") == 0)
+			return 1;
+		break;
+	case MEMCACHED_INPUT_STATE_STORED:
+		if (strcmp(line, "STORED") != 0 &&
+		    strcmp(line, "NOT_STORED") != 0)
+			break;
+		return 1;
+	case MEMCACHED_INPUT_STATE_DELETED:
+		if (strcmp(line, "DELETED") != 0)
+			break;
+		return 1;
+	case MEMCACHED_INPUT_STATE_INCRDECR:
+		if (strcmp(line, "NOT_FOUND") != 0 &&
+		    strcmp(line, "STORED") != 0 &&
+		    strcmp(line, "NOT_STORED") != 0 &&
+		    str_to_llong(line, &num) < 0)
+			break;
+		return 1;
+	}
+	i_error("memcached_ascii: Unexpected input (state=%d): %s",
+		states[0], line);
+	return -1;
+}
+
+static int memcached_ascii_input_reply(struct memcached_ascii_dict *dict)
+{
+	struct memcached_ascii_dict_reply *replies;
+	unsigned int count;
+	int ret;
+
+	if ((ret = memcached_ascii_input_reply_read(dict)) <= 0)
+		return ret;
+	/* finished a reply */
+	array_delete(&dict->input_states, 0, 1);
+
+	replies = array_get_modifiable(&dict->replies, &count);
+	i_assert(count > 0);
+	i_assert(replies[0].reply_count > 0);
+	if (--replies[0].reply_count == 0) {
+		if (replies[0].callback != NULL)
+			replies[0].callback(1, replies[0].context);
+		array_delete(&dict->replies, 0, 1);
+	}
+	return 1;
+}
+
+static void memcached_ascii_conn_input(struct connection *_conn)
+{
+	struct memcached_ascii_connection *conn =
+		(struct memcached_ascii_connection *)_conn;
+
+	switch (i_stream_read(_conn->input)) {
+	case 0:
+		return;
+	case -1:
+		memcached_ascii_conn_destroy(_conn);
+		return;
+	default:
+		break;
+	}
+
+	while (array_count(&conn->dict->input_states) > 0) {
+		if (memcached_ascii_input_reply(conn->dict) < 0) {
+			memcached_ascii_conn_destroy(_conn);
+			break;
+		}
+	}
+	io_loop_stop(conn->dict->ioloop);
+}
+
+static int memcached_ascii_input_wait(struct memcached_ascii_dict *dict)
+{
+	struct ioloop *old_ioloop = current_ioloop;
+
+	current_ioloop = dict->ioloop;
+	if (dict->to != NULL)
+		dict->to = io_loop_move_timeout(&dict->to);
+	connection_switch_ioloop(&dict->conn.conn);
+	io_loop_run(dict->ioloop);
+
+	current_ioloop = old_ioloop;
+	if (dict->to != NULL)
+		dict->to = io_loop_move_timeout(&dict->to);
+	connection_switch_ioloop(&dict->conn.conn);
+
+	return dict->conn.conn.fd_in == -1 ? -1 : 0;
+}
+
+static void memcached_ascii_input_timeout(struct memcached_ascii_dict *dict)
+{
+	i_error("memcached_ascii: Request timed out in %u.%03u secs",
+		dict->timeout_msecs/1000, dict->timeout_msecs%1000);
+	memcached_ascii_conn_destroy(&dict->conn.conn);
+}
+
+static int memcached_ascii_wait_replies(struct memcached_ascii_dict *dict)
+{
+	int ret;
+
+	dict->to = timeout_add(dict->timeout_msecs,
+			       memcached_ascii_input_timeout, dict);
+	while (array_count(&dict->input_states) > 0) {
+		i_assert(array_count(&dict->replies) > 0);
+
+		if ((ret = memcached_ascii_input_reply(dict)) != 0) {
+			if (ret < 0)
+				memcached_ascii_conn_destroy(&dict->conn.conn);
+			break;
+		}
+		ret = memcached_ascii_input_wait(dict);
+		if (ret != 0)
+			break;
+	}
+
+	timeout_remove(&dict->to);
+	return ret < 0 ? -1 : 0;
+}
+
+static int memcached_ascii_wait(struct memcached_ascii_dict *dict)
+{
+	int ret;
+
+	i_assert(dict->conn.conn.fd_in != -1);
+
+	if (dict->conn.conn.input == NULL) {
+		/* waiting for connection to finish */
+		dict->to = timeout_add(dict->timeout_msecs,
+				       memcached_ascii_input_timeout, dict);
+		ret = memcached_ascii_input_wait(dict);
+		timeout_remove(&dict->to);
+		if (ret < 0)
+			return -1;
+	}
+	if (memcached_ascii_wait_replies(dict) < 0)
+		return -1;
+	i_assert(array_count(&dict->input_states) == 0);


More information about the dovecot-cvs mailing list