Thread-Based MQ Notification sigev_notify_function

 

Thread-Based MQ Notification
SIGEV_THREAD | sigev_notify_function | union sigval | Chapter 52.6.2 | File 3 of 3

Series Navigation: Index File 1: API & sigevent File 2: Signal Notification File 3: Thread Notification

Overview

With SIGEV_THREAD, when a message arrives on a previously empty queue, the kernel automatically creates a new thread and calls the function you specified in sigev_notify_function. This thread handles the notification without any signal masking or sigsuspend() gymnastics.

Thread-based notification is cleaner in multithreaded programs because it integrates naturally with the threading model. No signal masks to manage, no risk of signal delivery interfering with the main thread.

How SIGEV_THREAD Works
What You Set What Kernel Does
sev.sigev_notify = SIGEV_THREAD Uses thread delivery mode
sev.sigev_notify_function = myFunc Calls myFunc(sigev_value) in the new thread
sev.sigev_value.sival_ptr = &mqd Passes this value as the single argument to the function
sev.sigev_notify_attributes = NULL Use default thread attributes (or a pthread_attr_t* for custom stack size, etc.)

Thread Function Signature
/* The function signature MUST match this — takes union sigval, returns void */
void my_notify_function(union sigval sv)
{
    /* sv.sival_ptr or sv.sival_int — whichever you set in sigev_value */
    mqd_t *mqdp = (mqd_t *)sv.sival_ptr;

    /* Do work here: re-register, drain queue, etc. */

    pthread_exit(NULL);  /* Thread exits when done */
}
Key difference from a normal thread start function: A pthread start function has signature void *func(void *arg). The notification function has signature void func(union sigval sv). The argument is already of the right type — no casting of a void* needed if you use sival_ptr.

Program Flow Diagram — Thread-Based Notification
Main Thread mq_open(... O_NONBLOCK)notifySetup()pause() or other work
notifySetup() Fills sigevent struct with SIGEV_THREAD, function pointer, sigev_value, calls mq_notify()
Kernel Message arrives on empty queue → deregisters process → spawns new thread → calls threadFunc(sigev_value)
Notification Thread notifySetup() again to re-register → drain queue with O_NONBLOCK mq_receive() loop → pthread_exit()

The notifySetup() Pattern

A common and clean design pattern is to extract the registration logic into a helper function notifySetup(). This function is called once from main() and again from inside the notification thread (for re-registration). This avoids repeating code.

static void notifySetup(mqd_t *mqdp);  /* Forward declaration */

static void notifySetup(mqd_t *mqdp)
{
    struct sigevent sev;

    sev.sigev_notify            = SIGEV_THREAD;
    sev.sigev_notify_function   = threadFunc;     /* Our handler */
    sev.sigev_notify_attributes = NULL;           /* Default thread attrs */
    sev.sigev_value.sival_ptr   = mqdp;           /* Pass mqd pointer */

    if (mq_notify(*mqdp, &sev) == -1) {
        perror("mq_notify");
        exit(EXIT_FAILURE);
    }
}

Complete Code Example — Thread-Based Notification
/* mq_notify_thread.c — POSIX MQ notification via thread (SIGEV_THREAD)
   Compile: gcc mq_notify_thread.c -lrt -lpthread -o mq_notify_thread
   Usage  : ./mq_notify_thread /myqueue
*/

#include <pthread.h>
#include <mqueue.h>
#include <fcntl.h>       /* O_NONBLOCK */
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>

/* Forward declaration — threadFunc calls notifySetup, so we declare first */
static void notifySetup(mqd_t *mqdp);

/* -----------------------------------------------------------------------
   threadFunc: called by kernel in a NEW THREAD when notification fires.
   Argument sv.sival_ptr points to the mqd_t of the queue.
   ----------------------------------------------------------------------- */
static void
threadFunc(union sigval sv)
{
    ssize_t       numRead;
    mqd_t        *mqdp;
    void         *buffer;
    struct mq_attr attr;

    mqdp = (mqd_t *)sv.sival_ptr;   /* Recover the mqd_t pointer */

    /* Get queue's max message size to allocate a receive buffer */
    if (mq_getattr(*mqdp, &attr) == -1) {
        perror("mq_getattr");
        pthread_exit(NULL);
    }

    buffer = malloc(attr.mq_msgsize);
    if (buffer == NULL) {
        perror("malloc");
        pthread_exit(NULL);
    }

    /* IMPORTANT: Re-register BEFORE draining the queue.
       Same reasoning as with signal notification — prevents missing
       a message that arrives while we're draining. */
    notifySetup(mqdp);

    /* Drain all messages from the queue using nonblocking reads */
    while ((numRead = mq_receive(*mqdp, buffer,
                                 attr.mq_msgsize, NULL)) >= 0) {
        printf("[Thread %lu] Received message: %ld bytes\n",
               (unsigned long)pthread_self(), (long)numRead);
        /* Process buffer contents here */
    }

    /* EAGAIN = queue is now empty (expected) */
    if (errno != EAGAIN) {
        perror("mq_receive unexpected");
        free(buffer);
        pthread_exit(NULL);
    }

    free(buffer);
    pthread_exit(NULL);   /* Thread terminates — kernel cleans up */
}

