Sending and Receiving Messages mq_send, mq_receive, mq_timedsend, mq_timedreceive

 

Sending and Receiving Messages
Chapter 52 — mq_send, mq_receive, mq_timedsend, mq_timedreceive

mq_send() — Putting a Message into the Queue
#include <mqueue.h>

int mq_send(mqd_t mqd, const char *msg_ptr, size_t msg_len,
            unsigned int msg_prio);
/* Returns: 0 on success, -1 on error */

mqd: The queue descriptor (must be opened with O_WRONLY or O_RDWR).
msg_ptr: Pointer to the message data (can be any bytes, not just strings).
msg_len: Number of bytes to send. Must be ≤ mq_msgsize.
msg_prio: Priority (unsigned int). Higher number = higher priority. Range: 0 to MQ_PRIO_MAX-1 (at least 32).

Blocking behaviour: If the queue is full, mq_send() blocks until space becomes available (a consumer receives a message). With O_NONBLOCK, it returns immediately with errno == EAGAIN.

#include <stdio.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>

/* Demonstrate mq_send with error handling */
int safe_send(mqd_t mqd, const char *data, unsigned int prio)
{
    size_t len = strlen(data) + 1;  /* include null terminator */

    if (mq_send(mqd, data, len, prio) == -1) {
        if (errno == EAGAIN)
            fprintf(stderr, "Queue full (O_NONBLOCK). Message dropped.\n");
        else if (errno == EMSGSIZE)
            fprintf(stderr, "Message too large for queue.\n");
        else
            perror("mq_send");
        return -1;
    }
    printf("Sent [prio=%u]: \"%s\"\n", prio, data);
    return 0;
}

int main(void)
{
    struct mq_attr attr = { .mq_maxmsg=5, .mq_msgsize=64 };
    mqd_t mqd = mq_open("/send_demo", O_CREAT|O_RDWR, 0600, &attr);
    if (mqd == (mqd_t)-1) { perror("mq_open"); return 1; }

    /* Send messages with different priorities */
    safe_send(mqd, "Low priority task",    1);
    safe_send(mqd, "Medium priority task", 5);
    safe_send(mqd, "High priority task",  10);
    safe_send(mqd, "Critical alert",      20);
    safe_send(mqd, "Another low task",     1);

    mq_close(mqd);
    mq_unlink("/send_demo");
    return 0;
}

mq_receive() — Getting the Highest Priority Message
#include <mqueue.h>

ssize_t mq_receive(mqd_t mqd, char *msg_ptr, size_t msg_len,
                   unsigned int *msg_prio);
/* Returns: number of bytes in message on success, -1 on error */

mqd: Queue descriptor (opened with O_RDONLY or O_RDWR).
msg_ptr: Buffer to store the received message.
msg_len: Buffer size. Must be ≥ mq_msgsize (use mq_getattr() to find this).
msg_prio: Pointer to unsigned int; filled with the message’s priority. Pass NULL if you don’t care.

Order guarantee: Always returns the message with the highest priority. Among equal-priority messages, FIFO order applies.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>

int main(void)
{
    struct mq_attr attr = { .mq_maxmsg=8, .mq_msgsize=64 };
    mqd_t mqd = mq_open("/recv_demo", O_CREAT|O_RDWR, 0600, &attr);
    if (mqd == (mqd_t)-1) { perror("mq_open"); return 1; }

    /* Send in arbitrary order */
    mq_send(mqd, "Task C (prio=1)", 16, 1);
    mq_send(mqd, "Task B (prio=5)", 16, 5);
    mq_send(mqd, "Task A (prio=9)", 16, 9);
    mq_send(mqd, "Task D (prio=1)", 16, 1);
    mq_send(mqd, "Task E (prio=5)", 16, 5);

    printf("Receiving in priority order:\n");

    char buf[64];
    unsigned int prio;
    ssize_t n;
    int count = 0;

    while ((n = mq_receive(mqd, buf, sizeof(buf), &prio)) > 0) {
        printf("  [%d] prio=%-3u  msg=\"%s\"\n", ++count, prio, buf);
    }

    mq_close(mqd);
    mq_unlink("/recv_demo");
    return 0;
}

