mq_notify
SIGEV_THREAD
52 โ TLPI
What is Thread-Based Message Notification?
POSIX message queues let a process register for asynchronous notification when a message arrives on an empty queue. Instead of blocking or polling, the process can do other work and be notified automatically.
There are two notification methods:
- SIGEV_SIGNAL โ delivers a real-time signal to the process.
- SIGEV_THREAD โ spawns a new thread that runs a specified function.
This tutorial covers SIGEV_THREAD โ the thread-based approach, which is often cleaner because you avoid signal-handler restrictions and can use all standard library functions freely inside the notification thread.
When you call mq_notify() with SIGEV_THREAD, you tell the kernel: “When a message arrives on this queue and it was empty before, please run this function in a new thread.”
The key fields of struct sigevent you must fill:
| Field | What to Set | Purpose |
|---|---|---|
sigev_notify |
SIGEV_THREAD |
Tells kernel to create a thread |
sigev_notify_function |
Pointer to your function | Thread’s start function |
sigev_notify_attributes |
NULL or pthread_attr_t* |
Thread attributes (stack size, etc.) |
sigev_value.sival_ptr |
Any pointer (e.g., &mqd) | Argument passed to thread function |
Important rule: Notification is a one-shot event. Once the thread fires, you must call mq_notify() again inside the thread to re-register for the next notification. If you forget, you miss all future messages.
The program below opens a message queue in O_NONBLOCK mode, registers for thread notification, and drains the queue inside the thread each time a message arrives.
/* mq_notify_thread.c
* Demonstrates SIGEV_THREAD notification for POSIX message queues.
* Usage: ./mq_notify_thread <mq-name>
* The queue must already exist (use mq_open with O_CREAT beforehand).
*/
#include <mqueue.h>
#include <signal.h> /* sigevent, SIGEV_THREAD */
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#define BUF_SIZE 4096
/* Forward declaration */
static void notifySetup(mqd_t *mqdp);
/*
* threadFunc โ called by the kernel in a new thread whenever a
* message arrives on a previously empty queue.
*
* val.sival_ptr holds the address of the mqd_t descriptor.
*/
static void
threadFunc(union sigval val)
{
mqd_t *mqdp;
ssize_t numRead;
char buf[BUF_SIZE];
struct mq_attr attr;
mqdp = (mqd_t *) val.sival_ptr; /* Recover the queue descriptor */
/* Re-register BEFORE draining the queue.
* If we drain first and a message arrives between drain and
* re-register, we miss the notification entirely.
*/
notifySetup(mqdp);
/* Get the message size limit so we know our buffer is big enough */
if (mq_getattr(*mqdp, &attr) == -1) {
perror("mq_getattr");
pthread_exit(NULL);
}
/* Drain all messages currently in the queue */
while (1) {
numRead = mq_receive(*mqdp, buf, attr.mq_msgsize, NULL);
if (numRead == -1) {
if (errno == EAGAIN) /* Queue is now empty โ done */
break;
perror("mq_receive");
pthread_exit(NULL);
}
printf("[Thread] Received %zd bytes: %.*s\n",
numRead, (int)numRead, buf);
}
printf("[Thread] Queue drained. Waiting for next message...\n");
pthread_exit(NULL);
}
/*
* notifySetup โ (re)registers the thread notification on the queue.
*/
static void
notifySetup(mqd_t *mqdp)
{
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = threadFunc;
sev.sigev_notify_attributes = NULL; /* Use default thread attrs */
sev.sigev_value.sival_ptr = mqdp; /* Pass queue descriptor */
if (mq_notify(*mqdp, &sev) == -1) {
perror("mq_notify");
exit(EXIT_FAILURE);
}
}
int
main(int argc, char *argv[])
{
mqd_t mqd;
if (argc != 2) {
fprintf(stderr, "Usage: %s <mq-name>\n", argv[0]);
exit(EXIT_FAILURE);
}
/* Open existing queue in non-blocking read-only mode */
mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);
if (mqd == (mqd_t) -1) {
perror("mq_open");
exit(EXIT_FAILURE);
}
/* Register for notification */
notifySetup(&mqd);
printf("[Main] Waiting for messages on '%s' ...\n", argv[1]);
pause(); /* Main thread sleeps; notifications wake threadFunc */
return 0;
}
Compile and run:
# Compile (link with mqueue and pthread libraries)
gcc -o mq_notify_thread mq_notify_thread.c -lmqueue -lpthread
# First create the queue in another terminal
./pmsg_create -c /myqueue # or use mq_open with O_CREAT
# Run the notifier
./mq_notify_thread /myqueue &
# Send a message (in another terminal)
./pmsg_send /myqueue "hello world"
# Output: [Thread] Received 11 bytes: hello world
You might wonder: why not just cast mqd to a pointer and pass it as sigev_value.sival_ptr?
The reason is that SUSv3 (the POSIX standard) gives no guarantee about the size or nature of mqd_t. It only says it is not an array type. On some systems mqd_t may be a struct, a pointer, or something larger than an integer. Casting it to a pointer would be undefined behaviour.
The safe approach is to store the descriptor in a variable and pass its address:
mqd_t mqd;
/* ... open queue ... */
sev.sigev_value.sival_ptr = &mqd; /* CORRECT: pass address */
/* NOT: sev.sigev_value.sival_ptr = (void *)(uintptr_t)mqd; */
Inside the thread function, recover the descriptor with:
mqd_t *mqdp = (mqd_t *) val.sival_ptr;
/* Use *mqdp as the descriptor */
mq_receive(*mqdp, buf, size, NULL);
After the notification fires, the registration is automatically cancelled. You must call mq_notify() again to receive future notifications.
static void
threadFunc(union sigval val)
{
mqd_t *mqdp = (mqd_t *) val.sival_ptr;
/* Step 1: Re-register FIRST */
notifySetup(mqdp);
/* Step 2: Then drain the queue */
while (mq_receive(*mqdp, buf, BUF_SIZE, NULL) != -1)
printf("Got: %s\n", buf);
if (errno != EAGAIN)
perror("mq_receive");
}
This single program creates the queue, starts a sender thread, and demonstrates the full notify loop without needing separate processes.
/* mq_notify_self_demo.c โ creates queue, sends, and receives via SIGEV_THREAD */
#include <mqueue.h>
#include <signal.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#define MQ_NAME "/demo_mq"
#define MAX_MSG 10
#define MSG_SIZE 256
static mqd_t g_mqd;
static volatile int g_done = 0;
static void notifySetup(void);
static void
threadFunc(union sigval val)
{
char buf[MSG_SIZE];
ssize_t n;
/* Re-register before drain */
notifySetup();
while (1) {
n = mq_receive(g_mqd, buf, MSG_SIZE, NULL);
if (n == -1) {
if (errno == EAGAIN) break;
perror("mq_receive"); break;
}
buf[n] = '\0';
printf("[Notify Thread] message: '%s'\n", buf);
if (strcmp(buf, "QUIT") == 0) g_done = 1;
}
}
static void
notifySetup(void)
{
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = threadFunc;
sev.sigev_notify_attributes = NULL;
sev.sigev_value.sival_ptr = NULL; /* Not needed; using global */
if (mq_notify(g_mqd, &sev) == -1) {
perror("mq_notify"); exit(EXIT_FAILURE);
}
}
int main(void)
{
struct mq_attr attr = { .mq_flags=0, .mq_maxmsg=MAX_MSG,
.mq_msgsize=MSG_SIZE, .mq_curmsgs=0 };
/* Create queue (delete first if leftover) */
mq_unlink(MQ_NAME);
g_mqd = mq_open(MQ_NAME, O_CREAT | O_RDWR | O_NONBLOCK, 0600, &attr);
if (g_mqd == (mqd_t)-1) { perror("mq_open"); exit(EXIT_FAILURE); }
notifySetup();
/* Send some messages */
const char *messages[] = { "Hello", "World", "From", "POSIX MQ", "QUIT" };
for (int i = 0; i < 5; i++) {
sleep(1);
if (mq_send(g_mqd, messages[i], strlen(messages[i]), 1) == -1)
perror("mq_send");
else
printf("[Main] Sent: '%s'\n", messages[i]);
}
/* Wait for notify thread to process QUIT */
while (!g_done) usleep(100000);
printf("[Main] Done.\n");
mq_close(g_mqd);
mq_unlink(MQ_NAME);
return 0;
}
gcc -o mq_notify_self_demo mq_notify_self_demo.c -lmqueue -lpthread
./mq_notify_self_demo
mqd_t. It might be a struct larger than a pointer. Casting it directly to void* would be undefined behaviour. Passing its address is always safe and portable.mq_receive() until it returns EAGAIN (queue empty). Without O_NONBLOCK, the last mq_receive() call would block forever waiting for a message that may never come, hanging the thread.mq_notify() on the same queue, it will get an EBUSY error. The first process can deregister by calling mq_notify(mqd, NULL).