TSD: Add the ability to enter a global slow path.

This gives any thread the ability to send other threads down slow paths the next
time they fetch tsd.
This commit is contained in:
David Goldblatt 2018-04-02 19:16:41 -07:00 committed by David Goldblatt
parent feff510b9f
commit e870829e64
3 changed files with 365 additions and 36 deletions

View File

@ -59,6 +59,9 @@ typedef void (*test_callback_t)(int *);
# define MALLOC_TEST_TSD_INITIALIZER
#endif
/* Various uses of this struct need it to be a named type. */
typedef ql_elm(tsd_t) tsd_link_t;
/* O(name, type, nullable type */
#define MALLOC_TSD \
O(tcache_enabled, bool, bool) \
@ -73,6 +76,7 @@ typedef void (*test_callback_t)(int *);
O(iarena, arena_t *, arena_t *) \
O(arena, arena_t *, arena_t *) \
O(arenas_tdata, arena_tdata_t *, arena_tdata_t *)\
O(link, tsd_link_t, tsd_link_t) \
O(tcache, tcache_t, tcache_t) \
O(witness_tsd, witness_tsd_t, witness_tsdn_t) \
MALLOC_TEST_TSD
@ -91,20 +95,67 @@ typedef void (*test_callback_t)(int *);
NULL, \
NULL, \
NULL, \
{NULL}, \
TCACHE_ZERO_INITIALIZER, \
WITNESS_TSD_INITIALIZER \
MALLOC_TEST_TSD_INITIALIZER \
}
void *malloc_tsd_malloc(size_t size);
void malloc_tsd_dalloc(void *wrapper);
void malloc_tsd_cleanup_register(bool (*f)(void));
tsd_t *malloc_tsd_boot0(void);
void malloc_tsd_boot1(void);
void tsd_cleanup(void *arg);
tsd_t *tsd_fetch_slow(tsd_t *tsd, bool internal);
void tsd_state_set(tsd_t *tsd, uint8_t new_state);
void tsd_slow_update(tsd_t *tsd);
/*
* Call ..._inc when your module wants to take all threads down the slow paths,
* and ..._dec when it no longer needs to.
*/
void tsd_global_slow_inc(tsdn_t *tsdn);
void tsd_global_slow_dec(tsdn_t *tsdn);
enum {
tsd_state_nominal = 0, /* Common case --> jnz. */
tsd_state_nominal_slow = 1, /* Initialized but on slow path. */
/* the above 2 nominal states should be lower values. */
tsd_state_nominal_max = 1, /* used for comparison only. */
tsd_state_minimal_initialized = 2,
tsd_state_purgatory = 3,
tsd_state_reincarnated = 4,
tsd_state_uninitialized = 5
/* Common case --> jnz. */
tsd_state_nominal = 0,
/* Initialized but on slow path. */
tsd_state_nominal_slow = 1,
/*
* Some thread has changed global state in such a way that all nominal
* threads need to recompute their fast / slow status the next time they
* get a chance.
*
* Any thread can change another thread's status *to* recompute, but
* threads are the only ones who can change their status *from*
* recompute.
*/
tsd_state_nominal_recompute = 2,
/*
* The above nominal states should be lower values. We use
* tsd_nominal_max to separate nominal states from threads in the
* process of being born / dying.
*/
tsd_state_nominal_max = 2,
/*
* A thread might free() during its death as its only allocator action;
* in such scenarios, we need tsd, but set up in such a way that no
* cleanup is necessary.
*/
tsd_state_minimal_initialized = 3,
/* States during which we know we're in thread death. */
tsd_state_purgatory = 4,
tsd_state_reincarnated = 5,
/*
* What it says on the tin; tsd that hasn't been initialized. Note
* that even when the tsd struct lives in TLS, when need to keep track
* of stuff like whether or not our pthread destructors have been
* scheduled, so this really truly is different than the nominal state.
*/
tsd_state_uninitialized = 6
};
/*
@ -141,11 +192,6 @@ tsd_state_get(tsd_t *tsd) {
return *(uint8_t *)&tsd->state;
}
JEMALLOC_ALWAYS_INLINE void
tsd_state_set(tsd_t *tsd, uint8_t state) {
atomic_store_u8(&tsd->state, state, ATOMIC_RELAXED);
}
/*
* Wrapper around tsd_t that makes it possible to avoid implicit conversion
* between tsd_t and tsdn_t, where tsdn_t is "nullable" and has to be
@ -172,15 +218,6 @@ tsdn_tsd(tsdn_t *tsdn) {
return &tsdn->tsd;
}
void *malloc_tsd_malloc(size_t size);
void malloc_tsd_dalloc(void *wrapper);
void malloc_tsd_cleanup_register(bool (*f)(void));
tsd_t *malloc_tsd_boot0(void);
void malloc_tsd_boot1(void);
void tsd_cleanup(void *arg);
tsd_t *tsd_fetch_slow(tsd_t *tsd, bool internal);
void tsd_slow_update(tsd_t *tsd);
/*
* We put the platform-specific data declarations and inlines into their own
* header files to avoid cluttering this file. They define tsd_boot0,
@ -213,10 +250,16 @@ MALLOC_TSD
#define O(n, t, nt) \
JEMALLOC_ALWAYS_INLINE t * \
tsd_##n##p_get(tsd_t *tsd) { \
assert(tsd_state_get(tsd) == tsd_state_nominal || \
tsd_state_get(tsd) == tsd_state_nominal_slow || \
tsd_state_get(tsd) == tsd_state_reincarnated || \
tsd_state_get(tsd) == tsd_state_minimal_initialized); \
/* \
* Because the state might change asynchronously if it's \
* nominal, we need to make sure that we only read it once. \
*/ \
uint8_t state = tsd_state_get(tsd); \
assert(state == tsd_state_nominal || \
state == tsd_state_nominal_slow || \
state == tsd_state_nominal_recompute || \
state == tsd_state_reincarnated || \
state == tsd_state_minimal_initialized); \
return tsd_##n##p_get_unsafe(tsd); \
}
MALLOC_TSD
@ -260,6 +303,11 @@ MALLOC_TSD
JEMALLOC_ALWAYS_INLINE void
tsd_assert_fast(tsd_t *tsd) {
/*
* Note that our fastness assertion does *not* include global slowness
* counters; it's not in general possible to ensure that they won't
* change asynchronously from underneath us.
*/
assert(!malloc_slow && tsd_tcache_enabled_get(tsd) &&
tsd_reentrancy_level_get(tsd) == 0);
}