Output — notice the priority ordering:

Receiving in priority order:
  [1] prio=9    msg="Task A (prio=9)"
  [2] prio=5    msg="Task B (prio=5)"
  [3] prio=5    msg="Task E (prio=5)"
  [4] prio=1    msg="Task C (prio=1)"
  [5] prio=1    msg="Task D (prio=1)"

Notice: Task A (prio=9) came out first even though it was sent third. Tasks B and E (both prio=5) came in FIFO order relative to each other. Tasks C and D (both prio=1) also in FIFO order.

Understanding Priority Ordering Visually

The queue maintains messages sorted by priority at all times. Adding a high-priority message causes it to “jump the queue”.

Insert Order Queue State (head = next to be received)
send(“C”, prio=1) [C,p=1]
send(“B”, prio=5) [B,p=5] → [C,p=1]
send(“A”, prio=9) [A,p=9] → [B,p=5] → [C,p=1]
send(“D”, prio=1) [A,p=9] → [B,p=5] → [C,p=1] → [D,p=1]
send(“E”, prio=5) [A,p=9] → [B,p=5] → [E,p=5] → [C,p=1] → [D,p=1]
receive() Returns A (prio=9) — always highest priority first

mq_timedsend() and mq_timedreceive() — With Timeout

These are the timed versions of send/receive. Instead of blocking forever when the queue is full (send) or empty (receive), they block only up to a specified absolute time. If the time expires, they return -1 with errno == ETIMEDOUT.

#include <mqueue.h>
#include <time.h>

int mq_timedsend(mqd_t mqd, const char *msg_ptr, size_t msg_len,
                 unsigned int msg_prio,
                 const struct timespec *abs_timeout);

ssize_t mq_timedreceive(mqd_t mqd, char *msg_ptr, size_t msg_len,
                         unsigned int *msg_prio,
                         const struct timespec *abs_timeout);

/* abs_timeout is an ABSOLUTE time (not relative duration).
   Use clock_gettime(CLOCK_REALTIME) and add your desired timeout. */

Common mistake: The timeout is absolute, not relative. You must get the current time first, then add the timeout duration to it.

#include <stdio.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <time.h>
#include <errno.h>

/* Helper: make an absolute timespec N seconds from now */
struct timespec make_abs_timeout(int seconds)
{
    struct timespec ts;
    clock_gettime(CLOCK_REALTIME, &ts);
    ts.tv_sec += seconds;
    return ts;
}

int main(void)
{
    struct mq_attr attr = { .mq_maxmsg=2, .mq_msgsize=64 };
    mqd_t mqd = mq_open("/timed_demo", O_CREAT|O_RDWR, 0600, &attr);
    if (mqd == (mqd_t)-1) { perror("mq_open"); return 1; }

    /* --- Test mq_timedreceive on an empty queue --- */
    char buf[64];
    unsigned int prio;
    struct timespec timeout = make_abs_timeout(2); /* 2 seconds */

    printf("Trying to receive from empty queue (2s timeout)...\n");
    ssize_t n = mq_timedreceive(mqd, buf, sizeof(buf), &prio, &timeout);
    if (n == -1) {
        if (errno == ETIMEDOUT)
            printf("mq_timedreceive: timed out as expected!\n");
        else
            perror("mq_timedreceive");
    }

    /* --- Fill queue and test mq_timedsend on a full queue --- */
    mq_send(mqd, "msg1", 5, 1);
    mq_send(mqd, "msg2", 5, 1);
    /* Queue is now full (maxmsg=2) */

    timeout = make_abs_timeout(2);
    printf("Trying to send to full queue (2s timeout)...\n");
    if (mq_timedsend(mqd, "msg3", 5, 1, &timeout) == -1) {
        if (errno == ETIMEDOUT)
            printf("mq_timedsend: timed out as expected!\n");
        else
            perror("mq_timedsend");
    }

    mq_close(mqd);
    mq_unlink("/timed_demo");
    return 0;
}

Output:

