dovecot-2.0: lmtp proxy fixes.
dovecot at dovecot.org
dovecot at dovecot.org
Mon Dec 7 20:34:40 EET 2009
details: http://hg.dovecot.org/dovecot-2.0/rev/3662241f75f2
changeset: 10415:3662241f75f2
user: Timo Sirainen <tss at iki.fi>
date: Mon Dec 07 13:34:35 2009 -0500
description:
lmtp proxy fixes.
diffstat:
3 files changed, 120 insertions(+), 58 deletions(-)
src/lib-lda/lmtp-client.c | 19 ++++-
src/lib-lda/lmtp-client.h | 5 +
src/lmtp/lmtp-proxy.c | 154 ++++++++++++++++++++++++++++-----------------
diffs (truncated from 382 to 300 lines):
diff -r ea17056d2df9 -r 3662241f75f2 src/lib-lda/lmtp-client.c
--- a/src/lib-lda/lmtp-client.c Mon Dec 07 11:40:40 2009 -0500
+++ b/src/lib-lda/lmtp-client.c Mon Dec 07 13:34:35 2009 -0500
@@ -48,6 +48,9 @@ struct lmtp_client {
struct ostream *output;
struct io *io;
int fd;
+
+ void (*data_output_callback)(void *);
+ void *data_output_context;
lmtp_finish_callback_t *finish_callback;
void *finish_context;
@@ -228,8 +231,7 @@ lmtp_client_data_next(struct lmtp_client
client->rcpt_next_data_idx = i + 1;
rcpt[i].failed = line[0] != '2';
- rcpt[i].data_callback(!rcpt[i].failed, line,
- rcpt[i].context);
+ rcpt[i].data_callback(!rcpt[i].failed, line, rcpt[i].context);
if (client->protocol == LMTP_CLIENT_PROTOCOL_LMTP)
break;
}
@@ -245,6 +247,7 @@ static void lmtp_client_send_data(struct
const unsigned char *data;
unsigned char add;
size_t i, size;
+ bool sent_bytes;
int ret;
if (client->output_finished)
@@ -275,6 +278,7 @@ static void lmtp_client_send_data(struct
break;
client->output_last = data[i-1];
i_stream_skip(client->data_input, i);
+ sent_bytes = TRUE;
}
if (o_stream_get_buffer_used_size(client->output) >= 4096) {
@@ -293,6 +297,9 @@ static void lmtp_client_send_data(struct
client->output_last = add;
}
}
+ if (sent_bytes && client->data_output_callback != NULL)
+ client->data_output_callback(client->data_output_context);
+
if (ret == 0 || ret == -2) {
/* -2 can happen with tee istreams */
return;
@@ -542,3 +549,11 @@ void lmtp_client_send_more(struct lmtp_c
if (client->input_state == LMTP_INPUT_STATE_DATA)
lmtp_client_send_data(client);
}
+
+void lmtp_client_set_data_output_callback(struct lmtp_client *client,
+ void (*callback)(void *),
+ void *context)
+{
+ client->data_output_callback = callback;
+ client->data_output_context = context;
+}
diff -r ea17056d2df9 -r 3662241f75f2 src/lib-lda/lmtp-client.h
--- a/src/lib-lda/lmtp-client.h Mon Dec 07 11:40:40 2009 -0500
+++ b/src/lib-lda/lmtp-client.h Mon Dec 07 13:34:35 2009 -0500
@@ -46,5 +46,10 @@ void lmtp_client_fail(struct lmtp_client
void lmtp_client_fail(struct lmtp_client *client, const char *line);
/* Return the state (command reply) the client is currently waiting for. */
const char *lmtp_client_state_to_string(struct lmtp_client *client);
+/* Call the given callback whenever client manages to send some more DATA
+ output to client. */
+void lmtp_client_set_data_output_callback(struct lmtp_client *client,
+ void (*callback)(void *),
+ void *context);
#endif
diff -r ea17056d2df9 -r 3662241f75f2 src/lmtp/lmtp-proxy.c
--- a/src/lmtp/lmtp-proxy.c Mon Dec 07 11:40:40 2009 -0500
+++ b/src/lmtp/lmtp-proxy.c Mon Dec 07 13:34:35 2009 -0500
@@ -27,6 +27,8 @@ struct lmtp_proxy_connection {
struct lmtp_client *client;
struct istream *data_input;
+
+ unsigned int finished:1;
unsigned int failed:1;
};
@@ -34,10 +36,10 @@ struct lmtp_proxy {
pool_t pool;
const char *mail_from, *my_hostname;
ARRAY_DEFINE(connections, struct lmtp_proxy_connection *);
- ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient);
+ ARRAY_DEFINE(rcpt_to, struct lmtp_proxy_recipient *);
unsigned int next_data_reply_idx;
- struct timeout *to, *to_data_idle;
+ struct timeout *to, *to_data_idle, *to_finish;
struct io *io;
struct istream *data_input, *orig_data_input;
struct ostream *client_output;
@@ -50,6 +52,7 @@ struct lmtp_proxy {
unsigned int finished:1;
unsigned int input_timeout:1;
+ unsigned int handling_data_input:1;
};
static void lmtp_conn_finish(void *context);
@@ -80,7 +83,6 @@ static void lmtp_proxy_connections_deini
array_foreach(&proxy->connections, conns) {
struct lmtp_proxy_connection *conn = *conns;
- lmtp_client_fail(conn->client, "451 4.3.0 Aborting");
lmtp_client_deinit(&conn->client);
}
}
@@ -147,16 +149,16 @@ lmtp_proxy_get_connection(struct lmtp_pr
static bool lmtp_proxy_send_data_replies(struct lmtp_proxy *proxy)
{
- const struct lmtp_proxy_recipient *rcpt;
+ struct lmtp_proxy_recipient *const *rcpt;
unsigned int i, count;
o_stream_cork(proxy->client_output);
rcpt = array_get(&proxy->rcpt_to, &count);
for (i = proxy->next_data_reply_idx; i < count; i++) {
- if (!(rcpt[i].rcpt_to_failed || rcpt[i].data_reply_received))
+ if (!(rcpt[i]->rcpt_to_failed || rcpt[i]->data_reply_received))
break;
o_stream_send_str(proxy->client_output,
- t_strconcat(rcpt[i].reply, "\r\n", NULL));
+ t_strconcat(rcpt[i]->reply, "\r\n", NULL));
}
o_stream_uncork(proxy->client_output);
proxy->next_data_reply_idx = i;
@@ -164,12 +166,26 @@ static bool lmtp_proxy_send_data_replies
return i == count;
}
-static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+static void lmtp_proxy_finish_timeout(struct lmtp_proxy *proxy)
{
i_assert(!proxy->finished);
+ timeout_remove(&proxy->to_finish);
proxy->finished = TRUE;
proxy->finish_callback(proxy->input_timeout, proxy->finish_context);
+}
+
+static void lmtp_proxy_finish(struct lmtp_proxy *proxy)
+{
+ /* do the actual finishing in a timeout handler, since the finish
+ callback causes the proxy to be destroyed and the code leading up
+ to this function can be called from many different places. it's
+ easier this way rather than having all the callers check if the
+ proxy was already destroyed. */
+ if (proxy->to_finish == NULL) {
+ proxy->to_finish = timeout_add(0, lmtp_proxy_finish_timeout,
+ proxy);
+ }
}
static void lmtp_proxy_try_finish(struct lmtp_proxy *proxy)
@@ -184,6 +200,7 @@ static void lmtp_conn_finish(void *conte
{
struct lmtp_proxy_connection *conn = context;
+ conn->finished = TRUE;
if (conn->data_input != NULL)
i_stream_unref(&conn->data_input);
lmtp_proxy_try_finish(conn->proxy);
@@ -195,20 +212,22 @@ static void lmtp_proxy_fail_all(struct l
unsigned int i, count;
const char *line;
- pool_ref(proxy->pool);
conns = array_get(&proxy->connections, &count);
for (i = 0; i < count; i++) {
line = t_strdup_printf(ERRSTR_TEMP_REMOTE_FAILURE
" (%s while waiting for reply to %s)", reason,
lmtp_client_state_to_string(conns[i]->client));
lmtp_client_fail(conns[i]->client, line);
-
- if (!array_is_created(&proxy->connections))
- break;
- }
- pool_unref(&proxy->pool);
- /* either the whole proxy is destroyed now, or we still have some
- DATA input to read. */
+ }
+
+ if (proxy->to_finish == NULL) {
+ /* we still have some DATA input to read */
+ if (proxy->io == NULL) {
+ proxy->io = io_add(i_stream_get_fd(proxy->data_input),
+ IO_READ,
+ lmtp_proxy_data_input, proxy);
+ }
+ }
}
static void lmtp_proxy_data_input_timeout(struct lmtp_proxy *proxy)
@@ -219,18 +238,17 @@ static void lmtp_proxy_data_input_timeou
proxy->input_timeout = TRUE;
i_stream_close(proxy->orig_data_input);
- pool_ref(proxy->pool);
conns = array_get(&proxy->connections, &count);
for (i = 0; i < count; i++) {
lmtp_client_fail(conns[i]->client, ERRSTR_TEMP_REMOTE_FAILURE
" (timeout in DATA input)");
- if (!array_is_created(&proxy->connections)) {
- pool_unref(&proxy->pool);
- return;
- }
- }
- /* last client failure should have caused the proxy to be destroyed */
- i_unreached();
+ }
+ if (proxy->to_finish == NULL) {
+ /* we had earlier failed all clients already and were just
+ waiting for DATA input to finish, but DATA input also failed
+ with a timeout. */
+ lmtp_proxy_finish(proxy);
+ }
}
static void
@@ -268,51 +286,48 @@ int lmtp_proxy_add_rcpt(struct lmtp_prox
if (conn->failed)
return -1;
- rcpt = array_append_space(&proxy->rcpt_to);
+ rcpt = p_new(proxy->pool, struct lmtp_proxy_recipient, 1);
rcpt->conn = conn;
rcpt->address = p_strdup(proxy->pool, address);
+ array_append(&proxy->rcpt_to, &rcpt, 1);
lmtp_client_add_rcpt(conn->client, address, lmtp_proxy_conn_rcpt_to,
lmtp_proxy_conn_data, rcpt);
return 0;
}
-static size_t lmtp_proxy_find_max_data_input_size(struct lmtp_proxy *proxy)
-{
- struct lmtp_proxy_connection *const *conns;
- unsigned int i, count;
- size_t size, max_size = 0;
-
- conns = array_get(&proxy->connections, &count);
- for (i = 0; i < count; i++) {
- if (conns[i]->data_input == NULL)
- continue;
- (void)i_stream_get_data(conns[i]->data_input, &size);
- if (max_size < size)
- max_size = size;
- }
- return max_size;
+static size_t lmtp_proxy_find_lowest_offset(struct lmtp_proxy *proxy)
+{
+ struct lmtp_proxy_connection *const *conns;
+ uoff_t min_offset = (uoff_t)-1;
+
+ array_foreach(&proxy->connections, conns) {
+ struct lmtp_proxy_connection *conn = *conns;
+
+ if (conn->data_input != NULL &&
+ min_offset > conn->data_input->v_offset)
+ min_offset = conn->data_input->v_offset;
+ }
+ return min_offset;
}
static bool lmtp_proxy_disconnect_hanging_output(struct lmtp_proxy *proxy)
{
struct lmtp_proxy_connection *const *conns;
- unsigned int i, count;
- size_t size, max_size;
-
- max_size = lmtp_proxy_find_max_data_input_size(proxy);
- if (max_size == 0)
+ uoff_t min_offset;
+
+ min_offset = lmtp_proxy_find_lowest_offset(proxy);
+ if (min_offset == (uoff_t)-1)
return FALSE;
/* disconnect all connections that are keeping us from reading
more input. */
- conns = array_get(&proxy->connections, &count);
- for (i = 0; i < count; i++) {
- if (conns[i]->data_input == NULL)
- continue;
- (void)i_stream_get_data(conns[i]->data_input, &size);
- if (size == max_size) {
- lmtp_client_fail(conns[i]->client,
+ array_foreach(&proxy->connections, conns) {
+ struct lmtp_proxy_connection *conn = *conns;
+
More information about the dovecot-cvs
mailing list