dovecot-2.2: lmtp: Added lmtp_user_concurrency_limit setting.

dovecot at dovecot.org dovecot at dovecot.org
Fri Mar 6 00:10:42 UTC 2015


details:   http://hg.dovecot.org/dovecot-2.2/rev/0e657cfb3f98
changeset: 18297:0e657cfb3f98
user:      Timo Sirainen <tss at iki.fi>
date:      Fri Mar 06 02:10:02 2015 +0200
description:
lmtp: Added lmtp_user_concurrency_limit setting.
This allows limiting the number of concurrent deliveries to a single user.
Useful in case one user is receiving a lot of mails and causing other
deliveries to be delayed. If the limit is reached, Dovecot returns temporary
failure error at DATA stage.

diffstat:

 src/lmtp/client.c        |   6 +++-
 src/lmtp/client.h        |   7 ++++
 src/lmtp/commands.c      |  75 ++++++++++++++++++++++++++++++++++++++++++++---
 src/lmtp/lmtp-settings.c |   2 +
 src/lmtp/lmtp-settings.h |   1 +
 src/lmtp/main.c          |   4 ++
 src/lmtp/main.h          |   1 +
 7 files changed, 89 insertions(+), 7 deletions(-)

diffs (284 lines):

diff -r c1403c1eb3e9 -r 0e657cfb3f98 src/lmtp/client.c
--- a/src/lmtp/client.c	Fri Mar 06 02:06:10 2015 +0200
+++ b/src/lmtp/client.c	Fri Mar 06 02:10:02 2015 +0200
@@ -11,6 +11,7 @@
 #include "process-title.h"
 #include "var-expand.h"
 #include "settings-parser.h"
+#include "anvil-client.h"
 #include "master-service.h"
 #include "master-service-ssl.h"
 #include "master-service-settings.h"
@@ -355,8 +356,11 @@
 		lmtp_proxy_deinit(&client->proxy);
 
 	if (array_is_created(&client->state.rcpt_to)) {
-		array_foreach_modifiable(&client->state.rcpt_to, rcptp)
+		array_foreach_modifiable(&client->state.rcpt_to, rcptp) {
+			if ((*rcptp)->anvil_query != NULL)
+				anvil_client_query_abort(anvil, &(*rcptp)->anvil_query);
 			mail_storage_service_user_free(&(*rcptp)->service_user);
+		}
 	}
 
 	if (client->state.raw_mail != NULL) {
diff -r c1403c1eb3e9 -r 0e657cfb3f98 src/lmtp/client.h
--- a/src/lmtp/client.h	Fri Mar 06 02:06:10 2015 +0200
+++ b/src/lmtp/client.h	Fri Mar 06 02:10:02 2015 +0200
@@ -7,11 +7,15 @@
 #define CLIENT_MAIL_DATA_MAX_INMEMORY_SIZE (1024*128)
 
 struct mail_recipient {
+	struct client *client;
+
 	const char *address;
 	const char *detail; /* +detail part is also in address */
 	struct lmtp_recipient_params params;
 
+	struct anvil_query *anvil_query;
 	struct mail_storage_service_user *service_user;
+	unsigned int parallel_count;
 };
 
 struct client_state {
@@ -21,6 +25,9 @@
 	ARRAY(struct mail_recipient *) rcpt_to;
 	unsigned int rcpt_idx;
 
+	unsigned int anvil_queries;
+	bool anvil_pending_data_write;
+
 	unsigned int data_end_idx;
 
 	/* Initially we start writing to mail_data. If it grows too large,
diff -r c1403c1eb3e9 -r 0e657cfb3f98 src/lmtp/commands.c
--- a/src/lmtp/commands.c	Fri Mar 06 02:06:10 2015 +0200
+++ b/src/lmtp/commands.c	Fri Mar 06 02:10:02 2015 +0200
@@ -15,6 +15,7 @@
 #include "var-expand.h"
 #include "restrict-access.h"
 #include "settings-parser.h"
+#include "anvil-client.h"
 #include "master-service.h"
 #include "master-service-ssl.h"
 #include "iostream-ssl.h"
@@ -41,6 +42,8 @@
 
 #define LMTP_PROXY_DEFAULT_TIMEOUT_MSECS (1000*30)
 
+static void client_input_data_write(struct client *client);
+
 int cmd_lhlo(struct client *client, const char *args)
 {
 	struct rfc822_parser_context parser;
@@ -563,6 +566,34 @@
 	return ret;
 }
 
+static void rcpt_anvil_lookup_callback(const char *reply, void *context)
+{
+	struct mail_recipient *rcpt = context;
+
+	i_assert(rcpt->client->state.anvil_queries > 0);
+
+	rcpt->anvil_query = NULL;
+	if (reply == NULL) {
+		/* lookup failed */
+	} else if (str_to_uint(reply, &rcpt->parallel_count) < 0) {
+		i_error("Invalid reply from anvil: %s", reply);
+	}
+	if (--rcpt->client->state.anvil_queries == 0 &&
+	    rcpt->client->state.anvil_pending_data_write) {
+		/* DATA command was finished, but we were still waiting on
+		   anvil before handling any users */
+		client_input_data_write(rcpt->client);
+	}
+}
+
+static void lmtp_anvil_init(void)
+{
+	if (anvil == NULL) {
+		const char *path = t_strdup_printf("%s/anvil", base_dir);
+		anvil = anvil_client_init(path, NULL, 0);
+	}
+}
+
 int cmd_rcpt(struct client *client, const char *args)
 {
 	struct mail_recipient *rcpt;
@@ -584,6 +615,7 @@
 	}
 
 	rcpt = p_new(client->state_pool, struct mail_recipient, 1);
