OpenVPN 3 Core Library
Loading...
Searching...
No Matches
runcontext.hpp
Go to the documentation of this file.
1// OpenVPN -- An application to securely tunnel IP networks
2// over a single port, with support for SSL/TLS-based
3// session authentication and key exchange,
4// packet encryption, packet authentication, and
5// packet compression.
6//
7// Copyright (C) 2012- OpenVPN Inc.
8//
9// SPDX-License-Identifier: MPL-2.0 OR AGPL-3.0-only WITH openvpn3-openssl-exception
10//
11
12// Manage a pool of threads for a multi-threaded server.
13//
14// To stress test this code, in client after serv->start() add:
15// if (unit == 3 || unit == 5)
16// throw Exception("HIT IT");
17// And after "case PThreadBarrier::ERROR_SIGNAL:"
18// if (unit & 1)
19// break;
20
21#ifndef OPENVPN_COMMON_RUNCONTEXT_H
22#define OPENVPN_COMMON_RUNCONTEXT_H
23
24#include <iostream>
25#include <string>
26#include <vector>
27#include <thread>
28#include <mutex>
29#include <memory>
30#include <type_traits> // for std::is_nothrow_move_constructible
31#include <utility>
32
43#include <openvpn/time/time.hpp>
47
48#ifdef ASIO_HAS_LOCAL_SOCKETS
50#endif
51
52namespace openvpn {
53
55{
56 RunContextLogEntry(const time_t timestamp_arg, const std::string &text_arg)
57 : timestamp(timestamp_arg),
58 text(text_arg)
59 {
60 }
61
62 time_t timestamp;
63 std::string text;
64};
65
66template <typename RC_TYPE>
67struct ServerThreadType : public virtual RC_TYPE
68{
71
72 virtual void thread_safe_stop() = 0;
73
74 virtual void log_notify(const RunContextLogEntry &le)
75 {
76 }
77};
78
81
82struct RunContextBase : public LogBase
83{
84 virtual void cancel() = 0;
85 virtual std::vector<RunContextLogEntry> add_log_observer(const unsigned int unit) = 0;
86 virtual void disable_log_history() = 0;
87 virtual Stop *async_stop() = 0;
88};
89
90template <typename ServerThread, typename Stats>
92{
93 public:
95
97 {
98 public:
100 : ctx(ctx_arg)
101 {
102 ctx.add_thread();
103 }
104
106 {
108 }
109
110 private:
112 };
113
123
124 openvpn_io::io_context *io_context_ptr()
125 {
126 return &io_context;
127 }
128
133
135 {
136 log_reopen = std::move(lr);
137 }
138
139 void set_thread(const unsigned int unit, std::thread *thread)
140 {
141 while (threadlist.size() <= unit)
142 threadlist.push_back(nullptr);
143 if (threadlist[unit])
144 throw Exception("RunContext::set_thread: overwrite");
145 threadlist[unit] = thread;
146 }
147
148 // called from worker thread
149 void set_server(const unsigned int unit, ServerThread *serv)
150 {
151 std::lock_guard<std::recursive_mutex> lock(mutex);
152 if (halt)
153 throw Exception("RunContext::set_server: halting");
154 while (servlist.size() <= unit)
155 servlist.push_back(nullptr);
156 if (servlist[unit])
157 throw Exception("RunContext::set_server: overwrite");
158 servlist[unit] = serv;
159 }
160
161 // called from worker thread
162 void clear_server(const unsigned int unit)
163 {
164 std::lock_guard<std::recursive_mutex> lock(mutex);
165 if (unit < servlist.size())
166 servlist[unit] = nullptr;
167
168 // remove log observer entry, if present
169 auto lu = std::find(log_observers.begin(), log_observers.end(), unit);
170 if (lu != log_observers.end())
171 log_observers.erase(lu);
172 }
173
174 std::vector<typename ServerThread::Ptr> get_servers()
175 {
176 std::lock_guard<std::recursive_mutex> lock(mutex);
177 std::vector<typename ServerThread::Ptr> ret;
178 if (halt)
179 return ret;
180 ret.reserve(servlist.size());
181 for (auto sp : servlist)
182 ret.emplace_back(sp);
183 return ret;
184 }
185
187 {
188 std::lock_guard<std::recursive_mutex> lock(mutex);
189 if (!log_history)
190 log_history.reset(new std::vector<RunContextLogEntry>());
191 }
192
193 void disable_log_history() override
194 {
195 std::lock_guard<std::recursive_mutex> lock(mutex);
196 log_history.reset();
197 }
198
199 std::vector<RunContextLogEntry> add_log_observer(const unsigned int unit) override
200 {
201 std::lock_guard<std::recursive_mutex> lock(mutex);
202 auto lu = std::find(log_observers.begin(), log_observers.end(), unit);
203 if (lu == log_observers.end())
204 log_observers.push_back(unit);
205 if (log_history)
206 return *log_history;
207 else
208 return std::vector<RunContextLogEntry>();
209 }
210
211#ifdef ASIO_HAS_LOCAL_SOCKETS
212 void set_exit_socket(ScopedFD &fd)
213 {
214 exit_sock.reset(new openvpn_io::posix::stream_descriptor(io_context, fd.release()));
215 exit_sock->async_read_some(openvpn_io::null_buffers(),
216 [self = Ptr(this)](const openvpn_io::error_code &error, const size_t bytes_recvd)
217 {
218 if (!error)
219 self->cancel();
220 });
221 }
222#endif
223
224 void set_prefix(const std::string &pre)
225 {
226 prefix = pre + ": ";
227 }
228
229 void run()
230 {
231 if (!halt)
232 io_context.run();
233 }
234
235 void join()
236 {
237 for (size_t i = 0; i < threadlist.size(); ++i)
238 {
239 std::thread *t = threadlist[i];
240 if (t)
241 {
242 t->join();
243 delete t;
244 threadlist[i] = nullptr;
245 }
246 }
247 }
248
249 template <typename SVC>
250 void process_exception(const std::string &thread_name,
251 const unsigned int unit,
252 const bool io_context_run_called,
253 openvpn_io::io_context &io_context,
254 SVC &svc,
255 PThreadBarrier &event_loop_bar,
256 const std::exception &e)
257 {
258 event_loop_bar.error();
259 if (svc)
260 {
261 clear_server(unit);
262 svc->stop(); // on exception, stop service,
263 }
264 if (io_context_run_called)
265 io_context.poll(); // execute completion handlers,
266 OPENVPN_LOG(thread_name << " thread exception: " << e.what());
267 }
268
269 void log(const std::string &str) override
270 {
271 time_t now;
272 const std::string ts = date_time_store_time_t(now);
273 {
274 std::lock_guard<std::recursive_mutex> lock(mutex);
275 std::cout << ts << ' ' << str << std::flush;
276
277 if (!log_observers.empty() || log_history)
278 {
279 const RunContextLogEntry le(now, str);
280 for (auto &si : log_observers)
281 {
282 ServerThread *st = servlist[si];
283 if (st)
284 st->log_notify(le);
285 }
286 if (log_history)
287 log_history->emplace_back(now, str);
288 }
289 }
290 }
291
292 // called from main or worker thread
293 void cancel() override
294 {
295 if (halt)
296 return;
297 openvpn_io::post(io_context, [self = Ptr(this)]()
298 {
299 std::lock_guard<std::recursive_mutex> lock(self->mutex);
300 if (self->halt)
301 return;
302 self->halt = true;
303
304 // async stop
305 if (self->async_stop_)
306 self->async_stop_->stop();
307
308 self->exit_timer.cancel();
309#ifdef ASIO_HAS_LOCAL_SOCKETS
310 self->exit_sock.reset();
311#endif
312 if (self->signals)
313 self->signals->cancel();
314
315 // stop threads
316 {
317 unsigned int stopped = 0;
318 for (size_t i = 0; i < self->servlist.size(); ++i)
319 {
320 ServerThread* serv = self->servlist[i];
321 if (serv)
322 {
323 serv->thread_safe_stop();
324 ++stopped;
325 }
326 self->servlist[i] = nullptr;
327 }
328 OPENVPN_LOG(self->prefix << "Stopping " << stopped << '/' << self->servlist.size() << " thread(s)");
329 } });
330 }
331
333 {
334 return log_wrap;
335 }
336
337 void set_stats_obj(const typename Stats::Ptr &stats_arg)
338 {
339 stats = stats_arg;
340 }
341
342 Stop *async_stop() override
343 {
344 return async_stop_;
345 }
346
347 private:
348 // called from main or worker thread
350 {
351 std::lock_guard<std::recursive_mutex> lock(mutex);
352 ++thread_count;
353 }
354
355 // called from main or worker thread
357 {
358 bool last = false;
359 {
360 std::lock_guard<std::recursive_mutex> lock(mutex);
361 last = (--thread_count <= 0);
362 }
363 if (last)
364 cancel();
365 }
366
367 protected:
368 virtual void signal(const openvpn_io::error_code &error, int signum)
369 {
370 if (!error && !halt)
371 {
372 OPENVPN_LOG("ASIO SIGNAL: " << signal_name(signum));
373 switch (signum)
374 {
375 case SIGINT:
376 case SIGTERM:
377 cancel();
378 break;
379#if !defined(OPENVPN_PLATFORM_WIN)
380 case SIGUSR2:
381 if (stats)
382 OPENVPN_LOG(stats->dump());
383 signal_rearm();
384 break;
385 case SIGHUP:
386 if (log_reopen)
388 signal_rearm();
389 break;
390#endif
391 default:
392 signal_rearm();
393 break;
394 }
395 }
396 }
397
398 private:
400 {
401 signals->register_signals_all([self = Ptr(this)](const openvpn_io::error_code &error, int signal_number)
402 { self->signal(error, signal_number); });
403 }
404
405 // debugging feature -- exit in n seconds
407 {
408 const std::string exit_in = Environ::find_static("EXIT_IN");
409 if (exit_in.empty())
410 return;
411 const unsigned int n_sec = parse_number_throw<unsigned int>(exit_in, "error parsing EXIT_IN");
412 exit_timer.expires_after(Time::Duration::seconds(n_sec));
413 exit_timer.async_wait([self = Ptr(this)](const openvpn_io::error_code &error)
414 {
415 if (error || self->halt)
416 return;
417 OPENVPN_LOG("DEBUG EXIT");
418 self->cancel(); });
419 }
420
421 // these vars only used by main thread
422 openvpn_io::io_context io_context{1};
423 typename Stats::Ptr stats;
426 std::string prefix;
427 std::vector<std::thread *> threadlist;
428#ifdef ASIO_HAS_LOCAL_SOCKETS
429 std::unique_ptr<openvpn_io::posix::stream_descriptor> exit_sock;
430#endif
431
432 // main lock
433 std::recursive_mutex mutex;
434
435 // servlist and related vars protected by mutex
436 std::vector<ServerThread *> servlist;
438
439 // stop
440 Stop *async_stop_ = nullptr;
441
442 // log observers
443 std::vector<unsigned int> log_observers; // unit numbers of log observers
444 std::unique_ptr<std::vector<RunContextLogEntry>> log_history;
445
446 // logging
448 Log::Context::Wrapper log_wrap; // must be constructed after log_context
450
451 protected:
452 volatile bool halt = false;
453};
454
455} // namespace openvpn
456
457#endif
void thread_safe_stop() override
void register_signals_all(SignalHandler stop_handler)
std::size_t expires_after(const Time::Duration &d)
Definition asiotimer.hpp:69
static std::string find_static(const std::string &name)
Definition environ.hpp:31
virtual void reopen()=0
The smart pointer class.
Definition rc.hpp:119
void reset() noexcept
Points this RCPtr<T> to nullptr safely.
Definition rc.hpp:290
implements a weak pointer for reference counted objects.
Definition rc.hpp:451
ThreadContext(RunContext &ctx_arg)
void set_server(const unsigned int unit, ServerThread *serv)
std::unique_ptr< std::vector< RunContextLogEntry > > log_history
openvpn_io::io_context io_context
void set_async_stop(Stop *async_stop)
Log::Context::Wrapper log_wrap
std::vector< RunContextLogEntry > add_log_observer(const unsigned int unit) override
void set_thread(const unsigned int unit, std::thread *thread)
RCPtr< RunContext > Ptr
Stop * async_stop() override
ASIOSignals::Ptr signals
void clear_server(const unsigned int unit)
std::vector< ServerThread * > servlist
void log(const std::string &str) override
std::vector< typename ServerThread::Ptr > get_servers()
void cancel() override
void set_log_reopen(LogSetup::Ptr lr)
void set_stats_obj(const typename Stats::Ptr &stats_arg)
Log::Context log_context
const Log::Context::Wrapper & log_wrapper()
std::recursive_mutex mutex
std::vector< std::thread * > threadlist
virtual void signal(const openvpn_io::error_code &error, int signum)
LogSetup::Ptr log_reopen
void process_exception(const std::string &thread_name, const unsigned int unit, const bool io_context_run_called, openvpn_io::io_context &io_context, SVC &svc, PThreadBarrier &event_loop_bar, const std::exception &e)
std::vector< unsigned int > log_observers
openvpn_io::io_context * io_context_ptr()
volatile bool halt
void set_prefix(const std::string &pre)
void disable_log_history() override
#define OPENVPN_LOG(args)
Support deferred server-side state creation when client connects.
Definition ovpncli.cpp:95
ServerThreadType< RC< thread_safe_refcount > > ServerThreadBase
std::string date_time_store_time_t(time_t &save)
Definition timestr.hpp:150
ServerThreadType< RCWeak< thread_safe_refcount > > ServerThreadWeakBase
std::string signal_name(const int signum)
The logging interface, simple, logs a string.
Argument to construct a Context in a different thread.
Scoped RAII for the global_log pointer.
virtual void cancel()=0
virtual std::vector< RunContextLogEntry > add_log_observer(const unsigned int unit)=0
virtual void disable_log_history()=0
virtual Stop * async_stop()=0
RunContextLogEntry(const time_t timestamp_arg, const std::string &text_arg)
virtual void log_notify(const RunContextLogEntry &le)
RCWeakPtr< ServerThreadType > WPtr
virtual void thread_safe_stop()=0
RCPtr< ServerThreadType > Ptr
std::string ret
std::vector< std::complex< double > > svc