Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
wsdb_ipc_server.cpp
Go to the documentation of this file.
7#include "barretenberg/wsdb/generated/wsdb_ipc_server.hpp"
11#include "ipc_runtime/ipc_server.hpp"
12#include "ipc_runtime/serve_helper.hpp"
13#include "ipc_runtime/signal_handlers.hpp"
14
15#include <algorithm>
16#include <cstdint>
17#include <functional>
18#include <iostream>
19#include <memory>
20#include <sstream>
21#include <string>
22#include <unordered_map>
23#include <vector>
24
25namespace bb::wsdb {
26
27using namespace bb::world_state;
28using namespace bb::crypto::merkle_tree;
29
30// ---------------------------------------------------------------------------
31// Simple JSON-like parsing for config maps
32// Parses "{0:1024,1:2048,...}" into unordered_map<uint32_t, uint64_t>
33// ---------------------------------------------------------------------------
34
35static std::unordered_map<world_state::MerkleTreeId, uint64_t> parse_tree_uint64_map(const std::string& json)
36{
38 if (json.empty()) {
39 return result;
40 }
41 std::string cleaned;
42 for (char c : json) {
43 if (c != '{' && c != '}' && c != ' ') {
44 cleaned += c;
45 }
46 }
47 std::istringstream ss(cleaned);
48 std::string pair;
49 while (std::getline(ss, pair, ',')) {
50 auto colon_pos = pair.find(':');
51 if (colon_pos != std::string::npos) {
52 auto key = static_cast<world_state::MerkleTreeId>(std::stoi(pair.substr(0, colon_pos)));
53 auto value = static_cast<uint64_t>(std::stoull(pair.substr(colon_pos + 1)));
54 result[key] = value;
55 }
56 }
57 return result;
58}
59
60static std::unordered_map<world_state::MerkleTreeId, uint32_t> parse_tree_uint32_map(const std::string& json)
61{
63 if (json.empty()) {
64 return result;
65 }
66 auto u64_map = parse_tree_uint64_map(json);
67 for (const auto& [k, v] : u64_map) {
68 result[k] = static_cast<uint32_t>(v);
69 }
70 return result;
71}
72
73static std::unordered_map<world_state::MerkleTreeId, index_t> parse_tree_index_map(const std::string& json)
74{
76 if (json.empty()) {
77 return result;
78 }
79 auto u64_map = parse_tree_uint64_map(json);
80 for (const auto& [k, v] : u64_map) {
81 result[k] = static_cast<index_t>(v);
82 }
83 return result;
84}
85
86// ---------------------------------------------------------------------------
87// Parse prefilled public data from JSON: [["slot_hex","value_hex"],...]
88// Each hex string is a 64-char (32-byte) hex-encoded field element.
89// ---------------------------------------------------------------------------
90
91static fr hex_to_fr(const std::string& hex)
92{
93 std::string cleaned = hex;
94 if (cleaned.size() >= 2 && cleaned[0] == '0' && (cleaned[1] == 'x' || cleaned[1] == 'X')) {
95 cleaned = cleaned.substr(2);
96 }
97 return fr(cleaned);
98}
99
100static std::vector<PublicDataLeafValue> parse_prefilled_public_data(const std::string& json)
101{
103 if (json.empty() || json == "[]") {
104 return result;
105 }
106
107 // Simple state-machine parser for [["hex","hex"],["hex","hex"],...]
108 std::vector<std::string> hex_values;
109 std::string current;
110 bool in_string = false;
111
112 for (char c : json) {
113 if (c == '"') {
114 in_string = !in_string;
115 } else if (in_string) {
116 current += c;
117 } else if ((c == ',' || c == ']') && !current.empty()) {
118 hex_values.push_back(std::move(current));
119 current.clear();
120 }
121 }
122
123 // hex_values should have pairs: slot, value, slot, value, ...
124 if (hex_values.size() % 2 != 0) {
125 std::cerr << "Warning: odd number of hex values in prefilled public data, ignoring last" << '\n';
126 }
127 for (size_t i = 0; i + 1 < hex_values.size(); i += 2) {
128 result.emplace_back(hex_to_fr(hex_values[i]), hex_to_fr(hex_values[i + 1]));
129 }
130 return result;
131}
132
133// ---------------------------------------------------------------------------
134// IPC server execution
135// ---------------------------------------------------------------------------
136
137int execute_wsdb_server(const std::string& input_path,
138 const std::string& data_dir,
139 const std::string& tree_heights_json,
140 const std::string& tree_prefill_json,
141 const std::string& map_sizes_json,
142 uint32_t threads,
143 uint32_t initial_header_generator_point,
144 const std::string& prefilled_public_data_json,
145 uint64_t genesis_timestamp,
146 size_t request_ring_size,
147 size_t response_ring_size)
148{
149 const uint64_t DEFAULT_MAP_SIZE = 1024UL * 1024;
150
151 // Parse config
152 auto tree_height = parse_tree_uint32_map(tree_heights_json);
153 auto tree_prefill = parse_tree_index_map(tree_prefill_json);
154
161 };
162 if (!map_sizes_json.empty()) {
163 auto parsed = parse_tree_uint64_map(map_sizes_json);
164 for (const auto& [k, v] : parsed) {
165 map_size[k] = v;
166 }
167 }
168
169 // Parse prefilled public data: JSON array of ["slot_hex","value_hex"] pairs
170 std::vector<PublicDataLeafValue> prefilled_public_data;
171 if (!prefilled_public_data_json.empty()) {
172 prefilled_public_data = parse_prefilled_public_data(prefilled_public_data_json);
173 std::cerr << "Parsed " << prefilled_public_data.size() << " prefilled public data entries" << '\n';
174 }
175
176 // Create WorldState
177 std::cerr << "Creating WorldState at " << data_dir << " with " << threads << " threads" << '\n';
178 auto ws = std::make_unique<WorldState>(threads,
179 data_dir,
180 map_size,
181 tree_height,
182 tree_prefill,
183 prefilled_public_data,
184 initial_header_generator_point,
185 genesis_timestamp);
186
187 WsdbRequest request{ .world_state = *ws };
188
189 // Pick UDS vs MPSC-SHM by path suffix; install the runtime's default
190 // lifecycle signal handlers (SIGTERM/SIGINT → request_shutdown, SIGBUS/SIGSEGV
191 // → close+exit, plus parent-death monitoring via prctl/kqueue).
192 ipc::ServerOptions opts;
193 // TS backend (client 0) + the AVM simulator pool (one connection per
194 // aztec-vm-sim process). Sized to cover a default-size pool with headroom so
195 // SHM isn't capped to a single AVM client. (UDS, the default transport, is
196 // unaffected — it admits connections via the listen backlog.)
197 opts.max_shm_clients = 8;
198 opts.shm_request_ring_size = request_ring_size;
199 opts.shm_response_ring_size = response_ring_size;
200 auto server = ipc::make_server(input_path, opts);
201 if (!server) {
202 std::cerr << "Error: --input path must end with .sock or .shm: " << input_path << '\n';
203 return 1;
204 }
205 std::cerr << "aztec-wsdb listening on " << input_path << '\n';
206 ipc::install_default_signal_handlers(*server);
207
208 if (!server->listen()) {
209 std::cerr << "Error: Could not start IPC server" << '\n';
210 return 1;
211 }
212
213 std::cerr << "aztec-wsdb IPC server ready" << '\n';
214
215 // Dispatch pool: services requests concurrently so parallel reads aren't
216 // serialized through the single reactor thread. It is deliberately DISTINCT
217 // from WorldState's own intra-op pool (the `threads` arg above): mutating
218 // handlers enqueue subtasks onto that pool and wait() on them, so dispatching
219 // those handlers onto the SAME pool could deadlock (bb::ThreadPool is
220 // blocking and non-work-stealing).
221 //
222 // Sized from the caller-provided `threads` budget (the same value used for
223 // the WorldState pool), NOT std::thread::hardware_concurrency() — the latter
224 // ignores cgroup CPU limits and reports the host core count (e.g. 192 in a
225 // 2-CPU CI container), which would spawn a huge pool per wsdb process and
226 // exhaust the per-UID thread limit (pthread_create EAGAIN).
227 uint32_t dispatch_threads = std::max<uint32_t>(2, threads);
228 bb::ThreadPool dispatch_pool(dispatch_threads);
229
230 // Server-side ordering: the database, not the client, guarantees per-fork
231 // read/write consistency. Each handler hands its work to this scheduler via
232 // schedule_read / schedule_write (which run reads concurrently and serialize
233 // writes per fork — see WsdbScheduler), so the context carries it.
234 auto scheduler = std::make_shared<WsdbScheduler>(dispatch_pool, *server);
235 request.scheduler = scheduler.get();
236
237 // Async dispatch: the reactor reads each request and hands it to the
238 // generated handler with a respond callback; the handler decodes, schedules
239 // its work, and responds when done (possibly from a pool thread).
240 auto handler = make_wsdb_handler(request);
241 server->run_reactor([&handler](int /*client_id*/, std::span<const uint8_t> raw, ipc::IpcServer::Respond respond) {
242 handler(raw, std::move(respond));
243 });
244
245 server->close();
246 return 0;
247}
248
249} // namespace bb::wsdb
const uint64_t DEFAULT_MAP_SIZE
@ L1_TO_L2_MESSAGE_TREE
Definition types.hpp:23
int execute_wsdb_server(const std::string &input_path, const std::string &data_dir, const std::string &tree_heights_json, const std::string &tree_prefill_json, const std::string &map_sizes_json, uint32_t threads, uint32_t initial_header_generator_point, const std::string &prefilled_public_data_json, uint64_t genesis_timestamp, size_t request_ring_size, size_t response_ring_size)
Start the aztec-wsdb IPC server.
field< Bn254FrParams > fr
Definition fr.hpp:155
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
pair(A, B) -> pair< unwrap_ref_decay_t< A >, unwrap_ref_decay_t< B > >
world_state::WorldState & world_state
Non-template handler declarations for the wsdb service.
Service-level context passed to every wsdb handler.
Server-side per-fork ordering for the concurrent wsdb.