1. Intro 2. Open/Close 3. Attributes 4. Send/Receive 5. Notify 6. Linux Specifics → 7. Interview Q&A →
Consider a server process that manages many queues and also does other work. If it calls mq_receive(), it blocks — it can’t do anything else. If it polls every queue in a loop, it wastes CPU. The solution is asynchronous notification.
mq_notify() tells the kernel: “When a message arrives on this empty queue, please notify me.” The process can then continue doing other work and handle the message only when it is notified. This is similar to SIGIO for file I/O.
|
Without mq_notify (Blocking)
Process calls mq_receive() |
With mq_notify (Async)
Process calls mq_notify() |
#include <mqueue.h>
#include <signal.h>
int mq_notify(mqd_t mqd, const struct sigevent *notification);
/* Returns: 0 on success, -1 on error */
mqd: The queue to watch.
notification: A struct sigevent describing HOW to notify. Pass NULL to cancel registration.
The struct sigevent has a field sigev_notify which controls the notification method:
| sigev_notify value | Notification method |
|---|---|
SIGEV_SIGNAL |
Send a signal (specified by sigev_signo) to the process |
SIGEV_THREAD |
Create a new thread and call the function sigev_notify_function |
SIGEV_NONE |
No notification (used to check which process is registered) |
One-shot rule: Notification fires only once per registration. After the notification fires, registration is automatically cancelled. You must call mq_notify() again inside your handler if you want to receive future notifications. Also, notification only fires when a message arrives on a previously empty queue.
The simplest method. When a message arrives on the empty queue, the kernel sends a specified signal (e.g., SIGUSR1 or SIGRTMIN) to the process. A signal handler reads the message.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <errno.h>
#define QNAME "/notify_sig"
#define MSGSIZE 64
#define NOTIFY_SIG SIGUSR1
static mqd_t s_mqd;
/* Signal handler: called when message arrives on empty queue */
static void message_arrived(int sig)
{
char buf[MSGSIZE];
unsigned int prio;
/* Drain all messages (there might be more than one) */
ssize_t n;
while ((n = mq_receive(s_mqd, buf, MSGSIZE, &prio)) > 0) {
printf("[signal handler] Got: \"%s\" (prio=%u)\n", buf, prio);
}
/* EAGAIN means queue is now empty — that's expected in O_NONBLOCK mode */
/* IMPORTANT: Re-register for next notification */
struct sigevent sev;
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = NOTIFY_SIG;
if (mq_notify(s_mqd, &sev) == -1)
perror("mq_notify re-register");
else
printf("[signal handler] Re-registered for next notification\n");
}
int main(void)
{
/* Open queue in O_NONBLOCK so signal handler's mq_receive doesn't block */
struct mq_attr attr = { .mq_maxmsg=5, .mq_msgsize=MSGSIZE };
s_mqd = mq_open(QNAME, O_CREAT|O_RDWR|O_NONBLOCK, 0600, &attr);
if (s_mqd == (mqd_t)-1) { perror("mq_open"); return 1; }
/* Install signal handler */
struct sigaction sa;
sa.sa_handler = message_arrived;
sigemptyset(&sa.sa_mask);
sa.sa_flags = 0;
if (sigaction(NOTIFY_SIG, &sa, NULL) == -1) {
perror("sigaction"); mq_close(s_mqd); mq_unlink(QNAME); return 1;
}
/* Register for notification */
struct sigevent sev;
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = NOTIFY_SIG;
if (mq_notify(s_mqd, &sev) == -1) {
perror("mq_notify"); mq_close(s_mqd); mq_unlink(QNAME); return 1;
}
printf("[main] Registered for SIGUSR1 notification. Waiting...\n");
/* Simulate another process sending messages */
printf("[main] Sending messages...\n");
mq_send(s_mqd, "alert: cpu high", 16, 9);
mq_send(s_mqd, "info: disk check", 17, 3);
/* Main thread can do other work while waiting */
for (int i = 0; i < 5; i++) {
printf("[main] doing other work... iteration %d\n", i);
sleep(1);
}
mq_close(s_mqd);
mq_unlink(QNAME);
return 0;
}
Signal handlers have many restrictions (async-signal-safe functions only). A cleaner approach is to block the signal and use sigwaitinfo() in the main thread. This avoids reentrancy issues.
#include <stdio.h>
#include <string.h>
#include <signal.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#define QNAME "/notify_wait"
#define MSGSIZE 64
#define NOTIFY_SIG SIGRTMIN /* real-time signal — carries extra info */
int main(void)
{
struct mq_attr attr = { .mq_maxmsg=4, .mq_msgsize=MSGSIZE };
mqd_t mqd = mq_open(QNAME, O_CREAT|O_RDWR|O_NONBLOCK, 0600, &attr);
if (mqd == (mqd_t)-1) { perror("mq_open"); return 1; }
/* Block SIGRTMIN so it doesn't invoke a handler — we'll use sigwaitinfo */
sigset_t block_mask;
sigemptyset(&block_mask);
sigaddset(&block_mask, NOTIFY_SIG);
if (sigprocmask(SIG_BLOCK, &block_mask, NULL) == -1) {
perror("sigprocmask"); goto cleanup;
}
/* Register mq_notify with SIGEV_SIGNAL */
struct sigevent sev;
memset(&sev, 0, sizeof(sev));
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = NOTIFY_SIG;
if (mq_notify(mqd, &sev) == -1) {
perror("mq_notify"); goto cleanup;
}
printf("[main] Waiting for notification via sigwaitinfo()...\n");
/* Main loop: wait for signal, then read messages */
for (int loops = 0; loops < 3; loops++) {
/* In a real app you'd send from another process/thread.
Here we send from main to demonstrate. */
if (loops == 0) {
mq_send(mqd, "first batch msg1", 17, 5);
mq_send(mqd, "first batch msg2", 17, 3);
}
siginfo_t si;
int sig = sigwaitinfo(&block_mask, &si);
if (sig == -1) { perror("sigwaitinfo"); break; }
printf("[main] Signal %d received. si_code=%d si_value=%d\n",
sig, si.si_code, si.si_value.sival_int);
/* Drain all messages from the queue */
char buf[MSGSIZE];
unsigned int prio;
ssize_t n;
while ((n = mq_receive(mqd, buf, MSGSIZE, &prio)) > 0)
printf("[main] msg: \"%s\" prio=%u\n", buf, prio);
/* Re-register for the next notification */
if (mq_notify(mqd, &sev) == -1) {
perror("mq_notify re-register"); break;
}
if (loops == 1) {
mq_send(mqd, "second batch", 13, 7);
}
}
cleanup:
mq_close(mqd);
mq_unlink(QNAME);
return 0;
}
Instead of a signal, the kernel creates a new thread and calls a specified function when a message arrives. This is often cleaner because the callback function runs in a normal thread context with no async-signal restrictions.
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <mqueue.h>
#include <signal.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <pthread.h>
#define QNAME "/notify_thread"
#define MSGSIZE 64
static mqd_t s_mqd;
/* This function runs in a NEW THREAD spawned by the kernel */
static void thread_notification_func(union sigval sv)
{
/* sv.sival_ptr can carry user data — we store the mqd_t there */
mqd_t mqd = *((mqd_t *)sv.sival_ptr);
printf("[notify thread] thread_id=%lu — message arrived!\n",
(unsigned long)pthread_self());
char buf[MSGSIZE];
unsigned int prio;
ssize_t n;
/* Drain the queue */
while ((n = mq_receive(mqd, buf, MSGSIZE, &prio)) > 0)
printf("[notify thread] received: \"%s\" (prio=%u)\n", buf, prio);
/* Re-register for next notification.
NOTE: buffer must stay alive — use static or heap, not stack! */
struct sigevent sev;
memset(&sev, 0, sizeof(sev));
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = thread_notification_func;
sev.sigev_value.sival_ptr = sv.sival_ptr; /* pass mqd back */
if (mq_notify(mqd, &sev) == -1)
perror("mq_notify re-register");
}
int main(void)
{
struct mq_attr attr = { .mq_maxmsg=5, .mq_msgsize=MSGSIZE };
/* O_NONBLOCK important so receive in callback doesn't block */
s_mqd = mq_open(QNAME, O_CREAT|O_RDWR|O_NONBLOCK, 0600, &attr);
if (s_mqd == (mqd_t)-1) { perror("mq_open"); return 1; }
/* Register SIGEV_THREAD notification */
struct sigevent sev;
memset(&sev, 0, sizeof(sev));
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = thread_notification_func;
sev.sigev_value.sival_ptr = &s_mqd; /* pass mqd to callback */
if (mq_notify(s_mqd, &sev) == -1) {
perror("mq_notify"); mq_close(s_mqd); mq_unlink(QNAME); return 1;
}
printf("[main] thread=%lu — registered SIGEV_THREAD notification\n",
(unsigned long)pthread_self());
/* Send messages (simulate another process doing this) */
sleep(1);
printf("[main] sending messages...\n");
mq_send(s_mqd, "task-high", 10, 9);
mq_send(s_mqd, "task-normal", 12, 5);
mq_send(s_mqd, "task-low", 9, 1);
/* Give notification thread time to run */
sleep(2);
printf("[main] sending second batch...\n");
mq_send(s_mqd, "batch2-msg", 11, 3);
sleep(2);
mq_close(s_mqd);
mq_unlink(QNAME);
return 0;
}
/* Compile: gcc prog.c -o prog -lrt -lpthread */
Buffer lifetime warning: When using SIGEV_THREAD, the struct sigevent itself does not need to stay alive after mq_notify() returns. However, the data you pass via sigev_value.sival_ptr must remain valid when the callback runs. Use global/static/heap-allocated data — never a local variable that goes out of scope.
Pass NULL as the notification argument to cancel a pending registration. This only works if the calling process owns the current registration.
/* Cancel notification */
if (mq_notify(mqd, NULL) == -1)
perror("mq_notify cancel");
else
printf("Notification deregistered.\n");
Also: calling mq_close() on a descriptor automatically cancels any notification registered via that descriptor.
Ownership rule: Only one process at a time can be registered for notification on a given queue. If process A is registered and process B tries to call mq_notify() on the same queue, process B gets EBUSY. Process A’s registration is not affected.
/* Process B's attempt to register when process A is already registered */
if (mq_notify(mqd, &sev) == -1) {
if (errno == EBUSY)
printf("Another process already registered for this queue.\n");
else
perror("mq_notify");
}
Notification fires only when a message arrives on a queue that was previously empty. If the queue already has messages when mq_notify() is called, no notification fires until the queue is first emptied and then a new message arrives.
/*
* Scenario 1 — Queue empty, then message arrives:
* mq_notify(mqd, &sev) -- register
* ... queue is empty ...
* mq_send(mqd, ...) -- FIRES notification (was empty)
*
* Scenario 2 — Queue already has messages:
* mq_send(mqd, "msg1", ...) -- put a message in
* mq_notify(mqd, &sev) -- register
* mq_send(mqd, "msg2", ...) -- does NOT fire (queue wasn't empty)
* mq_receive(mqd, ...) -- drain msg1
* mq_receive(mqd, ...) -- drain msg2 (queue now empty)
* mq_send(mqd, "msg3", ...) -- FIRES notification (queue was empty)
*
* Scenario 3 — Notification fires once then must be re-registered:
* mq_notify() -- register
* mq_send() -- fires (deregisters automatically)
* mq_send() -- does NOT fire (not registered anymore)
* mq_notify() -- re-register
* mq_send() -- fires again
*/
Q1: What does mq_notify() do?
It registers a process to receive an asynchronous notification when a message is placed on a previously empty message queue. The notification can be a signal or a new thread invocation.
Q2: What are the two notification methods available via struct sigevent?
SIGEV_SIGNAL: delivers a signal to the process.
SIGEV_THREAD: creates a new thread and calls a specified function.
Q3: Why must mq_notify() be called again inside the notification handler?
Because notification is one-shot — it fires exactly once and then the registration is automatically cancelled. To receive future notifications you must re-register by calling mq_notify() again.
Q4: Under what condition does the notification fire?
Only when a message arrives on a queue that was previously empty. If the queue already had messages, no notification fires for subsequent sends — only the first send after the queue becomes empty triggers it.
Q5: What error does mq_notify() return if another process is already registered?
-1 with errno == EBUSY. Only one process at a time can hold the notification registration for a given queue.
Q6: Why should the queue be opened in O_NONBLOCK mode when using mq_notify()?
Because the signal handler or notification thread must drain the queue with mq_receive(). If the queue is in blocking mode and becomes empty during draining, the next mq_receive() would block indefinitely inside the handler/thread. O_NONBLOCK causes it to return EAGAIN instead, signalling that the queue is empty.
Q7: How do you cancel a notification registration?
Call mq_notify(mqd, NULL). Also, closing the descriptor via mq_close() automatically cancels the registration.
Q8: What is the advantage of sigwaitinfo() over a signal handler for mq_notify()?
A signal handler has restrictions (only async-signal-safe functions allowed). With sigwaitinfo(), the signal is blocked and waited on synchronously in the main thread, allowing any function to be called safely when the notification arrives.
Q9: In SIGEV_THREAD, what lifetime must the data passed via sival_ptr have?
It must remain valid until after the notification thread has finished using it. Local (stack) variables in the registering function are unsafe — use global, static, or heap-allocated data.
Q10: Can the same process register for both a signal handler AND mq_notify on the same queue?
It only makes sense to use one. mq_notify() registers the process for notification and is independent of whether a signal handler is installed. Both can coexist but each mq_notify() call replaces the previous registration for that queue-process pair.
Explore /dev/mqueue filesystem, /proc tuning, IPC namespaces, and implementation internals.
