Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
lmdb_store_wrapper.cpp
Go to the documentation of this file.
5#include "napi.h"
6#include <chrono>
7#include <cstdint>
8#include <memory>
9#include <optional>
10#include <ratio>
11#include <stdexcept>
12#include <utility>
13
14using namespace bb::nodejs;
15using namespace bb::nodejs::lmdb_store;
16
17const uint64_t DEFAULT_MAP_SIZE = 1024UL * 1024;
18const uint64_t DEFAULT_MAX_READERS = 16;
19const uint64_t DEFAULT_CURSOR_PAGE_SIZE = 10;
20
21LMDBStoreWrapper::LMDBStoreWrapper(const Napi::CallbackInfo& info)
22 : ObjectWrap(info)
23{
24 Napi::Env env = info.Env();
25
26 size_t data_dir_index = 0;
27 std::string data_dir;
28 if (info.Length() > data_dir_index && info[data_dir_index].IsString()) {
29 data_dir = info[data_dir_index].As<Napi::String>();
30 } else {
31 throw Napi::TypeError::New(env, "Directory needs to be a string");
32 }
33
34 size_t map_size_index = 1;
35 uint64_t map_size = DEFAULT_MAP_SIZE;
36 if (info.Length() > map_size_index) {
38 // Int64Value is the widest integer accessor in N-API (no Uint64Value exists)
39 int64_t val = info[map_size_index].As<Napi::Number>().Int64Value();
40 if (val <= 0) {
41 throw Napi::TypeError::New(env, "Map size must be a positive number");
42 }
43 map_size = static_cast<uint64_t>(val);
44 } else {
45 throw Napi::TypeError::New(env, "Map size must be a number or an object");
46 }
47 }
48
49 size_t max_readers_index = 2;
51 if (info.Length() > max_readers_index) {
53 max_readers = info[max_readers_index].As<Napi::Number>().Uint32Value();
54 } else if (!info[max_readers_index].IsUndefined()) {
55 throw Napi::TypeError::New(env, "The number of readers must be a number");
56 }
57 }
58
59 // `ephemeral` opens the LMDB env with `MDB_NOSYNC | MDB_NOMETASYNC`, so commits return
60 // without waiting for fsync. The on-disk file is unrecoverable after a crash but stays
61 // sparse on every byte LMDB doesn't touch; the trade-off is appropriate for tmp stores
62 // that get cleaned up on close (see `openTmpStore`).
63 size_t ephemeral_index = 3;
64 bool ephemeral = false;
65 if (info.Length() > ephemeral_index) {
67 ephemeral = info[ephemeral_index].As<Napi::Boolean>().Value();
68 } else if (!info[ephemeral_index].IsUndefined()) {
69 throw Napi::TypeError::New(env, "The ephemeral flag must be a boolean");
70 }
71 }
72
74
76
79
85
87
89
90 // The close operation requires exclusive execution, no other operations can be run concurrently with it
92
94}
95
96Napi::Value LMDBStoreWrapper::call(const Napi::CallbackInfo& info)
97{
99}
100
101Napi::Function LMDBStoreWrapper::get_class(Napi::Env env)
102{
103 return DefineClass(env,
104 "Store",
105 {
107 });
108}
109
110// Simply verify that the store is still valid and that close has not been called
112{
113 if (_store) {
114 return;
115 }
116 throw std::runtime_error(format("LMDB store unavailable, was close already called?"));
117}
118
120{
121 verify_store();
122 _store->open_database(req.db, !req.uniqueKeys.value_or(true));
123 return { true };
124}
125
127{
128 verify_store();
130 lmdblib::KeysVector keys = req.keys;
131 _store->get(keys, vals, req.db);
132 return { vals };
133}
134
136{
137 verify_store();
138 std::vector<bool> exists;
139 _store->has(req.entries, exists, req.db);
140 return { exists };
141}
142
144{
145 verify_store();
146 bool reverse = req.reverse.value_or(false);
148 bool one_page = req.onePage.value_or(false);
149 lmdblib::Key key = req.key;
150
151 auto tx = _store->create_shared_read_transaction();
152 lmdblib::LMDBCursor::SharedPtr cursor = _store->create_cursor(tx, req.db);
153 bool start_ok = cursor->set_at_key(key);
154
155 if (!start_ok) {
156 // we couldn't find exactly the requested key. Find the next biggest one.
157 start_ok = cursor->set_at_key_gte(key);
158 // if we found a key that's greater _and_ we want to go in reverse order
159 // then we're actually outside the requested bounds, we need to go back one position
160 if (start_ok && reverse) {
162 // read_prev returns `true` if there's nothing more to read
163 // turn this into a "not ok" because there's nothing in the db for this cursor to read
164 start_ok = !cursor->read_prev(1, entries);
165 } else if (!start_ok && reverse) {
166 // we couldn't find a key greater than our starting point _and_ we want to go in reverse..
167 // then we start at the end of the database (the client requested to start at a key greater than anything in
168 // the DB)
169 start_ok = cursor->set_at_end();
170 }
171
172 // in case we're iterating in ascending order and we can't find the exact key or one that's greater than it
173 // then that means theren's nothing in the DB for the cursor to read
174 }
175
176 // we couldn't find a starting position
177 if (!start_ok) {
178 return { std::nullopt, {} };
179 }
180
181 auto [done, first_page] = _advance_cursor(*cursor, reverse, page_size);
182 // cursor finished after reading a single page or client only wanted the first page
183 if (done || one_page) {
184 return { std::nullopt, first_page };
185 }
186
187 auto cursor_id = cursor->id();
188 {
190 _cursors[cursor_id] = { cursor, reverse };
191 }
192
193 return { cursor_id, first_page };
194}
195
197{
198 {
200 _cursors.erase(req.cursor);
201 }
202 return { true };
203}
204
206{
208
209 {
211 data = _cursors.at(req.cursor);
212 }
213
215 auto [done, entries] = _advance_cursor(*data.cursor, data.reverse, page_size);
216 return { entries, done };
217}
218
220{
222
223 {
225 data = _cursors.at(req.cursor);
226 }
227
228 auto [done, count] = _advance_cursor_count(*data.cursor, data.reverse, req.endKey);
229 return { count, done };
230}
231
233{
234 verify_store();
236 batches.reserve(req.batches.size());
237
238 for (const auto& data : req.batches) {
239 lmdblib::LMDBStore::PutData batch{ data.second.addEntries, data.second.removeEntries, data.first };
240 batches.push_back(batch);
241 }
242
243 auto start = std::chrono::high_resolution_clock::now();
244 _store->put(batches);
245 auto end = std::chrono::high_resolution_clock::now();
246 std::chrono::duration<uint64_t, std::nano> duration_ns = end - start;
247
248 return { duration_ns.count() };
249}
250
252{
253 verify_store();
255 auto [map_size, physical_file_size] = _store->get_stats(stats);
256 return { stats, map_size, physical_file_size };
257}
258
260{
261 // prevent this store from receiving further messages
263
264 {
265 // close all of the open read cursors
266 std::lock_guard cursors(_cursor_mutex);
267 _cursors.clear();
268 }
269
270 // and finally close the database handle
271 _store.reset(nullptr);
272
273 return { true };
274}
275
277{
278 verify_store();
279 _store->copy_store(req.dstPath, req.compact.value_or(false));
280
281 return { true };
282}
283
285 bool reverse,
286 uint64_t page_size)
287{
289 bool done = reverse ? cursor.read_prev(page_size, entries) : cursor.read_next(page_size, entries);
290 return std::make_pair(done, entries);
291}
292
294 bool reverse,
295 const lmdblib::Key& end_key)
296{
297 uint64_t count = 0;
298 bool done = reverse ? cursor.count_until_prev(end_key, count) : cursor.count_until_next(end_key, count);
299 return std::make_pair(done, count);
300}
std::shared_ptr< LMDBCursor > SharedPtr
void register_handler(uint32_t msgType, T *self, R(T::*handler)() const, bool unique=false)
Napi::Promise process_message(const Napi::CallbackInfo &info)
StartCursorResponse start_cursor(const StartCursorRequest &req)
GetResponse get(const GetRequest &req)
static Napi::Function get_class(Napi::Env env)
BoolResponse close_cursor(const CloseCursorRequest &req)
BoolResponse open_database(const OpenDatabaseRequest &req)
bb::nodejs::AsyncMessageProcessor _msg_processor
HasResponse has(const HasRequest &req)
BatchResponse batch(const BatchRequest &req)
BoolResponse copy_store(const CopyStoreRequest &req)
std::unordered_map< uint64_t, CursorData > _cursors
static std::pair< bool, uint64_t > _advance_cursor_count(const lmdblib::LMDBCursor &cursor, bool reverse, const lmdblib::Key &end_key)
AdvanceCursorResponse advance_cursor(const AdvanceCursorRequest &req)
AdvanceCursorCountResponse advance_cursor_count(const AdvanceCursorCountRequest &req)
Napi::Value call(const Napi::CallbackInfo &)
The only instance method exposed to JavaScript. Takes a msgpack Message and returns a Promise.
static std::pair< bool, lmdblib::KeyDupValuesVector > _advance_cursor(const lmdblib::LMDBCursor &cursor, bool reverse, uint64_t page_size)
std::unique_ptr< lmdblib::LMDBStore > _store
std::string format(Args... args)
Definition log.hpp:23
#define info(...)
Definition log.hpp:93
const uint64_t DEFAULT_MAP_SIZE
const uint64_t DEFAULT_MAX_READERS
const uint64_t DEFAULT_CURSOR_PAGE_SIZE
std::vector< Key > KeysVector
Definition types.hpp:13
std::vector< uint8_t > Key
Definition types.hpp:11
std::vector< uint8_t > Value
Definition types.hpp:12
std::vector< KeyValuesPair > KeyDupValuesVector
Definition types.hpp:18
std::vector< OptionalValues > OptionalValuesVector
Definition types.hpp:17
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
std::byte * data