de033f56c0
This is a simple multi-producer, single-consumer queue. The intended use case is in the HPA, as we begin supporting hpdatas that move between hpa_shards. We take just a single CAS as the cost to send a message (or a batch of messages) in the low-contention case, and lock-freedom lets us avoid some lock-ordering issues.
305 lines
7.7 KiB
C
305 lines
7.7 KiB
C
#include "test/jemalloc_test.h"
|
|
|
|
#include "jemalloc/internal/mpsc_queue.h"
|
|
|
|
typedef struct elem_s elem_t;
|
|
typedef ql_head(elem_t) elem_list_t;
|
|
typedef mpsc_queue(elem_t) elem_mpsc_queue_t;
|
|
struct elem_s {
|
|
int thread;
|
|
int idx;
|
|
ql_elm(elem_t) link;
|
|
};
|
|
|
|
/* Include both proto and gen to make sure they match up. */
|
|
mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
|
|
elem_list_t);
|
|
mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t,
|
|
elem_list_t, link);
|
|
|
|
static void
|
|
init_elems_simple(elem_t *elems, int nelems, int thread) {
|
|
for (int i = 0; i < nelems; i++) {
|
|
elems[i].thread = thread;
|
|
elems[i].idx = i;
|
|
ql_elm_new(&elems[i], link);
|
|
}
|
|
}
|
|
|
|
static void
|
|
check_elems_simple(elem_list_t *list, int nelems, int thread) {
|
|
elem_t *elem;
|
|
int next_idx = 0;
|
|
ql_foreach(elem, list, link) {
|
|
expect_d_lt(next_idx, nelems, "Too many list items");
|
|
expect_d_eq(thread, elem->thread, "");
|
|
expect_d_eq(next_idx, elem->idx, "List out of order");
|
|
next_idx++;
|
|
}
|
|
}
|
|
|
|
TEST_BEGIN(test_simple) {
|
|
enum {NELEMS = 10};
|
|
elem_t elems[NELEMS];
|
|
elem_list_t list;
|
|
elem_mpsc_queue_t queue;
|
|
|
|
/* Pop empty queue onto empty list -> empty list */
|
|
ql_new(&list);
|
|
elem_mpsc_queue_new(&queue);
|
|
elem_mpsc_queue_pop_batch(&queue, &list);
|
|
expect_true(ql_empty(&list), "");
|
|
|
|
/* Pop empty queue onto nonempty list -> list unchanged */
|
|
ql_new(&list);
|
|
elem_mpsc_queue_new(&queue);
|
|
init_elems_simple(elems, NELEMS, 0);
|
|
for (int i = 0; i < NELEMS; i++) {
|
|
ql_tail_insert(&list, &elems[i], link);
|
|
}
|
|
elem_mpsc_queue_pop_batch(&queue, &list);
|
|
check_elems_simple(&list, NELEMS, 0);
|
|
|
|
/* Pop nonempty queue onto empty list -> list takes queue contents */
|
|
ql_new(&list);
|
|
elem_mpsc_queue_new(&queue);
|
|
init_elems_simple(elems, NELEMS, 0);
|
|
for (int i = 0; i < NELEMS; i++) {
|
|
elem_mpsc_queue_push(&queue, &elems[i]);
|
|
}
|
|
elem_mpsc_queue_pop_batch(&queue, &list);
|
|
check_elems_simple(&list, NELEMS, 0);
|
|
|
|
/* Pop nonempty queue onto nonempty list -> list gains queue contents */
|
|
ql_new(&list);
|
|
elem_mpsc_queue_new(&queue);
|
|
init_elems_simple(elems, NELEMS, 0);
|
|
for (int i = 0; i < NELEMS / 2; i++) {
|
|
ql_tail_insert(&list, &elems[i], link);
|
|
}
|
|
for (int i = NELEMS / 2; i < NELEMS; i++) {
|
|
elem_mpsc_queue_push(&queue, &elems[i]);
|
|
}
|
|
elem_mpsc_queue_pop_batch(&queue, &list);
|
|
check_elems_simple(&list, NELEMS, 0);
|
|
|
|
}
|
|
TEST_END
|
|
|
|
TEST_BEGIN(test_push_single_or_batch) {
|
|
enum {
|
|
BATCH_MAX = 10,
|
|
/*
|
|
* We'll push i items one-at-a-time, then i items as a batch,
|
|
* then i items as a batch again, as i ranges from 1 to
|
|
* BATCH_MAX. So we need 3 times the sum of the numbers from 1
|
|
* to BATCH_MAX elements total.
|
|
*/
|
|
NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2
|
|
};
|
|
elem_t elems[NELEMS];
|
|
init_elems_simple(elems, NELEMS, 0);
|
|
elem_list_t list;
|
|
ql_new(&list);
|
|
elem_mpsc_queue_t queue;
|
|
elem_mpsc_queue_new(&queue);
|
|
int next_idx = 0;
|
|
for (int i = 1; i < 10; i++) {
|
|
/* Push i items 1 at a time. */
|
|
for (int j = 0; j < i; j++) {
|
|
elem_mpsc_queue_push(&queue, &elems[next_idx]);
|
|
next_idx++;
|
|
}
|
|
/* Push i items in batch. */
|
|
for (int j = 0; j < i; j++) {
|
|
ql_tail_insert(&list, &elems[next_idx], link);
|
|
next_idx++;
|
|
}
|
|
elem_mpsc_queue_push_batch(&queue, &list);
|
|
expect_true(ql_empty(&list), "Batch push should empty source");
|
|
/*
|
|
* Push i items in batch, again. This tests two batches
|
|
* proceeding one after the other.
|
|
*/
|
|
for (int j = 0; j < i; j++) {
|
|
ql_tail_insert(&list, &elems[next_idx], link);
|
|
next_idx++;
|
|
}
|
|
elem_mpsc_queue_push_batch(&queue, &list);
|
|
expect_true(ql_empty(&list), "Batch push should empty source");
|
|
}
|
|
expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push.");
|
|
|
|
expect_true(ql_empty(&list), "");
|
|
elem_mpsc_queue_pop_batch(&queue, &list);
|
|
check_elems_simple(&list, NELEMS, 0);
|
|
}
|
|
TEST_END
|
|
|
|
TEST_BEGIN(test_multi_op) {
|
|
enum {NELEMS = 20};
|
|
elem_t elems[NELEMS];
|
|
init_elems_simple(elems, NELEMS, 0);
|
|
elem_list_t push_list;
|
|
ql_new(&push_list);
|
|
elem_list_t result_list;
|
|
ql_new(&result_list);
|
|
elem_mpsc_queue_t queue;
|
|
elem_mpsc_queue_new(&queue);
|
|
|
|
int next_idx = 0;
|
|
/* Push first quarter 1-at-a-time. */
|
|
for (int i = 0; i < NELEMS / 4; i++) {
|
|
elem_mpsc_queue_push(&queue, &elems[next_idx]);
|
|
next_idx++;
|
|
}
|
|
/* Push second quarter in batch. */
|
|
for (int i = NELEMS / 4; i < NELEMS / 2; i++) {
|
|
ql_tail_insert(&push_list, &elems[next_idx], link);
|
|
next_idx++;
|
|
}
|
|
elem_mpsc_queue_push_batch(&queue, &push_list);
|
|
/* Batch pop all pushed elements. */
|
|
elem_mpsc_queue_pop_batch(&queue, &result_list);
|
|
/* Push third quarter in batch. */
|
|
for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) {
|
|
ql_tail_insert(&push_list, &elems[next_idx], link);
|
|
next_idx++;
|
|
}
|
|
elem_mpsc_queue_push_batch(&queue, &push_list);
|
|
/* Push last quarter one-at-a-time. */
|
|
for (int i = 3 * NELEMS / 4; i < NELEMS; i++) {
|
|
elem_mpsc_queue_push(&queue, &elems[next_idx]);
|
|
next_idx++;
|
|
}
|
|
/* Pop them again. Order of existing list should be preserved. */
|
|
elem_mpsc_queue_pop_batch(&queue, &result_list);
|
|
|
|
check_elems_simple(&result_list, NELEMS, 0);
|
|
|
|
}
|
|
TEST_END
|
|
|
|
typedef struct pusher_arg_s pusher_arg_t;
|
|
struct pusher_arg_s {
|
|
elem_mpsc_queue_t *queue;
|
|
int thread;
|
|
elem_t *elems;
|
|
int nelems;
|
|
};
|
|
|
|
typedef struct popper_arg_s popper_arg_t;
|
|
struct popper_arg_s {
|
|
elem_mpsc_queue_t *queue;
|
|
int npushers;
|
|
int nelems_per_pusher;
|
|
int *pusher_counts;
|
|
};
|
|
|
|
static void *
|
|
thd_pusher(void *void_arg) {
|
|
pusher_arg_t *arg = (pusher_arg_t *)void_arg;
|
|
int next_idx = 0;
|
|
while (next_idx < arg->nelems) {
|
|
/* Push 10 items in batch. */
|
|
elem_list_t list;
|
|
ql_new(&list);
|
|
int limit = next_idx + 10;
|
|
while (next_idx < arg->nelems && next_idx < limit) {
|
|
ql_tail_insert(&list, &arg->elems[next_idx], link);
|
|
next_idx++;
|
|
}
|
|
elem_mpsc_queue_push_batch(arg->queue, &list);
|
|
/* Push 10 items one-at-a-time. */
|
|
limit = next_idx + 10;
|
|
while (next_idx < arg->nelems && next_idx < limit) {
|
|
elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]);
|
|
next_idx++;
|
|
}
|
|
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static void *
|
|
thd_popper(void *void_arg) {
|
|
popper_arg_t *arg = (popper_arg_t *)void_arg;
|
|
int done_pushers = 0;
|
|
while (done_pushers < arg->npushers) {
|
|
elem_list_t list;
|
|
ql_new(&list);
|
|
elem_mpsc_queue_pop_batch(arg->queue, &list);
|
|
elem_t *elem;
|
|
ql_foreach(elem, &list, link) {
|
|
int thread = elem->thread;
|
|
int idx = elem->idx;
|
|
expect_d_eq(arg->pusher_counts[thread], idx,
|
|
"Thread's pushes reordered");
|
|
arg->pusher_counts[thread]++;
|
|
if (arg->pusher_counts[thread]
|
|
== arg->nelems_per_pusher) {
|
|
done_pushers++;
|
|
}
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
TEST_BEGIN(test_multiple_threads) {
|
|
enum {
|
|
NPUSHERS = 4,
|
|
NELEMS_PER_PUSHER = 1000*1000,
|
|
};
|
|
thd_t pushers[NPUSHERS];
|
|
pusher_arg_t pusher_arg[NPUSHERS];
|
|
|
|
thd_t popper;
|
|
popper_arg_t popper_arg;
|
|
|
|
elem_mpsc_queue_t queue;
|
|
elem_mpsc_queue_new(&queue);
|
|
|
|
elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t));
|
|
elem_t *elem_iter = elems;
|
|
for (int i = 0; i < NPUSHERS; i++) {
|
|
pusher_arg[i].queue = &queue;
|
|
pusher_arg[i].thread = i;
|
|
pusher_arg[i].elems = elem_iter;
|
|
pusher_arg[i].nelems = NELEMS_PER_PUSHER;
|
|
|
|
init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i);
|
|
elem_iter += NELEMS_PER_PUSHER;
|
|
}
|
|
popper_arg.queue = &queue;
|
|
popper_arg.npushers = NPUSHERS;
|
|
popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER;
|
|
int pusher_counts[NPUSHERS] = {0};
|
|
popper_arg.pusher_counts = pusher_counts;
|
|
|
|
thd_create(&popper, thd_popper, (void *)&popper_arg);
|
|
for (int i = 0; i < NPUSHERS; i++) {
|
|
thd_create(&pushers[i], thd_pusher, &pusher_arg[i]);
|
|
}
|
|
|
|
thd_join(popper, NULL);
|
|
for (int i = 0; i < NPUSHERS; i++) {
|
|
thd_join(pushers[i], NULL);
|
|
}
|
|
|
|
for (int i = 0; i < NPUSHERS; i++) {
|
|
expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], "");
|
|
}
|
|
|
|
free(elems);
|
|
}
|
|
TEST_END
|
|
|
|
int
|
|
main(void) {
|
|
return test_no_reentrancy(
|
|
test_simple,
|
|
test_push_single_or_batch,
|
|
test_multi_op,
|
|
test_multiple_threads);
|
|
}
|