summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/mu-indexer.cc176
-rw-r--r--lib/mu-labels-cache.hh5
-rw-r--r--lib/mu-store.cc65
-rw-r--r--lib/mu-store.hh19
4 files changed, 240 insertions, 25 deletions
diff --git a/lib/mu-indexer.cc b/lib/mu-indexer.cc
index be956ad..6f94e95 100644
--- a/lib/mu-indexer.cc
+++ b/lib/mu-indexer.cc
@@ -29,6 +29,8 @@
#include <unordered_map>
#include <unordered_set>
#include <chrono>
+#include <string_view>
+#include <ranges>
using namespace std::chrono_literals;
#include "mu-store.hh"
@@ -45,7 +47,8 @@ struct IndexState {
enum State { Idle,
Scanning,
Finishing,
- Cleaning
+ Cleaning,
+ Aborting,
};
static const char* name(State s) {
switch (s) {
@@ -57,6 +60,8 @@ struct IndexState {
return "finishing";
case Cleaning:
return "cleaning";
+ case Aborting:
+ return "aborting";
default:
return "<error>";
}
@@ -109,7 +114,8 @@ struct Indexer::Private {
bool add_message(const std::string& path);
- bool cleanup();
+ void cleanup_from_scratch();
+ void cleanup_incremental();
bool start(const Indexer::Config& conf, bool block);
bool stop();
@@ -137,10 +143,16 @@ struct Indexer::Private {
Progress progress_{};
IndexState state_{};
std::mutex lock_, w_lock_;
+ Option<time_t> started_;
std::atomic<time_t> completed_{};
bool was_empty_{};
uint64_t last_index_{};
+
+ // pathnames we've seen traversing the maildir hierarchy. entries
+ // with a trailing slash are maildir directories we've skipped as
+ // up-to-date.
+ std::vector<std::string> seen_maildir_paths_;
};
bool
@@ -168,6 +180,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
htype == Scanner::HandleType::EnterNewCur) {
mu_debug("skip {} (seems up-to-date: {:%FT%T} >= {:%FT%T})",
fullpath, mu_time(dirstamp_), mu_time(statbuf->st_ctime));
+ seen_maildir_paths_.emplace_back(fullpath + "/");
return false;
}
@@ -184,6 +197,7 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
auto noupdate = ::access((fullpath + "/.noupdate").c_str(), F_OK) == 0;
if (noupdate) {
mu_debug("skip {} (has .noupdate)", fullpath);
+ seen_maildir_paths_.emplace_back(fullpath + "/");
return false;
}
}
@@ -198,6 +212,8 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf,
case Scanner::HandleType::File: {
++progress_.checked;
+ seen_maildir_paths_.push_back(fullpath);
+
if (conf_.lazy_check && static_cast<uint64_t>(statbuf->st_ctime) < last_index_) {
// in lazy mode, ignore the file if it has not changed
// since the last indexing op.
@@ -261,7 +277,7 @@ Indexer::Private::handle_item(WorkItem&& item)
++progress_.updated;
} break;
case WorkItem::Type::Dir:
- store_.set_dirstamp(item.full_path, ::time(NULL));
+ store_.set_dirstamp(item.full_path, started_.value());
break;
default:
g_warn_if_reached();
@@ -272,12 +288,11 @@ Indexer::Private::handle_item(WorkItem&& item)
}
}
-bool
-Indexer::Private::cleanup()
+void
+Indexer::Private::cleanup_from_scratch()
{
- mu_debug("starting cleanup");
+ mu_debug("starting cleanup without using scan results");
- size_t n{};
std::vector<Store::Id> orphans; // store messages without files.
using DirFiles = std::unordered_set<std::string>;
@@ -309,7 +324,6 @@ Indexer::Private::cleanup()
};
store_.for_each_message_path([&](Store::Id id, const std::string& path) {
- ++n;
if (!is_file_present(path)) {
mu_debug("cannot read {} (id={}); queuing for removal from store",
path, id);
@@ -326,14 +340,100 @@ Indexer::Private::cleanup()
store_.remove_messages(orphans);
progress_.removed += orphans.size();
}
+}
+
+void
+Indexer::Private::cleanup_incremental()
+{
+ mu_debug("starting cleanup after scan");
+
+ // Sort the seen paths into the same order Xapian will give its terms to us
+ std::vector<std::string> fs_terms;
+ fs_terms.reserve(seen_maildir_paths_.size());
+ for (std::string_view fullpath : seen_maildir_paths_)
+ fs_terms.emplace_back(field_from_id(Field::Id::Path).xapian_term(fullpath));
+ std::ranges::sort(fs_terms);
+
+ // Discard duplicates from fs_terms in case two paths collided to one term, e.g. over
+ // case. That shouldn't happen, but be correct-ish if it does.
+ auto [new_end, old_end] = std::ranges::unique(fs_terms);
+ if (new_end != old_end) {
+ mu_warning("collisions under term normalization: using regular cleanup");
+ cleanup_from_scratch();
+ return;
+ }
- return true;
+ fs_terms.erase(new_end, old_end);
+ seen_maildir_paths_.clear();
+
+ // Walk through all the path terms. If the DB has a path term that we didn't see in our
+ // filesystem walk above, add it the orphans list for removal from the DB. If we were in
+ // lazy scan mode, we may have skipped some directories entirely: these are represented by
+ // entries in fs_terms with a trailing slash. When we see one, we deem all DB entries
+ // that have the skip entry as a prefix as present.
+
+ size_t fs_terms_pos = 0;
+ auto current_fs_term = [&]() -> std::string_view {
+ if (fs_terms_pos < fs_terms.size())
+ return fs_terms[fs_terms_pos];
+ // N.B. '~' compares greater than the start of any field shortcut, so use it as an
+ // after-the-end sentinel.
+ return "~";
+ };
+
+ std::vector<std::string> orphan_terms;
+
+ auto handle_db_term = [&](std::string_view db_term) {
+ for (;;) {
+ std::string_view fs_term = current_fs_term();
+ bool is_wildcard = fs_term.ends_with('/');
+ if (is_wildcard && db_term.starts_with(fs_term)) {
+ return true;
+ }
+ int cmp = db_term.compare(fs_term);
+ if (cmp < 0) {
+ mu_debug("orphan in db={} but not fs={}", db_term, fs_term);
+ orphan_terms.emplace_back(db_term);
+ return true;
+ }
+
+ if (cmp == 0) {
+ ++fs_terms_pos;
+ return true;
+ }
+
+ ++fs_terms_pos;
+ // FS has an entry not in the DB. If not a directory, we should have
+ // indexed it.
+ if (!is_wildcard)
+ mu_warning("unexpectedly unindexed message: {}", fs_term);
+ }
+ };
+
+ store_.for_each_term(Field::Id::Path, handle_db_term);
+
+ if (orphan_terms.empty())
+ mu_debug("nothing to clean up");
+ else {
+ size_t nr_removed = store_.remove_messages_by_term(
+ orphan_terms, [&](size_t nr_cleaned_up_so_far) {
+ progress_.removed = nr_cleaned_up_so_far;
+ });
+ mu_debug("removing {} stale message(s) from store", nr_removed);
+ progress_.removed = nr_removed;
+ if (nr_removed != orphan_terms.size())
+ mu_warning("should have removed {} messages but actually removed {}",
+ orphan_terms.size(), nr_removed);
+ }
}
void
Indexer::Private::scan_worker()
{
progress_.reset();
+ seen_maildir_paths_.clear();
+ started_ = time(NULL);
+
if (conf_.scan) {
mu_debug("starting scanner");
if (!scanner_.start()) { // blocks.
@@ -344,21 +444,61 @@ Indexer::Private::scan_worker()
mu_debug("scanner finished");
}
- // and let the worker finish their work.
- state_.change_to(IndexState::Finishing);
+ enum class CleanupKind {
+ None,
+ FromScratch,
+ Incremental,
+ };
+
+ CleanupKind cleanup_kind = CleanupKind::Incremental;
+ bool aborted = state_ == IndexState::Aborting;
+
+ if (cleanup_kind >= CleanupKind::None && !conf_.cleanup) {
+ mu_debug("cleanup: not running as requested");
+ cleanup_kind = CleanupKind::None;
+ }
+
+ if (cleanup_kind > CleanupKind::None && aborted) {
+ mu_debug("cleanup: disabling because indexer aborted");
+ cleanup_kind = CleanupKind::None;
+ }
+
+ if (cleanup_kind >= CleanupKind::Incremental &&
+ g_getenv("MU_NO_INCREMENTAL_CLEANUP")) {
+ mu_debug("cleanup: not using incremental: MU_NO_INCREMENTAL_CLEANUP in environ");
+ cleanup_kind = CleanupKind::FromScratch;
+ }
+
+ if (cleanup_kind >= CleanupKind::Incremental && !conf_.scan) {
+ mu_debug("cleanup: not using incremental: scan not done");
+ cleanup_kind = CleanupKind::FromScratch;
+ }
+
+ state_.change_to(IndexState::Cleaning);
+
+ switch (cleanup_kind) {
+ case CleanupKind::None:
+ break;
+ case CleanupKind::FromScratch:
+ cleanup_from_scratch();
+ break;
+ case CleanupKind::Incremental:
+ cleanup_incremental();
+ break;
+ }
- if (conf_.cleanup) {
- mu_debug("starting cleanup");
- state_.change_to(IndexState::Cleaning);
- cleanup();
- mu_debug("cleanup finished");
+ aborted = state_ == IndexState::Aborting;
+ if (!aborted) {
+ // Store started time, not ending time, so that next time we run we know to scan
+ // anything that appeared during our scan.
+ store_.config().set<Mu::Config::Id::LastIndex>(started_.value());
}
completed_ = ::time({});
// attempt to commit to disk.
store_.xapian_db().request_commit(true);
- store_.config().set<Mu::Config::Id::LastIndex>(completed_);
state_.change_to(IndexState::Idle);
+ started_ = Nothing;
}
bool
@@ -399,6 +539,8 @@ Indexer::Private::start(const Indexer::Config& conf, bool block)
bool
Indexer::Private::stop()
{
+ if (state_ != IndexState::Idle)
+ state_.change_to(IndexState::Aborting);
scanner_.stop();
if (scanner_worker_.joinable())
diff --git a/lib/mu-labels-cache.hh b/lib/mu-labels-cache.hh
index 9f95d09..e465a55 100644
--- a/lib/mu-labels-cache.hh
+++ b/lib/mu-labels-cache.hh
@@ -131,6 +131,11 @@ public:
*/
Result<void> restore(const Store& store);
+ /**
+ * @return whether the labels map is non-empty
+ */
+ bool empty() const { return label_map_.empty(); }
+
private:
/**
* Deserialize the cache into a Map
diff --git a/lib/mu-store.cc b/lib/mu-store.cc
index c67b551..de061ac 100644
--- a/lib/mu-store.cc
+++ b/lib/mu-store.cc
@@ -22,6 +22,7 @@
#include <chrono>
#include <mutex>
+#include <algorithm>
#include <array>
#include <cstdlib>
#include <stdexcept>
@@ -130,6 +131,9 @@ struct Store::Private {
Option<const std::string&> target_mdir,
Option<Flags> new_flags,
MoveOptions opts);
+
+ bool remove_message_by_id_unlocked(Store::Id id);
+
XapianDb xapian_db_;
Config config_;
ContactsCache contacts_cache_;
@@ -430,17 +434,62 @@ Store::remove_messages(const std::vector<Store::Id>& ids)
xapian_db().request_transaction();
- for (auto&& id : ids) {
- if (const auto msg = priv_->find_message_unlocked(id); !msg) {
- mu_warning("cannot find document {} for deletion", id);
- } else {
- for (auto&& label: msg->labels())
- priv_->labels_cache_.decrease(label);
- xapian_db().delete_document(id);
- }
+ for (auto&& id : ids)
+ priv_->remove_message_by_id_unlocked(id);
+
+ xapian_db().request_commit(true/*force*/);
+}
+
+bool
+Store::Private::remove_message_by_id_unlocked(Store::Id id)
+{
+ if (auto msg = labels_cache_.empty() ? Nothing : find_message_unlocked(id); msg)
+ for (auto&& label: msg->labels())
+ labels_cache_.decrease(label);
+
+ if (!xapian_db_.delete_document(id)) {
+ mu_warning("cannot find document {} for deletion", id);
+ return false;
+ }
+
+ return true;
+}
+
+size_t
+Store::remove_messages_by_term(std::span<const std::string> terms,
+ std::function<void (size_t)> progress_fn)
+{
+ std::unique_lock lock{priv_->lock_};
+ size_t nr_removed = 0;
+ std::vector<Xapian::Query> qvec;
+ std::vector<Store::Id> ids_to_remove;
+
+ xapian_db().request_transaction();
+
+ while (!terms.empty()) {
+ auto chunk = terms.subspan(0, std::min<size_t>(terms.size(), 1024));
+ terms = terms.subspan(chunk.size());
+ qvec.clear();
+ qvec.reserve(chunk.size());
+ std::ranges::copy(chunk, std::back_inserter(qvec));
+ auto enq = xapian_db().enquire();
+ enq.set_weighting_scheme(Xapian::BoolWeight()); // No score
+ enq.set_docid_order(Xapian::Enquire::ASCENDING);
+ enq.set_query(Xapian::Query{Xapian::Query::OP_OR, qvec.begin(), qvec.end()});
+ auto mset = enq.get_mset(0, xapian_db().size());
+ ids_to_remove.insert(ids_to_remove.end(), mset.begin(), mset.end());
+ }
+
+ // Sort the IDs to remove to make Xapian tree traversal easier
+ std::ranges::sort(ids_to_remove);
+ for (Id id : ids_to_remove) {
+ nr_removed += priv_->remove_message_by_id_unlocked(id);
+ if (nr_removed % 500 == 0)
+ progress_fn(nr_removed);
}
xapian_db().request_commit(true/*force*/);
+ return nr_removed;
}
Option<Message>
diff --git a/lib/mu-store.hh b/lib/mu-store.hh
index 76f3a36..da0ec56 100644
--- a/lib/mu-store.hh
+++ b/lib/mu-store.hh
@@ -22,9 +22,11 @@
#include <string>
#include <vector>
+#include <span>
#include <mutex>
#include <ctime>
#include <memory>
+#include <functional>
#include "mu-contacts-cache.hh"
#include "mu-labels-cache.hh"
@@ -261,6 +263,23 @@ public:
void remove_message(Id id) { remove_messages({id}); }
/**
+ * Remove a number if messages from the store. It will _not_ remove the
+ * message from the file system.
+ *
+ * It's more efficient to use this function than to translate the terms to docids and then
+ * call remove_messages() with the ids: this way, we can fuse the ID lookup and the
+ * deletion, skip post-translation existence steps, and do far fewer Xapian
+ * B-tree traversals.
+ *
+ * @param ids vector with terms for the message
+ * @param progress_fn called occasionally to update number of removed messages;
+ * called occasionally with cumulative number of messages removed so far
+ * @return number of messages removed overall
+ */
+ size_t remove_messages_by_term(std::span<const std::string> terms,
+ std::function<void (size_t)> progress_fn);
+
+ /**
* Find message in the store.
*
* @param id doc id for the message to find