de033f56c0
This is a simple multi-producer, single-consumer queue. The intended use case is in the HPA, as we begin supporting hpdatas that move between hpa_shards. We take just a single CAS as the cost to send a message (or a batch of messages) in the low-contention case, and lock-freedom lets us avoid some lock-ordering issues.
135 lines
4.9 KiB
C
135 lines
4.9 KiB
C
#ifndef JEMALLOC_INTERNAL_MPSC_QUEUE_H
|
|
#define JEMALLOC_INTERNAL_MPSC_QUEUE_H
|
|
|
|
#include "jemalloc/internal/atomic.h"
|
|
|
|
/*
|
|
* A concurrent implementation of a multi-producer, single-consumer queue. It
|
|
* supports three concurrent operations:
|
|
* - Push
|
|
* - Push batch
|
|
* - Pop batch
|
|
*
|
|
* These operations are all lock-free.
|
|
*
|
|
* The implementation is the simple two-stack queue built on a Treiber stack.
|
|
* It's not terribly efficient, but this isn't expected to go into anywhere with
|
|
* hot code. In fact, we don't really even need queue semantics in any
|
|
* anticipated use cases; we could get away with just the stack. But this way
|
|
* lets us frame the API in terms of the existing list types, which is a nice
|
|
* convenience. We can save on cache misses by introducing our own (parallel)
|
|
* single-linked list type here, and dropping FIFO semantics, if we need this to
|
|
* get faster. Since we're currently providing queue semantics though, we use
|
|
* the prev field in the link rather than the next field for Treiber-stack
|
|
* linkage, so that we can preserve order for bash-pushed lists (recall that the
|
|
* two-stack tricks reverses orders in the lock-free first stack).
|
|
*/
|
|
|
|
#define mpsc_queue(a_type) \
|
|
struct { \
|
|
atomic_p_t tail; \
|
|
}
|
|
|
|
#define mpsc_queue_proto(a_attr, a_prefix, a_queue_type, a_type, \
|
|
a_list_type) \
|
|
/* Initialize a queue. */ \
|
|
a_attr void \
|
|
a_prefix##new(a_queue_type *queue); \
|
|
/* Insert all items in src into the queue, clearing src. */ \
|
|
a_attr void \
|
|
a_prefix##push_batch(a_queue_type *queue, a_list_type *src); \
|
|
/* Insert node into the queue. */ \
|
|
a_attr void \
|
|
a_prefix##push(a_queue_type *queue, a_type *node); \
|
|
/* \
|
|
* Pop all items in the queue into the list at dst. dst should already \
|
|
* be initialized (and may contain existing items, which then remain \
|
|
* in dst). \
|
|
*/ \
|
|
a_attr void \
|
|
a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst);
|
|
|
|
#define mpsc_queue_gen(a_attr, a_prefix, a_queue_type, a_type, \
|
|
a_list_type, a_link) \
|
|
a_attr void \
|
|
a_prefix##new(a_queue_type *queue) { \
|
|
atomic_store_p(&queue->tail, NULL, ATOMIC_RELAXED); \
|
|
} \
|
|
a_attr void \
|
|
a_prefix##push_batch(a_queue_type *queue, a_list_type *src) { \
|
|
/* \
|
|
* Reuse the ql list next field as the Treiber stack next \
|
|
* field. \
|
|
*/ \
|
|
a_type *first = ql_first(src); \
|
|
a_type *last = ql_last(src, a_link); \
|
|
void* cur_tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED); \
|
|
do { \
|
|
/* \
|
|
* Note that this breaks the queue ring structure; \
|
|
* it's not a ring any more! \
|
|
*/ \
|
|
first->a_link.qre_prev = cur_tail; \
|
|
/* \
|
|
* Note: the upcoming CAS doesn't need an atomic; every \
|
|
* push only needs to synchronize with the next pop, \
|
|
* which we get from the release sequence rules. \
|
|
*/ \
|
|
} while (!atomic_compare_exchange_weak_p(&queue->tail, \
|
|
&cur_tail, last, ATOMIC_RELEASE, ATOMIC_RELAXED)); \
|
|
ql_new(src); \
|
|
} \
|
|
a_attr void \
|
|
a_prefix##push(a_queue_type *queue, a_type *node) { \
|
|
ql_elm_new(node, a_link); \
|
|
a_list_type list; \
|
|
ql_new(&list); \
|
|
ql_head_insert(&list, node, a_link); \
|
|
a_prefix##push_batch(queue, &list); \
|
|
} \
|
|
a_attr void \
|
|
a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst) { \
|
|
a_type *tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED); \
|
|
if (tail == NULL) { \
|
|
/* \
|
|
* In the common special case where there are no \
|
|
* pending elements, bail early without a costly RMW. \
|
|
*/ \
|
|
return; \
|
|
} \
|
|
tail = atomic_exchange_p(&queue->tail, NULL, ATOMIC_ACQUIRE); \
|
|
/* \
|
|
* It's a single-consumer queue, so if cur started non-NULL, \
|
|
* it'd better stay non-NULL. \
|
|
*/ \
|
|
assert(tail != NULL); \
|
|
/* \
|
|
* We iterate through the stack and both fix up the link \
|
|
* structure (stack insertion broke the list requirement that \
|
|
* the list be circularly linked). It's just as efficient at \
|
|
* this point to make the queue a "real" queue, so do that as \
|
|
* well. \
|
|
* If this ever gets to be a hot spot, we can omit this fixup \
|
|
* and make the queue a bag (i.e. not necessarily ordered), but \
|
|
* that would mean jettisoning the existing list API as the \
|
|
* batch pushing/popping interface. \
|
|
*/ \
|
|
a_list_type reversed; \
|
|
ql_new(&reversed); \
|
|
while (tail != NULL) { \
|
|
/* \
|
|
* Pop an item off the stack, prepend it onto the list \
|
|
* (reversing the order). Recall that we use the \
|
|
* list prev field as the Treiber stack next field to \
|
|
* preserve order of batch-pushed items when reversed. \
|
|
*/ \
|
|
a_type *next = tail->a_link.qre_prev; \
|
|
ql_elm_new(tail, a_link); \
|
|
ql_head_insert(&reversed, tail, a_link); \
|
|
tail = next; \
|
|
} \
|
|
ql_concat(dst, &reversed, a_link); \
|
|
}
|
|
|
|
#endif /* JEMALLOC_INTERNAL_MPSC_QUEUE_H */
|