POSIX Message Queues Asynchronous Notification via Threads

 

POSIX Message Queues
Part 6 โ€” Asynchronous Notification via Threads (mq_notify + SIGEV_THREAD)
๐Ÿ“ฌ Topic
mq_notify
๐Ÿงต Method
SIGEV_THREAD
๐Ÿ“˜ Chapter
52 โ€” TLPI

What is Thread-Based Message Notification?

POSIX message queues let a process register for asynchronous notification when a message arrives on an empty queue. Instead of blocking or polling, the process can do other work and be notified automatically.

There are two notification methods:

  • SIGEV_SIGNAL โ€” delivers a real-time signal to the process.
  • SIGEV_THREAD โ€” spawns a new thread that runs a specified function.

This tutorial covers SIGEV_THREAD โ€” the thread-based approach, which is often cleaner because you avoid signal-handler restrictions and can use all standard library functions freely inside the notification thread.

Key Terms
mq_notify() sigevent SIGEV_THREAD sigev_notify_function sigev_value sival_ptr O_NONBLOCK mq_receive() pthread_attr_t re-registration

1. How SIGEV_THREAD Works

When you call mq_notify() with SIGEV_THREAD, you tell the kernel: “When a message arrives on this queue and it was empty before, please run this function in a new thread.”

The key fields of struct sigevent you must fill:

struct sigevent โ€” Fields Used for SIGEV_THREAD
Field What to Set Purpose
sigev_notify SIGEV_THREAD Tells kernel to create a thread
sigev_notify_function Pointer to your function Thread’s start function
sigev_notify_attributes NULL or pthread_attr_t* Thread attributes (stack size, etc.)
sigev_value.sival_ptr Any pointer (e.g., &mqd) Argument passed to thread function

Important rule: Notification is a one-shot event. Once the thread fires, you must call mq_notify() again inside the thread to re-register for the next notification. If you forget, you miss all future messages.

2. Full Working Code Example

The program below opens a message queue in O_NONBLOCK mode, registers for thread notification, and drains the queue inside the thread each time a message arrives.

/* mq_notify_thread.c
 * Demonstrates SIGEV_THREAD notification for POSIX message queues.
 * Usage: ./mq_notify_thread <mq-name>
 * The queue must already exist (use mq_open with O_CREAT beforehand).
 */

#include <mqueue.h>
#include <signal.h>          /* sigevent, SIGEV_THREAD */
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>

#define BUF_SIZE 4096

/* Forward declaration */
static void notifySetup(mqd_t *mqdp);

/*
 * threadFunc โ€” called by the kernel in a new thread whenever a
 * message arrives on a previously empty queue.
 *
 * val.sival_ptr holds the address of the mqd_t descriptor.
 */
static void
threadFunc(union sigval val)
{
    mqd_t *mqdp;
    ssize_t numRead;
    char buf[BUF_SIZE];
    struct mq_attr attr;

    mqdp = (mqd_t *) val.sival_ptr;   /* Recover the queue descriptor */

    /* Re-register BEFORE draining the queue.
     * If we drain first and a message arrives between drain and
     * re-register, we miss the notification entirely.
     */
    notifySetup(mqdp);

    /* Get the message size limit so we know our buffer is big enough */
    if (mq_getattr(*mqdp, &attr) == -1) {
        perror("mq_getattr");
        pthread_exit(NULL);
    }

    /* Drain all messages currently in the queue */
    while (1) {
        numRead = mq_receive(*mqdp, buf, attr.mq_msgsize, NULL);
        if (numRead == -1) {
            if (errno == EAGAIN)   /* Queue is now empty โ€” done */
                break;
            perror("mq_receive");
            pthread_exit(NULL);
        }
        printf("[Thread] Received %zd bytes: %.*s\n",
               numRead, (int)numRead, buf);
    }

    printf("[Thread] Queue drained. Waiting for next message...\n");
    pthread_exit(NULL);
}

/*
 * notifySetup โ€” (re)registers the thread notification on the queue.
 */
