diff --git a/external/nng/include/nng/nng.h b/external/nng/include/nng/nng.h index cd75495..b3eca27 100644 --- a/external/nng/include/nng/nng.h +++ b/external/nng/include/nng/nng.h @@ -1179,6 +1179,11 @@ NNG_DECL int nng_stream_listener_set_ptr( NNG_DECL int nng_stream_listener_set_addr( nng_stream_listener *, const char *, const nng_sockaddr *); +#define NNG_CAN_LIMIT_THREADS +NNG_DECL void nng_set_ncpu_max(int); +NNG_DECL void nng_set_pool_thread_limit_min(int); +NNG_DECL void nng_set_pool_thread_limit_max(int); +NNG_DECL void nng_set_resolve_thread_max(int); #ifndef NNG_ELIDE_DEPRECATED // These are legacy APIs that have been deprecated. diff --git a/external/nng/src/core/aio.c b/external/nng/src/core/aio.c index dfab8f6..edfc101 100644 --- a/external/nng/src/core/aio.c +++ b/external/nng/src/core/aio.c @@ -796,8 +796,8 @@ nni_aio_sys_init(void) #else num_thr = NNG_EXPIRE_THREADS; #endif - if (num_thr > 256) { - num_thr = 256; + if (num_thr > nni_thr_get_pool_thread_limit_max()) { + num_thr = nni_thr_get_pool_thread_limit_max(); } nni_aio_expire_q_list = diff --git a/external/nng/src/core/platform.h b/external/nng/src/core/platform.h index f1127c5..40bd863 100644 --- a/external/nng/src/core/platform.h +++ b/external/nng/src/core/platform.h @@ -356,6 +356,9 @@ extern int nni_parse_ip(const char *, nng_sockaddr *); // nni_parse_ip_port parses an IP address with an optional port appended. extern int nni_parse_ip_port(const char *, nng_sockaddr *); +// nni_set_resolve_thread_max is used to configure the resolve thread pool +extern void nni_set_resolve_thread_max(int); + // // IPC (UNIX Domain Sockets & Named Pipes) Support. // diff --git a/external/nng/src/core/taskq.c b/external/nng/src/core/taskq.c index e06bc26..af5c035 100644 --- a/external/nng/src/core/taskq.c +++ b/external/nng/src/core/taskq.c @@ -242,6 +242,9 @@ nni_taskq_sys_init(void) #else nthrs = NNG_NUM_TASKQ_THREADS; #endif + if (nthrs > nni_thr_get_pool_thread_limit_max()) { + nthrs = nni_thr_get_pool_thread_limit_max(); + } #if NNG_MAX_TASKQ_THREADS > 0 if (nthrs > NNG_MAX_TASKQ_THREADS) { nthrs = NNG_MAX_TASKQ_THREADS; diff --git a/external/nng/src/core/thread.c b/external/nng/src/core/thread.c index 6f50476..c175414 100644 --- a/external/nng/src/core/thread.c +++ b/external/nng/src/core/thread.c @@ -172,4 +172,46 @@ void nni_thr_set_name(nni_thr *thr, const char *name) { nni_plat_thr_set_name(thr != NULL ? &thr->thr : NULL, name); -} \ No newline at end of file +} + +static int nni_ncpu_max = 256; +static int nni_pool_thread_limit_min = 2; +static int nni_pool_thread_limit_max = 64; + +void +nni_thr_set_ncpu_max(int limit) +{ + nni_ncpu_max = limit; +} + +int +nni_thr_get_ncpu_max() +{ + return nni_ncpu_max; +} + +// nni_set_ncpu_max can be used to limit how many threads nng tries to use +// for a single thread pool +void +nni_thr_set_pool_thread_limit_min(int limit) +{ + nni_pool_thread_limit_min = limit; +} + +int +nni_thr_get_pool_thread_limit_min() +{ + return nni_pool_thread_limit_min; +} + +void +nni_thr_set_pool_thread_limit_max(int limit) +{ + nni_pool_thread_limit_max = limit; +} + +int +nni_thr_get_pool_thread_limit_max() +{ + return nni_pool_thread_limit_max; +} diff --git a/external/nng/src/core/thread.h b/external/nng/src/core/thread.h index 316acdc..76b98ca 100644 --- a/external/nng/src/core/thread.h +++ b/external/nng/src/core/thread.h @@ -82,4 +82,16 @@ extern bool nni_thr_is_self(nni_thr *thr); // nni_thr_set_name is used to set a short name for the thread. extern void nni_thr_set_name(nni_thr *thr, const char *); +// nni_set_ncpu_max can be used to limit how many threads nng tries to use +extern void nni_thr_set_ncpu_max(int); +extern int nni_thr_get_ncpu_max(); + +// nni_set_ncpu_max can be used to limit how many threads nng tries to use +// for a single thread pool +extern void nni_thr_set_pool_thread_limit_min(int); +extern int nni_thr_get_pool_thread_limit_min(); + +extern void nni_thr_set_pool_thread_limit_max(int); +extern int nni_thr_get_pool_thread_limit_max(); + #endif // CORE_THREAD_H diff --git a/external/nng/src/nng.c b/external/nng/src/nng.c index 1ccc138..79fb000 100644 --- a/external/nng/src/nng.c +++ b/external/nng/src/nng.c @@ -1900,3 +1900,27 @@ nng_version(void) return (xstr(NNG_MAJOR_VERSION) "." xstr(NNG_MINOR_VERSION) "." xstr( NNG_PATCH_VERSION) NNG_RELEASE_SUFFIX); } + +void +nng_set_ncpu_max(int limit) +{ + nni_thr_set_ncpu_max(limit); +} + +void +nng_set_pool_thread_limit_min(int limit) +{ + nni_thr_set_pool_thread_limit_min(limit); +} + +void +nng_set_pool_thread_limit_max(int limit) +{ + nni_thr_set_pool_thread_limit_max(limit); +} + +void +nng_set_resolve_thread_max(int limit) +{ + nni_set_resolve_thread_max(limit); +} diff --git a/external/nng/src/platform/posix/posix_resolv_gai.c b/external/nng/src/platform/posix/posix_resolv_gai.c index 83974b3..8e1878b 100644 --- a/external/nng/src/platform/posix/posix_resolv_gai.c +++ b/external/nng/src/platform/posix/posix_resolv_gai.c @@ -440,6 +440,14 @@ nni_parse_ip_port(const char *addr, nni_sockaddr *sa) return (parse_ip(addr, sa, true)); } +static int nng_resolv_concurrency = NNG_RESOLV_CONCURRENCY; + +void +nni_set_resolve_thread_max(int limit) +{ + nng_resolv_concurrency = limit; +} + int nni_posix_resolv_sysinit(void) { diff --git a/external/nng/src/platform/posix/posix_thread.c b/external/nng/src/platform/posix/posix_thread.c index c47bcec..7d68759 100644 --- a/external/nng/src/platform/posix/posix_thread.c +++ b/external/nng/src/platform/posix/posix_thread.c @@ -444,14 +444,21 @@ nni_plat_fini(void) int nni_plat_ncpu(void) { + int system_proc_count = 1; + int proc_limit = nni_thr_get_ncpu_max(); + // POSIX specifies sysconf exists, but not the value // _SC_NPROCESSORS_ONLN. Nonetheless, everybody implements it. // If you don't we'll assume you only have a single logical CPU. #ifdef _SC_NPROCESSORS_ONLN - return (sysconf(_SC_NPROCESSORS_ONLN)); -#else - return (1); + system_proc_count = (sysconf(_SC_NPROCESSORS_ONLN)); #endif + + if (system_proc_count > proc_limit) { + return proc_limit; + } else { + return system_proc_count; + } } #endif // NNG_PLATFORM_POSIX diff --git a/external/nng/src/platform/windows/win_io.c b/external/nng/src/platform/windows/win_io.c index 489dc01..17a047c 100644 --- a/external/nng/src/platform/windows/win_io.c +++ b/external/nng/src/platform/windows/win_io.c @@ -92,11 +92,11 @@ nni_win_io_sysinit(void) int nthr = nni_plat_ncpu() * 2; // Limits on the thread count. This is fairly arbitrary. - if (nthr < 4) { - nthr = 4; + if (nthr < nni_thr_get_pool_thread_limit_min()) { + nthr = nni_thr_get_pool_thread_limit_min(); } - if (nthr > 64) { - nthr = 64; + if (nthr > nni_thr_get_pool_thread_limit_max()) { + nthr = nni_thr_get_pool_thread_limit_max(); } if ((win_io_thrs = NNI_ALLOC_STRUCTS(win_io_thrs, nthr)) == NULL) { return (NNG_ENOMEM); diff --git a/external/nng/src/platform/windows/win_resolv.c b/external/nng/src/platform/windows/win_resolv.c index 8628719..e41a999 100644 --- a/external/nng/src/platform/windows/win_resolv.c +++ b/external/nng/src/platform/windows/win_resolv.c @@ -405,6 +405,14 @@ nni_parse_ip_port(const char *addr, nni_sockaddr *sa) return (parse_ip(addr, sa, true)); } +static int nng_resolv_concurrency = NNG_RESOLV_CONCURRENCY; + +void +nni_set_resolve_thread_max(int limit) +{ + nng_resolv_concurrency = limit; +} + int nni_win_resolv_sysinit(void) { @@ -413,7 +421,7 @@ nni_win_resolv_sysinit(void) nni_aio_list_init(&resolv_aios); resolv_fini = false; - for (int i = 0; i < NNG_RESOLV_CONCURRENCY; i++) { + for (int i = 0; i < nng_resolv_concurrency; i++) { int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL); if (rv != 0) { nni_win_resolv_sysfini(); diff --git a/external/nng/src/platform/windows/win_thread.c b/external/nng/src/platform/windows/win_thread.c index dc9ed12..cdac395 100644 --- a/external/nng/src/platform/windows/win_thread.c +++ b/external/nng/src/platform/windows/win_thread.c @@ -393,9 +393,17 @@ int nni_plat_ncpu(void) { SYSTEM_INFO info; + int ncpu_max = nni_thr_get_ncpu_max(); + int n = 0; GetSystemInfo(&info); - return ((int) (info.dwNumberOfProcessors)); + n = ((int) (info.dwNumberOfProcessors)); + + if (n > ncpu_max) { + return ncpu_max; + } else { + return n; + } } int