dovecot-2.0-sslstream: lmtp proxy fixes.

dovecot at dovecot.org dovecot at dovecot.org
Sat Feb 13 02:56:46 EET 2010


details:   http://hg.dovecot.org/dovecot-2.0-sslstream/rev/3662241f75f2
changeset: 10416:3662241f75f2
user:      Timo Sirainen <tss at iki.fi>
date:      Mon Dec 07 13:34:35 2009 -0500
description:
lmtp proxy fixes.

diffstat:

3 files changed, 120 insertions(+), 58 deletions(-)
src/lib-lda/lmtp-client.c |   19 ++++-
src/lib-lda/lmtp-client.h |    5 +
src/lmtp/lmtp-proxy.c     |  154 ++++++++++++++++++++++++++++-----------------

diffs (truncated from 382 to 300 lines):

diff -r ea17056d2df9 -r 3662241f75f2 src/lib-lda/lmtp-client.c
--- a/src/lib-lda/lmtp-client.c	Mon Dec 07 11:40:40 2009 -0500
+++ b/src/lib-lda/lmtp-client.c	Mon Dec 07 13:34:35 2009 -0500
@@ -48,6 +48,9 @@ struct lmtp_client {
 	struct ostream *output;
 	struct io *io;
 	int fd;
+
+	void (*data_output_callback)(void *);
+	void *data_output_context;
 
 	lmtp_finish_callback_t *finish_callback;
 	void *finish_context;
@@ -228,8 +231,7 @@ lmtp_client_data_next(struct lmtp_client
 
 		client->rcpt_next_data_idx = i + 1;
 		rcpt[i].failed = line[0] != '2';
-		rcpt[i].data_callback(!rcpt[i].failed, line,
-				      rcpt[i].context);
+		rcpt[i].data_callback(!rcpt[i].failed, line, rcpt[i].context);
 		if (client->protocol == LMTP_CLIENT_PROTOCOL_LMTP)
 			break;
 	}
@@ -245,6 +247,7 @@ static void lmtp_client_send_data(struct
 	const unsigned char *data;
 	unsigned char add;
 	size_t i, size;
+	bool sent_bytes;
 	int ret;
 
 	if (client->output_finished)
@@ -275,6 +278,7 @@ static void lmtp_client_send_data(struct
 				break;
 			client->output_last = data[i-1];
 			i_stream_skip(client->data_input, i);
+			sent_bytes = TRUE;
 		}
 
 		if (o_stream_get_buffer_used_size(client->output) >= 4096) {
@@ -293,6 +297,9 @@ static void lmtp_client_send_data(struct
 			client->output_last = add;
 		}
 	}
+	if (sent_bytes && client->data_output_callback != NULL)
+		client->data_output_callback(client->data_output_context);
+
 	if (ret == 0 || ret == -2) {
 		/* -2 can happen with tee istreams */
 		return;
@@ -542,3 +549,11 @@ void lmtp_client_send_more(struct lmtp_c
 	if (client->input_state == LMTP_INPUT_STATE_DATA)
 		lmtp_client_send_data(client);
 }
+
+void lmtp_client_set_data_output_callback(struct lmtp_client *client,
+					  void (*callback)(void *),
+					  void *context)
+{
+	client->data_output_callback = callback;
+	client->data_output_context = context;
+}
diff -r ea17056d2df9 -r 3662241f75f2 src/lib-lda/lmtp-client.h
--- a/src/lib-lda/lmtp-client.h	Mon Dec 07 11:40:40 2009 -0500
+++ b/src/lib-lda/lmtp-client.h	Mon Dec 07 13:34:35 2009 -0500
@@ -46,5 +46,10 @@ void lmtp_client_fail(struct lmtp_client
 void lmtp_client_fail(struct lmtp_client *client, const char *line);
 /* Return the state (command reply) the client is currently waiting for. */
 const char *lmtp_client_state_to_string(struct lmtp_client *client);
+/* Call the given callback whenever client manages to send some more DATA
+   output to client. */
+void lmtp_client_set_data_output_callback(struct lmtp_client *client,
+					  void (*callback)(void *),
+					  void *context);
 
 #endif
diff -r ea17056d2df9 -r 3662241f75f2 src/lmtp/lmtp-proxy.c
--- a/src/lmtp/lmtp-proxy.c	Mon Dec 07 11:40:40 2009 -0500
+++ b/src/lmtp/lmtp-proxy.c	Mon Dec 07 13:34:35 2009 -0500
@@ -27,6 +27,8 @@ struct lmtp_proxy_connection {
 
 	struct lmtp_client *client;
 	struct istream *data_input;
+
+	unsigned int finished:1;
 	unsigned int failed:1;
 };
 
@@ -34,10 +36,10 @@ struct lmtp_proxy {
 	pool_t pool;
 	const char *mail_from, *my_hostname;
 	ARRAY_DEFINE(connections, struct lmtp_proxy_connection *);
-	ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient);
+	ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient *);
 	unsigned int next_data_reply_idx;
 
-	struct timeout *to, *to_data_idle;
+	struct timeout *to, *to_data_idle, *to_finish;
 	struct io *io;
 	struct istream *data_input, *orig_data_input;
 	struct ostream *client_output;
@@ -50,6 +52,7 @@ struct lmtp_proxy {
 
 	unsigned int finished:1;
 	unsigned int input_timeout:1;
+	unsigned int handling_data_input:1;
 };
 
 static void lmtp_conn_finish(void *context);
@@ -80,7 +83,6 @@ static void lmtp_proxy_connections_deini
 	array_foreach(&proxy->connections, conns) {
 		struct lmtp_proxy_connection *conn = *conns;
 
-		lmtp_client_fail(conn->client, "451 4.3.0 Aborting");
 		lmtp_client_deinit(&conn->client);
 	}
 }
@@ -147,16 +149,16 @@ lmtp_proxy_get_connection(struct lmtp_pr
 
 static bool lmtp_proxy_send_data_replies(struct lmtp_proxy *proxy)
 {
-	const struct lmtp_proxy_recipient *rcpt;
+	struct lmtp_proxy_recipient *const *rcpt;
 	unsigned int i, count;
 
 	o_stream_cork(proxy->client_output);
 	rcpt = array_get(&proxy->rcpt_to, &count);
 	for (i = proxy->next_data_reply_idx; i < count; i++) {
-		if (!(rcpt[i].rcpt_to_failed || rcpt[i].data_reply_received))
+		if (!(rcpt[i]->rcpt_to_failed || rcpt[i]->data_reply_received))
 			break;
 		o_stream_send_str(proxy->client_output,
-				  t_strconcat(rcpt[i].reply, "\r\n", NULL));
+				  t_strconcat(rcpt[i]->reply, "\r\n", NULL));
 	}
 	o_stream_uncork(proxy->client_output);
 	proxy->next_data_reply_idx = i;
@@ -164,12 +166,26 @@ static bool lmtp_proxy_send_data_replies
 	return i == count;
 }
 
-static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+static void lmtp_proxy_finish_timeout(struct lmtp_proxy *proxy)
 {
 	i_assert(!proxy->finished);
 
+	timeout_remove(&proxy->to_finish);
 	proxy->finished = TRUE;
 	proxy->finish_callback(proxy->input_timeout, proxy->finish_context);
+}
+
+static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+{
+	/* do the actual finishing in a timeout handler, since the finish
+	   callback causes the proxy to be destroyed and the code leading up
+	   to this function can be called from many different places. it's
+	   easier this way rather than having all the callers check if the
+	   proxy was already destroyed. */
+	if (proxy->to_finish == NULL) {
+		proxy->to_finish = timeout_add(0, lmtp_proxy_finish_timeout,
+					       proxy);
+	}
 }
 
 static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
@@ -184,6 +200,7 @@ static void lmtp_conn_finish(void *conte
 {
 	struct lmtp_proxy_connection *conn = context;
 
+	conn->finished = TRUE;
 	if (conn->data_input != NULL)
 		i_stream_unref(&conn->data_input);
 	lmtp_proxy_try_finish(conn->proxy);
@@ -195,20 +212,22 @@ static void lmtp_proxy_fail_all(struct l
 	unsigned int i, count;
 	const char *line;
 
-	pool_ref(proxy->pool);
 	conns = array_get(&proxy->connections, &count);
 	for (i = 0; i < count; i++) {
 		line = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
 				" (%s while waiting for reply to %s)", reason,
 				lmtp_client_state_to_string(conns[i]->client));
 		lmtp_client_fail(conns[i]->client, line);
-
-		if (!array_is_created(&proxy->connections))
-			break;
-	}
-	pool_unref(&proxy->pool);
-	/* either the whole proxy is destroyed now, or we still have some
-	   DATA input to read. */
+	}
+
+	if (proxy->to_finish == NULL) {
+		/* we still have some DATA input to read */
+		if (proxy->io == NULL) {
+			proxy->io = io_add(i_stream_get_fd(proxy->data_input),
+					   IO_READ,
+					   lmtp_proxy_data_input, proxy);
+		}
+	}
 }
 
 static void lmtp_proxy_data_input_timeout(struct lmtp_proxy *proxy)
@@ -219,18 +238,17 @@ static void lmtp_proxy_data_input_timeou
 	proxy->input_timeout = TRUE;
 	i_stream_close(proxy->orig_data_input);
 
-	pool_ref(proxy->pool);
 	conns = array_get(&proxy->connections, &count);
 	for (i = 0; i < count; i++) {
 		lmtp_client_fail(conns[i]->client, ERRSTR_TEMP_REMOTE_FAILURE
 				 " (timeout in DATA input)");
-		if (!array_is_created(&proxy->connections)) {
-			pool_unref(&proxy->pool);
-			return;
-		}
-	}
-	/* last client failure should have caused the proxy to be destroyed */
-	i_unreached();
+	}
+	if (proxy->to_finish == NULL) {
+		/* we had earlier failed all clients already and were just
+		   waiting for DATA input to finish, but DATA input also failed
+		   with a timeout. */
+		lmtp_proxy_finish(proxy);
+	}
 }
 
 static void