/* -----------------------------------------------------------------------
   notifySetup: register (or re-register) for thread notification.
   Called from main() and from threadFunc().
   ----------------------------------------------------------------------- */
static void
notifySetup(mqd_t *mqdp)
{
    struct sigevent sev;

    sev.sigev_notify            = SIGEV_THREAD;
    sev.sigev_notify_function   = threadFunc;
    sev.sigev_notify_attributes = NULL;          /* Default thread attrs */
    sev.sigev_value.sival_ptr   = mqdp;          /* Passed to threadFunc */

    if (mq_notify(*mqdp, &sev) == -1) {
        perror("mq_notify");
        exit(EXIT_FAILURE);
    }
}

/* -----------------------------------------------------------------------
   main: open queue, register for notification, then sleep.
   The main thread doesn't need to poll or wait on signals.
   ----------------------------------------------------------------------- */
int
main(int argc, char *argv[])
{
    mqd_t mqd;

    if (argc != 2) {
        fprintf(stderr, "Usage: %s mq-name\n", argv[0]);
        return 1;
    }

    /* Open queue in nonblocking mode (used by threadFunc for draining) */
    mqd = mq_open(argv[1], O_RDONLY | O_NONBLOCK);
    if (mqd == (mqd_t)-1) {
        perror("mq_open");
        return 1;
    }

    /* Register for thread notification */
    notifySetup(&mqd);

    printf("Registered for thread notification on %s\n", argv[1]);
    printf("Main thread sleeping — notifications handled by spawned threads\n");

    /* Main thread can do other work or just wait.
       pause() is fine here because we're not waiting for a signal —
       the notification thread handles everything independently. */
    for (;;)
        pause();

    mq_close(mqd);
    return 0;
}

Code Example 2 — Custom Thread Attributes

You can control the notification thread’s stack size, scheduling policy, etc. using a pthread_attr_t:

#include <pthread.h>
#include <mqueue.h>
#include <signal.h>
#include <stdio.h>

void my_notify_func(union sigval sv) {
    printf("Notification thread started\n");
    pthread_exit(NULL);
}

void register_with_custom_attrs(mqd_t mqd) {
    struct sigevent  sev;
    pthread_attr_t   tattr;

    /* Initialize thread attributes */
    pthread_attr_init(&tattr);

    /* Set a custom stack size of 64KB for the notification thread */
    pthread_attr_setstacksize(&tattr, 64 * 1024);

    /* Set detach state so thread resources are freed automatically */
    pthread_attr_setdetachstate(&tattr, PTHREAD_CREATE_DETACHED);

    /* Point sigev_notify_attributes to our custom attrs */
    sev.sigev_notify            = SIGEV_THREAD;
    sev.sigev_notify_function   = my_notify_func;
    sev.sigev_notify_attributes = &tattr;  /* Custom attrs pointer */
    sev.sigev_value.sival_int   = 0;

    if (mq_notify(mqd, &sev) == -1)
        perror("mq_notify");

    /* Note: tattr can be destroyed after mq_notify() returns.
       The kernel copies what it needs. */
    pthread_attr_destroy(&tattr);
}

Code Example 3 — Passing a Context Struct via sival_ptr

In real applications, you often need more than just the queue descriptor. Use a struct:

#include <pthread.h>
#include <mqueue.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>

/* Context struct — holds everything the thread needs */
typedef struct {
    mqd_t   mqd;
    int     queue_id;
    char    name[64];
} MQContext;

void mq_thread_handler(union sigval sv)
{
    MQContext *ctx = (MQContext *)sv.sival_ptr;
    char buf[256];
    ssize_t n;

    printf("[Queue %d: %s] Notification received\n",
           ctx->queue_id, ctx->name);

    /* Re-register — pass context again for future notifications */
    struct sigevent sev;
    sev.sigev_notify            = SIGEV_THREAD;
    sev.sigev_notify_function   = mq_thread_handler;
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_ptr   = ctx;   /* Same context */
    mq_notify(ctx->mqd, &sev);

    /* Drain messages */
    while ((n = mq_receive(ctx->mqd, buf, sizeof(buf), NULL)) >= 0)
        printf("  Message (%ld bytes): %.*s\n", (long)n, (int)n, buf);

    pthread_exit(NULL);
}

int main(void)
{
    MQContext ctx;
    struct mq_attr attr = { .mq_maxmsg = 10, .mq_msgsize = 256 };
    struct sigevent sev;

    /* Open or create queue */
    ctx.mqd = mq_open("/demo", O_CREAT | O_RDONLY | O_NONBLOCK,
                      0644, &attr);
    if (ctx.mqd == (mqd_t)-1) { perror("mq_open"); return 1; }

    ctx.queue_id = 1;
    __builtin_strncpy(ctx.name, "demo-queue", sizeof(ctx.name));

    /* Register with context struct as the sigval */
    sev.sigev_notify            = SIGEV_THREAD;
    sev.sigev_notify_function   = mq_thread_handler;
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_ptr   = &ctx;

    if (mq_notify(ctx.mqd, &sev) == -1) { perror("mq_notify"); return 1; }

    printf("Registered. Waiting for messages...\n");
    for (;;) pause();

    mq_close(ctx.mqd);
    mq_unlink("/demo");
    return 0;
}

