diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/mu-indexer.cc | 176 | ||||
| -rw-r--r-- | lib/mu-labels-cache.hh | 5 | ||||
| -rw-r--r-- | lib/mu-store.cc | 65 | ||||
| -rw-r--r-- | lib/mu-store.hh | 19 |
4 files changed, 240 insertions, 25 deletions
diff --git a/lib/mu-indexer.cc b/lib/mu-indexer.cc index be956ad..6f94e95 100644 --- a/lib/mu-indexer.cc +++ b/lib/mu-indexer.cc @@ -29,6 +29,8 @@ #include <unordered_map> #include <unordered_set> #include <chrono> +#include <string_view> +#include <ranges> using namespace std::chrono_literals; #include "mu-store.hh" @@ -45,7 +47,8 @@ struct IndexState { enum State { Idle, Scanning, Finishing, - Cleaning + Cleaning, + Aborting, }; static const char* name(State s) { switch (s) { @@ -57,6 +60,8 @@ struct IndexState { return "finishing"; case Cleaning: return "cleaning"; + case Aborting: + return "aborting"; default: return "<error>"; } @@ -109,7 +114,8 @@ struct Indexer::Private { bool add_message(const std::string& path); - bool cleanup(); + void cleanup_from_scratch(); + void cleanup_incremental(); bool start(const Indexer::Config& conf, bool block); bool stop(); @@ -137,10 +143,16 @@ struct Indexer::Private { Progress progress_{}; IndexState state_{}; std::mutex lock_, w_lock_; + Option<time_t> started_; std::atomic<time_t> completed_{}; bool was_empty_{}; uint64_t last_index_{}; + + // pathnames we've seen traversing the maildir hierarchy. entries + // with a trailing slash are maildir directories we've skipped as + // up-to-date. + std::vector<std::string> seen_maildir_paths_; }; bool @@ -168,6 +180,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, htype == Scanner::HandleType::EnterNewCur) { mu_debug("skip {} (seems up-to-date: {:%FT%T} >= {:%FT%T})", fullpath, mu_time(dirstamp_), mu_time(statbuf->st_ctime)); + seen_maildir_paths_.emplace_back(fullpath + "/"); return false; } @@ -184,6 +197,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, auto noupdate = ::access((fullpath + "/.noupdate").c_str(), F_OK) == 0; if (noupdate) { mu_debug("skip {} (has .noupdate)", fullpath); + seen_maildir_paths_.emplace_back(fullpath + "/"); return false; } } @@ -198,6 +212,8 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, case Scanner::HandleType::File: { ++progress_.checked; + seen_maildir_paths_.push_back(fullpath); + if (conf_.lazy_check && static_cast<uint64_t>(statbuf->st_ctime) < last_index_) { // in lazy mode, ignore the file if it has not changed // since the last indexing op. @@ -261,7 +277,7 @@ Indexer::Private::handle_item(WorkItem&& item) ++progress_.updated; } break; case WorkItem::Type::Dir: - store_.set_dirstamp(item.full_path, ::time(NULL)); + store_.set_dirstamp(item.full_path, started_.value()); break; default: g_warn_if_reached(); @@ -272,12 +288,11 @@ Indexer::Private::handle_item(WorkItem&& item) } } -bool -Indexer::Private::cleanup() +void +Indexer::Private::cleanup_from_scratch() { - mu_debug("starting cleanup"); + mu_debug("starting cleanup without using scan results"); - size_t n{}; std::vector<Store::Id> orphans; // store messages without files. using DirFiles = std::unordered_set<std::string>; @@ -309,7 +324,6 @@ Indexer::Private::cleanup() }; store_.for_each_message_path([&](Store::Id id, const std::string& path) { - ++n; if (!is_file_present(path)) { mu_debug("cannot read {} (id={}); queuing for removal from store", path, id); @@ -326,14 +340,100 @@ Indexer::Private::cleanup() store_.remove_messages(orphans); progress_.removed += orphans.size(); } +} + +void +Indexer::Private::cleanup_incremental() +{ + mu_debug("starting cleanup after scan"); + + // Sort the seen paths into the same order Xapian will give its terms to us + std::vector<std::string> fs_terms; + fs_terms.reserve(seen_maildir_paths_.size()); + for (std::string_view fullpath : seen_maildir_paths_) + fs_terms.emplace_back(field_from_id(Field::Id::Path).xapian_term(fullpath)); + std::ranges::sort(fs_terms); + + // Discard duplicates from fs_terms in case two paths collided to one term, e.g. over + // case. That shouldn't happen, but be correct-ish if it does. + auto [new_end, old_end] = std::ranges::unique(fs_terms); + if (new_end != old_end) { + mu_warning("collisions under term normalization: using regular cleanup"); + cleanup_from_scratch(); + return; + } - return true; + fs_terms.erase(new_end, old_end); + seen_maildir_paths_.clear(); + + // Walk through all the path terms. If the DB has a path term that we didn't see in our + // filesystem walk above, add it the orphans list for removal from the DB. If we were in + // lazy scan mode, we may have skipped some directories entirely: these are represented by + // entries in fs_terms with a trailing slash. When we see one, we deem all DB entries + // that have the skip entry as a prefix as present. + + size_t fs_terms_pos = 0; + auto current_fs_term = [&]() -> std::string_view { + if (fs_terms_pos < fs_terms.size()) + return fs_terms[fs_terms_pos]; + // N.B. '~' compares greater than the start of any field shortcut, so use it as an + // after-the-end sentinel. + return "~"; + }; + + std::vector<std::string> orphan_terms; + + auto handle_db_term = [&](std::string_view db_term) { + for (;;) { + std::string_view fs_term = current_fs_term(); + bool is_wildcard = fs_term.ends_with('/'); + if (is_wildcard && db_term.starts_with(fs_term)) { + return true; + } + int cmp = db_term.compare(fs_term); + if (cmp < 0) { + mu_debug("orphan in db={} but not fs={}", db_term, fs_term); + orphan_terms.emplace_back(db_term); + return true; + } + + if (cmp == 0) { + ++fs_terms_pos; + return true; + } + + ++fs_terms_pos; + // FS has an entry not in the DB. If not a directory, we should have + // indexed it. + if (!is_wildcard) + mu_warning("unexpectedly unindexed message: {}", fs_term); + } + }; + + store_.for_each_term(Field::Id::Path, handle_db_term); + + if (orphan_terms.empty()) + mu_debug("nothing to clean up"); + else { + size_t nr_removed = store_.remove_messages_by_term( + orphan_terms, [&](size_t nr_cleaned_up_so_far) { + progress_.removed = nr_cleaned_up_so_far; + }); + mu_debug("removing {} stale message(s) from store", nr_removed); + progress_.removed = nr_removed; + if (nr_removed != orphan_terms.size()) + mu_warning("should have removed {} messages but actually removed {}", + orphan_terms.size(), nr_removed); + } } void Indexer::Private::scan_worker() { progress_.reset(); + seen_maildir_paths_.clear(); + started_ = time(NULL); + if (conf_.scan) { mu_debug("starting scanner"); if (!scanner_.start()) { // blocks. @@ -344,21 +444,61 @@ Indexer::Private::scan_worker() mu_debug("scanner finished"); } - // and let the worker finish their work. - state_.change_to(IndexState::Finishing); + enum class CleanupKind { + None, + FromScratch, + Incremental, + }; + + CleanupKind cleanup_kind = CleanupKind::Incremental; + bool aborted = state_ == IndexState::Aborting; + + if (cleanup_kind >= CleanupKind::None && !conf_.cleanup) { + mu_debug("cleanup: not running as requested"); + cleanup_kind = CleanupKind::None; + } + + if (cleanup_kind > CleanupKind::None && aborted) { + mu_debug("cleanup: disabling because indexer aborted"); + cleanup_kind = CleanupKind::None; + } + + if (cleanup_kind >= CleanupKind::Incremental && + g_getenv("MU_NO_INCREMENTAL_CLEANUP")) { + mu_debug("cleanup: not using incremental: MU_NO_INCREMENTAL_CLEANUP in environ"); + cleanup_kind = CleanupKind::FromScratch; + } + + if (cleanup_kind >= CleanupKind::Incremental && !conf_.scan) { + mu_debug("cleanup: not using incremental: scan not done"); + cleanup_kind = CleanupKind::FromScratch; + } + + state_.change_to(IndexState::Cleaning); + + switch (cleanup_kind) { + case CleanupKind::None: + break; + case CleanupKind::FromScratch: + cleanup_from_scratch(); + break; + case CleanupKind::Incremental: + cleanup_incremental(); + break; + } - if (conf_.cleanup) { - mu_debug("starting cleanup"); - state_.change_to(IndexState::Cleaning); - cleanup(); - mu_debug("cleanup finished"); + aborted = state_ == IndexState::Aborting; + if (!aborted) { + // Store started time, not ending time, so that next time we run we know to scan + // anything that appeared during our scan. + store_.config().set<Mu::Config::Id::LastIndex>(started_.value()); } completed_ = ::time({}); // attempt to commit to disk. store_.xapian_db().request_commit(true); - store_.config().set<Mu::Config::Id::LastIndex>(completed_); state_.change_to(IndexState::Idle); + started_ = Nothing; } bool @@ -399,6 +539,8 @@ Indexer::Private::start(const Indexer::Config& conf, bool block) bool Indexer::Private::stop() { + if (state_ != IndexState::Idle) + state_.change_to(IndexState::Aborting); scanner_.stop(); if (scanner_worker_.joinable()) diff --git a/lib/mu-labels-cache.hh b/lib/mu-labels-cache.hh index 9f95d09..e465a55 100644 --- a/lib/mu-labels-cache.hh +++ b/lib/mu-labels-cache.hh @@ -131,6 +131,11 @@ public: */ Result<void> restore(const Store& store); + /** + * @return whether the labels map is non-empty + */ + bool empty() const { return label_map_.empty(); } + private: /** * Deserialize the cache into a Map diff --git a/lib/mu-store.cc b/lib/mu-store.cc index c67b551..de061ac 100644 --- a/lib/mu-store.cc +++ b/lib/mu-store.cc @@ -22,6 +22,7 @@ #include <chrono> #include <mutex> +#include <algorithm> #include <array> #include <cstdlib> #include <stdexcept> @@ -130,6 +131,9 @@ struct Store::Private { Option<const std::string&> target_mdir, Option<Flags> new_flags, MoveOptions opts); + + bool remove_message_by_id_unlocked(Store::Id id); + XapianDb xapian_db_; Config config_; ContactsCache contacts_cache_; @@ -430,17 +434,62 @@ Store::remove_messages(const std::vector<Store::Id>& ids) xapian_db().request_transaction(); - for (auto&& id : ids) { - if (const auto msg = priv_->find_message_unlocked(id); !msg) { - mu_warning("cannot find document {} for deletion", id); - } else { - for (auto&& label: msg->labels()) - priv_->labels_cache_.decrease(label); - xapian_db().delete_document(id); - } + for (auto&& id : ids) + priv_->remove_message_by_id_unlocked(id); + + xapian_db().request_commit(true/*force*/); +} + +bool +Store::Private::remove_message_by_id_unlocked(Store::Id id) +{ + if (auto msg = labels_cache_.empty() ? Nothing : find_message_unlocked(id); msg) + for (auto&& label: msg->labels()) + labels_cache_.decrease(label); + + if (!xapian_db_.delete_document(id)) { + mu_warning("cannot find document {} for deletion", id); + return false; + } + + return true; +} + +size_t +Store::remove_messages_by_term(std::span<const std::string> terms, + std::function<void (size_t)> progress_fn) +{ + std::unique_lock lock{priv_->lock_}; + size_t nr_removed = 0; + std::vector<Xapian::Query> qvec; + std::vector<Store::Id> ids_to_remove; + + xapian_db().request_transaction(); + + while (!terms.empty()) { + auto chunk = terms.subspan(0, std::min<size_t>(terms.size(), 1024)); + terms = terms.subspan(chunk.size()); + qvec.clear(); + qvec.reserve(chunk.size()); + std::ranges::copy(chunk, std::back_inserter(qvec)); + auto enq = xapian_db().enquire(); + enq.set_weighting_scheme(Xapian::BoolWeight()); // No score + enq.set_docid_order(Xapian::Enquire::ASCENDING); + enq.set_query(Xapian::Query{Xapian::Query::OP_OR, qvec.begin(), qvec.end()}); + auto mset = enq.get_mset(0, xapian_db().size()); + ids_to_remove.insert(ids_to_remove.end(), mset.begin(), mset.end()); + } + + // Sort the IDs to remove to make Xapian tree traversal easier + std::ranges::sort(ids_to_remove); + for (Id id : ids_to_remove) { + nr_removed += priv_->remove_message_by_id_unlocked(id); + if (nr_removed % 500 == 0) + progress_fn(nr_removed); } xapian_db().request_commit(true/*force*/); + return nr_removed; } Option<Message> diff --git a/lib/mu-store.hh b/lib/mu-store.hh index 76f3a36..da0ec56 100644 --- a/lib/mu-store.hh +++ b/lib/mu-store.hh @@ -22,9 +22,11 @@ #include <string> #include <vector> +#include <span> #include <mutex> #include <ctime> #include <memory> +#include <functional> #include "mu-contacts-cache.hh" #include "mu-labels-cache.hh" @@ -261,6 +263,23 @@ public: void remove_message(Id id) { remove_messages({id}); } /** + * Remove a number if messages from the store. It will _not_ remove the + * message from the file system. + * + * It's more efficient to use this function than to translate the terms to docids and then + * call remove_messages() with the ids: this way, we can fuse the ID lookup and the + * deletion, skip post-translation existence steps, and do far fewer Xapian + * B-tree traversals. + * + * @param ids vector with terms for the message + * @param progress_fn called occasionally to update number of removed messages; + * called occasionally with cumulative number of messages removed so far + * @return number of messages removed overall + */ + size_t remove_messages_by_term(std::span<const std::string> terms, + std::function<void (size_t)> progress_fn); + + /** * Find message in the store. * * @param id doc id for the message to find |
