apply patch to nng to be able to control the number of threads used

Patch author: bionicbeagle
Patch source: https://gist.github.com/bionicbeagle/4891eba6279ead5db5f501a60ff2b194
Discussion: https://github.com/nanomsg/nng/issues/1572#issuecomment-1332740743
This commit is contained in:
oxmox 2023-02-03 21:33:39 +01:00
parent 89a300edce
commit e87ace77f6
12 changed files with 132 additions and 12 deletions

View file

@ -1179,6 +1179,11 @@ NNG_DECL int nng_stream_listener_set_ptr(
NNG_DECL int nng_stream_listener_set_addr( NNG_DECL int nng_stream_listener_set_addr(
nng_stream_listener *, const char *, const nng_sockaddr *); nng_stream_listener *, const char *, const nng_sockaddr *);
#define NNG_CAN_LIMIT_THREADS
NNG_DECL void nng_set_ncpu_max(int);
NNG_DECL void nng_set_pool_thread_limit_min(int);
NNG_DECL void nng_set_pool_thread_limit_max(int);
NNG_DECL void nng_set_resolve_thread_max(int);
#ifndef NNG_ELIDE_DEPRECATED #ifndef NNG_ELIDE_DEPRECATED
// These are legacy APIs that have been deprecated. // These are legacy APIs that have been deprecated.

View file

@ -796,8 +796,8 @@ nni_aio_sys_init(void)
#else #else
num_thr = NNG_EXPIRE_THREADS; num_thr = NNG_EXPIRE_THREADS;
#endif #endif
if (num_thr > 256) { if (num_thr > nni_thr_get_pool_thread_limit_max()) {
num_thr = 256; num_thr = nni_thr_get_pool_thread_limit_max();
} }
nni_aio_expire_q_list = nni_aio_expire_q_list =

View file

@ -356,6 +356,9 @@ extern int nni_parse_ip(const char *, nng_sockaddr *);
// nni_parse_ip_port parses an IP address with an optional port appended. // nni_parse_ip_port parses an IP address with an optional port appended.
extern int nni_parse_ip_port(const char *, nng_sockaddr *); extern int nni_parse_ip_port(const char *, nng_sockaddr *);
// nni_set_resolve_thread_max is used to configure the resolve thread pool
extern void nni_set_resolve_thread_max(int);
// //
// IPC (UNIX Domain Sockets & Named Pipes) Support. // IPC (UNIX Domain Sockets & Named Pipes) Support.
// //

View file

@ -242,6 +242,9 @@ nni_taskq_sys_init(void)
#else #else
nthrs = NNG_NUM_TASKQ_THREADS; nthrs = NNG_NUM_TASKQ_THREADS;
#endif #endif
if (nthrs > nni_thr_get_pool_thread_limit_max()) {
nthrs = nni_thr_get_pool_thread_limit_max();
}
#if NNG_MAX_TASKQ_THREADS > 0 #if NNG_MAX_TASKQ_THREADS > 0
if (nthrs > NNG_MAX_TASKQ_THREADS) { if (nthrs > NNG_MAX_TASKQ_THREADS) {
nthrs = NNG_MAX_TASKQ_THREADS; nthrs = NNG_MAX_TASKQ_THREADS;

View file

@ -172,4 +172,46 @@ void
nni_thr_set_name(nni_thr *thr, const char *name) nni_thr_set_name(nni_thr *thr, const char *name)
{ {
nni_plat_thr_set_name(thr != NULL ? &thr->thr : NULL, name); nni_plat_thr_set_name(thr != NULL ? &thr->thr : NULL, name);
} }
static int nni_ncpu_max = 256;
static int nni_pool_thread_limit_min = 2;
static int nni_pool_thread_limit_max = 64;
void
nni_thr_set_ncpu_max(int limit)
{
nni_ncpu_max = limit;
}
int
nni_thr_get_ncpu_max()
{
return nni_ncpu_max;
}
// nni_set_ncpu_max can be used to limit how many threads nng tries to use
// for a single thread pool
void
nni_thr_set_pool_thread_limit_min(int limit)
{
nni_pool_thread_limit_min = limit;
}
int
nni_thr_get_pool_thread_limit_min()
{
return nni_pool_thread_limit_min;
}
void
nni_thr_set_pool_thread_limit_max(int limit)
{
nni_pool_thread_limit_max = limit;
}
int
nni_thr_get_pool_thread_limit_max()
{
return nni_pool_thread_limit_max;
}

View file

@ -82,4 +82,16 @@ extern bool nni_thr_is_self(nni_thr *thr);
// nni_thr_set_name is used to set a short name for the thread. // nni_thr_set_name is used to set a short name for the thread.
extern void nni_thr_set_name(nni_thr *thr, const char *); extern void nni_thr_set_name(nni_thr *thr, const char *);
// nni_set_ncpu_max can be used to limit how many threads nng tries to use
extern void nni_thr_set_ncpu_max(int);
extern int nni_thr_get_ncpu_max();
// nni_set_ncpu_max can be used to limit how many threads nng tries to use
// for a single thread pool
extern void nni_thr_set_pool_thread_limit_min(int);
extern int nni_thr_get_pool_thread_limit_min();
extern void nni_thr_set_pool_thread_limit_max(int);
extern int nni_thr_get_pool_thread_limit_max();
#endif // CORE_THREAD_H #endif // CORE_THREAD_H

View file

@ -1900,3 +1900,27 @@ nng_version(void)
return (xstr(NNG_MAJOR_VERSION) "." xstr(NNG_MINOR_VERSION) "." xstr( return (xstr(NNG_MAJOR_VERSION) "." xstr(NNG_MINOR_VERSION) "." xstr(
NNG_PATCH_VERSION) NNG_RELEASE_SUFFIX); NNG_PATCH_VERSION) NNG_RELEASE_SUFFIX);
} }
void
nng_set_ncpu_max(int limit)
{
nni_thr_set_ncpu_max(limit);
}
void
nng_set_pool_thread_limit_min(int limit)
{
nni_thr_set_pool_thread_limit_min(limit);
}
void
nng_set_pool_thread_limit_max(int limit)
{
nni_thr_set_pool_thread_limit_max(limit);
}
void
nng_set_resolve_thread_max(int limit)
{
nni_set_resolve_thread_max(limit);
}

View file

@ -440,6 +440,14 @@ nni_parse_ip_port(const char *addr, nni_sockaddr *sa)
return (parse_ip(addr, sa, true)); return (parse_ip(addr, sa, true));
} }
static int nng_resolv_concurrency = NNG_RESOLV_CONCURRENCY;
void
nni_set_resolve_thread_max(int limit)
{
nng_resolv_concurrency = limit;
}
int int
nni_posix_resolv_sysinit(void) nni_posix_resolv_sysinit(void)
{ {

View file

@ -444,14 +444,21 @@ nni_plat_fini(void)
int int
nni_plat_ncpu(void) nni_plat_ncpu(void)
{ {
int system_proc_count = 1;
int proc_limit = nni_thr_get_ncpu_max();
// POSIX specifies sysconf exists, but not the value // POSIX specifies sysconf exists, but not the value
// _SC_NPROCESSORS_ONLN. Nonetheless, everybody implements it. // _SC_NPROCESSORS_ONLN. Nonetheless, everybody implements it.
// If you don't we'll assume you only have a single logical CPU. // If you don't we'll assume you only have a single logical CPU.
#ifdef _SC_NPROCESSORS_ONLN #ifdef _SC_NPROCESSORS_ONLN
return (sysconf(_SC_NPROCESSORS_ONLN)); system_proc_count = (sysconf(_SC_NPROCESSORS_ONLN));
#else
return (1);
#endif #endif
if (system_proc_count > proc_limit) {
return proc_limit;
} else {
return system_proc_count;
}
} }
#endif // NNG_PLATFORM_POSIX #endif // NNG_PLATFORM_POSIX

View file

@ -92,11 +92,11 @@ nni_win_io_sysinit(void)
int nthr = nni_plat_ncpu() * 2; int nthr = nni_plat_ncpu() * 2;
// Limits on the thread count. This is fairly arbitrary. // Limits on the thread count. This is fairly arbitrary.
if (nthr < 4) { if (nthr < nni_thr_get_pool_thread_limit_min()) {
nthr = 4; nthr = nni_thr_get_pool_thread_limit_min();
} }
if (nthr > 64) { if (nthr > nni_thr_get_pool_thread_limit_max()) {
nthr = 64; nthr = nni_thr_get_pool_thread_limit_max();
} }
if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) { if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) {
return (NNG_ENOMEM); return (NNG_ENOMEM);

View file

@ -405,6 +405,14 @@ nni_parse_ip_port(const char *addr, nni_sockaddr *sa)
return (parse_ip(addr, sa, true)); return (parse_ip(addr, sa, true));
} }
static int nng_resolv_concurrency = NNG_RESOLV_CONCURRENCY;
void
nni_set_resolve_thread_max(int limit)
{
nng_resolv_concurrency = limit;
}
int int
nni_win_resolv_sysinit(void) nni_win_resolv_sysinit(void)
{ {
@ -413,7 +421,7 @@ nni_win_resolv_sysinit(void)
nni_aio_list_init(&resolv_aios); nni_aio_list_init(&resolv_aios);
resolv_fini = false; resolv_fini = false;
for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { for (int i = 0; i < nng_resolv_concurrency; i++) {
int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
if (rv != 0) { if (rv != 0) {
nni_win_resolv_sysfini(); nni_win_resolv_sysfini();

View file

@ -393,9 +393,17 @@ int
nni_plat_ncpu(void) nni_plat_ncpu(void)
{ {
SYSTEM_INFO info; SYSTEM_INFO info;
int ncpu_max = nni_thr_get_ncpu_max();
int n = 0;
GetSystemInfo(&info); GetSystemInfo(&info);
return ((int) (info.dwNumberOfProcessors)); n = ((int) (info.dwNumberOfProcessors));
if (n > ncpu_max) {
return ncpu_max;
} else {
return n;
}
} }
int int