mpsc_queue: Add module.

This is a simple multi-producer, single-consumer queue.  The intended use case
is in the HPA, as we begin supporting hpdatas that move between hpa_shards.  We
take just a single CAS as the cost to send a message (or a batch of messages) in
the low-contention case, and lock-freedom lets us avoid some lock-ordering
issues.
This commit is contained in:
David Goldblatt 2021-05-11 14:49:55 -07:00 committed by David Goldblatt
parent 4452a4812f
commit de033f56c0
3 changed files with 439 additions and 0 deletions

View File

@ -233,6 +233,7 @@ TESTS_UNIT := \
$(srcroot)test/unit/malloc_conf_2.c \
$(srcroot)test/unit/malloc_io.c \
$(srcroot)test/unit/math.c \
$(srcroot)test/unit/mpsc_queue.c \
$(srcroot)test/unit/mq.c \
$(srcroot)test/unit/mtx.c \
$(srcroot)test/unit/nstime.c \

View File

@ -0,0 +1,134 @@
#ifndef JEMALLOC_INTERNAL_MPSC_QUEUE_H
#define JEMALLOC_INTERNAL_MPSC_QUEUE_H
#include "jemalloc/internal/atomic.h"
/*
* A concurrent implementation of a multi-producer, single-consumer queue. It
* supports three concurrent operations:
* - Push
* - Push batch
* - Pop batch
*
* These operations are all lock-free.
*
* The implementation is the simple two-stack queue built on a Treiber stack.
* It's not terribly efficient, but this isn't expected to go into anywhere with
* hot code. In fact, we don't really even need queue semantics in any
* anticipated use cases; we could get away with just the stack. But this way
* lets us frame the API in terms of the existing list types, which is a nice
* convenience. We can save on cache misses by introducing our own (parallel)
* single-linked list type here, and dropping FIFO semantics, if we need this to
* get faster. Since we're currently providing queue semantics though, we use
* the prev field in the link rather than the next field for Treiber-stack
* linkage, so that we can preserve order for bash-pushed lists (recall that the
* two-stack tricks reverses orders in the lock-free first stack).
*/
#define mpsc_queue(a_type) \
struct { \
atomic_p_t tail; \
}
#define mpsc_queue_proto(a_attr, a_prefix, a_queue_type, a_type, \
a_list_type) \
/* Initialize a queue. */ \
a_attr void \
a_prefix##new(a_queue_type *queue); \
/* Insert all items in src into the queue, clearing src. */ \
a_attr void \
a_prefix##push_batch(a_queue_type *queue, a_list_type *src); \
/* Insert node into the queue. */ \
a_attr void \
a_prefix##push(a_queue_type *queue, a_type *node); \
/* \
* Pop all items in the queue into the list at dst. dst should already \
* be initialized (and may contain existing items, which then remain \
* in dst). \
*/ \
a_attr void \
a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst);
#define mpsc_queue_gen(a_attr, a_prefix, a_queue_type, a_type, \
a_list_type, a_link) \
a_attr void \
a_prefix##new(a_queue_type *queue) { \
atomic_store_p(&queue->tail, NULL, ATOMIC_RELAXED); \
} \
a_attr void \
a_prefix##push_batch(a_queue_type *queue, a_list_type *src) { \
/* \
* Reuse the ql list next field as the Treiber stack next \
* field. \
*/ \
a_type *first = ql_first(src); \
a_type *last = ql_last(src, a_link); \
void* cur_tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED); \
do { \
/* \
* Note that this breaks the queue ring structure; \
* it's not a ring any more! \
*/ \
first->a_link.qre_prev = cur_tail; \
/* \
* Note: the upcoming CAS doesn't need an atomic; every \
* push only needs to synchronize with the next pop, \
* which we get from the release sequence rules. \
*/ \
} while (!atomic_compare_exchange_weak_p(&queue->tail, \
&cur_tail, last, ATOMIC_RELEASE, ATOMIC_RELAXED)); \
ql_new(src); \
} \
a_attr void \
a_prefix##push(a_queue_type *queue, a_type *node) { \
ql_elm_new(node, a_link); \
a_list_type list; \
ql_new(&list); \
ql_head_insert(&list, node, a_link); \
a_prefix##push_batch(queue, &list); \
} \
a_attr void \
a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst) { \
a_type *tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED); \
if (tail == NULL) { \
/* \
* In the common special case where there are no \
* pending elements, bail early without a costly RMW. \
*/ \
return; \
} \
tail = atomic_exchange_p(&queue->tail, NULL, ATOMIC_ACQUIRE); \
/* \
* It's a single-consumer queue, so if cur started non-NULL, \
* it'd better stay non-NULL. \
*/ \
assert(tail != NULL); \
/* \
* We iterate through the stack and both fix up the link \
* structure (stack insertion broke the list requirement that \
* the list be circularly linked). It's just as efficient at \
* this point to make the queue a "real" queue, so do that as \
* well. \
* If this ever gets to be a hot spot, we can omit this fixup \
* and make the queue a bag (i.e. not necessarily ordered), but \
* that would mean jettisoning the existing list API as the \
* batch pushing/popping interface. \
*/ \
a_list_type reversed; \
ql_new(&reversed); \
while (tail != NULL) { \
/* \
* Pop an item off the stack, prepend it onto the list \
* (reversing the order). Recall that we use the \
* list prev field as the Treiber stack next field to \
* preserve order of batch-pushed items when reversed. \
*/ \
a_type *next = tail->a_link.qre_prev; \
ql_elm_new(tail, a_link); \
ql_head_insert(&reversed, tail, a_link); \
tail = next; \
} \
ql_concat(dst, &reversed, a_link); \
}
#endif /* JEMALLOC_INTERNAL_MPSC_QUEUE_H */