+	rcpt->client = client;
 	address = lmtp_unescape_address(address);
 
 	argv = t_strsplit(params, " ");
@@ -652,6 +684,17 @@
 		array_append(&client->state.rcpt_to, &rcpt, 1);
 		client_send_line(client, "250 2.1.5 OK");
 	}
+
+	if (client->lmtp_set->lmtp_user_concurrency_limit > 0) {
+		const char *query = t_strconcat("LOOKUP\t",
+			master_service_get_name(master_service),
+			"/", str_tabescape(username), NULL);
+		lmtp_anvil_init();
+		rcpt->anvil_query = anvil_client_query(anvil, query,
+					rcpt_anvil_lookup_callback, rcpt);
+		if (rcpt->anvil_query != NULL)
+			client->state.anvil_queries++;
+	}
 	return 0;
 }
 
@@ -710,9 +753,18 @@
 	enum mail_error mail_error;
 	int ret;
 
+	i_assert(client->state.anvil_queries == 0);
+
 	input = mail_storage_service_user_get_input(rcpt->service_user);
 	username = t_strdup(input->username);
 
+	if (rcpt->parallel_count >= client->lmtp_set->lmtp_user_concurrency_limit) {
+		client_send_line(client, ERRSTR_TEMP_USERDB_FAIL_PREFIX
+				 "Too many concurrent deliveries for user",
+				 rcpt->address);
+		return -1;
+	}
+
 	mail_set = mail_storage_service_user_get_mail_set(rcpt->service_user);
 	set_parser = mail_storage_service_user_get_settings_parser(rcpt->service_user);
 	if (client->proxy_timeout_secs > 0 &&
@@ -776,6 +828,11 @@
 	dctx.save_dest_mail = array_count(&client->state.rcpt_to) > 1 &&
 		client->state.first_saved_mail == NULL;
 
+	if (client->lmtp_set->lmtp_user_concurrency_limit > 0) {
+		master_service_anvil_send(master_service, t_strconcat(
+			"CONNECT\t", my_pid, "\t", master_service_get_name(master_service),
+			"/", username, "\n", NULL));
+	}
 	if (mail_deliver(&dctx, &storage) == 0) {
 		if (dctx.dest_mail != NULL) {
 			i_assert(client->state.first_saved_mail == NULL);
@@ -807,6 +864,11 @@
 				 rcpt->address);
 		ret = -1;
 	}
+	if (client->lmtp_set->lmtp_user_concurrency_limit > 0) {
+		master_service_anvil_send(master_service, t_strconcat(
+			"DISCONNECT\t", my_pid, "\t", master_service_get_name(master_service),
+			"/", username, "\n", NULL));
+	}
 	return ret;
 }
 
@@ -1019,10 +1081,9 @@
 	return str_c(str);
 }
 
