Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
wsdb_scheduler.hpp
Go to the documentation of this file.
1#pragma once
30#include "ipc_runtime/ipc_server.hpp"
31
32#include <atomic>
33#include <cstdint>
34#include <deque>
35#include <functional>
36#include <memory>
37#include <mutex>
38#include <unordered_map>
39#include <utility>
40
41namespace bb::wsdb {
42
43class WsdbScheduler : public std::enable_shared_from_this<WsdbScheduler> {
44 public:
46 : pool_(pool)
47 , server_(server)
48 {}
49
50 // A read on `fork`. Committed reads (independent snapshots) run concurrently
51 // with everything; uncommitted reads wait behind an in-flight write on the
52 // same fork. Called only on the reactor thread.
53 void submit_read(uint64_t fork, bool committed, std::function<void()> work)
54 {
55 if (run_inline_if_idle(work)) {
56 return;
57 }
58 if (committed) {
60 return;
61 }
63 Lane& lane = lanes_[fork];
64 lane.pending.push_back({ false, std::move(work) });
66 pump(fork, lane);
67 }
68
69 // A write on `fork`: exclusive on that fork. Called only on the reactor thread.
70 void submit_write(uint64_t fork, std::function<void()> work)
71 {
72 if (run_inline_if_idle(work)) {
73 return;
74 }
76 Lane& lane = lanes_[fork];
77 lane.pending.push_back({ true, std::move(work) });
79 pump(fork, lane);
80 }
81
82 private:
83 struct Op {
85 std::function<void()> work;
86 };
87 struct Lane {
89 int in_flight = 0; // ops dispatched on this fork, not yet completed
90 int in_flight_mutating = 0; // of which, writes
91 };
92
93 // Fully idle => run on the reactor thread, skipping the pool handoff AND the
94 // ordering bookkeeping. Safe because submit_* is reactor-only, so with
95 // nothing in flight this request is alone and ordering is moot.
96 bool run_inline_if_idle(std::function<void()>& work)
97 {
98 if (inflight_.load(std::memory_order_acquire) == 0 && !server_.has_pending_request()) {
99 work();
100 return true;
101 }
102 return false;
103 }
104
105 void dispatch_unordered(std::function<void()> work)
106 {
108 auto self = shared_from_this();
109 pool_.enqueue([self, work = std::move(work)]() {
110 work();
111 self->inflight_.fetch_sub(1, std::memory_order_release);
112 });
113 }
114
115 // Caller holds mtx_. Dispatch as many head ops as ordering allows: a run of
116 // reads concurrently; a write only when the fork has drained, and nothing
117 // after it until it completes.
118 void pump(uint64_t fork, Lane& lane)
119 {
120 while (!lane.pending.empty()) {
121 Op& front = lane.pending.front();
122 if (front.mutating) {
123 if (lane.in_flight != 0) {
124 break; // write needs exclusive access; wait for the fork to drain
125 }
126 dispatch(fork, lane, true);
127 break; // barrier: release nothing else on this fork until it completes
128 }
129 if (lane.in_flight_mutating != 0) {
130 break; // uncommitted read waits behind an in-flight write
131 }
132 dispatch(fork, lane, false); // reads go concurrently
133 }
134 }
135
136 // Caller holds mtx_.
137 void dispatch(uint64_t fork, Lane& lane, bool mutating)
138 {
139 Op op = std::move(lane.pending.front());
140 lane.pending.pop_front();
141 lane.in_flight++;
142 if (mutating) {
143 lane.in_flight_mutating++;
144 }
145 auto self = shared_from_this();
146 pool_.enqueue([self, fork, mutating, work = std::move(op.work)]() {
147 work();
148 self->complete(fork, mutating);
149 });
150 }
151
152 void complete(uint64_t fork, bool mutating)
153 {
155 Lane& lane = lanes_[fork]; // references stay valid: lanes are never erased
156 lane.in_flight--;
157 if (mutating) {
158 lane.in_flight_mutating--;
159 }
161 pump(fork, lane);
162 }
163
166 std::mutex mtx_;
167 std::unordered_map<uint64_t, Lane> lanes_; // per fork; references stable (never erased)
168 std::atomic<int> inflight_{ 0 }; // submitted-but-not-completed ops (queued + running)
169};
170
171} // namespace bb::wsdb
void enqueue(const std::function< void()> &task)
Abstract interface for IPC server.
WsdbScheduler(bb::ThreadPool &pool, ipc::IpcServer &server)
std::unordered_map< uint64_t, Lane > lanes_
void pump(uint64_t fork, Lane &lane)
void complete(uint64_t fork, bool mutating)
void submit_write(uint64_t fork, std::function< void()> work)
bool run_inline_if_idle(std::function< void()> &work)
void submit_read(uint64_t fork, bool committed, std::function< void()> work)
void dispatch(uint64_t fork, Lane &lane, bool mutating)
std::atomic< int > inflight_
void dispatch_unordered(std::function< void()> work)
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13
std::function< void()> work