Trying to receive from empty queue (2s timeout)...
mq_timedreceive: timed out as expected!
Trying to send to full queue (2s timeout)...
mq_timedsend: timed out as expected!

Sending Structured/Binary Data (Not Just Strings)

Messages are raw byte arrays, not strings. You can send any data including structs — just cast to char *. This is very useful in embedded systems to pass sensor readings, commands, or events.

#include <stdio.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>

/* A structured message type */
typedef struct {
    int   sensor_id;
    float temperature;
    float humidity;
    int   alarm_flag;
} SensorData;

int main(void)
{
    struct mq_attr attr = {
        .mq_maxmsg  = 10,
        .mq_msgsize = sizeof(SensorData),
    };

    mqd_t mqd = mq_open("/sensor_q", O_CREAT|O_RDWR, 0600, &attr);
    if (mqd == (mqd_t)-1) { perror("mq_open"); return 1; }

    /* Producer: send a sensor reading */
    SensorData reading = {
        .sensor_id   = 42,
        .temperature = 28.5f,
        .humidity    = 65.2f,
        .alarm_flag  = 0,
    };

    if (mq_send(mqd, (char *)&reading, sizeof(reading), 5) == -1) {
        perror("mq_send");
        mq_close(mqd); mq_unlink("/sensor_q"); return 1;
    }
    printf("Sent: sensor=%d temp=%.1f hum=%.1f alarm=%d\n",
           reading.sensor_id, reading.temperature,
           reading.humidity, reading.alarm_flag);

    /* Consumer: receive and cast back */
    SensorData received;
    unsigned int prio;
    ssize_t n = mq_receive(mqd, (char *)&received, sizeof(received), &prio);
    if (n == -1) {
        perror("mq_receive");
    } else {
        printf("Received: sensor=%d temp=%.1f hum=%.1f alarm=%d (prio=%u)\n",
               received.sensor_id, received.temperature,
               received.humidity, received.alarm_flag, prio);
    }

    mq_close(mqd);
    mq_unlink("/sensor_q");
    return 0;
}

Producer-Consumer Pattern with fork()

A typical IPC scenario: parent forks a child. Child is the producer (sends messages), parent is the consumer (receives and processes). The queue is the communication channel between them.

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/wait.h>

#define QNAME    "/prod_cons"
#define MSGSIZE  64
#define MAXMSG   5
#define NUM_MSGS 4

int main(void)
{
    struct mq_attr attr = {
        .mq_maxmsg  = MAXMSG,
        .mq_msgsize = MSGSIZE,
    };

    /* Create queue before fork so both parent and child can open it */
    mqd_t mqd = mq_open(QNAME, O_CREAT | O_RDWR, 0600, &attr);
    if (mqd == (mqd_t)-1) { perror("mq_open"); return 1; }

    pid_t pid = fork();
    if (pid == -1) { perror("fork"); mq_close(mqd); mq_unlink(QNAME); return 1; }

    if (pid == 0) {
        /* ---- CHILD: Producer ---- */
        for (int i = 0; i < NUM_MSGS; i++) {
            char msg[MSGSIZE];
            unsigned int prio = (i % 3) + 1;  /* priorities 1, 2, 3 */
            snprintf(msg, sizeof(msg), "message-%d", i);
            mq_send(mqd, msg, strlen(msg)+1, prio);
            printf("[child]  sent: \"%s\" prio=%u\n", msg, prio);
            usleep(50000); /* 50ms */
        }
        mq_close(mqd);
        exit(0);
    } else {
        /* ---- PARENT: Consumer ---- */
        char buf[MSGSIZE];
        unsigned int prio;
        for (int i = 0; i < NUM_MSGS; i++) {
            ssize_t n = mq_receive(mqd, buf, MSGSIZE, &prio);
            if (n > 0)
                printf("[parent] recv: \"%s\" prio=%u\n", buf, prio);
        }
        wait(NULL);
        mq_close(mqd);
        mq_unlink(QNAME);
    }
    return 0;
}

Non-blocking Send/Receive Pattern