Signal vs Thread Notification — Comparison
Aspect SIGEV_SIGNAL SIGEV_THREAD
Delivery mechanism OS sends a signal to the process OS spawns a new thread and calls a function
Waiting mechanism sigsuspend() required in main loop pause() is safe — thread runs independently
Signal masking Must carefully manage signal masks No signal mask management needed
Handler restrictions Handler must be async-signal-safe; actual work done outside handler Thread function can call any function (malloc, printf, mq_receive)
Multithreaded suitability Complex — signals interact with all threads Cleaner — fits naturally into threaded designs
Data passing Via sigev_value in siginfo_t (realtime signals only) Via sigev_value directly as function argument
Thread overhead No new thread created per notification New thread spawned per notification (small overhead)

Common Mistakes with SIGEV_THREAD
Mistake 1 — Draining before re-registering:

Same issue as with signals — always call notifySetup() (or mq_notify()) before starting the mq_receive() drain loop inside the thread function.

Mistake 2 — Using a stack-allocated sigevent in notifySetup:

If you store pthread_attr_t in a local variable and pass its address in sigev_notify_attributes, ensure the attribute struct is alive when mq_notify() is called. It can be destroyed immediately after mq_notify() returns — the kernel copies what it needs.

Mistake 3 — Forgetting O_NONBLOCK on the queue:

Without O_NONBLOCK, the drain loop’s final mq_receive() call (when the queue is empty) will block the notification thread indefinitely. Always open with O_NONBLOCK and check for EAGAIN.

Interview Questions & Answers
Q1. What does SIGEV_THREAD do when a message arrives on the queue?
The kernel automatically creates a new POSIX thread and calls the function specified in sigev_notify_function with the sigev_value as its argument. This thread runs concurrently with the main thread and handles the message processing.
Q2. What is the signature of the notification thread function?
void function_name(union sigval sv). It takes a single union sigval argument (which carries either an integer sv.sival_int or pointer sv.sival_ptr) and returns void. It should call pthread_exit(NULL) when done.
Q3. Why is SIGEV_THREAD preferred over SIGEV_SIGNAL in multithreaded programs?
In a multithreaded program, signals are delivered to a specific thread and signal masks must be managed across all threads — this is complex and error-prone. With SIGEV_THREAD, the kernel spawns a new thread for each notification. No signal masking is needed, the thread can call any function (not limited to async-signal-safe functions), and the design is cleaner.
Q4. How do you pass a queue descriptor to the notification thread function?
Store the queue descriptor (mqd_t) in a variable that persists (not stack-local to a function that might return), take its address, and assign it to sev.sigev_value.sival_ptr. Inside the thread function, cast sv.sival_ptr to mqd_t * and dereference it.
Q5. Why does the notification thread call notifySetup() before draining the queue?
After the notification fires, the registration is removed (auto-deregistration). If we drain first and a new message arrives between the drain completing and the re-registration, the queue is already nonempty when we register — no further notification will fire. By re-registering first, we guarantee the next incoming message will trigger another notification.
Q6. What happens when the notification thread calls pthread_exit()?
The thread terminates and its resources are reclaimed by the system (if the thread was created in detached state or joined). The main process continues running and the notification registration (set by notifySetup() called earlier in the thread) is now active for the next message arrival.
Q7. Can you pass custom thread attributes (stack size, scheduling) to the notification thread?
Yes. Set sev.sigev_notify_attributes to a pointer to a pthread_attr_t structure that has been initialized and configured with the desired attributes. If NULL, default thread attributes are used. The attribute struct can be destroyed after mq_notify() returns.
Q8. Why is pause() safe to use in main() with SIGEV_THREAD but not with SIGEV_SIGNAL?
With SIGEV_SIGNAL, the main thread must catch a signal and act on it — so the exact timing of signal delivery relative to pause() matters (race condition). With SIGEV_THREAD, the main thread has nothing to “catch” — the kernel directly spawns a thread. The main thread just needs to stay alive, so pause() (or any other wait) is fine.
Q9. What link flags are needed to compile programs using mq_notify with SIGEV_THREAD?
Both -lrt (for the POSIX message queue library) and -lpthread (for the POSIX threads library) are required: gcc program.c -lrt -lpthread -o program
Q10. Compare SIGEV_NONE, SIGEV_SIGNAL, and SIGEV_THREAD in one sentence each.
SIGEV_NONE: Registers the process but performs no actual notification when a message arrives.
SIGEV_SIGNAL: Sends a specified signal to the process, requiring careful signal mask management and sigsuspend() in the loop.
SIGEV_THREAD: Spawns a new thread and calls the registered function with the provided sigval argument, ideal for multithreaded programs.

Leave a Reply

Your email address will not be published. Required fields are marked *