Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
chonk_batch_verifier.cpp
Go to the documentation of this file.
1#ifndef __wasm__
10
11namespace bb {
12
14 uint32_t num_cores,
15 uint32_t batch_size,
16 ResultCallback on_result)
17{
18 {
19 std::lock_guard lock(mutex_);
20 if (running_ || stopping_) {
21 throw_or_abort("ChonkBatchVerifier: already started");
22 }
23 vks_ = std::move(vks);
24 num_cores_ = std::max(1u, num_cores);
25 batch_size_ = std::max(1u, batch_size);
26 on_result_ = std::move(on_result);
27 queue_.clear();
28 in_flight_ids_.clear();
29 shutdown_ = false;
30 running_ = true;
31 }
32
33 coordinator_thread_ = std::thread([this]() { coordinator_loop(); });
34 info("ChonkBatchVerifier started with ", num_cores_, " cores, batch_size=", batch_size_);
35}
36
38{
39 VerifyResult failure;
40 bool has_failure = false;
41 {
42 std::lock_guard lock(mutex_);
43 if (!running_ || shutdown_) {
44 throw_or_abort("ChonkBatchVerifier: enqueue called while verifier is not running");
45 }
46 if (in_flight_ids_.contains(request.request_id)) {
47 throw_or_abort("ChonkBatchVerifier: duplicate request_id: " + std::to_string(request.request_id));
48 }
49 if (queue_.size() >= MAX_QUEUE_SIZE) {
50 throw_or_abort("ChonkBatchVerifier: queue is full");
51 }
52
53 request.enqueue_time = std::chrono::steady_clock::now();
54 in_flight_ids_.insert(request.request_id);
55
56 if (request.vk_index >= vks_.size()) {
57 failure = VerifyResult::failed(request.request_id, "invalid vk_index: " + std::to_string(request.vk_index));
58 has_failure = true;
59 } else if (vks_[request.vk_index] == nullptr || vks_[request.vk_index]->vk == nullptr) {
60 failure = VerifyResult::failed(request.request_id, "missing verification key");
61 has_failure = true;
62 } else {
63 const size_t expected_proof_size = static_cast<size_t>(vks_[request.vk_index]->vk->num_public_inputs) +
65 if (request.proof.size() != expected_proof_size) {
66 failure = VerifyResult::failed(request.request_id,
67 "proof has wrong size: expected " + std::to_string(expected_proof_size) +
68 ", got " + std::to_string(request.proof.size()));
69 has_failure = true;
70 } else {
71 queue_.push_back(std::move(request));
72 }
73 }
74 }
75 if (has_failure) {
76 dispatch(std::move(failure));
77 } else {
78 cv_.notify_one();
79 }
80}
81
83{
84 std::thread coordinator_thread;
85 {
86 std::unique_lock lock(mutex_);
87 if (stopping_) {
88 stopped_cv_.wait(lock, [this] { return !stopping_; });
89 return;
90 }
91 if (!running_ && !coordinator_thread_.joinable()) {
92 return;
93 }
94 stopping_ = true;
95 shutdown_ = true;
96 if (coordinator_thread_.joinable()) {
97 coordinator_thread = std::move(coordinator_thread_);
98 }
99 }
100 cv_.notify_one();
101 if (coordinator_thread.joinable()) {
102 coordinator_thread.join();
103 }
104 {
105 std::lock_guard lock(mutex_);
106 running_ = false;
107 queue_.clear();
108 in_flight_ids_.clear();
109 stopping_ = false;
110 }
111 stopped_cv_.notify_all();
112 info("ChonkBatchVerifier stopped");
113}
114
116{
117 bool should_stop = false;
118 {
119 std::lock_guard lock(mutex_);
120 should_stop = running_ || coordinator_thread_.joinable() || stopping_;
121 }
122 if (should_stop) {
123 stop();
124 }
125}
126
128{
129 const uint64_t request_id = result.request_id;
130 try {
131 // Result delivery owns the request id until the callback completes.
132 if (on_result_) {
133 on_result_(std::move(result));
134 }
135 } catch (const std::exception& e) {
136 info("ChonkBatchVerifier: result callback threw: ", e.what());
137 } catch (...) {
138 info("ChonkBatchVerifier: result callback threw unknown exception");
139 }
140 std::lock_guard lock(mutex_);
141 in_flight_ids_.erase(request_id);
142}
143
145{
146 while (true) {
147 // ── Collect a batch ──────────────────────────────────────────────
149 {
150 std::unique_lock lock(mutex_);
151
152 // Wait until we have work or are told to shut down.
153 // No timeout needed: while we're processing a batch, new proofs
154 // accumulate in the queue. When idle, process whatever arrives immediately.
155 cv_.wait(lock, [this] { return shutdown_ || !queue_.empty(); });
156
157 // Take up to batch_size_ items (may be a partial batch)
158 size_t take = std::min(queue_.size(), static_cast<size_t>(batch_size_));
159 if (take > 0) {
160 auto end = queue_.begin() + static_cast<ptrdiff_t>(take);
161 batch.assign(std::make_move_iterator(queue_.begin()), std::make_move_iterator(end));
162 queue_.erase(queue_.begin(), end);
163 }
164
165 if (batch.empty()) {
166 if (shutdown_) {
167 break;
168 }
169 continue;
170 }
171
172 // Invalid VK indices and malformed proof sizes are rejected at enqueue time.
173 }
174
175 if (batch.empty()) {
176 continue;
177 }
178
179 // ── Phase 1: parallel reduce (all cores, work-stealing) ──────────
180 auto reduce_start = std::chrono::steady_clock::now();
181 auto reduce_results = parallel_reduce(batch);
182
183 // Separate passed from failed (emit failures immediately)
184 std::vector<size_t> passed_indices;
185 passed_indices.reserve(reduce_results.size());
186 for (size_t i = 0; i < reduce_results.size(); ++i) {
187 auto& rr = reduce_results[i];
188 if (!rr.all_checks_passed) {
189 auto result = VerifyResult::failed(rr.request_id, rr.error_message);
190 result.time_in_queue_ms = ms_between(rr.enqueue_time, reduce_start);
191 result.time_in_verify_ms = rr.reduce_ms;
192 dispatch(std::move(result));
193 } else {
194 passed_indices.push_back(i);
195 }
196 }
197
198 if (passed_indices.empty()) {
199 continue;
200 }
201
202 // ── Phase 2: batch IPA verification ────────────────────────────
203 auto ipa_start = std::chrono::steady_clock::now();
204 bool ok = batch_check(reduce_results, passed_indices);
205 double ipa_ms = ms_since(ipa_start);
206 double reduce_ms = ms_between(reduce_start, ipa_start);
207
208 info("ChonkBatchVerifier: batch of ",
209 passed_indices.size(),
210 ": reduce=",
211 reduce_ms,
212 "ms, batch_check=",
213 ipa_ms,
214 "ms, result=",
215 ok ? "OK" : "BISECTING");
216
217 if (ok) {
218 emit_ok(reduce_results, passed_indices, reduce_start, ipa_ms, 0);
219 } else {
220 bisect(reduce_results, passed_indices, 0, reduce_start);
221 }
222 }
223}
224
226 const std::vector<VerifyRequest>& batch)
227{
228 const size_t num_proofs = batch.size();
229 std::vector<ReduceResult> results(num_proofs);
230 std::atomic<size_t> work_index{ 0 };
231
232 uint32_t num_workers = std::min(num_cores_, static_cast<uint32_t>(num_proofs));
233 std::vector<std::thread> workers;
234 workers.reserve(num_workers);
235
236 for (uint32_t w = 0; w < num_workers; ++w) {
237 workers.emplace_back([&]() {
238 // Each worker thread is single-threaded for reduce_to_ipa_claim
240 while (true) {
241 size_t idx = work_index.fetch_add(1, std::memory_order_relaxed);
242 if (idx >= num_proofs) {
243 break;
244 }
245 auto& req = batch[idx];
246 auto t0 = std::chrono::steady_clock::now();
247
248 try {
249 ChonkNativeVerifier verifier(vks_[req.vk_index]);
250 auto reduced = verifier.reduce_to_ipa_claim(req.proof);
251
252 results[idx] = ReduceResult{
253 .request_id = req.request_id,
254 .ipa_claim = std::move(reduced.ipa_claim),
255 .ipa_proof = std::move(reduced.ipa_proof),
256 .all_checks_passed = reduced.all_checks_passed,
257 .error_message = reduced.all_checks_passed ? "" : "reduction failed",
258 .enqueue_time = req.enqueue_time,
259 .reduce_ms = ms_since(t0),
260 };
261 } catch (const std::exception& e) {
262 results[idx] = ReduceResult{
263 .request_id = req.request_id,
264 .ipa_claim = {},
265 .ipa_proof = {},
266 .all_checks_passed = false,
267 .error_message = std::string("reduce_to_ipa_claim threw: ") + e.what(),
268 .enqueue_time = req.enqueue_time,
269 .reduce_ms = ms_since(t0),
270 };
271 } catch (...) {
272 results[idx] = ReduceResult{
273 .request_id = req.request_id,
274 .ipa_claim = {},
275 .ipa_proof = {},
276 .all_checks_passed = false,
277 .error_message = "reduce_to_ipa_claim threw unknown exception",
278 .enqueue_time = req.enqueue_time,
279 .reduce_ms = ms_since(t0),
280 };
281 }
282 }
283 });
284 }
285 for (auto& t : workers) {
286 t.join();
287 }
288
289 return results;
290}
291
292bool ChonkBatchVerifier::batch_check(const std::vector<ReduceResult>& results, const std::vector<size_t>& indices)
293{
294 if (indices.empty()) {
295 return true;
296 }
297
299
300 try {
301 // Collect IPA claims and transcripts for batch verification
304 claims.reserve(indices.size());
305 transcripts.reserve(indices.size());
306 for (size_t idx : indices) {
307 claims.push_back(results[idx].ipa_claim);
308 transcripts.push_back(std::make_shared<NativeTranscript>(results[idx].ipa_proof));
309 }
310
312 return IPA<curve::Grumpkin>::batch_reduce_verify(ipa_vk, claims, transcripts);
313 } catch (const std::exception& e) {
314 info("ChonkBatchVerifier: batch_check exception: ", e.what());
315 return false;
316 }
317}
318
320 std::vector<size_t> indices,
321 uint32_t depth,
322 std::chrono::steady_clock::time_point reduce_start)
323{
324 // Base case: single proof identified as the failure
325 if (indices.size() == 1) {
326 auto& rr = results[indices[0]];
327 auto result = VerifyResult::failed(rr.request_id, "batch check failed (bisected to individual)");
328 result.time_in_queue_ms = ms_between(rr.enqueue_time, std::chrono::steady_clock::now());
329 result.time_in_verify_ms = rr.reduce_ms;
330 result.batch_failure_count = depth + 1;
331 dispatch(std::move(result));
332 return;
333 }
334
335 info("ChonkBatchVerifier: bisecting ", indices.size(), " proofs at depth ", depth);
336
337 size_t mid = indices.size() / 2;
338 std::vector<size_t> left(indices.begin(), indices.begin() + static_cast<ptrdiff_t>(mid));
339 std::vector<size_t> right(indices.begin() + static_cast<ptrdiff_t>(mid), indices.end());
340
341 // Check left half; if it passes, all failures must be in the right half (skip redundant check)
342 auto t0 = std::chrono::steady_clock::now();
343 bool left_ok = batch_check(results, left);
344 double left_ms = ms_since(t0);
345
346 if (left_ok) {
347 emit_ok(results, left, reduce_start, left_ms, depth + 1);
348 // All failures are in the right half — recurse directly without re-checking
349 bisect(results, std::move(right), depth + 1, reduce_start);
350 } else {
351 // Left failed — need to check right independently
352 bisect(results, std::move(left), depth + 1, reduce_start);
353
354 auto t1 = std::chrono::steady_clock::now();
355 bool right_ok = batch_check(results, right);
356 double right_ms = ms_since(t1);
357
358 if (right_ok) {
359 emit_ok(results, right, reduce_start, right_ms, depth + 1);
360 } else {
361 bisect(results, std::move(right), depth + 1, reduce_start);
362 }
363 }
364}
365
367 const std::vector<size_t>& indices,
368 std::chrono::steady_clock::time_point reduce_start,
369 double ipa_ms,
370 uint32_t depth)
371{
372 for (size_t idx : indices) {
373 auto& rr = results[idx];
375 .request_id = rr.request_id,
376 .status = static_cast<uint8_t>(VerifyStatus::OK),
377 .error_message = "",
378 .time_in_queue_ms = ms_between(rr.enqueue_time, reduce_start),
379 .time_in_verify_ms = rr.reduce_ms + ipa_ms,
380 .batch_failure_count = depth,
381 });
382 }
383}
384
385} // namespace bb
386#endif
std::vector< ReduceResult > parallel_reduce(const std::vector< VerifyRequest > &batch)
void bisect(std::vector< ReduceResult > &results, std::vector< size_t > indices, uint32_t depth, std::chrono::steady_clock::time_point reduce_start)
static double ms_between(std::chrono::steady_clock::time_point from, std::chrono::steady_clock::time_point to)
std::function< void(VerifyResult)> ResultCallback
void emit_ok(const std::vector< ReduceResult > &results, const std::vector< size_t > &indices, std::chrono::steady_clock::time_point reduce_start, double ipa_ms, uint32_t depth)
void dispatch(VerifyResult result)
bool batch_check(const std::vector< ReduceResult > &results, const std::vector< size_t > &indices)
std::condition_variable stopped_cv_
static double ms_since(std::chrono::steady_clock::time_point t)
static constexpr size_t MAX_QUEUE_SIZE
std::vector< std::shared_ptr< MegaZKFlavor::VKAndHash > > vks_
std::deque< VerifyRequest > queue_
std::unordered_set< uint64_t > in_flight_ids_
void enqueue(VerifyRequest request)
Enqueue a proof for verification.
std::condition_variable cv_
void stop()
Stop the processor, flushing remaining proofs.
void start(std::vector< std::shared_ptr< MegaZKFlavor::VKAndHash > > vks, uint32_t num_cores, uint32_t batch_size, ResultCallback on_result)
Start the coordinator thread.
Verifier for Chonk IVC proofs (both native and recursive).
IPAReductionResult reduce_to_ipa_claim(const Proof &proof)
Run Chonk verification up to but not including IPA, returning the IPA claim for deferred verification...
static constexpr size_t ECCVM_FIXED_SIZE
IPA (inner product argument) commitment scheme class.
Definition ipa.hpp:86
Representation of the Grumpkin Verifier Commitment Key inside a bn254 circuit.
#define info(...)
Definition log.hpp:93
Entry point for Barretenberg command-line interface.
Definition api.hpp:5
void set_parallel_for_concurrency(size_t num_cores)
Definition thread.cpp:23
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
std::string to_string(bb::avm2::ValueTag tag)
Per-proof result from the reduce phase.
static constexpr size_t PROOF_LENGTH_WITHOUT_PUB_INPUTS
size_t size() const
A request to verify a single Chonk proof.
std::chrono::steady_clock::time_point enqueue_time
Result of verifying a single proof within a batch.
static VerifyResult failed(uint64_t id, std::string msg)
void throw_or_abort(std::string const &err)