Use O_NONBLOCK when you want to poll the queue without blocking — useful in event loops or when you have other work to do if the queue is empty.

#include <stdio.h>
#include <string.h>
#include <mqueue.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <errno.h>
#include <unistd.h>

#define QNAME "/nonblock_q"

int main(void)
{
    struct mq_attr attr = { .mq_maxmsg=4, .mq_msgsize=32 };

    /* Open with O_NONBLOCK */
    mqd_t mqd = mq_open(QNAME, O_CREAT|O_RDWR|O_NONBLOCK, 0600, &attr);
    if (mqd == (mqd_t)-1) { perror("mq_open"); return 1; }

    /* Send a couple of messages */
    mq_send(mqd, "work item 1", 12, 2);
    mq_send(mqd, "work item 2", 12, 5);

    /* Poll loop: drain the queue, do other work between attempts */
    char buf[32];
    unsigned int prio;
    int iterations = 0;

    while (1) {
        ssize_t n = mq_receive(mqd, buf, sizeof(buf), &prio);
        if (n > 0) {
            printf("[iter=%d] processed: \"%s\" (prio=%u)\n",
                   iterations, buf, prio);
        } else if (errno == EAGAIN) {
            printf("[iter=%d] queue empty, doing other work...\n", iterations);
            if (iterations >= 3) break;  /* exit after a few empty polls */
        } else {
            perror("mq_receive"); break;
        }
        iterations++;
        usleep(100000); /* simulate doing other work */
    }

    mq_close(mqd);
    mq_unlink(QNAME);
    return 0;
}

Output:

[iter=0] processed: "work item 2" (prio=5)
[iter=1] processed: "work item 1" (prio=2)
[iter=2] queue empty, doing other work...
[iter=3] queue empty, doing other work...

Interview Questions — Send and Receive

Q1: In what order does mq_receive() return messages?
Always in descending priority order (highest priority first). Among messages with the same priority, FIFO (first sent, first received) order applies.

Q2: What is the minimum buffer size needed for mq_receive()?
At least mq_msgsize bytes (the maximum message size configured for that queue). You find this via mq_getattr(). Passing a smaller buffer fails with EMSGSIZE.

Q3: What is the difference between mq_receive() and mq_timedreceive()?
mq_receive() blocks indefinitely when the queue is empty. mq_timedreceive() blocks only until an absolute timeout (struct timespec). If the timeout expires before a message arrives, it returns -1 with errno == ETIMEDOUT.

Q4: Is the timeout in mq_timedsend/mq_timedreceive relative or absolute?
Absolute. You must obtain the current time via clock_gettime(CLOCK_REALTIME, &ts) and add your desired duration to ts.tv_sec.

Q5: What happens when mq_send() is called on a full queue in blocking mode?
The calling process blocks (sleeps) until another process calls mq_receive() to free a slot. In O_NONBLOCK mode, it returns immediately with errno == EAGAIN.

Q6: Can you send binary (non-string) data through a POSIX message queue?
Yes. Messages are raw byte arrays. You can send any data — structs, integers, binary buffers — by casting to const char * and passing the exact byte count via msg_len.

Q7: What is the maximum message priority value on Linux?
MQ_PRIO_MAX - 1. POSIX requires MQ_PRIO_MAX to be at least 32, but on Linux it is 32768, so valid priorities are 0 to 32767.

Q8: If you have two messages with prio=5 and two with prio=3, what order are they received?
Both prio=5 messages first (in FIFO order among themselves), then both prio=3 messages (in FIFO order among themselves).

Q9: Write the one-liner pattern to correctly allocate a receive buffer.

struct mq_attr a; mq_getattr(mqd, &a); char *buf = malloc(a.mq_msgsize);

Q10: What return value does mq_receive() give on success?
The number of bytes in the received message (as ssize_t), which is the actual message length (not the buffer size). On error, -1 is returned.

Next: Asynchronous Notification with mq_notify()

Learn how a process can be notified when a message arrives on an empty queue — without polling or blocking.

← Back to File 3 Go to File 5 →

Leave a Reply

Your email address will not be published. Required fields are marked *