dovecot-2.0: dsync: Rewrote copying and uid conflict resolution ...
dovecot at dovecot.org
dovecot at dovecot.org
Wed Jun 30 23:21:23 EEST 2010
details: http://hg.dovecot.org/dovecot-2.0/rev/372af44dca85
changeset: 11664:372af44dca85
user: Timo Sirainen <tss at iki.fi>
date: Wed Jun 30 21:19:59 2010 +0100
description:
dsync: Rewrote copying and uid conflict resolution code. Added tons of comments.
The uid conflict resolution is now done using copy+expunge, without needing
a special "change uid" feature from lib-storage.
diffstat:
src/dsync/dsync-brain-msgs-new.c | 218 ++++++++++++++++++++++---------------------
src/dsync/dsync-brain-msgs.c | 134 +++++++++++++++++++++-----
src/dsync/dsync-brain-private.h | 8 +-
src/dsync/dsync-worker-local.c | 29 ++++-
src/dsync/test-dsync-brain.c | 2 -
5 files changed, 247 insertions(+), 144 deletions(-)
diffs (truncated from 652 to 300 lines):
diff -r b09638ebb87d -r 372af44dca85 src/dsync/dsync-brain-msgs-new.c
--- a/src/dsync/dsync-brain-msgs-new.c Wed Jun 30 20:42:13 2010 +0100
+++ b/src/dsync/dsync-brain-msgs-new.c Wed Jun 30 21:19:59 2010 +0100
@@ -1,5 +1,32 @@
/* Copyright (c) 2009-2010 Dovecot authors, see the included COPYING file */
+/*
+ This code contains the step 6 explained in dsync-brain-msgs.c:
+ It saves/copies new messages and gives new UIDs for conflicting messages.
+
+ The input is both workers' msg iterators' new_msgs and uid_conflicts
+ variables. They're first sorted by mailbox and secondarily by wanted
+ destination UID. Destination UIDs of conflicts should always be higher
+ than new messages'.
+
+ Mailboxes are handled one at a time:
+
+ 1. Go through all saved messages. If we've already seen an instance of this
+ message, try to copy it. Otherwise save a new instance of it.
+ 2. Some of the copies may fail because they're already expunged by that
+ time. A list of these failed copies are saved to copy_retry_indexes.
+ 3. UID conflicts are resolved by assigning a new UID to the message.
+ To avoid delays with remote dsync, this is done via worker API.
+ Internally the local worker copies the message to its new UID and
+ once the copy succeeds, the old UID is expunged. If the copy fails, it's
+ either due to message already being expunged or something more fatal.
+ 4. Once all messages are saved/copied, see if there are any failed copies.
+ If so, goto 1, but going through only the failed messages.
+ 5. If there are more mailboxes left, go to next one and goto 1.
+
+ Step 4 may require waiting for remote worker to send all replies.
+*/
+
#include "lib.h"
#include "array.h"
#include "istream.h"
@@ -51,31 +78,56 @@
}
if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs)
dsync_brain_msg_sync_add_new_msgs(ctx->iter);
+ i_free(ctx);
+}
+
+static void
+dsync_brain_sync_remove_guid_instance(struct dsync_brain_msg_iter *iter,
+ const struct dsync_brain_new_msg *msg)
+{
+ struct dsync_brain_guid_instance *inst;
+ void *orig_key, *orig_value;
+
+ if (!hash_table_lookup_full(iter->guid_hash, msg->msg->guid,
+ &orig_key, &orig_value)) {
+ /* another failed copy already removed it */
+ return;
+ }
+ inst = orig_value;
+
+ if (inst->next == NULL)
+ hash_table_remove(iter->guid_hash, orig_key);
+ else
+ hash_table_update(iter->guid_hash, orig_key, inst->next);
}
static void dsync_brain_copy_callback(bool success, void *context)
{
struct dsync_brain_msg_copy_context *ctx = context;
- const struct dsync_brain_new_msg *msg;
- struct dsync_brain_guid_instance *inst;
+ struct dsync_brain_new_msg *msg;
if (!success) {
/* mark the guid instance invalid and try again later */
- msg = array_idx(&ctx->iter->new_msgs, ctx->msg_idx);
- inst = hash_table_lookup(ctx->iter->guid_hash, msg->msg->guid);
- inst->failed = TRUE;
- array_append(&ctx->iter->copy_retry_indexes, &ctx->msg_idx, 1);
+ msg = array_idx_modifiable(&ctx->iter->new_msgs, ctx->msg_idx);
+ i_assert(msg->saved);
+ msg->saved = FALSE;
+
+ if (ctx->iter->next_new_msg > ctx->msg_idx)
+ ctx->iter->next_new_msg = ctx->msg_idx;
+
+ dsync_brain_sync_remove_guid_instance(ctx->iter, msg);
}
if (--ctx->iter->copy_results_left == 0 && !ctx->iter->adding_msgs)
dsync_brain_msg_sync_add_new_msgs(ctx->iter);
+ i_free(ctx);
}
static int
dsync_brain_msg_sync_add_new_msg(struct dsync_brain_msg_iter *dest_iter,
const mailbox_guid_t *src_mailbox,
unsigned int msg_idx,
- const struct dsync_brain_new_msg *msg)
+ struct dsync_brain_new_msg *msg)
{
struct dsync_brain_msg_save_context *save_ctx;
struct dsync_brain_msg_copy_context *copy_ctx;
@@ -83,14 +135,15 @@
const struct dsync_brain_guid_instance *inst;
const struct dsync_brain_mailbox *inst_box;
+ msg->saved = TRUE;
+
inst = hash_table_lookup(dest_iter->guid_hash, msg->msg->guid);
if (inst != NULL) {
/* we can save this by copying an existing message */
inst_box = array_idx(&dest_iter->sync->mailboxes,
inst->mailbox_idx);
- copy_ctx = p_new(dest_iter->sync->pool,
- struct dsync_brain_msg_copy_context, 1);
+ copy_ctx = i_new(struct dsync_brain_msg_copy_context, 1);
copy_ctx->iter = dest_iter;
copy_ctx->msg_idx = msg_idx;
@@ -104,11 +157,9 @@
dest_iter->sync->src_msg_iter :
dest_iter->sync->dest_msg_iter;
- save_ctx = p_new(src_iter->sync->pool,
- struct dsync_brain_msg_save_context, 1);
+ save_ctx = i_new(struct dsync_brain_msg_save_context, 1);
save_ctx->iter = dest_iter;
- save_ctx->msg = dsync_message_dup(src_iter->sync->pool,
- msg->msg);
+ save_ctx->msg = msg->msg;
save_ctx->mailbox_idx = dest_iter->mailbox_idx;
dest_iter->adding_msgs = TRUE;
@@ -126,12 +177,14 @@
dsync_brain_mailbox_add_new_msgs(struct dsync_brain_msg_iter *iter,
const mailbox_guid_t *mailbox_guid)
{
- const struct dsync_brain_new_msg *msgs;
+ struct dsync_brain_new_msg *msgs;
unsigned int i, msg_count;
bool ret = TRUE;
- msgs = array_get(&iter->new_msgs, &msg_count);
+ msgs = array_get_modifiable(&iter->new_msgs, &msg_count);
for (i = iter->next_new_msg; i < msg_count; i++) {
+ if (msgs[i].saved)
+ continue;
if (msgs[i].mailbox_idx != iter->mailbox_idx) {
i_assert(msgs[i].mailbox_idx > iter->mailbox_idx);
ret = FALSE;
@@ -175,48 +228,46 @@
}
static void
-dsync_brain_mailbox_retry_copies(struct dsync_brain_msg_iter *iter,
- const mailbox_guid_t *mailbox_guid)
+dsync_brain_msg_sync_finish(struct dsync_brain_msg_iter *iter)
{
- const uint32_t *indexes;
- const struct dsync_brain_new_msg *msgs;
- unsigned int i, msg_idx, idx_count, msg_count;
- struct dsync_brain_guid_instance *inst;
- const char *guid_str;
- void *orig_key, *orig_value;
+ struct dsync_brain_mailbox_sync *sync = iter->sync;
- /* first remove GUID instances that had failed. */
- msgs = array_get(&iter->new_msgs, &msg_count);
- indexes = array_get(&iter->copy_retry_indexes, &idx_count);
- for (i = 0; i < idx_count; i++) {
- guid_str = msgs[indexes[i]].msg->guid;
- if (hash_table_lookup_full(iter->guid_hash, guid_str,
- &orig_key, &orig_value))
- inst = orig_value;
- else
- inst = NULL;
- if (inst != NULL && inst->failed) {
- inst = inst->next;
- if (inst == NULL)
- hash_table_remove(iter->guid_hash, guid_str);
- else {
- hash_table_update(iter->guid_hash, orig_key,
- inst);
- }
+ iter->msgs_sent = TRUE;
+
+ /* done with all mailboxes from this iter */
+ dsync_worker_set_input_callback(iter->worker, NULL, NULL);
+
+ if (sync->src_msg_iter->msgs_sent &&
+ sync->dest_msg_iter->msgs_sent &&
+ sync->src_msg_iter->save_results_left == 0 &&
+ sync->dest_msg_iter->save_results_left == 0 &&
+ dsync_worker_output_flush(sync->dest_worker) > 0 &&
+ dsync_worker_output_flush(sync->src_worker) > 0) {
+ sync->brain->state++;
+ dsync_brain_sync(sync->brain);
+ }
+}
+
+static bool
+dsync_brain_msg_sync_select_mailbox(struct dsync_brain_msg_iter *iter)
+{
+ const struct dsync_brain_mailbox *mailbox;
+
+ while (iter->mailbox_idx < array_count(&iter->sync->mailboxes)) {
+ if (array_count(&iter->new_msgs) == 0 &&
+ array_count(&iter->uid_conflicts) == 0) {
+ /* optimization: don't even bother selecting this
+ mailbox */
+ iter->mailbox_idx++;
+ continue;
}
+
+ mailbox = array_idx(&iter->sync->mailboxes, iter->mailbox_idx);
+ dsync_worker_select_mailbox(iter->worker, &mailbox->box);
+ return TRUE;
}
-
- /* try saving again. there probably weren't many of them, so don't
- worry about filling output buffer. */
- for (i = 0; i < idx_count; i++) {
- msg_idx = indexes[i];
- // FIXME: if buffer fills, we assert-crash
- (void)dsync_brain_msg_sync_add_new_msg(iter, mailbox_guid,
- msg_idx, &msgs[msg_idx]);
- }
-
- /* if we copied anything, we'll again have to wait for the results */
- array_clear(&iter->copy_retry_indexes);
+ dsync_brain_msg_sync_finish(iter);
+ return FALSE;
}
static void
@@ -225,19 +276,9 @@
const struct dsync_brain_mailbox *mailbox;
const mailbox_guid_t *mailbox_guid;
- while (iter->mailbox_idx < array_count(&iter->sync->mailboxes)) {
+ do {
mailbox = array_idx(&iter->sync->mailboxes, iter->mailbox_idx);
mailbox_guid = &mailbox->box.mailbox_guid;
-
- if (array_count(&iter->new_msgs) == 0) {
- /* optimization: don't even bother selecting the
- mailbox */
- iter->mailbox_idx++;
- continue;
- }
-
- dsync_worker_select_mailbox(iter->worker, &mailbox->box);
-
if (dsync_brain_mailbox_add_new_msgs(iter, mailbox_guid)) {
/* continue later */
return;
@@ -246,11 +287,6 @@
/* all messages saved for this mailbox. continue with saving
its conflicts and waiting for copies to finish. */
dsync_brain_mailbox_save_conflicts(iter);
-
- while (iter->copy_results_left == 0 &&
- array_count(&iter->copy_retry_indexes) > 0)
- dsync_brain_mailbox_retry_copies(iter, mailbox_guid);
-
if (iter->copy_results_left > 0) {
/* wait for copies to finish */
return;
@@ -258,21 +294,7 @@
/* done with this mailbox, try the next one */
iter->mailbox_idx++;
- }
- iter->msgs_sent = TRUE;
-
- /* done with all mailboxes from this iter */
- dsync_worker_set_input_callback(iter->worker, NULL, NULL);
-
- if (iter->sync->src_msg_iter->msgs_sent &&
- iter->sync->dest_msg_iter->msgs_sent &&
- iter->sync->src_msg_iter->save_results_left == 0 &&
- iter->sync->dest_msg_iter->save_results_left == 0 &&
- dsync_worker_output_flush(iter->sync->dest_worker) > 0 &&
- dsync_worker_output_flush(iter->sync->src_worker) > 0) {
- iter->sync->brain->state++;
- dsync_brain_sync(iter->sync->brain);
- }
+ } while (dsync_brain_msg_sync_select_mailbox(iter));
}
static void dsync_worker_new_msg_output(void *context)
@@ -318,13 +340,16 @@
{
iter->mailbox_idx = 0;
+ /* sort input by 1) mailbox, 2) new message UID */
array_sort(&iter->new_msgs, dsync_brain_new_msg_cmp);
array_sort(&iter->uid_conflicts, dsync_brain_uid_conflict_cmp);
dsync_worker_set_input_callback(iter->worker, NULL, iter);
dsync_worker_set_output_callback(iter->worker,
More information about the dovecot-cvs
mailing list