@@ -268,51 +286,48 @@ int lmtp_proxy_add_rcpt(struct lmtp_prox
 	if (conn->failed)
 		return -1;
 
-	rcpt = array_append_space(&proxy->rcpt_to);
+	rcpt = p_new(proxy->pool, struct lmtp_proxy_recipient, 1);
 	rcpt->conn = conn;
 	rcpt->address = p_strdup(proxy->pool, address);
+	array_append(&proxy->rcpt_to, &rcpt, 1);
 
 	lmtp_client_add_rcpt(conn->client, address, lmtp_proxy_conn_rcpt_to,
 			     lmtp_proxy_conn_data, rcpt);
 	return 0;
 }
 
-static size_t lmtp_proxy_find_max_data_input_size(struct lmtp_proxy *proxy)
-{
-	struct lmtp_proxy_connection *const *conns;
-	unsigned int i, count;
-	size_t size, max_size = 0;
-
-	conns = array_get(&proxy->connections, &count);
-	for (i = 0; i < count; i++) {
-		if (conns[i]->data_input == NULL)
-			continue;
-		(void)i_stream_get_data(conns[i]->data_input, &size);
-		if (max_size < size)
-			max_size = size;
-	}
-	return max_size;
+static size_t lmtp_proxy_find_lowest_offset(struct lmtp_proxy *proxy)
+{
+	struct lmtp_proxy_connection *const *conns;
+	uoff_t min_offset = (uoff_t)-1;
+
+	array_foreach(&proxy->connections, conns) {
+		struct lmtp_proxy_connection *conn = *conns;
+
+		if (conn->data_input != NULL &&
+		    min_offset > conn->data_input->v_offset)
+			min_offset = conn->data_input->v_offset;
+	}
+	return min_offset;
 }
 
 static bool lmtp_proxy_disconnect_hanging_output(struct lmtp_proxy *proxy)
 {
 	struct lmtp_proxy_connection *const *conns;
-	unsigned int i, count;
-	size_t size, max_size;
-
-	max_size = lmtp_proxy_find_max_data_input_size(proxy);
-	if (max_size == 0)
+	uoff_t min_offset;
+
+	min_offset = lmtp_proxy_find_lowest_offset(proxy);
+	if (min_offset == (uoff_t)-1)
 		return FALSE;
 
 	/* disconnect all connections that are keeping us from reading
 	   more input. */
-	conns = array_get(&proxy->connections, &count);
-	for (i = 0; i < count; i++) {
-		if (conns[i]->data_input == NULL)
-			continue;
-		(void)i_stream_get_data(conns[i]->data_input, &size);
-		if (size == max_size) {
-			lmtp_client_fail(conns[i]->client,
+	array_foreach(&proxy->connections, conns) {
+		struct lmtp_proxy_connection *conn = *conns;
+


More information about the dovecot-cvs mailing list