-static bool client_input_data_write(struct client *client)
+static void client_input_data_write(struct client *client)
 {
 	struct istream *input;
-	bool ret = TRUE;
 
 	/* stop handling client input until saving/proxying is finished */
 	if (client->to_idle != NULL)
@@ -1037,10 +1098,10 @@
 		client_state_set(client, "DATA", "proxying");
 		lmtp_proxy_start(client->proxy, input,
 				 client_proxy_finish, client);
-		ret = FALSE;
+	} else {
+		client_input_data_finish(client);
 	}
 	i_stream_unref(&input);
-	return ret;
 }
 
 static int client_input_add_file(struct client *client,
@@ -1127,8 +1188,10 @@
 		return;
 	}
 
-	if (client_input_data_write(client))
-		client_input_data_finish(client);
+	if (client->state.anvil_queries == 0)
+		client_input_data_write(client);
+	else
+		client->state.anvil_pending_data_write = TRUE;
 }
 
 static void client_input_data(struct client *client)
diff -r c1403c1eb3e9 -r 0e657cfb3f98 src/lmtp/lmtp-settings.c
--- a/src/lmtp/lmtp-settings.c	Fri Mar 06 02:06:10 2015 +0200
+++ b/src/lmtp/lmtp-settings.c	Fri Mar 06 02:10:02 2015 +0200
@@ -60,6 +60,7 @@
 	DEF(SET_BOOL, lmtp_proxy),
 	DEF(SET_BOOL, lmtp_save_to_detail_mailbox),
 	DEF(SET_BOOL, lmtp_rcpt_check_quota),
+	DEF(SET_UINT, lmtp_user_concurrency_limit),
 	DEF(SET_STR, lmtp_address_translate),
 	DEF(SET_STR_VARS, login_greeting),
 	DEF(SET_STR, login_trusted_networks),
@@ -71,6 +72,7 @@
 	.lmtp_proxy = FALSE,
 	.lmtp_save_to_detail_mailbox = FALSE,
 	.lmtp_rcpt_check_quota = FALSE,
+	.lmtp_user_concurrency_limit = 0,
 	.lmtp_address_translate = "",
 	.login_greeting = PACKAGE_NAME" ready.",
 	.login_trusted_networks = ""
diff -r c1403c1eb3e9 -r 0e657cfb3f98 src/lmtp/lmtp-settings.h
--- a/src/lmtp/lmtp-settings.h	Fri Mar 06 02:06:10 2015 +0200
+++ b/src/lmtp/lmtp-settings.h	Fri Mar 06 02:10:02 2015 +0200
@@ -8,6 +8,7 @@
 	bool lmtp_proxy;
 	bool lmtp_save_to_detail_mailbox;
 	bool lmtp_rcpt_check_quota;
+	unsigned int lmtp_user_concurrency_limit;
 	const char *lmtp_address_translate;
 	const char *login_greeting;
 	const char *login_trusted_networks;
diff -r c1403c1eb3e9 -r 0e657cfb3f98 src/lmtp/main.c
--- a/src/lmtp/main.c	Fri Mar 06 02:06:10 2015 +0200
+++ b/src/lmtp/main.c	Fri Mar 06 02:10:02 2015 +0200
@@ -7,6 +7,7 @@
 #include "abspath.h"
 #include "restrict-access.h"
 #include "fd-close-on-exec.h"
+#include "anvil-client.h"
 #include "master-service.h"
 #include "master-service-settings.h"
 #include "master-interface.h"
@@ -27,6 +28,7 @@
 
 const char *dns_client_socket_path, *base_dir;
 struct mail_storage_service_ctx *storage_service;
+struct anvil_client *anvil;
 
 static void client_connected(struct master_service_connection *conn)
 {
@@ -69,6 +71,8 @@
 static void main_deinit(void)
 {
 	clients_destroy();
+	if (anvil != NULL)
+		anvil_client_deinit(&anvil);
 }
 
 int main(int argc, char *argv[])
diff -r c1403c1eb3e9 -r 0e657cfb3f98 src/lmtp/main.h
--- a/src/lmtp/main.h	Fri Mar 06 02:06:10 2015 +0200
+++ b/src/lmtp/main.h	Fri Mar 06 02:10:02 2015 +0200
@@ -3,6 +3,7 @@
 
 extern const char *dns_client_socket_path, *base_dir;
 extern struct mail_storage_service_ctx *storage_service;
+extern struct anvil_client *anvil;
 
 void listener_client_destroyed(void);
 


More information about the dovecot-cvs mailing list