summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDirk-Jan C. Binnema <djcb@djcbsoftware.nl>2024-05-27 23:02:42 +0300
committerDirk-Jan C. Binnema <djcb@djcbsoftware.nl>2024-06-03 21:01:17 +0300
commit697d6b6b4fe922af17a7535f25497798d925b68f (patch)
treef2336454d079c33fee389f164ec1e274e6e19596
parentf2f01595a51380ae38aafb4cd11a0d3c17a33a10 (diff)
server: pass sexp-commmands through store worker
To ensure all Xapian rw commands happen in the same thread.
-rw-r--r--lib/mu-server.cc70
-rw-r--r--mu/mu-cmd-server.cc5
2 files changed, 60 insertions, 15 deletions
diff --git a/lib/mu-server.cc b/lib/mu-server.cc
index 62c9ca0..edc6398 100644
--- a/lib/mu-server.cc
+++ b/lib/mu-server.cc
@@ -1,5 +1,5 @@
/*
-** Copyright (C) 2020-2023 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
+** Copyright (C) 2020-2024 Dirk-Jan C. Binnema <djcb@djcbsoftware.nl>
**
** This program is free software; you can redistribute it and/or modify it
** under the terms of the GNU General Public License as published by the
@@ -29,9 +29,12 @@
#include <atomic>
#include <thread>
#include <mutex>
+#include <condition_variable>
#include <variant>
#include <functional>
+
+
#include <cstring>
#include <glib.h>
#include <glib/gprintf.h>
@@ -116,7 +119,7 @@ struct OutputStream {
}
private:
- std::string fname_;
+ std::string fname_;
using OutType = std::variant<std::ofstream, std::ostringstream>;
OutType out_;
};
@@ -126,11 +129,21 @@ private:
/// @brief object to manage the server-context for all commands.
struct Server::Private {
Private(Store& store, const Server::Options& opts, Output output)
- : store_{store}, options_{opts}, output_{output},
+ : store_{store},
+ store_worker_{store.store_worker()},
+ options_{opts}, output_{output},
command_handler_{make_command_map()},
keep_going_{true},
- tmp_dir_{unwrap(make_temp_dir())}
- {}
+ tmp_dir_{unwrap(make_temp_dir())} {
+
+ // tell the store-worker that we (this class) can handle
+ // sexp strings.
+ store_worker_.install_sexp_handler(
+ [this](const std::string& sexp) {
+ this->invoke(sexp);
+ });
+
+ }
~Private() {
indexer().stop();
@@ -148,13 +161,10 @@ struct Server::Private {
// acccessors
Store& store() { return store_; }
const Store& store() const { return store_; }
+ StoreWorker& store_worker() { return store_worker_; }
Indexer& indexer() { return store().indexer(); }
//CommandMap& command_map() const { return command_map_; }
- //
- // invoke
- //
- bool invoke(const std::string& expr) noexcept;
//
// output
@@ -186,7 +196,19 @@ struct Server::Private {
void remove_handler(const Command& cmd);
void view_handler(const Command& cmd);
+ bool keep_going() const { return keep_going_; }
+ void set_keep_going(bool going) { keep_going_ = going; }
+
+ // make main thread wait until done with the command.
+ std::mutex done_lock_;
+ std::condition_variable done_cond_;
+
private:
+ //
+ // invoke
+ //
+ bool invoke(const std::string& expr) noexcept;
+
void move_docid(Store::Id docid, Option<std::string> flagstr,
bool new_name, bool no_view);
@@ -209,6 +231,7 @@ private:
std::ofstream make_temp_file_stream(std::string& fname) const;
Store& store_;
+ StoreWorker& store_worker_;
Server::Options options_;
Server::Output output_;
const CommandHandler command_handler_;
@@ -480,6 +503,10 @@ Server::Private::invoke(const std::string& expr) noexcept
keep_going_ = false;
}
+ // tell main thread we're done with the command.
+ std::lock_guard l{done_lock_};
+ done_cond_.notify_one();
+
return keep_going_;
}
@@ -690,9 +717,6 @@ Server::Private::find_handler(const Command& cmd)
StopWatch sw{mu_format("{} (indexing: {})", __func__,
indexer().is_running() ? "yes" : "no")};
- // we need to _lock_ the store while querying (which likely consists of
- // multiple actual queries) + grabbing the results.
- std::lock_guard l{store_.lock()};
auto qres{store_.run_query(q, sort_field_id, qflags, maxnum)};
if (!qres)
throw Error(Error::Code::Query, "failed to run query: {}", qres.error().what());
@@ -1081,7 +1105,27 @@ Server::~Server() = default;
bool
Server::invoke(const std::string& expr) noexcept
{
- return priv_->invoke(expr);
+ /* a _little_ hacky; handle _quit_ directly to properly
+ * shut down the server */
+ if (expr == "(quit)") {
+ mu_debug("quitting");
+ priv_->set_keep_going(false);
+ return false;
+ }
+
+ /*
+ * feed the command to the queue; it'll get executed in the
+ * store-worker thread; however, sync on its completion
+ * so we get its keep_going() result
+ *
+ * as an added bonus, this ensures mu server shell doesn't require an
+ * extra user RET to get back the prompt
+ */
+ std::unique_lock done_lock{priv_->done_lock_};
+ priv_->store_worker().push(StoreWorker::SexpCommand{expr});
+ priv_->done_cond_.wait(done_lock);
+
+ return priv_->keep_going();
}
/* LCOV_EXCL_STOP */
diff --git a/mu/mu-cmd-server.cc b/mu/mu-cmd-server.cc
index 3a69456..50c7c01 100644
--- a/mu/mu-cmd-server.cc
+++ b/mu/mu-cmd-server.cc
@@ -79,11 +79,12 @@ cookie(size_t n)
::printf(COOKIE_PRE "%x" COOKIE_POST, num);
}
-
-
static void
output_stdout(const std::string& str, Server::OutputFlags flags)
{
+ // Note: with the StoreWorker, we _always_ need to flush
+ flags |= Server::OutputFlags::Flush;
+
cookie(str.size() + 1);
if (G_UNLIKELY(::puts(str.c_str()) < 0)) {
mu_critical("failed to write output '{}'", str);