static void
notifySetup(mqd_t *mqdp)
{
    struct sigevent sev;

    sev.sigev_notify                 = SIGEV_THREAD;
    sev.sigev_notify_function        = threadFunc;
    sev.sigev_notify_attributes      = NULL;   /* Use default thread attrs */
    sev.sigev_value.sival_ptr        = mqdp;   /* Pass queue descriptor */

    if (mq_notify(*mqdp, &sev) == -1) {
        perror("mq_notify");
        exit(EXIT_FAILURE);
    }
}

int
main(int argc, char *argv[])
{
    mqd_t mqd;

    if (argc != 2) {
        fprintf(stderr, "Usage: %s <mq-name>\n", argv[0]);
        exit(EXIT_FAILURE);
    }

    /* Open existing queue in non-blocking read-only mode */
    mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);
    if (mqd == (mqd_t) -1) {
        perror("mq_open");
        exit(EXIT_FAILURE);
    }

    /* Register for notification */
    notifySetup(&mqd);

    printf("[Main] Waiting for messages on '%s' ...\n", argv[1]);
    pause();   /* Main thread sleeps; notifications wake threadFunc */

    return 0;
}

Compile and run:

# Compile (link with mqueue and pthread libraries)
gcc -o mq_notify_thread mq_notify_thread.c -lmqueue -lpthread

# First create the queue in another terminal
./pmsg_create -c /myqueue          # or use mq_open with O_CREAT

# Run the notifier
./mq_notify_thread /myqueue &

# Send a message (in another terminal)
./pmsg_send /myqueue "hello world"
# Output: [Thread] Received 11 bytes: hello world

3. Why Pass &mqd via sival_ptr (Not the Descriptor Value)?

You might wonder: why not just cast mqd to a pointer and pass it as sigev_value.sival_ptr?

The reason is that SUSv3 (the POSIX standard) gives no guarantee about the size or nature of mqd_t. It only says it is not an array type. On some systems mqd_t may be a struct, a pointer, or something larger than an integer. Casting it to a pointer would be undefined behaviour.

The safe approach is to store the descriptor in a variable and pass its address:

mqd_t mqd;
/* ... open queue ... */

sev.sigev_value.sival_ptr = &mqd;   /* CORRECT: pass address */
/* NOT: sev.sigev_value.sival_ptr = (void *)(uintptr_t)mqd; */

Inside the thread function, recover the descriptor with:

mqd_t *mqdp = (mqd_t *) val.sival_ptr;
/* Use *mqdp as the descriptor */
mq_receive(*mqdp, buf, size, NULL);

4. The Re-registration Pattern โ€” Critical Detail

After the notification fires, the registration is automatically cancelled. You must call mq_notify() again to receive future notifications.

Correct Notification Loop
1. Main: mq_notify() register
โ†’
2. Kernel: message arrives โ†’ spawn thread
โ†’
3. threadFunc: call mq_notify() AGAIN
โ†’
4. threadFunc: drain queue (mq_receive loop)
โš ๏ธ Always re-register before draining the queue, not after. A message arriving in the gap between drain and re-register would be silently missed.
static void
threadFunc(union sigval val)
{
    mqd_t *mqdp = (mqd_t *) val.sival_ptr;

    /* Step 1: Re-register FIRST */
    notifySetup(mqdp);

    /* Step 2: Then drain the queue */
    while (mq_receive(*mqdp, buf, BUF_SIZE, NULL) != -1)
        printf("Got: %s\n", buf);

    if (errno != EAGAIN)
        perror("mq_receive");
}

5. Complete Self-Contained Demo (Creates + Notifies)

This single program creates the queue, starts a sender thread, and demonstrates the full notify loop without needing separate processes.

/* mq_notify_self_demo.c โ€” creates queue, sends, and receives via SIGEV_THREAD */

#include <mqueue.h>
#include <signal.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>

#define MQ_NAME  "/demo_mq"
#define MAX_MSG  10
#define MSG_SIZE 256

static mqd_t g_mqd;
static volatile int g_done = 0;

static void notifySetup(void);

