dovecot-2.0: dsync: Rewrote copying and uid conflict resolution ...

dovecot at dovecot.org dovecot at dovecot.org
Wed Jun 30 23:21:23 EEST 2010


details:   http://hg.dovecot.org/dovecot-2.0/rev/372af44dca85
changeset: 11664:372af44dca85
user:      Timo Sirainen <tss at iki.fi>
date:      Wed Jun 30 21:19:59 2010 +0100
description:
dsync: Rewrote copying and uid conflict resolution code. Added tons of comments.
The uid conflict resolution is now done using copy+expunge, without needing
a special "change uid" feature from lib-storage.

diffstat:

 src/dsync/dsync-brain-msgs-new.c |  218 ++++++++++++++++++++++---------------------
 src/dsync/dsync-brain-msgs.c     |  134 +++++++++++++++++++++-----
 src/dsync/dsync-brain-private.h  |    8 +-
 src/dsync/dsync-worker-local.c   |   29 ++++-
 src/dsync/test-dsync-brain.c     |    2 -
 5 files changed, 247 insertions(+), 144 deletions(-)

diffs (truncated from 652 to 300 lines):

diff -r b09638ebb87d -r 372af44dca85 src/dsync/dsync-brain-msgs-new.c
--- a/src/dsync/dsync-brain-msgs-new.c	Wed Jun 30 20:42:13 2010 +0100
+++ b/src/dsync/dsync-brain-msgs-new.c	Wed Jun 30 21:19:59 2010 +0100
@@ -1,5 +1,32 @@
 /* Copyright (c) 2009-2010 Dovecot authors, see the included COPYING file */
 
+/*
+   This code contains the step 6 explained in dsync-brain-msgs.c:
+   It saves/copies new messages and gives new UIDs for conflicting messages.
+
+   The input is both workers' msg iterators' new_msgs and uid_conflicts
+   variables. They're first sorted by mailbox and secondarily by wanted
+   destination UID. Destination UIDs of conflicts should always be higher
+   than new messages'.
+
+   Mailboxes are handled one at a time:
+
+   1. Go through all saved messages. If we've already seen an instance of this
+      message, try to copy it. Otherwise save a new instance of it.
+   2. Some of the copies may fail because they're already expunged by that
+      time. A list of these failed copies are saved to copy_retry_indexes.
+   3. UID conflicts are resolved by assigning a new UID to the message.
+      To avoid delays with remote dsync, this is done via worker API.
+      Internally the local worker copies the message to its new UID and
+      once the copy succeeds, the old UID is expunged. If the copy fails, it's
+      either due to message already being expunged or something more fatal.
+   4. Once all messages are saved/copied, see if there are any failed copies.
+      If so, goto 1, but going through only the failed messages.
+   5. If there are more mailboxes left, go to next one and goto 1.
+
+   Step 4 may require waiting for remote worker to send all replies.
+*/
+
 #include "lib.h"
 #include "array.h"
 #include "istream.h"
@@ -51,31 +78,56 @@
 	}
 	if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs)
 		dsync_brain_msg_sync_add_new_msgs(ctx->iter);
