Speed up writes of events with statekeys to state db
This commit is contained in:
parent
703a07fb7d
commit
9891b6bffe
@ -37,7 +37,7 @@
|
||||
|
||||
//! Should be changed when a breaking change occurs in the cache format.
|
||||
//! This will reset client's data.
|
||||
static const std::string CURRENT_CACHE_FORMAT_VERSION{"2022.11.06"};
|
||||
static const std::string CURRENT_CACHE_FORMAT_VERSION{"2023.03.12"};
|
||||
|
||||
//! Keys used for the DB
|
||||
static const std::string_view NEXT_BATCH_KEY("next_batch");
|
||||
@ -1513,6 +1513,64 @@ Cache::runMigrations()
|
||||
QCoreApplication::instance()->processEvents(QEventLoop::AllEvents, 100);
|
||||
}
|
||||
|
||||
return true;
|
||||
}},
|
||||
{"2023.03.12",
|
||||
[this]() {
|
||||
try {
|
||||
auto txn = lmdb::txn::begin(env_, nullptr);
|
||||
auto room_ids = getRoomIds(txn);
|
||||
|
||||
for (const auto &room_id : room_ids) {
|
||||
try {
|
||||
auto oldStateskeyDb =
|
||||
lmdb::dbi::open(txn,
|
||||
std::string(room_id + "/state_by_key").c_str(),
|
||||
MDB_CREATE | MDB_DUPSORT);
|
||||
lmdb::dbi_set_dupsort(
|
||||
txn, oldStateskeyDb, +[](const MDB_val *a, const MDB_val *b) {
|
||||
auto get_skey = [](const MDB_val *v) {
|
||||
return nlohmann::json::parse(
|
||||
std::string_view(static_cast<const char *>(v->mv_data),
|
||||
v->mv_size))
|
||||
.value("key", "");
|
||||
};
|
||||
|
||||
return get_skey(a).compare(get_skey(b));
|
||||
});
|
||||
auto newStateskeyDb = getStatesKeyDb(txn, room_id);
|
||||
|
||||
// convert the dupsort format
|
||||
{
|
||||
auto cursor = lmdb::cursor::open(txn, oldStateskeyDb);
|
||||
std::string_view ev_type, data;
|
||||
bool start = true;
|
||||
while (cursor.get(ev_type, data, start ? MDB_FIRST : MDB_NEXT)) {
|
||||
start = false;
|
||||
|
||||
auto j =
|
||||
nlohmann::json::parse(std::string_view(data.data(), data.size()));
|
||||
|
||||
newStateskeyDb.put(
|
||||
txn, ev_type, j.value("key", "") + '\0' + j.value("id", ""));
|
||||
}
|
||||
}
|
||||
|
||||
// delete old db
|
||||
lmdb::dbi_drop(txn, oldStateskeyDb, true);
|
||||
} catch (std::exception &e) {
|
||||
nhlog::db()->error("While migrating state events from {}, ignoring error {}",
|
||||
room_id,
|
||||
e.what());
|
||||
}
|
||||
}
|
||||
txn.commit();
|
||||
} catch (const lmdb::error &) {
|
||||
nhlog::db()->critical("Failed to convert states key database in migration!");
|
||||
return false;
|
||||
}
|
||||
|
||||
nhlog::db()->info("Successfully updated states key database format.");
|
||||
return true;
|
||||
}},
|
||||
};
|
||||
|
@ -310,9 +310,12 @@ public:
|
||||
static int compare_state_key(const MDB_val *a, const MDB_val *b)
|
||||
{
|
||||
auto get_skey = [](const MDB_val *v) {
|
||||
return nlohmann::json::parse(
|
||||
std::string_view(static_cast<const char *>(v->mv_data), v->mv_size))
|
||||
.value("key", "");
|
||||
auto temp = std::string_view(static_cast<const char *>(v->mv_data), v->mv_size);
|
||||
// allow only passing the state key, in which case no null char will be in it and we
|
||||
// return the whole string because rfind returns npos.
|
||||
// We search from the back, because state keys could include nullbytes, event ids can
|
||||
// not.
|
||||
return temp.substr(0, temp.rfind('\0'));
|
||||
};
|
||||
|
||||
return get_skey(a).compare(get_skey(b));
|
||||
@ -438,21 +441,12 @@ private:
|
||||
else if (e.state_key.empty())
|
||||
statesdb.del(txn, to_string(e.type));
|
||||
else
|
||||
stateskeydb.del(txn,
|
||||
to_string(e.type),
|
||||
nlohmann::json::object({
|
||||
{"key", e.state_key},
|
||||
{"id", e.event_id},
|
||||
})
|
||||
.dump());
|
||||
stateskeydb.del(
|
||||
txn, to_string(e.type), e.state_key + '\0' + e.event_id);
|
||||
} else if (e.state_key.empty()) {
|
||||
statesdb.put(txn, to_string(e.type), nlohmann::json(e).dump());
|
||||
} else {
|
||||
auto data = nlohmann::json::object({
|
||||
{"key", e.state_key},
|
||||
{"id", e.event_id},
|
||||
})
|
||||
.dump();
|
||||
auto data = e.state_key + '\0' + e.event_id;
|
||||
auto key = to_string(e.type);
|
||||
|
||||
// Work around https://bugs.openldap.org/show_bug.cgi?id=8447
|
||||
@ -486,8 +480,9 @@ private:
|
||||
}
|
||||
} else {
|
||||
auto db = getStatesKeyDb(txn, room_id);
|
||||
std::string d = nlohmann::json::object({{"key", state_key}}).dump();
|
||||
std::string_view data = d;
|
||||
// we can search using state key, since the compare functions defaults to the whole
|
||||
// string, when there is no nullbyte
|
||||
std::string_view data = state_key;
|
||||
std::string_view typeStrV = typeStr;
|
||||
|
||||
auto cursor = lmdb::cursor::open(txn, db);
|
||||
@ -496,9 +491,14 @@ private:
|
||||
|
||||
try {
|
||||
auto eventsDb = getEventsDb(txn, room_id);
|
||||
if (!eventsDb.get(
|
||||
txn, nlohmann::json::parse(data)["id"].get<std::string>(), value))
|
||||
auto eventid = data;
|
||||
if (auto sep = data.rfind('\0'); sep != std::string_view::npos) {
|
||||
if (!eventsDb.get(txn, eventid.substr(sep + 1), value))
|
||||
return std::nullopt;
|
||||
} else {
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
} catch (std::exception &) {
|
||||
return std::nullopt;
|
||||
}
|
||||
@ -537,10 +537,12 @@ private:
|
||||
first = false;
|
||||
|
||||
try {
|
||||
if (eventsDb.get(
|
||||
txn, nlohmann::json::parse(data)["id"].get<std::string>(), value))
|
||||
auto eventid = data;
|
||||
if (auto sep = data.rfind('\0'); sep != std::string_view::npos) {
|
||||
if (eventsDb.get(txn, eventid.substr(sep + 1), value))
|
||||
events.push_back(
|
||||
nlohmann::json::parse(value).get<mtx::events::StateEvent<T>>());
|
||||
}
|
||||
} catch (std::exception &e) {
|
||||
nhlog::db()->warn("Failed to parse state event: {}", e.what());
|
||||
}
|
||||
@ -636,7 +638,7 @@ private:
|
||||
lmdb::dbi getStatesKeyDb(lmdb::txn &txn, const std::string &room_id)
|
||||
{
|
||||
auto db = lmdb::dbi::open(
|
||||
txn, std::string(room_id + "/state_by_key").c_str(), MDB_CREATE | MDB_DUPSORT);
|
||||
txn, std::string(room_id + "/states_key").c_str(), MDB_CREATE | MDB_DUPSORT);
|
||||
lmdb::dbi_set_dupsort(txn, db, compare_state_key);
|
||||
return db;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user