171
src/tsd.c
View File

@ -51,14 +51,159 @@ bool tsd_booted = false;
/******************************************************************************/
/* A list of all the tsds in the nominal state. */
typedef ql_head(tsd_t) tsd_list_t;
static tsd_list_t tsd_nominal_tsds = ql_head_initializer(tsd_nominal_tsds);
static malloc_mutex_t tsd_nominal_tsds_lock;
/* How many slow-path-enabling features are turned on. */
static atomic_u32_t tsd_global_slow_count = ATOMIC_INIT(0);
static bool
tsd_in_nominal_list(tsd_t *tsd) {
tsd_t *tsd_list;
bool found = false;
/*
* We don't know that tsd is nominal; it might not be safe to get data
* out of it here.
*/
malloc_mutex_lock(TSDN_NULL, &tsd_nominal_tsds_lock);
ql_foreach(tsd_list, &tsd_nominal_tsds, TSD_MANGLE(link)) {
if (tsd == tsd_list) {
found = true;
break;
}
}
malloc_mutex_unlock(TSDN_NULL, &tsd_nominal_tsds_lock);
return found;
}
static void
tsd_add_nominal(tsd_t *tsd) {
assert(!tsd_in_nominal_list(tsd));
assert(tsd_state_get(tsd) <= tsd_state_nominal_max);
ql_elm_new(tsd, TSD_MANGLE(link));
malloc_mutex_lock(tsd_tsdn(tsd), &tsd_nominal_tsds_lock);
ql_tail_insert(&tsd_nominal_tsds, tsd, TSD_MANGLE(link));
malloc_mutex_unlock(tsd_tsdn(tsd), &tsd_nominal_tsds_lock);
}
static void
tsd_remove_nominal(tsd_t *tsd) {
assert(tsd_in_nominal_list(tsd));
assert(tsd_state_get(tsd) <= tsd_state_nominal_max);
malloc_mutex_lock(tsd_tsdn(tsd), &tsd_nominal_tsds_lock);
ql_remove(&tsd_nominal_tsds, tsd, TSD_MANGLE(link));
malloc_mutex_unlock(tsd_tsdn(tsd), &tsd_nominal_tsds_lock);
}
static void
tsd_force_recompute(tsdn_t *tsdn) {
/*
* The stores to tsd->state here need to synchronize with the exchange
* in tsd_slow_update.
*/
atomic_fence(ATOMIC_RELEASE);
malloc_mutex_lock(tsdn, &tsd_nominal_tsds_lock);
tsd_t *remote_tsd;
ql_foreach(remote_tsd, &tsd_nominal_tsds, TSD_MANGLE(link)) {
assert(atomic_load_u8(&remote_tsd->state, ATOMIC_RELAXED)
<= tsd_state_nominal_max);
atomic_store_u8(&remote_tsd->state, tsd_state_nominal_recompute,
ATOMIC_RELAXED);
}
malloc_mutex_unlock(tsdn, &tsd_nominal_tsds_lock);
}
void
tsd_global_slow_inc(tsdn_t *tsdn) {
atomic_fetch_add_u32(&tsd_global_slow_count, 1, ATOMIC_RELAXED);
/*
* We unconditionally force a recompute, even if the global slow count
* was already positive. If we didn't, then it would be possible for us
* to return to the user, have the user synchronize externally with some
* other thread, and then have that other thread not have picked up the
* update yet (since the original incrementing thread might still be
* making its way through the tsd list).
*/
tsd_force_recompute(tsdn);
}
void tsd_global_slow_dec(tsdn_t *tsdn) {
atomic_fetch_sub_u32(&tsd_global_slow_count, 1, ATOMIC_RELAXED);
/* See the note in ..._inc(). */
tsd_force_recompute(tsdn);
}
static bool
tsd_local_slow(tsd_t *tsd) {
return !tsd_tcache_enabled_get(tsd)
|| tsd_reentrancy_level_get(tsd) > 0;
}
static bool
tsd_global_slow() {
return atomic_load_u32(&tsd_global_slow_count, ATOMIC_RELAXED) > 0;
}
/******************************************************************************/
static uint8_t
tsd_state_compute(tsd_t *tsd) {
if (!tsd_nominal(tsd)) {
return tsd_state_get(tsd);
}
/* We're in *a* nominal state; but which one? */
if (malloc_slow || tsd_local_slow(tsd) || tsd_global_slow()) {
return tsd_state_nominal_slow;
} else {
return tsd_state_nominal;
}
}
void
tsd_slow_update(tsd_t *tsd) {
if (tsd_nominal(tsd)) {
if (malloc_slow || !tsd_tcache_enabled_get(tsd) ||
tsd_reentrancy_level_get(tsd) > 0) {
tsd_state_set(tsd, tsd_state_nominal_slow);
uint8_t old_state;
do {
uint8_t new_state = tsd_state_compute(tsd);
old_state = atomic_exchange_u8(&tsd->state, new_state,
ATOMIC_ACQUIRE);
} while (old_state == tsd_state_nominal_recompute);
}
void
tsd_state_set(tsd_t *tsd, uint8_t new_state) {
/* Only the tsd module can change the state *to* recompute. */
assert(new_state != tsd_state_nominal_recompute);
uint8_t old_state = atomic_load_u8(&tsd->state, ATOMIC_RELAXED);
if (old_state > tsd_state_nominal_max) {
/*
* Not currently in the nominal list, but it might need to be
* inserted there.
*/
assert(!tsd_in_nominal_list(tsd));
atomic_store_u8(&tsd->state, new_state, ATOMIC_RELAXED);
if (new_state <= tsd_state_nominal_max) {
tsd_add_nominal(tsd);
}
} else {
/*
* We're currently nominal. If the new state is non-nominal,
* great; we take ourselves off the list and just enter the new
* state.
*/
assert(tsd_in_nominal_list(tsd));
if (new_state > tsd_state_nominal_max) {
tsd_remove_nominal(tsd);
atomic_store_u8(&tsd->state, new_state, ATOMIC_RELAXED);
} else {
tsd_state_set(tsd, tsd_state_nominal);
/*
* This is the tricky case. We're transitioning from
* one nominal state to another. The caller can't know
* about any races that are occuring at the same time,
* so we always have to recompute no matter what.
*/
tsd_slow_update(tsd);
}
}
}
@ -118,10 +263,14 @@ tsd_fetch_slow(tsd_t *tsd, bool minimal) {
assert(!tsd_fast(tsd));
if (tsd_state_get(tsd) == tsd_state_nominal_slow) {
/* On slow path but no work needed. */
assert(malloc_slow || !tsd_tcache_enabled_get(tsd) ||
tsd_reentrancy_level_get(tsd) > 0 ||
*tsd_arenas_tdata_bypassp_get(tsd));
/*
* On slow path but no work needed. Note that we can't
* necessarily *assert* that we're slow, because we might be
* slow because of an asynchronous modification to global state,
* which might be asynchronously modified *back*.
*/
} else if (tsd_state_get(tsd) == tsd_state_nominal_recompute) {
tsd_slow_update(tsd);
} else if (tsd_state_get(tsd) == tsd_state_uninitialized) {
if (!minimal) {
tsd_state_set(tsd, tsd_state_nominal);
@ -260,6 +409,10 @@ malloc_tsd_boot0(void) {
tsd_t *tsd;
ncleanups = 0;
if (malloc_mutex_init(&tsd_nominal_tsds_lock, "tsd_nominal_tsds_lock",
WITNESS_RANK_OMIT, malloc_mutex_rank_exclusive)) {
return NULL;
}
if (tsd_boot0()) {
return NULL;
}

View File

@ -1,5 +1,10 @@
#include "test/jemalloc_test.h"
/*
* If we're e.g. in debug mode, we *never* enter the fast path, and so shouldn't
* be asserting that we're on one.
*/
static bool originally_fast;
static int data_cleanup_count;
void
@ -124,6 +129,128 @@ TEST_BEGIN(test_tsd_reincarnation) {
}
TEST_END
typedef struct {
atomic_u32_t phase;
atomic_b_t error;
} global_slow_data_t;
static void *
thd_start_global_slow(void *arg) {
/* PHASE 0 */
global_slow_data_t *data = (global_slow_data_t *)arg;
free(mallocx(1, 0));
tsd_t *tsd = tsd_fetch();
/*
* No global slowness has happened yet; there was an error if we were
* originally fast but aren't now.
*/
atomic_store_b(&data->error, originally_fast && !tsd_fast(tsd),
ATOMIC_SEQ_CST);
atomic_store_u32(&data->phase, 1, ATOMIC_SEQ_CST);
/* PHASE 2 */
while (atomic_load_u32(&data->phase, ATOMIC_SEQ_CST) != 2) {
}
free(mallocx(1, 0));
atomic_store_b(&data->error, tsd_fast(tsd), ATOMIC_SEQ_CST);
atomic_store_u32(&data->phase, 3, ATOMIC_SEQ_CST);
/* PHASE 4 */
while (atomic_load_u32(&data->phase, ATOMIC_SEQ_CST) != 4) {
}
free(mallocx(1, 0));
atomic_store_b(&data->error, tsd_fast(tsd), ATOMIC_SEQ_CST);
atomic_store_u32(&data->phase, 5, ATOMIC_SEQ_CST);
/* PHASE 6 */
while (atomic_load_u32(&data->phase, ATOMIC_SEQ_CST) != 6) {
}
free(mallocx(1, 0));
/* Only one decrement so far. */
atomic_store_b(&data->error, tsd_fast(tsd), ATOMIC_SEQ_CST);
atomic_store_u32(&data->phase, 7, ATOMIC_SEQ_CST);
/* PHASE 8 */
while (atomic_load_u32(&data->phase, ATOMIC_SEQ_CST) != 8) {
}
free(mallocx(1, 0));
/*
* Both decrements happened; we should be fast again (if we ever
* were)
*/
atomic_store_b(&data->error, originally_fast && !tsd_fast(tsd),
ATOMIC_SEQ_CST);
atomic_store_u32(&data->phase, 9, ATOMIC_SEQ_CST);
return NULL;
}
TEST_BEGIN(test_tsd_global_slow) {
global_slow_data_t data = {ATOMIC_INIT(0), ATOMIC_INIT(false)};
/*
* Note that the "mallocx" here (vs. malloc) is important, since the
* compiler is allowed to optimize away free(malloc(1)) but not
* free(mallocx(1)).
*/
free(mallocx(1, 0));
tsd_t *tsd = tsd_fetch();
originally_fast = tsd_fast(tsd);
thd_t thd;
thd_create(&thd, thd_start_global_slow, (void *)&data.phase);
/* PHASE 1 */
while (atomic_load_u32(&data.phase, ATOMIC_SEQ_CST) != 1) {
/*
* We don't have a portable condvar/semaphore mechanism.
* Spin-wait.
*/
}
assert_false(atomic_load_b(&data.error, ATOMIC_SEQ_CST), "");
tsd_global_slow_inc(tsd_tsdn(tsd));
free(mallocx(1, 0));
assert_false(tsd_fast(tsd), "");
atomic_store_u32(&data.phase, 2, ATOMIC_SEQ_CST);
/* PHASE 3 */
while (atomic_load_u32(&data.phase, ATOMIC_SEQ_CST) != 3) {
}
assert_false(atomic_load_b(&data.error, ATOMIC_SEQ_CST), "");
/* Increase again, so that we can test multiple fast/slow changes. */
tsd_global_slow_inc(tsd_tsdn(tsd));
atomic_store_u32(&data.phase, 4, ATOMIC_SEQ_CST);
free(mallocx(1, 0));
assert_false(tsd_fast(tsd), "");
/* PHASE 5 */
while (atomic_load_u32(&data.phase, ATOMIC_SEQ_CST) != 5) {
}
assert_false(atomic_load_b(&data.error, ATOMIC_SEQ_CST), "");
tsd_global_slow_dec(tsd_tsdn(tsd));
atomic_store_u32(&data.phase, 6, ATOMIC_SEQ_CST);
/* We only decreased once; things should still be slow. */
free(mallocx(1, 0));
assert_false(tsd_fast(tsd), "");
/* PHASE 7 */
while (atomic_load_u32(&data.phase, ATOMIC_SEQ_CST) != 7) {
}
assert_false(atomic_load_b(&data.error, ATOMIC_SEQ_CST), "");
tsd_global_slow_dec(tsd_tsdn(tsd));
atomic_store_u32(&data.phase, 8, ATOMIC_SEQ_CST);
/* We incremented and then decremented twice; we should be fast now. */
free(mallocx(1, 0));
assert_true(!originally_fast || tsd_fast(tsd), "");
/* PHASE 9 */
while (atomic_load_u32(&data.phase, ATOMIC_SEQ_CST) != 9) {
}
assert_false(atomic_load_b(&data.error, ATOMIC_SEQ_CST), "");
thd_join(thd, NULL);
}
TEST_END
int
main(void) {
/* Ensure tsd bootstrapped. */
@ -135,5 +262,6 @@ main(void) {
return test_no_reentrancy(
test_tsd_main_thread,
test_tsd_sub_thread,
test_tsd_reincarnation);
test_tsd_reincarnation,
test_tsd_global_slow);
}