+	i_free(ctx);
+}
+
+static void
+dsync_brain_sync_remove_guid_instance(struct dsync_brain_msg_iter *iter,
+				      const struct dsync_brain_new_msg *msg)
+{
+	struct dsync_brain_guid_instance *inst;
+	void *orig_key, *orig_value;
+
+	if (!hash_table_lookup_full(iter->guid_hash, msg->msg->guid,
+				    &orig_key, &orig_value)) {
+		/* another failed copy already removed it */
+		return;
+	}
+	inst = orig_value;
+
+	if (inst->next == NULL)
+		hash_table_remove(iter->guid_hash, orig_key);
+	else
+		hash_table_update(iter->guid_hash, orig_key, inst->next);
 }
 
 static void dsync_brain_copy_callback(bool success, void *context)
 {
 	struct dsync_brain_msg_copy_context *ctx = context;
-	const struct dsync_brain_new_msg *msg;
-	struct dsync_brain_guid_instance *inst;
+	struct dsync_brain_new_msg *msg;
 
 	if (!success) {
 		/* mark the guid instance invalid and try again later */
-		msg = array_idx(&ctx->iter->new_msgs, ctx->msg_idx);
-		inst = hash_table_lookup(ctx->iter->guid_hash, msg->msg->guid);
-		inst->failed = TRUE;
-		array_append(&ctx->iter->copy_retry_indexes, &ctx->msg_idx, 1);
+		msg = array_idx_modifiable(&ctx->iter->new_msgs, ctx->msg_idx);
+		i_assert(msg->saved);
+		msg->saved = FALSE;
+
+		if (ctx->iter->next_new_msg > ctx->msg_idx)
+			ctx->iter->next_new_msg = ctx->msg_idx;
+
+		dsync_brain_sync_remove_guid_instance(ctx->iter, msg);
 	}
 
 	if (--ctx->iter->copy_results_left == 0 && !ctx->iter->adding_msgs)
 		dsync_brain_msg_sync_add_new_msgs(ctx->iter);
+	i_free(ctx);
 }
 
 static int
 dsync_brain_msg_sync_add_new_msg(struct dsync_brain_msg_iter *dest_iter,
 				 const mailbox_guid_t *src_mailbox,
 				 unsigned int msg_idx,
-				 const struct dsync_brain_new_msg *msg)
+				 struct dsync_brain_new_msg *msg)
 {
 	struct dsync_brain_msg_save_context *save_ctx;
 	struct dsync_brain_msg_copy_context *copy_ctx;
@@ -83,14 +135,15 @@
 	const struct dsync_brain_guid_instance *inst;
 	const struct dsync_brain_mailbox *inst_box;
 
+	msg->saved = TRUE;
+
 	inst = hash_table_lookup(dest_iter->guid_hash, msg->msg->guid);
 	if (inst != NULL) {
 		/* we can save this by copying an existing message */
 		inst_box = array_idx(&dest_iter->sync->mailboxes,
 				     inst->mailbox_idx);
 
-		copy_ctx = p_new(dest_iter->sync->pool,
-				 struct dsync_brain_msg_copy_context, 1);
+		copy_ctx = i_new(struct dsync_brain_msg_copy_context, 1);
 		copy_ctx->iter = dest_iter;
 		copy_ctx->msg_idx = msg_idx;
 
@@ -104,11 +157,9 @@
 			dest_iter->sync->src_msg_iter :
 			dest_iter->sync->dest_msg_iter;
 
-		save_ctx = p_new(src_iter->sync->pool,
-				 struct dsync_brain_msg_save_context, 1);
+		save_ctx = i_new(struct dsync_brain_msg_save_context, 1);
 		save_ctx->iter = dest_iter;
-		save_ctx->msg = dsync_message_dup(src_iter->sync->pool,
-						  msg->msg);
+		save_ctx->msg = msg->msg;
 		save_ctx->mailbox_idx = dest_iter->mailbox_idx;
 
 		dest_iter->adding_msgs = TRUE;
@@ -126,12 +177,14 @@
 dsync_brain_mailbox_add_new_msgs(struct dsync_brain_msg_iter *iter,
 				 const mailbox_guid_t *mailbox_guid)
 {
-	const struct dsync_brain_new_msg *msgs;
+	struct dsync_brain_new_msg *msgs;
 	unsigned int i, msg_count;
 	bool ret = TRUE;
 
-	msgs = array_get(&iter->new_msgs, &msg_count);
+	msgs = array_get_modifiable(&iter->new_msgs, &msg_count);
 	for (i = iter->next_new_msg; i < msg_count; i++) {
+		if (msgs[i].saved)
+			continue;
 		if (msgs[i].mailbox_idx != iter->mailbox_idx) {
 			i_assert(msgs[i].mailbox_idx > iter->mailbox_idx);
 			ret = FALSE;
@@ -175,48 +228,46 @@
 }
 
 static void
-dsync_brain_mailbox_retry_copies(struct dsync_brain_msg_iter *iter,
-				 const mailbox_guid_t *mailbox_guid)
+dsync_brain_msg_sync_finish(struct dsync_brain_msg_iter *iter)
 {
-	const uint32_t *indexes;
-	const struct dsync_brain_new_msg *msgs;
-	unsigned int i, msg_idx, idx_count, msg_count;
-	struct dsync_brain_guid_instance *inst;
-	const char *guid_str;
-	void *orig_key, *orig_value;
+	struct dsync_brain_mailbox_sync *sync = iter->sync;
 
-	/* first remove GUID instances that had failed. */
-	msgs = array_get(&iter->new_msgs, &msg_count);
-	indexes = array_get(&iter->copy_retry_indexes, &idx_count);
-	for (i = 0; i < idx_count; i++) {
-		guid_str = msgs[indexes[i]].msg->guid;
-		if (hash_table_lookup_full(iter->guid_hash, guid_str,
-					   &orig_key, &orig_value))
-			inst = orig_value;
-		else
-			inst = NULL;
-		if (inst != NULL && inst->failed) {
-			inst = inst->next;
-			if (inst == NULL)
-				hash_table_remove(iter->guid_hash, guid_str);
-			else {
-				hash_table_update(iter->guid_hash, orig_key,
-						  inst);
-			}
+	iter->msgs_sent = TRUE;
+
+	/* done with all mailboxes from this iter */
+	dsync_worker_set_input_callback(iter->worker, NULL, NULL);
+
+	if (sync->src_msg_iter->msgs_sent &&
+	    sync->dest_msg_iter->msgs_sent &&
+	    sync->src_msg_iter->save_results_left == 0 &&
+	    sync->dest_msg_iter->save_results_left == 0 &&
+	    dsync_worker_output_flush(sync->dest_worker) > 0 &&
+	    dsync_worker_output_flush(sync->src_worker) > 0) {
+		sync->brain->state++;
+		dsync_brain_sync(sync->brain);
+	}
+}
+
+static bool
+dsync_brain_msg_sync_select_mailbox(struct dsync_brain_msg_iter *iter)
+{
+	const struct dsync_brain_mailbox *mailbox;
+
+	while (iter->mailbox_idx < array_count(&iter->sync->mailboxes)) {
+		if (array_count(&iter->new_msgs) == 0 &&
+		    array_count(&iter->uid_conflicts) == 0) {
+			/* optimization: don't even bother selecting this
+			   mailbox */
+			iter->mailbox_idx++;
+			continue;
 		}
+
+		mailbox = array_idx(&iter->sync->mailboxes, iter->mailbox_idx);
+		dsync_worker_select_mailbox(iter->worker, &mailbox->box);
+		return TRUE;
 	}
-
-	/* try saving again. there probably weren't many of them, so don't
-	   worry about filling output buffer. */
-	for (i = 0; i < idx_count; i++) {
-		msg_idx = indexes[i];
-		// FIXME: if buffer fills, we assert-crash
-		(void)dsync_brain_msg_sync_add_new_msg(iter, mailbox_guid,
-						       msg_idx, &msgs[msg_idx]);
-	}
-
-	/* if we copied anything, we'll again have to wait for the results */
-	array_clear(&iter->copy_retry_indexes);
+	dsync_brain_msg_sync_finish(iter);
+	return FALSE;
 }
 
 static void
@@ -225,19 +276,9 @@
 	const struct dsync_brain_mailbox *mailbox;
 	const mailbox_guid_t *mailbox_guid;
 
-	while (iter->mailbox_idx < array_count(&iter->sync->mailboxes)) {
+	do {
 		mailbox = array_idx(&iter->sync->mailboxes, iter->mailbox_idx);
 		mailbox_guid = &mailbox->box.mailbox_guid;
-
-		if (array_count(&iter->new_msgs) == 0) {
-			/* optimization: don't even bother selecting the
-			   mailbox */
-			iter->mailbox_idx++;
-			continue;
-		}
-
-		dsync_worker_select_mailbox(iter->worker, &mailbox->box);
-
 		if (dsync_brain_mailbox_add_new_msgs(iter, mailbox_guid)) {
 			/* continue later */
 			return;
@@ -246,11 +287,6 @@
 		/* all messages saved for this mailbox. continue with saving
 		   its conflicts and waiting for copies to finish. */
 		dsync_brain_mailbox_save_conflicts(iter);
-
-		while (iter->copy_results_left == 0 &&
-		       array_count(&iter->copy_retry_indexes) > 0)
-			dsync_brain_mailbox_retry_copies(iter, mailbox_guid);
-
 		if (iter->copy_results_left > 0) {
 			/* wait for copies to finish */
 			return;
@@ -258,21 +294,7 @@
 
 		/* done with this mailbox, try the next one */
 		iter->mailbox_idx++;
-	}
-	iter->msgs_sent = TRUE;
-
-	/* done with all mailboxes from this iter */
-	dsync_worker_set_input_callback(iter->worker, NULL, NULL);
-
-	if (iter->sync->src_msg_iter->msgs_sent &&
-	    iter->sync->dest_msg_iter->msgs_sent &&
-	    iter->sync->src_msg_iter->save_results_left == 0 &&
-	    iter->sync->dest_msg_iter->save_results_left == 0 &&
-	    dsync_worker_output_flush(iter->sync->dest_worker) > 0 &&
-	    dsync_worker_output_flush(iter->sync->src_worker) > 0) {
-		iter->sync->brain->state++;
-		dsync_brain_sync(iter->sync->brain);
-	}
+	} while (dsync_brain_msg_sync_select_mailbox(iter));
 }
 
 static void dsync_worker_new_msg_output(void *context)
@@ -318,13 +340,16 @@
 {
 	iter->mailbox_idx = 0;
 
+	/* sort input by 1) mailbox, 2) new message UID */
 	array_sort(&iter->new_msgs, dsync_brain_new_msg_cmp);
 	array_sort(&iter->uid_conflicts, dsync_brain_uid_conflict_cmp);
 
 	dsync_worker_set_input_callback(iter->worker, NULL, iter);
 	dsync_worker_set_output_callback(iter->worker,


More information about the dovecot-cvs mailing list