Line data Source code
1 : /*
2 : ___________________________________________
3 : | _ ___ _ |
4 : | | | |__ \ | | |
5 : | | |__ ) |__ _ __ _ ___ _ __ | |_ |
6 : | | '_ \ / // _` |/ _` |/ _ \ '_ \| __| | HTTP/2 AGENT FOR MOCK TESTING
7 : | | | | |/ /| (_| | (_| | __/ | | | |_ | Version 0.0.z
8 : | |_| |_|____\__,_|\__, |\___|_| |_|\__| | https://github.com/testillano/h2agent
9 : | __/ | |
10 : | |___/ |
11 : |___________________________________________|
12 :
13 : Licensed under the MIT License <http://opensource.org/licenses/MIT>.
14 : SPDX-License-Identifier: MIT
15 : Copyright (c) 2021 Eduardo Ramos
16 :
17 : Permission is hereby granted, free of charge, to any person obtaining a copy
18 : of this software and associated documentation files (the "Software"), to deal
19 : in the Software without restriction, including without limitation the rights
20 : to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
21 : copies of the Software, and to permit persons to whom the Software is
22 : furnished to do so, subject to the following conditions:
23 :
24 : The above copyright notice and this permission notice shall be included in all
25 : copies or substantial portions of the Software.
26 :
27 : THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
28 : IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
29 : FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
30 : AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
31 : LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
32 : OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
33 : SOFTWARE.
34 : */
35 :
36 : #include <SseManager.hpp>
37 :
38 : #include <ert/tracing/Logger.hpp>
39 :
40 : namespace h2agent
41 : {
42 : namespace model
43 : {
44 :
45 6 : uint64_t SseManager::addConnection(std::unordered_set<std::string> keys, write_cb_t writeCb) {
46 6 : uint64_t id = next_id_.fetch_add(1);
47 6 : std::lock_guard<std::mutex> lock(mutex_);
48 12 : connections_[id] = {std::move(keys), std::move(writeCb)};
49 6 : LOGDEBUG(ert::tracing::Logger::debug(ert::tracing::Logger::asString("SSE connection added (id=%lu, keys=%zu, total=%zu)", id, connections_[id].keys.size(), connections_.size()), ERT_FILE_LOCATION));
50 6 : return id;
51 12 : }
52 :
53 6 : void SseManager::removeConnection(uint64_t id) {
54 6 : std::lock_guard<std::mutex> lock(mutex_);
55 6 : connections_.erase(id);
56 6 : LOGDEBUG(ert::tracing::Logger::debug(ert::tracing::Logger::asString("SSE connection removed (id=%lu, remaining=%zu)", id, connections_.size()), ERT_FILE_LOCATION));
57 6 : }
58 :
59 6 : void SseManager::notify(const std::string &key, const nlohmann::json &value) {
60 : // Format SSE event
61 6 : nlohmann::json data;
62 6 : data["key"] = key;
63 6 : data["value"] = value;
64 6 : std::string event = "event: vault-set\ndata: " + data.dump() + "\n\n";
65 :
66 6 : std::lock_guard<std::mutex> lock(mutex_);
67 11 : for (auto &[id, conn] : connections_) {
68 5 : if (conn.keys.empty() || conn.keys.count(key)) {
69 : try {
70 4 : conn.writeCb(event);
71 0 : } catch (...) {
72 0 : LOGWARNING(ert::tracing::Logger::warning(ert::tracing::Logger::asString("SSE write failed for connection %lu", id), ERT_FILE_LOCATION));
73 0 : }
74 : }
75 : }
76 6 : }
77 :
78 4 : size_t SseManager::activeConnections() const {
79 4 : std::lock_guard<std::mutex> lock(mutex_);
80 8 : return connections_.size();
81 4 : }
82 :
83 : }
84 : }
|