From 464cb60490efda800625b16fedd5bcd238e1526e Mon Sep 17 00:00:00 2001 From: Qi Wang Date: Thu, 8 Jun 2017 22:46:31 -0700 Subject: [PATCH] Move background thread creation to background_thread_0. To avoid complications, avoid invoking pthread_create "internally", instead rely on thread0 to launch new threads, and also terminating threads when asked. --- .../internal/background_thread_externs.h | 2 - .../internal/background_thread_structs.h | 2 + src/background_thread.c | 505 +++++++++++------- src/ctl.c | 12 +- 4 files changed, 313 insertions(+), 208 deletions(-) diff --git a/include/jemalloc/internal/background_thread_externs.h b/include/jemalloc/internal/background_thread_externs.h index aef1c90b..7c883697 100644 --- a/include/jemalloc/internal/background_thread_externs.h +++ b/include/jemalloc/internal/background_thread_externs.h @@ -10,8 +10,6 @@ extern background_thread_info_t *background_thread_info; bool background_thread_create(tsd_t *tsd, unsigned arena_ind); bool background_threads_enable(tsd_t *tsd); bool background_threads_disable(tsd_t *tsd); -bool background_threads_disable_single(tsd_t *tsd, - background_thread_info_t *info); void background_thread_interval_check(tsdn_t *tsdn, arena_t *arena, arena_decay_t *decay, size_t npages_new); void background_thread_prefork0(tsdn_t *tsdn); diff --git a/include/jemalloc/internal/background_thread_structs.h b/include/jemalloc/internal/background_thread_structs.h index edf90fe2..f6ad4adb 100644 --- a/include/jemalloc/internal/background_thread_structs.h +++ b/include/jemalloc/internal/background_thread_structs.h @@ -18,6 +18,8 @@ struct background_thread_info_s { malloc_mutex_t mtx; /* Whether the thread has been created. */ bool started; + /* Pause execution (for arena reset / destroy). */ + bool pause; /* When true, it means no wakeup scheduled. */ atomic_b_t indefinite_sleep; /* Next scheduled wakeup time (absolute time in ns). */ diff --git a/src/background_thread.c b/src/background_thread.c index 51d23cb9..f2bc474a 100644 --- a/src/background_thread.c +++ b/src/background_thread.c @@ -50,8 +50,6 @@ pthread_create_wrapper(pthread_t *__restrict thread, const pthread_attr_t *attr, bool background_thread_create(tsd_t *tsd, unsigned arena_ind) NOT_REACHED bool background_threads_enable(tsd_t *tsd) NOT_REACHED bool background_threads_disable(tsd_t *tsd) NOT_REACHED -bool background_threads_disable_single(tsd_t *tsd, - background_thread_info_t *info) NOT_REACHED void background_thread_interval_check(tsdn_t *tsdn, arena_t *arena, arena_decay_t *decay, size_t npages_new) NOT_REACHED void background_thread_prefork0(tsdn_t *tsdn) NOT_REACHED @@ -67,8 +65,9 @@ void background_thread_ctl_init(tsdn_t *tsdn) NOT_REACHED static bool background_thread_enabled_at_fork; static void -background_thread_info_reinit(tsdn_t *tsdn, background_thread_info_t *info) { +background_thread_info_init(tsdn_t *tsdn, background_thread_info_t *info) { background_thread_wakeup_time_set(tsdn, info, 0); + info->pause = false; info->npages_to_purge_new = 0; if (config_stats) { info->tot_n_runs = 0; @@ -210,211 +209,107 @@ arena_decay_compute_purge_interval(tsdn_t *tsdn, arena_t *arena) { return i1 < i2 ? i1 : i2; } -static inline uint64_t -background_work_once(tsdn_t *tsdn, unsigned ind) { - arena_t *arena; - unsigned i, narenas; - uint64_t min_interval; +static void +background_thread_sleep(tsdn_t *tsdn, background_thread_info_t *info, + uint64_t interval) { + if (config_stats) { + info->tot_n_runs++; + } + info->npages_to_purge_new = 0; - min_interval = BACKGROUND_THREAD_INDEFINITE_SLEEP; - narenas = narenas_total_get(); - for (i = ind; i < narenas; i += ncpus) { - arena = arena_get(tsdn, i, false); + struct timeval tv; + /* Specific clock required by timedwait. */ + gettimeofday(&tv, NULL); + nstime_t before_sleep; + nstime_init2(&before_sleep, tv.tv_sec, tv.tv_usec * 1000); + + int ret; + if (interval == BACKGROUND_THREAD_INDEFINITE_SLEEP) { + assert(background_thread_indefinite_sleep(info)); + ret = pthread_cond_wait(&info->cond, &info->mtx.lock); + assert(ret == 0); + } else { + assert(interval >= BACKGROUND_THREAD_MIN_INTERVAL_NS && + interval <= BACKGROUND_THREAD_INDEFINITE_SLEEP); + /* We need malloc clock (can be different from tv). */ + nstime_t next_wakeup; + nstime_init(&next_wakeup, 0); + nstime_update(&next_wakeup); + nstime_iadd(&next_wakeup, interval); + assert(nstime_ns(&next_wakeup) < + BACKGROUND_THREAD_INDEFINITE_SLEEP); + background_thread_wakeup_time_set(tsdn, info, + nstime_ns(&next_wakeup)); + + nstime_t ts_wakeup; + nstime_copy(&ts_wakeup, &before_sleep); + nstime_iadd(&ts_wakeup, interval); + struct timespec ts; + ts.tv_sec = (size_t)nstime_sec(&ts_wakeup); + ts.tv_nsec = (size_t)nstime_nsec(&ts_wakeup); + + assert(!background_thread_indefinite_sleep(info)); + ret = pthread_cond_timedwait(&info->cond, &info->mtx.lock, &ts); + assert(ret == ETIMEDOUT || ret == 0); + background_thread_wakeup_time_set(tsdn, info, + BACKGROUND_THREAD_INDEFINITE_SLEEP); + } + if (config_stats) { + gettimeofday(&tv, NULL); + nstime_t after_sleep; + nstime_init2(&after_sleep, tv.tv_sec, tv.tv_usec * 1000); + if (nstime_compare(&after_sleep, &before_sleep) > 0) { + nstime_subtract(&after_sleep, &before_sleep); + nstime_add(&info->tot_sleep_time, &after_sleep); + } + } + while (info->pause) { + malloc_mutex_unlock(tsdn, &info->mtx); + /* Wait on global lock to update status. */ + malloc_mutex_lock(tsdn, &background_thread_lock); + malloc_mutex_unlock(tsdn, &background_thread_lock); + malloc_mutex_lock(tsdn, &info->mtx); + } +} + +static inline void +background_work_sleep_once(tsdn_t *tsdn, background_thread_info_t *info, unsigned ind) { + uint64_t min_interval = BACKGROUND_THREAD_INDEFINITE_SLEEP; + unsigned narenas = narenas_total_get(); + + for (unsigned i = ind; i < narenas; i += ncpus) { + arena_t *arena = arena_get(tsdn, i, false); if (!arena) { continue; } - arena_decay(tsdn, arena, true, false); + if (min_interval == BACKGROUND_THREAD_MIN_INTERVAL_NS) { + /* Min interval will be used. */ + continue; + } uint64_t interval = arena_decay_compute_purge_interval(tsdn, arena); - if (interval == BACKGROUND_THREAD_MIN_INTERVAL_NS) { - return interval; - } - - assert(interval > BACKGROUND_THREAD_MIN_INTERVAL_NS); + assert(interval >= BACKGROUND_THREAD_MIN_INTERVAL_NS); if (min_interval > interval) { min_interval = interval; } } - - return min_interval; + background_thread_sleep(tsdn, info, min_interval); } -static void -background_work(tsdn_t *tsdn, unsigned ind) { - int ret; - background_thread_info_t *info = &background_thread_info[ind]; - - malloc_mutex_lock(tsdn, &info->mtx); - background_thread_wakeup_time_set(tsdn, info, - BACKGROUND_THREAD_INDEFINITE_SLEEP); - while (info->started) { - uint64_t interval = background_work_once(tsdn, ind); - if (config_stats) { - info->tot_n_runs++; - } - info->npages_to_purge_new = 0; - - struct timeval tv; - /* Specific clock required by timedwait. */ - gettimeofday(&tv, NULL); - nstime_t before_sleep; - nstime_init2(&before_sleep, tv.tv_sec, tv.tv_usec * 1000); - - if (interval == BACKGROUND_THREAD_INDEFINITE_SLEEP) { - assert(background_thread_indefinite_sleep(info)); - ret = pthread_cond_wait(&info->cond, &info->mtx.lock); - assert(ret == 0); - } else { - assert(interval >= BACKGROUND_THREAD_MIN_INTERVAL_NS && - interval <= BACKGROUND_THREAD_INDEFINITE_SLEEP); - /* We need malloc clock (can be different from tv). */ - nstime_t next_wakeup; - nstime_init(&next_wakeup, 0); - nstime_update(&next_wakeup); - nstime_iadd(&next_wakeup, interval); - assert(nstime_ns(&next_wakeup) < - BACKGROUND_THREAD_INDEFINITE_SLEEP); - background_thread_wakeup_time_set(tsdn, info, - nstime_ns(&next_wakeup)); - - nstime_t ts_wakeup; - nstime_copy(&ts_wakeup, &before_sleep); - nstime_iadd(&ts_wakeup, interval); - struct timespec ts; - ts.tv_sec = (size_t)nstime_sec(&ts_wakeup); - ts.tv_nsec = (size_t)nstime_nsec(&ts_wakeup); - - assert(!background_thread_indefinite_sleep(info)); - ret = pthread_cond_timedwait(&info->cond, - &info->mtx.lock, &ts); - assert(ret == ETIMEDOUT || ret == 0); - background_thread_wakeup_time_set(tsdn, info, - BACKGROUND_THREAD_INDEFINITE_SLEEP); - } - - if (config_stats) { - gettimeofday(&tv, NULL); - nstime_t after_sleep; - nstime_init2(&after_sleep, tv.tv_sec, tv.tv_usec * 1000); - if (nstime_compare(&after_sleep, &before_sleep) > 0) { - nstime_subtract(&after_sleep, &before_sleep); - nstime_add(&info->tot_sleep_time, &after_sleep); - } - } - } - background_thread_wakeup_time_set(tsdn, info, 0); - malloc_mutex_unlock(tsdn, &info->mtx); -} - -static void * -background_thread_entry(void *ind_arg) { - unsigned thread_ind = (unsigned)(uintptr_t)ind_arg; - assert(thread_ind < narenas_total_get() && thread_ind < ncpus); - - if (opt_percpu_arena != percpu_arena_disabled) { - set_current_thread_affinity((int)thread_ind); - } - /* - * Start periodic background work. We use internal tsd which avoids - * side effects, for example triggering new arena creation (which in - * turn triggers another background thread creation). - */ - background_work(tsd_tsdn(tsd_internal_fetch()), thread_ind); - assert(pthread_equal(pthread_self(), - background_thread_info[thread_ind].thread)); - - return NULL; -} - -/* Create a new background thread if needed. */ -bool -background_thread_create(tsd_t *tsd, unsigned arena_ind) { - assert(have_background_thread); - malloc_mutex_assert_owner(tsd_tsdn(tsd), &background_thread_lock); - - /* We create at most NCPUs threads. */ - size_t thread_ind = arena_ind % ncpus; - background_thread_info_t *info = &background_thread_info[thread_ind]; - - bool need_new_thread; - malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); - need_new_thread = background_thread_enabled() && !info->started; - if (need_new_thread) { - info->started = true; - background_thread_info_reinit(tsd_tsdn(tsd), info); - n_background_threads++; - } - malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); - if (!need_new_thread) { - return false; - } - - pre_reentrancy(tsd); - malloc_mutex_unlock(tsd_tsdn(tsd), &background_thread_lock); - /* - * To avoid complications (besides reentrancy), create internal - * background threads with the underlying pthread_create, and drop - * background_thread_lock (pthread_create may take internal locks). - */ - int err = pthread_create_wrapper(&info->thread, NULL, - background_thread_entry, (void *)thread_ind); - malloc_mutex_lock(tsd_tsdn(tsd), &background_thread_lock); - post_reentrancy(tsd); - - if (err != 0) { - malloc_printf(": arena %u background thread creation " - "failed (%d).\n", arena_ind, err); - malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); - info->started = false; - n_background_threads--; - malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); - - return true; - } - assert(info->started); - - return false; -} - -bool -background_threads_enable(tsd_t *tsd) { - assert(n_background_threads == 0); - assert(background_thread_enabled()); - malloc_mutex_assert_owner(tsd_tsdn(tsd), &background_thread_lock); - - VARIABLE_ARRAY(bool, created, ncpus); - unsigned i, ncreated; - for (i = 0; i < ncpus; i++) { - created[i] = false; - } - ncreated = 0; - - unsigned n = narenas_total_get(); - for (i = 0; i < n; i++) { - if (created[i % ncpus] || - arena_get(tsd_tsdn(tsd), i, false) == NULL) { - continue; - } - if (background_thread_create(tsd, i)) { - return true; - } - created[i % ncpus] = true; - if (++ncreated == ncpus) { - break; - } - } - - return false; -} - -bool +static bool background_threads_disable_single(tsd_t *tsd, background_thread_info_t *info) { - malloc_mutex_assert_owner(tsd_tsdn(tsd), &background_thread_lock); - pre_reentrancy(tsd); + if (info == &background_thread_info[0]) { + malloc_mutex_assert_owner(tsd_tsdn(tsd), + &background_thread_lock); + } else { + malloc_mutex_assert_not_owner(tsd_tsdn(tsd), + &background_thread_lock); + } - bool has_thread; + pre_reentrancy(tsd); malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + bool has_thread; if (info->started) { has_thread = true; info->started = false; @@ -440,14 +335,221 @@ background_threads_disable_single(tsd_t *tsd, background_thread_info_t *info) { return false; } +static void *background_thread_entry(void *ind_arg); + +static void +check_background_thread_creation(tsd_t *tsd, unsigned *n_created, + bool *created_threads) { + if (likely(*n_created == n_background_threads)) { + return; + } + + for (unsigned i = 1; i < ncpus; i++) { + if (created_threads[i]) { + continue; + } + background_thread_info_t *info = &background_thread_info[i]; + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + if (info->started) { + pre_reentrancy(tsd); + int err = pthread_create_wrapper(&info->thread, NULL, + background_thread_entry, (void *)(uintptr_t)i); + post_reentrancy(tsd); + + if (err == 0) { + (*n_created)++; + created_threads[i] = true; + } else { + malloc_printf(": background thread " + "creation failed (%d)\n", err); + if (opt_abort) { + abort(); + } + } + } + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); + } +} + +static void +background_thread0_work(tsd_t *tsd) { + /* Thread0 is also responsible for launching / terminating threads. */ + VARIABLE_ARRAY(bool, created_threads, ncpus); + unsigned i; + for (i = 1; i < ncpus; i++) { + created_threads[i] = false; + } + /* Start working, and create more threads when asked. */ + unsigned n_created = 1; + while (background_thread_info[0].started) { + check_background_thread_creation(tsd, &n_created, + (bool *)&created_threads); + background_work_sleep_once(tsd_tsdn(tsd), + &background_thread_info[0], 0); + } + + /* + * Shut down other threads at exit. Note that the ctl thread is holding + * the global background_thread mutex (and is waiting) for us. + */ + assert(!background_thread_enabled()); + for (i = 1; i < ncpus; i++) { + background_thread_info_t *info = &background_thread_info[i]; + if (created_threads[i]) { + background_threads_disable_single(tsd, info); + } else { + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + /* Clear in case the thread wasn't created. */ + info->started = false; + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); + } + } + background_thread_info[0].started = false; + assert(n_background_threads == 1); +} + +static void +background_work(tsd_t *tsd, unsigned ind) { + background_thread_info_t *info = &background_thread_info[ind]; + + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + background_thread_wakeup_time_set(tsd_tsdn(tsd), info, + BACKGROUND_THREAD_INDEFINITE_SLEEP); + if (ind == 0) { + background_thread0_work(tsd); + } else { + while (info->started) { + background_work_sleep_once(tsd_tsdn(tsd), info, ind); + } + } + background_thread_wakeup_time_set(tsd_tsdn(tsd), info, 0); + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); +} + +static void * +background_thread_entry(void *ind_arg) { + unsigned thread_ind = (unsigned)(uintptr_t)ind_arg; + assert(thread_ind < ncpus); + + if (opt_percpu_arena != percpu_arena_disabled) { + set_current_thread_affinity((int)thread_ind); + } + /* + * Start periodic background work. We use internal tsd which avoids + * side effects, for example triggering new arena creation (which in + * turn triggers another background thread creation). + */ + background_work(tsd_internal_fetch(), thread_ind); + assert(pthread_equal(pthread_self(), + background_thread_info[thread_ind].thread)); + + return NULL; +} + +static void +background_thread_init(tsd_t *tsd, background_thread_info_t *info) { + malloc_mutex_assert_owner(tsd_tsdn(tsd), &background_thread_lock); + info->started = true; + background_thread_info_init(tsd_tsdn(tsd), info); + n_background_threads++; +} + +/* Create a new background thread if needed. */ +bool +background_thread_create(tsd_t *tsd, unsigned arena_ind) { + assert(have_background_thread); + malloc_mutex_assert_owner(tsd_tsdn(tsd), &background_thread_lock); + + /* We create at most NCPUs threads. */ + size_t thread_ind = arena_ind % ncpus; + background_thread_info_t *info = &background_thread_info[thread_ind]; + + bool need_new_thread; + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + need_new_thread = background_thread_enabled() && !info->started; + if (need_new_thread) { + background_thread_init(tsd, info); + } + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); + if (!need_new_thread) { + return false; + } + if (arena_ind != 0) { + /* Threads are created asynchronously by Thread 0. */ + background_thread_info_t *t0 = &background_thread_info[0]; + malloc_mutex_lock(tsd_tsdn(tsd), &t0->mtx); + assert(t0->started); + pthread_cond_signal(&t0->cond); + malloc_mutex_unlock(tsd_tsdn(tsd), &t0->mtx); + + return false; + } + + pre_reentrancy(tsd); + /* + * To avoid complications (besides reentrancy), create internal + * background threads with the underlying pthread_create. + */ + int err = pthread_create_wrapper(&info->thread, NULL, + background_thread_entry, (void *)thread_ind); + post_reentrancy(tsd); + + if (err != 0) { + malloc_printf(": arena 0 background thread creation " + "failed (%d)\n", err); + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + info->started = false; + n_background_threads--; + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); + + return true; + } + + return false; +} + +bool +background_threads_enable(tsd_t *tsd) { + assert(n_background_threads == 0); + assert(background_thread_enabled()); + malloc_mutex_assert_owner(tsd_tsdn(tsd), &background_thread_lock); + + VARIABLE_ARRAY(bool, marked, ncpus); + unsigned i, nmarked; + for (i = 0; i < ncpus; i++) { + marked[i] = false; + } + nmarked = 0; + /* Mark the threads we need to create for thread 0. */ + unsigned n = narenas_total_get(); + for (i = 1; i < n; i++) { + if (marked[i % ncpus] || + arena_get(tsd_tsdn(tsd), i, false) == NULL) { + continue; + } + background_thread_info_t *info = &background_thread_info[i]; + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + assert(!info->started); + background_thread_init(tsd, info); + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); + marked[i % ncpus] = true; + if (++nmarked == ncpus) { + break; + } + } + + return background_thread_create(tsd, 0); +} + bool background_threads_disable(tsd_t *tsd) { assert(!background_thread_enabled()); - for (unsigned i = 0; i < ncpus; i++) { - background_thread_info_t *info = &background_thread_info[i]; - if (background_threads_disable_single(tsd, info)) { - return true; - } + malloc_mutex_assert_owner(tsd_tsdn(tsd), &background_thread_lock); + + /* Thread 0 will be responsible for terminating other threads. */ + if (background_threads_disable_single(tsd, + &background_thread_info[0])) { + return true; } assert(n_background_threads == 0); @@ -460,7 +562,6 @@ background_thread_interval_check(tsdn_t *tsdn, arena_t *arena, arena_decay_t *decay, size_t npages_new) { background_thread_info_t *info = arena_background_thread_info_get( arena); - if (malloc_mutex_trylock(tsdn, &info->mtx)) { /* * Background thread may hold the mutex for a long period of @@ -474,7 +575,6 @@ background_thread_interval_check(tsdn_t *tsdn, arena_t *arena, if (!info->started) { goto label_done; } - assert(background_thread_enabled()); if (malloc_mutex_trylock(tsdn, &decay->mtx)) { goto label_done; } @@ -646,7 +746,7 @@ bool background_thread_boot0(void) { if (!have_background_thread && opt_background_thread) { malloc_printf(": option background_thread currently " - "supports pthread only. \n"); + "supports pthread only\n"); return true; } @@ -686,9 +786,10 @@ background_thread_boot1(tsdn_t *tsdn) { for (unsigned i = 0; i < ncpus; i++) { background_thread_info_t *info = &background_thread_info[i]; + /* Thread mutex is rank_inclusive because of thread0. */ if (malloc_mutex_init(&info->mtx, "background_thread", WITNESS_RANK_BACKGROUND_THREAD, - malloc_mutex_rank_exclusive)) { + malloc_mutex_address_ordered)) { return true; } if (pthread_cond_init(&info->cond, NULL)) { @@ -696,7 +797,7 @@ background_thread_boot1(tsdn_t *tsdn) { } malloc_mutex_lock(tsdn, &info->mtx); info->started = false; - background_thread_info_reinit(tsdn, info); + background_thread_info_init(tsdn, info); malloc_mutex_unlock(tsdn, &info->mtx); } #endif diff --git a/src/ctl.c b/src/ctl.c index 134dbac9..242b36d4 100644 --- a/src/ctl.c +++ b/src/ctl.c @@ -1948,8 +1948,10 @@ arena_reset_prepare_background_thread(tsd_t *tsd, unsigned arena_ind) { unsigned ind = arena_ind % ncpus; background_thread_info_t *info = &background_thread_info[ind]; - assert(info->started); - background_threads_disable_single(tsd, info); + assert(info->started && !info->pause); + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + info->pause = true; + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); } } } @@ -1961,8 +1963,10 @@ arena_reset_finish_background_thread(tsd_t *tsd, unsigned arena_ind) { unsigned ind = arena_ind % ncpus; background_thread_info_t *info = &background_thread_info[ind]; - assert(!info->started); - background_thread_create(tsd, ind); + assert(info->started && info->pause); + malloc_mutex_lock(tsd_tsdn(tsd), &info->mtx); + info->pause = false; + malloc_mutex_unlock(tsd_tsdn(tsd), &info->mtx); } malloc_mutex_unlock(tsd_tsdn(tsd), &background_thread_lock); }