diff options
| author | Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> | 2024-09-16 19:52:43 +0300 |
|---|---|---|
| committer | Dirk-Jan C. Binnema <djcb@djcbsoftware.nl> | 2024-10-08 11:23:04 +0300 |
| commit | d2343c6d62e7fe0638a6e60a55f0e72f01c0367d (patch) | |
| tree | 61e7367740f623e9b240a1ac7eefcad0dd6809ab /lib | |
| parent | 8176663002fac592fed3c8406fa918b1ebca7028 (diff) | |
mu-server: try avoiding xapian multi-threaded access
Try to avoid multi-threaded operations with Xapian.
This remove the thread workers during indexing, and avoids the indexing
background thread. So, mu4e has to wait once again during indexing.
We can improve upon that, but first we need to know if it avoids the
problem of issue #2756.
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/mu-indexer.cc | 53 | ||||
| -rw-r--r-- | lib/mu-server.cc | 37 |
2 files changed, 65 insertions, 25 deletions
diff --git a/lib/mu-indexer.cc b/lib/mu-indexer.cc index 65603c2..12ff06a 100644 --- a/lib/mu-indexer.cc +++ b/lib/mu-indexer.cc @@ -105,6 +105,7 @@ struct Indexer::Private { bool handler(const std::string& fullpath, struct stat* statbuf, Scanner::HandleType htype); void maybe_start_worker(); + void item_worker(); void scan_worker(); @@ -135,6 +136,8 @@ struct Indexer::Private { Type type; }; + void handle_item(WorkItem&& item); + AsyncQueue<WorkItem> todos_; Progress progress_{}; @@ -193,7 +196,11 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, return true; } case Scanner::HandleType::LeaveDir: { +#ifdef XAPIAN_SINGLE_THREADED + handle_item({fullpath, WorkItem::Type::Dir}); +#else todos_.push({fullpath, WorkItem::Type::Dir}); +#endif /*XAPIAN_SINGLE_THREADED*/ return true; } @@ -210,9 +217,13 @@ Indexer::Private::handler(const std::string& fullpath, struct stat* statbuf, if (statbuf->st_ctime <= dirstamp_ && store_.contains_message(fullpath)) return false; +#ifdef XAPIAN_SINGLE_THREADED + handle_item({fullpath, WorkItem::Type::File}); +#else // push the remaining messages to our "todo" queue for // (re)parsing and adding/updating to the database. todos_.push({fullpath, WorkItem::Type::File}); +#endif return true; } default: @@ -260,6 +271,30 @@ Indexer::Private::add_message(const std::string& path) return true; } + +void +Indexer::Private::handle_item(WorkItem&& item) +{ + try { + switch (item.type) { + case WorkItem::Type::File: { + if (G_LIKELY(add_message(item.full_path))) + ++progress_.updated; + } break; + case WorkItem::Type::Dir: + store_.set_dirstamp(item.full_path, ::time(NULL)); + break; + default: + g_warn_if_reached(); + break; + } + } catch (const Mu::Error& er) { + mu_warning("error adding message @ {}: {}", item.full_path, er.what()); + } +} + + + void Indexer::Private::item_worker() { @@ -270,22 +305,8 @@ Indexer::Private::item_worker() while (state_ == IndexState::Scanning) { if (!todos_.pop(item, 250ms)) continue; - try { - switch (item.type) { - case WorkItem::Type::File: { - if (G_LIKELY(add_message(item.full_path))) - ++progress_.updated; - } break; - case WorkItem::Type::Dir: - store_.set_dirstamp(item.full_path, ::time(NULL)); - break; - default: - g_warn_if_reached(); - break; - } - } catch (const Mu::Error& er) { - mu_warning("error adding message @ {}: {}", item.full_path, er.what()); - } + + handle_item(std::move(item)); maybe_start_worker(); std::this_thread::yield(); diff --git a/lib/mu-server.cc b/lib/mu-server.cc index 9d21cab..17c3acd 100644 --- a/lib/mu-server.cc +++ b/lib/mu-server.cc @@ -149,6 +149,7 @@ struct Server::Private { Store& store() { return store_; } const Store& store() const { return store_; } Indexer& indexer() { return store().indexer(); } + void do_index(const Indexer::Config& conf); //CommandMap& command_map() const { return command_map_; } // @@ -762,6 +763,20 @@ get_stats(const Indexer::Progress& stats, const std::string& state) } void +Server::Private::do_index(const Indexer::Config& conf) +{ + StopWatch sw{"indexing"}; + indexer().start(conf); + while (indexer().is_running()) { + std::this_thread::sleep_for(std::chrono::milliseconds(2000)); + output_sexp(get_stats(indexer().progress(), "running"), + Server::OutputFlags::Flush); + } + output_sexp(get_stats(indexer().progress(), "complete"), + Server::OutputFlags::Flush); +} + +void Server::Private::index_handler(const Command& cmd) { Mu::Indexer::Config conf{}; @@ -770,22 +785,23 @@ Server::Private::index_handler(const Command& cmd) // ignore .noupdate with an empty store. conf.ignore_noupdate = store().empty(); +#ifdef XAPIAN_SINGLE_THREADED + // nothing to do + if (indexer().is_running()) { + throw Error{Error::Code::Xapian, "indexer is already running"}; + } + do_index(conf); +#else indexer().stop(); if (index_thread_.joinable()) index_thread_.join(); // start a background track. index_thread_ = std::thread([this, conf = std::move(conf)] { - StopWatch sw{"indexing"}; - indexer().start(conf); - while (indexer().is_running()) { - std::this_thread::sleep_for(std::chrono::milliseconds(2000)); - output_sexp(get_stats(indexer().progress(), "running"), - Server::OutputFlags::Flush); - } - output_sexp(get_stats(indexer().progress(), "complete"), - Server::OutputFlags::Flush); + do_index(conf); }); +#endif /*XAPIAN_SINGLE_THREADED */ + } void @@ -959,6 +975,9 @@ Server::Private::ping_handler(const Command& cmd) ":personal-addresses", std::move(addrs), ":database-path", store().path(), ":root-maildir", store().root_maildir(), +#ifdef XAPIAN_SINGLE_THREADED + ":xapian-single-threaded", Sexp::t_sym, +#endif /*XAPIAN_SINGLE_THREADED*/ ":doccount", storecount))); } |
