Nameless Engine
Loading...
Searching...
No Matches
NodeNotificationBroadcaster.hpp
1#pragma once
2
3// Standard.
4#include <mutex>
5#include <unordered_map>
6#include <vector>
7
8// Custom.
9#include "game/node/callback/NodeFunction.hpp"
10#include "io/Logger.h"
11
12namespace ne {
13 class Node;
14
17 // Only Node can create broadcasters because it has some additional protection code
18 // to avoid shooting yourself in the foot. Additionally, the broadcaster is only valid
19 // when the node that owns the broadcaster is spawned, otherwise the broadcaster does nothing.
20 friend class Node;
21
22 public:
24 virtual ~NodeNotificationBroadcasterBase() = default;
25
28
29 protected:
35 virtual void onOwnerNodeSpawning(Node* pOwnerNode) = 0;
36
42 virtual void onOwnerNodeDespawning(Node* pOwnerNode) = 0;
43 };
44
45 template <typename FunctionReturnType, typename... FunctionArgs> class NodeNotificationBroadcaster;
46
51 template <typename FunctionReturnType, typename... FunctionArgs>
52 class NodeNotificationBroadcaster<FunctionReturnType(FunctionArgs...)>
54 // Only Node can create broadcasters because it has some additional protection code
55 // to avoid shooting yourself in the foot. Additionally, the broadcaster is only valid
56 // when the node that owns the broadcaster is spawned, otherwise the broadcaster does nothing.
57 friend class Node;
58
59 public:
60 virtual ~NodeNotificationBroadcaster() override = default;
61
64
75 void broadcast(FunctionArgs&&... args) {
76 // Make sure we have a spawned owner node.
77 std::scoped_lock ownerGuard(mtxSpawnedOwnerNode.first, mtxCallbacks.first);
78 if (mtxSpawnedOwnerNode.second == nullptr) {
79 // Being in a cleared state - do nothing.
80 return;
81 }
82
83 // don't unlock mutexes yet
84
85 // Make sure we are in the top level `broadcast` call (not called from some
86 // callback that was triggered in other `broadcast` call)
87 // because we will modify callbacks array.
88 bool bIsTopLevelBroadcast = !bIsBroadcasting.test();
89 if (bIsTopLevelBroadcast) {
90 // Mark the start of broadcasting (start of working with callbacks).
91 bIsBroadcasting.test_and_set();
92
93 {
94 // Add new pending callbacks.
95 std::scoped_lock guard(mtxCallbacksToAdd.first);
96 for (auto& [iBindingId, callback] : mtxCallbacksToAdd.second) {
97 mtxCallbacks.second[iBindingId] = std::move(callback);
98 }
99 mtxCallbacksToAdd.second.clear();
100 }
101
102 {
103 // Remove callbacks marked as "to be removed".
104 std::scoped_lock guard(mtxCallbacksToRemove.first);
105 for (const auto& iBindingId : mtxCallbacksToRemove.second) {
106 // Make sure a binding with this ID exists.
107 auto it = mtxCallbacks.second.find(iBindingId);
108 if (it == mtxCallbacks.second.end()) [[unlikely]] {
109 Logger::get().error(std::format(
110 "a callback with binding ID {} was marked to be removed from a "
111 "broadcaster but broadcaster does not have a callback with this ID",
112 iBindingId));
113 continue;
114 }
115 mtxCallbacks.second.erase(it);
116 }
117 mtxCallbacksToRemove.second.clear();
118 }
119
120 // Erase no longer valid callbacks.
121 std::erase_if(mtxCallbacks.second, [](auto& item) { return !item.second.isNodeSpawned(); });
122 }
123
124 // Call registered callbacks.
125 for (auto& [iBindingId, callback] : mtxCallbacks.second) {
126 callback(std::forward<FunctionArgs>(args)...);
127
128 // Make sure our owner node is still spawned because the callback we just
129 // called could have despawned the owner node.
130 if (mtxSpawnedOwnerNode.second == nullptr) {
131 // Owner node was despawned and all callbacks were removed, exit.
132 break;
133 }
134 }
135
136 if (bIsTopLevelBroadcast) {
137 // Finished broadcasting.
138 bIsBroadcasting.clear();
139 }
140 }
141
152 size_t subscribe(const NodeFunction<FunctionReturnType(FunctionArgs...)>& callback) {
153 std::scoped_lock callbacksGuard(mtxCallbacks.first);
154
155 // Generate new binding ID.
156 const auto iNewBindingId = iAvailableBindingId.fetch_add(1);
157 if (iNewBindingId + 1 == ULLONG_MAX) [[unlikely]] {
158 Logger::get().warn(std::format(
159 "\"next available broadcaster binding ID\" is at its maximum value: {}, another "
160 "subscribed callback will cause an overflow",
161 iNewBindingId + 1));
162 }
163
164 // Check if we are inside of a `broadcast` call.
165 if (bIsBroadcasting.test()) {
166 // We are inside of a `broadcast` call (this function is probably called from some
167 // callback that we called inside of our `broadcast` call), don't modify callbacks array
168 // as we are iterating over it. Instead, add this callback as "pending to be added".
169 std::scoped_lock callbacksToAddGuard(mtxCallbacksToAdd.first);
170 mtxCallbacksToAdd.second[iNewBindingId] = callback;
171 } else {
172 // We are not inside of a `broadcast` call, it's safe to modify callbacks array.
173 mtxCallbacks.second[iNewBindingId] = callback;
174 }
175
176 return iNewBindingId;
177 }
178
187 void unsubscribe(size_t iBindingId) {
188 {
189 // First, look if this binding is still pending to be added.
190 std::scoped_lock callbacksToAddGuard(mtxCallbacksToAdd.first);
191 auto it = mtxCallbacksToAdd.second.find(iBindingId);
192 if (it != mtxCallbacksToAdd.second.end()) {
193 // Just remove it from the pending array.
194 mtxCallbacksToAdd.second.erase(it);
195 return;
196 }
197 }
198
199 // Make sure the binding exists in the main array.
200 std::scoped_lock callbacksGuard(mtxCallbacks.first);
201 auto it = mtxCallbacks.second.find(iBindingId);
202 if (it == mtxCallbacks.second.end()) [[unlikely]] {
204 std::format("callback with binding ID {} was not found in the broadcaster", iBindingId));
205 return;
206 }
207
208 // Check if we are inside of a `broadcast` call.
209 if (bIsBroadcasting.test()) {
210 // We are inside of a `broadcast` call (this function is probably called from some
211 // callback that we called inside of our `broadcast` call), don't modify callbacks array
212 // as we are iterating over it. Instead, add this binding ID as "pending to be removed".
213 std::scoped_lock callbacksToRemoveGuard(mtxCallbacksToRemove.first);
214 mtxCallbacksToRemove.second.push_back(iBindingId);
215 } else {
216 // We are not inside of a `broadcast` call, it's safe to modify callbacks array.
217 mtxCallbacks.second.erase(it);
218 }
219 }
220
232 std::scoped_lock guard(mtxCallbacks.first, mtxCallbacksToAdd.first, mtxCallbacksToRemove.first);
233
234 // About returning "estimated" number:
235 // We might check `bIsBroadcasting` and remove callbacks of despawned nodes
236 // but if we are being inside of a `broadcast` call we still would not know
237 // if the returned number is correct or not so we generally can't guarantee
238 // that the returned number is 100% correct.
239
240 const auto iCurrentPlusPending = mtxCallbacks.second.size() + mtxCallbacksToAdd.second.size();
241 const auto iPendingToBeRemoved = mtxCallbacksToRemove.second.size();
242
243 // Make extra sure everything is correct.
244 if (iCurrentPlusPending < iPendingToBeRemoved) [[unlikely]] {
245 Logger::get().error(std::format(
246 "there are more callbacks to be removed than all existing callbacks plus "
247 "pending to be added: currently registered: {}, pending to be added: {}, pending to "
248 "be removed: {}",
249 mtxCallbacks.second.size(),
250 mtxCallbacksToAdd.second.size(),
251 mtxCallbacksToRemove.second.size()));
252 return 0;
253 }
254
255 return iCurrentPlusPending - iPendingToBeRemoved;
256 }
257
258 protected:
259 NodeNotificationBroadcaster() { mtxSpawnedOwnerNode.second = nullptr; };
260
266 virtual void onOwnerNodeSpawning(Node* pOwnerNode) override {
267 std::scoped_lock ownerGuard(mtxSpawnedOwnerNode.first);
268
269 // Make sure we don't have an owner.
270 if (mtxSpawnedOwnerNode.second != nullptr) {
271 Error error(
272 "some node has notified a broadcaster about being spawned but this broadcaster already "
273 "has an owner node");
274 error.showError();
275 throw std::runtime_error(error.getFullErrorMessage());
276 }
277
278 // Save new owner.
279 mtxSpawnedOwnerNode.second = pOwnerNode;
280 }
281
287 virtual void onOwnerNodeDespawning(Node* pOwnerNode) override {
288 std::scoped_lock ownerGuard(mtxSpawnedOwnerNode.first);
289
290 // Make sure the specified owner is indeed our owner node.
291 if (mtxSpawnedOwnerNode.second != pOwnerNode) [[unlikely]] {
292 Logger::get().error("some node notified a broadcaster about it being despawned but this "
293 "broadcaster's owner is not this node");
294 return;
295 }
296
297 // Clear pointer to the owner node (to mark "cleared" state and avoid broadcasting).
298 mtxSpawnedOwnerNode.second = nullptr;
299
300 removeAllCallbacks();
301 }
302
303 private:
306 {
307 std::scoped_lock guard(mtxCallbacks.first);
308 mtxCallbacks.second.clear();
309 }
310
311 {
312 std::scoped_lock guard(mtxCallbacksToAdd.first);
313 mtxCallbacksToAdd.second.clear();
314 }
315
316 {
317 std::scoped_lock guard(mtxCallbacksToRemove.first);
318 mtxCallbacksToRemove.second.clear();
319 }
320 }
321
326 std::pair<
327 std::recursive_mutex,
328 std::unordered_map<size_t, NodeFunction<FunctionReturnType(FunctionArgs...)>>>
330
335 std::pair<
336 std::recursive_mutex,
337 std::unordered_map<size_t, NodeFunction<FunctionReturnType(FunctionArgs...)>>>
339
344 std::pair<std::recursive_mutex, std::vector<size_t>> mtxCallbacksToRemove;
345
347 std::pair<std::recursive_mutex, Node*> mtxSpawnedOwnerNode;
348
350 std::atomic<size_t> iAvailableBindingId{0};
351
353 std::atomic_flag bIsBroadcasting{};
354 };
355} // namespace ne
Definition: Error.h:27
std::string getFullErrorMessage() const
Definition: Error.cpp:84
void showError() const
Definition: Error.cpp:102
void error(std::string_view sText, const std::source_location location=std::source_location::current()) const
Definition: Logger.cpp:75
static Logger & get()
Definition: Logger.cpp:41
void warn(std::string_view sText, const std::source_location location=std::source_location::current()) const
Definition: Logger.cpp:62
Definition: NodeFunction.hpp:11
Definition: NodeNotificationBroadcaster.hpp:16
virtual void onOwnerNodeDespawning(Node *pOwnerNode)=0
virtual void onOwnerNodeSpawning(Node *pOwnerNode)=0
std::pair< std::recursive_mutex, std::vector< size_t > > mtxCallbacksToRemove
Definition: NodeNotificationBroadcaster.hpp:344
void unsubscribe(size_t iBindingId)
Definition: NodeNotificationBroadcaster.hpp:187
std::pair< std::recursive_mutex, std::unordered_map< size_t, NodeFunction< FunctionReturnType(FunctionArgs...)> > > mtxCallbacksToAdd
Definition: NodeNotificationBroadcaster.hpp:338
size_t subscribe(const NodeFunction< FunctionReturnType(FunctionArgs...)> &callback)
Definition: NodeNotificationBroadcaster.hpp:152
void removeAllCallbacks()
Definition: NodeNotificationBroadcaster.hpp:305
std::pair< std::recursive_mutex, Node * > mtxSpawnedOwnerNode
Definition: NodeNotificationBroadcaster.hpp:347
virtual void onOwnerNodeDespawning(Node *pOwnerNode) override
Definition: NodeNotificationBroadcaster.hpp:287
size_t getSubscriberCount()
Definition: NodeNotificationBroadcaster.hpp:231
void broadcast(FunctionArgs &&... args)
Definition: NodeNotificationBroadcaster.hpp:75
std::pair< std::recursive_mutex, std::unordered_map< size_t, NodeFunction< FunctionReturnType(FunctionArgs...)> > > mtxCallbacks
Definition: NodeNotificationBroadcaster.hpp:329
virtual void onOwnerNodeSpawning(Node *pOwnerNode) override
Definition: NodeNotificationBroadcaster.hpp:266
Definition: NodeNotificationBroadcaster.hpp:45
Definition: Node.h:39