304
test/unit/mpsc_queue.c Normal file
View File

@ -0,0 +1,304 @@
#include "test/jemalloc_test.h"
#include "jemalloc/internal/mpsc_queue.h"
typedef struct elem_s elem_t;
typedef ql_head(elem_t) elem_list_t;
typedef mpsc_queue(elem_t) elem_mpsc_queue_t;
struct elem_s {
int thread;
int idx;
ql_elm(elem_t) link;
};
/* Include both proto and gen to make sure they match up. */
mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
elem_list_t);
mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
elem_list_t, link);
static void
init_elems_simple(elem_t *elems, int nelems, int thread) {
for (int i = 0; i < nelems; i++) {
elems[i].thread = thread;
elems[i].idx = i;
ql_elm_new(&elems[i], link);
}
}
static void
check_elems_simple(elem_list_t *list, int nelems, int thread) {
elem_t *elem;
int next_idx = 0;
ql_foreach(elem, list, link) {
expect_d_lt(next_idx, nelems, "Too many list items");
expect_d_eq(thread, elem->thread, "");
expect_d_eq(next_idx, elem->idx, "List out of order");
next_idx++;
}
}
TEST_BEGIN(test_simple) {
enum {NELEMS = 10};
elem_t elems[NELEMS];
elem_list_t list;
elem_mpsc_queue_t queue;
/* Pop empty queue onto empty list -> empty list */
ql_new(&list);
elem_mpsc_queue_new(&queue);
elem_mpsc_queue_pop_batch(&queue, &list);
expect_true(ql_empty(&list), "");
/* Pop empty queue onto nonempty list -> list unchanged */
ql_new(&list);
elem_mpsc_queue_new(&queue);
init_elems_simple(elems, NELEMS, 0);
for (int i = 0; i < NELEMS; i++) {
ql_tail_insert(&list, &elems[i], link);
}
elem_mpsc_queue_pop_batch(&queue, &list);
check_elems_simple(&list, NELEMS, 0);
/* Pop nonempty queue onto empty list -> list takes queue contents */
ql_new(&list);
elem_mpsc_queue_new(&queue);
init_elems_simple(elems, NELEMS, 0);
for (int i = 0; i < NELEMS; i++) {
elem_mpsc_queue_push(&queue, &elems[i]);
}
elem_mpsc_queue_pop_batch(&queue, &list);
check_elems_simple(&list, NELEMS, 0);
/* Pop nonempty queue onto nonempty list -> list gains queue contents */
ql_new(&list);
elem_mpsc_queue_new(&queue);
init_elems_simple(elems, NELEMS, 0);
for (int i = 0; i < NELEMS / 2; i++) {
ql_tail_insert(&list, &elems[i], link);
}
for (int i = NELEMS / 2; i < NELEMS; i++) {
elem_mpsc_queue_push(&queue, &elems[i]);
}
elem_mpsc_queue_pop_batch(&queue, &list);
check_elems_simple(&list, NELEMS, 0);
}
TEST_END
TEST_BEGIN(test_push_single_or_batch) {
enum {
BATCH_MAX = 10,
/*
* We'll push i items one-at-a-time, then i items as a batch,
* then i items as a batch again, as i ranges from 1 to
* BATCH_MAX. So we need 3 times the sum of the numbers from 1
* to BATCH_MAX elements total.
*/
NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2
};
elem_t elems[NELEMS];
init_elems_simple(elems, NELEMS, 0);
elem_list_t list;
ql_new(&list);
elem_mpsc_queue_t queue;
elem_mpsc_queue_new(&queue);
int next_idx = 0;
for (int i = 1; i < 10; i++) {
/* Push i items 1 at a time. */
for (int j = 0; j < i; j++) {
elem_mpsc_queue_push(&queue, &elems[next_idx]);
next_idx++;
}
/* Push i items in batch. */
for (int j = 0; j < i; j++) {
ql_tail_insert(&list, &elems[next_idx], link);
next_idx++;
}
elem_mpsc_queue_push_batch(&queue, &list);
expect_true(ql_empty(&list), "Batch push should empty source");
/*
* Push i items in batch, again. This tests two batches
* proceeding one after the other.
*/
for (int j = 0; j < i; j++) {
ql_tail_insert(&list, &elems[next_idx], link);
next_idx++;
}
elem_mpsc_queue_push_batch(&queue, &list);
expect_true(ql_empty(&list), "Batch push should empty source");
}
expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push.");
expect_true(ql_empty(&list), "");
elem_mpsc_queue_pop_batch(&queue, &list);
check_elems_simple(&list, NELEMS, 0);
}
TEST_END
TEST_BEGIN(test_multi_op) {
enum {NELEMS = 20};
elem_t elems[NELEMS];
init_elems_simple(elems, NELEMS, 0);
elem_list_t push_list;
ql_new(&push_list);
elem_list_t result_list;
ql_new(&result_list);
elem_mpsc_queue_t queue;
elem_mpsc_queue_new(&queue);
int next_idx = 0;
/* Push first quarter 1-at-a-time. */
for (int i = 0; i < NELEMS / 4; i++) {
elem_mpsc_queue_push(&queue, &elems[next_idx]);
next_idx++;
}
/* Push second quarter in batch. */
for (int i = NELEMS / 4; i < NELEMS / 2; i++) {
ql_tail_insert(&push_list, &elems[next_idx], link);
next_idx++;
}
elem_mpsc_queue_push_batch(&queue, &push_list);
/* Batch pop all pushed elements. */
elem_mpsc_queue_pop_batch(&queue, &result_list);
/* Push third quarter in batch. */
for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) {
ql_tail_insert(&push_list, &elems[next_idx], link);
next_idx++;
}
elem_mpsc_queue_push_batch(&queue, &push_list);
/* Push last quarter one-at-a-time. */
for (int i = 3 * NELEMS / 4; i < NELEMS; i++) {
elem_mpsc_queue_push(&queue, &elems[next_idx]);
next_idx++;
}
/* Pop them again. Order of existing list should be preserved. */
elem_mpsc_queue_pop_batch(&queue, &result_list);
check_elems_simple(&result_list, NELEMS, 0);
}
TEST_END
typedef struct pusher_arg_s pusher_arg_t;
struct pusher_arg_s {
elem_mpsc_queue_t *queue;
int thread;
elem_t *elems;
int nelems;
};
typedef struct popper_arg_s popper_arg_t;
struct popper_arg_s {
elem_mpsc_queue_t *queue;
int npushers;
int nelems_per_pusher;
int *pusher_counts;
};
static void *
thd_pusher(void *void_arg) {
pusher_arg_t *arg = (pusher_arg_t *)void_arg;
int next_idx = 0;
while (next_idx < arg->nelems) {
/* Push 10 items in batch. */
elem_list_t list;
ql_new(&list);
int limit = next_idx + 10;
while (next_idx < arg->nelems && next_idx < limit) {
ql_tail_insert(&list, &arg->elems[next_idx], link);
next_idx++;
}
elem_mpsc_queue_push_batch(arg->queue, &list);
/* Push 10 items one-at-a-time. */
limit = next_idx + 10;
while (next_idx < arg->nelems && next_idx < limit) {
elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]);
next_idx++;
}
}
return NULL;
}
static void *
thd_popper(void *void_arg) {
popper_arg_t *arg = (popper_arg_t *)void_arg;
int done_pushers = 0;
while (done_pushers < arg->npushers) {
elem_list_t list;
ql_new(&list);
elem_mpsc_queue_pop_batch(arg->queue, &list);
elem_t *elem;
ql_foreach(elem, &list, link) {
int thread = elem->thread;
int idx = elem->idx;
expect_d_eq(arg->pusher_counts[thread], idx,
"Thread's pushes reordered");
arg->pusher_counts[thread]++;
if (arg->pusher_counts[thread]
== arg->nelems_per_pusher) {
done_pushers++;
}
}
}
return NULL;
}
TEST_BEGIN(test_multiple_threads) {
enum {
NPUSHERS = 4,
NELEMS_PER_PUSHER = 1000*1000,
};
thd_t pushers[NPUSHERS];
pusher_arg_t pusher_arg[NPUSHERS];
thd_t popper;
popper_arg_t popper_arg;
elem_mpsc_queue_t queue;
elem_mpsc_queue_new(&queue);
elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t));
elem_t *elem_iter = elems;
for (int i = 0; i < NPUSHERS; i++) {
pusher_arg[i].queue = &queue;
pusher_arg[i].thread = i;
pusher_arg[i].elems = elem_iter;
pusher_arg[i].nelems = NELEMS_PER_PUSHER;
init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i);
elem_iter += NELEMS_PER_PUSHER;
}
popper_arg.queue = &queue;
popper_arg.npushers = NPUSHERS;
popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER;
int pusher_counts[NPUSHERS] = {0};
popper_arg.pusher_counts = pusher_counts;
thd_create(&popper, thd_popper, (void *)&popper_arg);
for (int i = 0; i < NPUSHERS; i++) {
thd_create(&pushers[i], thd_pusher, &pusher_arg[i]);
}
thd_join(popper, NULL);
for (int i = 0; i < NPUSHERS; i++) {
thd_join(pushers[i], NULL);
}
for (int i = 0; i < NPUSHERS; i++) {
expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], "");
}
free(elems);
}
TEST_END
int
main(void) {
return test_no_reentrancy(
test_simple,
test_push_single_or_batch,
test_multi_op,
test_multiple_threads);
}