static void
threadFunc(union sigval val)
{
    char buf[MSG_SIZE];
    ssize_t n;

    /* Re-register before drain */
    notifySetup();

    while (1) {
        n = mq_receive(g_mqd, buf, MSG_SIZE, NULL);
        if (n == -1) {
            if (errno == EAGAIN) break;
            perror("mq_receive"); break;
        }
        buf[n] = '\0';
        printf("[Notify Thread] message: '%s'\n", buf);
        if (strcmp(buf, "QUIT") == 0) g_done = 1;
    }
}

static void
notifySetup(void)
{
    struct sigevent sev;
    sev.sigev_notify           = SIGEV_THREAD;
    sev.sigev_notify_function  = threadFunc;
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_ptr  = NULL;   /* Not needed; using global */

    if (mq_notify(g_mqd, &sev) == -1) {
        perror("mq_notify"); exit(EXIT_FAILURE);
    }
}

int main(void)
{
    struct mq_attr attr = { .mq_flags=0, .mq_maxmsg=MAX_MSG,
                            .mq_msgsize=MSG_SIZE, .mq_curmsgs=0 };

    /* Create queue (delete first if leftover) */
    mq_unlink(MQ_NAME);
    g_mqd = mq_open(MQ_NAME, O_CREAT | O_RDWR | O_NONBLOCK, 0600, &attr);
    if (g_mqd == (mqd_t)-1) { perror("mq_open"); exit(EXIT_FAILURE); }

    notifySetup();

    /* Send some messages */
    const char *messages[] = { "Hello", "World", "From", "POSIX MQ", "QUIT" };
    for (int i = 0; i < 5; i++) {
        sleep(1);
        if (mq_send(g_mqd, messages[i], strlen(messages[i]), 1) == -1)
            perror("mq_send");
        else
            printf("[Main] Sent: '%s'\n", messages[i]);
    }

    /* Wait for notify thread to process QUIT */
    while (!g_done) usleep(100000);
    printf("[Main] Done.\n");

    mq_close(g_mqd);
    mq_unlink(MQ_NAME);
    return 0;
}
gcc -o mq_notify_self_demo mq_notify_self_demo.c -lmqueue -lpthread
./mq_notify_self_demo

Interview Questions & Answers
Q1. What is the difference between SIGEV_SIGNAL and SIGEV_THREAD in mq_notify()?
SIGEV_SIGNAL delivers a real-time signal to the process when a message arrives. Signal handlers have strict restrictions (only async-signal-safe functions allowed). SIGEV_THREAD spawns a brand-new thread and runs your function in it, with no restrictions on library calls. SIGEV_THREAD is generally easier to program but has the overhead of thread creation.
Q2. Why must you call mq_notify() again inside the notification thread?
Notification registration is one-shot. Once the kernel fires the notification, it automatically cancels the registration. If you don’t re-register inside the thread, no future messages will trigger a notification, even though the queue continues to receive messages.
Q3. Why should you re-register BEFORE draining the queue, not after?
If you drain first and a new message arrives between the end of draining and the re-registration call, you will miss the notification for that message. By re-registering first, any message arriving during the drain will trigger a new notification, so nothing is lost.
Q4. Why pass the address of mqd_t via sival_ptr instead of the value?
The POSIX standard does not guarantee the size or nature of mqd_t. It might be a struct larger than a pointer. Casting it directly to void* would be undefined behaviour. Passing its address is always safe and portable.
Q5. Why must the queue be opened with O_NONBLOCK when using mq_notify()?
Inside the notification thread, you drain the queue using a loop that calls mq_receive() until it returns EAGAIN (queue empty). Without O_NONBLOCK, the last mq_receive() call would block forever waiting for a message that may never come, hanging the thread.
Q6. Can you have multiple processes registered for notification on the same queue simultaneously?
No. Only one process at a time can be registered for notification on a given message queue. If a second process calls mq_notify() on the same queue, it will get an EBUSY error. The first process can deregister by calling mq_notify(mqd, NULL).
Q7. What happens to notification registration if the registered process calls mq_receive() and the queue was not empty?
Notification is only triggered when a message arrives on a previously empty queue. If the queue already had messages when the notified process starts draining, no new notification fires for those existing messages. This is why you should drain completely inside the thread.

Leave a Reply

Your email address will not be published. Required fields are marked *