dovecot-2.0: dsync: Lots of updates and fixes.
dovecot at dovecot.org
dovecot at dovecot.org
Thu Aug 6 03:30:54 EEST 2009
details: http://hg.dovecot.org/dovecot-2.0/rev/d9a96da46d4a
changeset: 9736:d9a96da46d4a
user: Timo Sirainen <tss at iki.fi>
date: Wed Aug 05 20:30:43 2009 -0400
description:
dsync: Lots of updates and fixes.
diffstat:
25 files changed, 1449 insertions(+), 466 deletions(-)
src/dsync/dsync-brain-msgs-new.c | 299 +++++++++++-------
src/dsync/dsync-brain-msgs.c | 47 +-
src/dsync/dsync-brain-private.h | 14
src/dsync/dsync-brain.c | 237 ++++++++++----
src/dsync/dsync-brain.h | 10
src/dsync/dsync-data.c | 28 -
src/dsync/dsync-data.h | 20 +
src/dsync/dsync-proxy-client.c | 167 ++++++++--
src/dsync/dsync-proxy-server-cmd.c | 134 +++++---
src/dsync/dsync-proxy-server.c | 4
src/dsync/dsync-proxy-server.h | 2
src/dsync/dsync-proxy.c | 92 +++--
src/dsync/dsync-proxy.h | 3
src/dsync/dsync-worker-local.c | 497 +++++++++++++++++++++++++++----
src/dsync/dsync-worker-private.h | 10
src/dsync/dsync-worker.c | 41 ++
src/dsync/dsync-worker.h | 21 +
src/dsync/dsync.c | 40 +-
src/dsync/test-dsync-brain-msgs.c | 20 -
src/dsync/test-dsync-brain.c | 78 +++-
src/dsync/test-dsync-common.c | 11
src/dsync/test-dsync-proxy-server-cmd.c | 74 ++++
src/dsync/test-dsync-proxy.c | 10
src/dsync/test-dsync-worker.c | 54 +++
src/dsync/test-dsync-worker.h | 2
diffs (truncated from 3395 to 300 lines):
diff -r d354dc450c63 -r d9a96da46d4a src/dsync/dsync-brain-msgs-new.c
--- a/src/dsync/dsync-brain-msgs-new.c Wed Aug 05 20:30:06 2009 -0400
+++ b/src/dsync/dsync-brain-msgs-new.c Wed Aug 05 20:30:43 2009 -0400
@@ -2,6 +2,7 @@
#include "lib.h"
#include "array.h"
+#include "istream.h"
#include "hash.h"
#include "dsync-worker.h"
#include "dsync-brain-private.h"
@@ -13,42 +14,36 @@ struct dsync_brain_msg_copy_context {
struct dsync_brain_msg_save_context {
struct dsync_brain_msg_iter *iter;
-
- mailbox_guid_t mailbox;
const struct dsync_message *msg;
};
static void
-dsync_brain_msg_sync_retry_copies(struct dsync_brain_mailbox_sync *sync);
-
-static bool
-dsync_brain_msg_sync_is_save_done(struct dsync_brain_mailbox_sync *sync)
-{
- return sync->src_msg_iter->copy_results_left == 0 &&
- sync->dest_msg_iter->copy_results_left == 0 &&
- sync->src_msg_iter->save_results_left == 0 &&
- sync->dest_msg_iter->save_results_left == 0;
-}
+dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter);
static void msg_get_callback(enum dsync_msg_get_result result,
- struct dsync_msg_static_data *data,
+ const struct dsync_msg_static_data *data,
void *context)
{
struct dsync_brain_msg_save_context *ctx = context;
+ struct istream *input;
switch (result) {
case DSYNC_MSG_GET_RESULT_SUCCESS:
- dsync_worker_select_mailbox(ctx->iter->worker, &ctx->mailbox);
+ input = data->input;
dsync_worker_msg_save(ctx->iter->worker, ctx->msg, data);
+ i_stream_unref(&input);
break;
case DSYNC_MSG_GET_RESULT_EXPUNGED:
/* mail got expunged during sync. just skip this. */
break;
case DSYNC_MSG_GET_RESULT_FAILED:
+ i_error("msg-get failed: uid=%u guid=%s",
+ ctx->msg->uid, ctx->msg->guid);
dsync_brain_fail(ctx->iter->sync->brain);
break;
}
- ctx->iter->save_results_left--;
+ if (--ctx->iter->save_results_left == 0 && !ctx->iter->adding_msgs)
+ dsync_brain_msg_sync_add_new_msgs(ctx->iter);
}
static void dsync_brain_copy_callback(bool success, void *context)
@@ -57,7 +52,6 @@ static void dsync_brain_copy_callback(bo
const struct dsync_brain_new_msg *msg;
struct dsync_brain_guid_instance *inst;
- ctx->iter->copy_results_left--;
if (!success) {
/* mark the guid instance invalid and try again later */
msg = array_idx(&ctx->iter->new_msgs, ctx->msg_idx);
@@ -66,17 +60,15 @@ static void dsync_brain_copy_callback(bo
array_append(&ctx->iter->copy_retry_indexes, &ctx->msg_idx, 1);
}
- if (dsync_brain_msg_sync_is_save_done(ctx->iter->sync)) {
- ctx->iter->sync->brain->state++;
- dsync_brain_sync(ctx->iter->sync->brain);
- }
+ if (--ctx->iter->copy_results_left == 0 && !ctx->iter->adding_msgs)
+ dsync_brain_msg_sync_add_new_msgs(ctx->iter);
}
static int
dsync_brain_msg_sync_add_new_msg(struct dsync_brain_msg_iter *dest_iter,
const mailbox_guid_t *src_mailbox,
unsigned int msg_idx,
- const struct dsync_message *msg)
+ const struct dsync_brain_new_msg *msg)
{
struct dsync_brain_msg_save_context *save_ctx;
struct dsync_brain_msg_copy_context *copy_ctx;
@@ -84,10 +76,9 @@ dsync_brain_msg_sync_add_new_msg(struct
const struct dsync_brain_guid_instance *inst;
const struct dsync_brain_mailbox *inst_box;
- inst = hash_table_lookup(dest_iter->guid_hash, msg->guid);
+ inst = hash_table_lookup(dest_iter->guid_hash, msg->msg->guid);
if (inst != NULL) {
/* we can save this by copying an existing message */
- dsync_worker_select_mailbox(dest_iter->worker, src_mailbox);
inst_box = array_idx(&dest_iter->sync->mailboxes,
inst->mailbox_idx);
@@ -96,9 +87,10 @@ dsync_brain_msg_sync_add_new_msg(struct
copy_ctx->iter = dest_iter;
copy_ctx->msg_idx = msg_idx;
- dsync_worker_msg_copy(dest_iter->worker, &inst_box->box.guid,
- inst->uid, msg, dsync_brain_copy_callback,
- copy_ctx);
+ dsync_worker_msg_copy(dest_iter->worker,
+ &inst_box->box.mailbox_guid,
+ inst->uid, msg->msg,
+ dsync_brain_copy_callback, copy_ctx);
dest_iter->copy_results_left++;
} else {
src_iter = dest_iter == dest_iter->sync->dest_msg_iter ?
@@ -108,79 +100,78 @@ dsync_brain_msg_sync_add_new_msg(struct
save_ctx = p_new(src_iter->sync->pool,
struct dsync_brain_msg_save_context, 1);
save_ctx->iter = dest_iter;
- save_ctx->mailbox = *src_mailbox;
- save_ctx->msg = dsync_message_dup(src_iter->sync->pool, msg);
-
- dsync_worker_select_mailbox(src_iter->worker, src_mailbox);
- dsync_worker_msg_get(src_iter->worker, msg->uid,
- msg_get_callback, save_ctx);
+ save_ctx->msg = dsync_message_dup(src_iter->sync->pool,
+ msg->msg);
+
+ dest_iter->adding_msgs = TRUE;
dest_iter->save_results_left++;
+ dsync_worker_msg_get(src_iter->worker, src_mailbox,
+ msg->orig_uid, msg_get_callback, save_ctx);
+ dest_iter->adding_msgs = FALSE;
+ if (dsync_worker_output_flush(src_iter->worker) < 0)
+ return -1;
}
return dsync_worker_is_output_full(dest_iter->worker) ? 0 : 1;
}
-static void
-dsync_brain_msg_iter_add_new_msgs(struct dsync_brain_msg_iter *dest_iter)
-{
- const struct dsync_brain_mailbox *mailboxes, *mailbox;
+static bool
+dsync_brain_mailbox_add_new_msgs(struct dsync_brain_msg_iter *iter,
+ const mailbox_guid_t *mailbox_guid)
+{
const struct dsync_brain_new_msg *msgs;
- unsigned int i, mailbox_count, msg_count;
-
- mailboxes = array_get(&dest_iter->sync->mailboxes, &mailbox_count);
- msgs = array_get(&dest_iter->new_msgs, &msg_count);
- for (i = dest_iter->next_new_msg; i < msg_count; i++) {
- mailbox = &mailboxes[msgs[i].mailbox_idx];
- if (dsync_brain_msg_sync_add_new_msg(dest_iter,
- &mailbox->box.guid, i,
- msgs[i].msg) <= 0) {
+ unsigned int i, msg_count;
+ bool ret = TRUE;
+
+ msgs = array_get(&iter->new_msgs, &msg_count);
+ for (i = iter->next_new_msg; i < msg_count; i++) {
+ if (msgs[i].mailbox_idx != iter->mailbox_idx) {
+ ret = FALSE;
+ break;
+ }
+ if (dsync_brain_msg_sync_add_new_msg(iter, mailbox_guid,
+ i, &msgs[i]) <= 0) {
/* failed / continue later */
- dest_iter->next_new_msg = i + 1;
+ i++;
break;
}
}
- dest_iter->msgs_sent = TRUE;
-}
-
-static void
-dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter)
-{
- dsync_brain_msg_iter_add_new_msgs(iter);
-
- if (iter->sync->dest_msg_iter->msgs_sent &&
- iter->sync->src_msg_iter->msgs_sent &&
- dsync_brain_msg_sync_is_save_done(iter->sync))
- dsync_brain_msg_sync_retry_copies(iter->sync);
-}
-
-static void dsync_worker_new_msg_output(void *context)
-{
- struct dsync_brain_msg_iter *iter = context;
-
- dsync_brain_msg_sync_add_new_msgs(iter);
-}
-
-static void
-dsync_brain_msg_iter_sync_new_msgs(struct dsync_brain_msg_iter *iter)
-{
- dsync_worker_set_input_callback(iter->worker, NULL, iter);
- dsync_worker_set_output_callback(iter->worker,
- dsync_worker_new_msg_output, iter);
- dsync_brain_msg_sync_add_new_msgs(iter);
-}
-
-void dsync_brain_msg_sync_new_msgs(struct dsync_brain_mailbox_sync *sync)
-{
- dsync_brain_msg_iter_sync_new_msgs(sync->src_msg_iter);
- dsync_brain_msg_iter_sync_new_msgs(sync->dest_msg_iter);
-}
-
-static void
-dsync_brain_msg_iter_sync_retry_copies(struct dsync_brain_msg_iter *iter)
+ iter->next_new_msg = i;
+ if (i == msg_count)
+ ret = FALSE;
+
+ /* flush copy commands */
+ if (dsync_worker_output_flush(iter->worker) > 0 && ret) {
+ /* we have more space again, continue */
+ return dsync_brain_mailbox_add_new_msgs(iter, mailbox_guid);
+ } else {
+ return ret;
+ }
+}
+
+static void
+dsync_brain_mailbox_save_conflicts(struct dsync_brain_msg_iter *iter)
+{
+ const struct dsync_brain_uid_conflict *conflicts;
+ unsigned int i, count;
+
+ conflicts = array_get(&iter->uid_conflicts, &count);
+ for (i = iter->next_conflict; i < count; i++) {
+ if (conflicts[i].mailbox_idx != iter->mailbox_idx)
+ break;
+
+ dsync_worker_msg_update_uid(iter->worker, conflicts[i].old_uid,
+ conflicts[i].new_uid);
+ }
+ iter->next_conflict = i;
+}
+
+static void
+dsync_brain_mailbox_retry_copies(struct dsync_brain_msg_iter *iter,
+ const mailbox_guid_t *mailbox_guid)
{
const uint32_t *indexes;
- const struct dsync_brain_mailbox *mailboxes, *mailbox;
const struct dsync_brain_new_msg *msgs;
- unsigned int i, msg_idx, idx_count, msg_count, mailbox_count;
+ unsigned int i, msg_idx, idx_count, msg_count;
struct dsync_brain_guid_instance *inst;
const char *guid_str;
void *orig_key, *orig_value;
@@ -208,37 +199,120 @@ dsync_brain_msg_iter_sync_retry_copies(s
/* try saving again. there probably weren't many of them, so don't
worry about filling output buffer. */
- mailboxes = array_get(&iter->sync->mailboxes, &mailbox_count);
for (i = 0; i < idx_count; i++) {
msg_idx = indexes[i];
- mailbox = &mailboxes[msgs[msg_idx].mailbox_idx];
- (void)dsync_brain_msg_sync_add_new_msg(iter, &mailbox->box.guid,
- msg_idx,
- msgs[msg_idx].msg);
+ (void)dsync_brain_msg_sync_add_new_msg(iter, mailbox_guid,
+ msg_idx, &msgs[msg_idx]);
}
/* if we copied anything, we'll again have to wait for the results */
array_clear(&iter->copy_retry_indexes);
- dsync_worker_set_output_callback(iter->worker, NULL, NULL);
-}
-
-static void
-dsync_brain_msg_sync_retry_copies(struct dsync_brain_mailbox_sync *sync)
-{
- dsync_brain_msg_iter_sync_retry_copies(sync->dest_msg_iter);
- dsync_brain_msg_iter_sync_retry_copies(sync->src_msg_iter);
-
- if (dsync_brain_msg_sync_is_save_done(sync)) {
- dsync_worker_set_input_callback(sync->src_worker, NULL, NULL);
- dsync_worker_set_input_callback(sync->dest_worker, NULL, NULL);
- sync->brain->state++;
- dsync_brain_sync(sync->brain);
- } else {
- /* temporarily move back the state. once copies have returned
- success/failures, we'll get back to this function and see
- if we need to retry again */
- sync->brain->state--;
- }
+}
+
+static void
+dsync_brain_msg_sync_add_new_msgs(struct dsync_brain_msg_iter *iter)
+{
+ const struct dsync_brain_mailbox *mailbox;
More information about the dovecot-cvs
mailing list