Client-Server with Message Queues

 

Chapter 46 · File 5 of 6

Client-Server with Message Queues

Request-Response Pattern · PID as Reply Key · Full Working Program

The most common real-world use of System V message queues is a client-server request/response pattern. Clients send requests to a well-known server queue; the server processes each request and sends the response back to the client’s private queue. This file builds a complete working example step by step.

Design: The Request-Response Pattern

Client-Server Communication via Message Queues
CLIENT
1. Create own private queue
2. Build request msg
3. Put own PID in request
4. Send to server queue
5. Wait for reply on own queue
6. Process reply
→→→ Request →→→
(type=1, includes client_pid)
SERVER QUEUE
Well-known key (ftok)
←←← Reply ←←←
(type=client_pid)
SERVER
1. Create server queue (fixed key)
2. Loop: read next request
3. Process request
4. Send reply to client’s queue
using client_pid as msg type
Key trick: client puts its PID in the request, server uses that as the reply message type. Each client waits on msgrcv(type=its_own_pid) so replies go to the right client.
Why use PID as message type? When multiple clients share the same server queue, the server must route replies. Each client’s PID is unique and positive — perfect as a message type. The client calls msgrcv(type=getpid()) and only receives the reply meant for it.

Step 1 — Shared Header File

svmsg_common.h
/* svmsg_common.h — Shared by server and client */
#ifndef SVMSG_COMMON_H
#define SVMSG_COMMON_H

#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>

/* Well-known path + project ID to generate the server queue key */
#define SERVER_KEY_FILE  "/tmp/svmsg_server"
#define SERVER_PROJ_ID   'S'

/* Message type the CLIENT sends to the server */
#define REQ_MSG_TYPE     1

/* Maximum length of string data in a message */
#define MAX_DATA         256

/* Request message: client → server */
struct RequestMsg {
    long  mtype;              /* Always REQ_MSG_TYPE (=1) */
    pid_t client_pid;         /* Client's PID — server uses this as reply type */
    int   client_qid;         /* Client's queue ID — server replies here */
    char  data[MAX_DATA];     /* The actual request data */
};

/* Response message: server → client */
struct ResponseMsg {
    long mtype;               /* Set to client_pid so only that client reads it */
    int  status;              /* 0 = success, -1 = error */
    char data[MAX_DATA];      /* Response data */
};

#endif /* SVMSG_COMMON_H */

Step 2 — The Server

svmsg_server.c
/* svmsg_server.c — Echo server using System V message queues */
#include "svmsg_common.h"
#include <signal.h>
#include <ctype.h>

static int server_qid = -1;

void cleanup(int sig)
{
    if (server_qid != -1) {
        printf("\n[Server] Removing queue %d\n", server_qid);
        msgctl(server_qid, IPC_RMID, NULL);
    }
    exit(0);
}

/* Uppercase the string — our "service" */
void process_request(const char *in, char *out, size_t outsz)
{
    size_t i;
    for (i = 0; i < outsz - 1 && in[i] != '\0'; i++)
        out[i] = toupper((unsigned char)in[i]);
    out[i] = '\0';
}

int main(void)
{
    signal(SIGINT,  cleanup);
    signal(SIGTERM, cleanup);

    /* Create key file and key */
    FILE *f = fopen(SERVER_KEY_FILE, "w");
    if (f) fclose(f);

    key_t key = ftok(SERVER_KEY_FILE, SERVER_PROJ_ID);
    if (key == -1) { perror("ftok"); return 1; }

    /* Create server queue (remove stale one first) */
    server_qid = msgget(key, IPC_CREAT | IPC_EXCL | 0666);
    if (server_qid == -1) {
        if (errno == EEXIST) {
            /* Remove stale queue and try again */
            int old = msgget(key, 0666);
            if (old != -1) msgctl(old, IPC_RMID, NULL);
            server_qid = msgget(key, IPC_CREAT | 0666);
        }
        if (server_qid == -1) { perror("msgget"); return 1; }
    }

    printf("[Server] Started. Queue ID = %d. Waiting for requests...\n", server_qid);
    printf("[Server] Send me a string and I'll return it UPPERCASED.\n\n");

    while (1) {
        struct RequestMsg req;

        /* Receive any incoming request (type=REQ_MSG_TYPE) */
        ssize_t n = msgrcv(server_qid, &req, sizeof(req) - sizeof(long),
                           REQ_MSG_TYPE, 0);
        if (n == -1) {
            if (errno == EINTR) continue;
            perror("msgrcv");
            break;
        }

        printf("[Server] Request from PID=%d: '%s'\n", req.client_pid, req.data);

        /* Build response */
        struct ResponseMsg resp;
        resp.mtype  = req.client_pid;  /* Reply goes ONLY to this client */
        resp.status = 0;
        process_request(req.data, resp.data, sizeof(resp.data));

        /* Send reply to client's queue */
        if (msgsnd(req.client_qid, &resp, sizeof(resp) - sizeof(long), 0) == -1) {
            perror("msgsnd reply");
            /* Client may have gone away — not fatal for server */
        } else {
            printf("[Server] Reply sent: '%s'\n\n", resp.data);
        }
    }

    cleanup(0);
    return 0;
}

Step 3 — The Client

svmsg_client.c
/* svmsg_client.c — Client for the echo server */
#include "svmsg_common.h"

int main(int argc, char *argv[])
{
    if (argc != 2) {
        fprintf(stderr, "Usage: %s <message to uppercase>\n", argv[0]);
        return 1;
    }

    /* Open the server queue using the same key */
    key_t key = ftok(SERVER_KEY_FILE, SERVER_PROJ_ID);
    if (key == -1) { perror("ftok"); return 1; }

    int server_qid = msgget(key, 0666);
    if (server_qid == -1) {
        if (errno == ENOENT)
            fprintf(stderr, "Server queue not found! Start svmsg_server first.\n");
        else
            perror("msgget");
        return 1;
    }

    /* Create our own private reply queue */
    int my_qid = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
    if (my_qid == -1) { perror("msgget (client queue)"); return 1; }

    /* Build the request */
    struct RequestMsg req;
    req.mtype      = REQ_MSG_TYPE;
    req.client_pid = getpid();
    req.client_qid = my_qid;
    strncpy(req.data, argv[1], MAX_DATA - 1);
    req.data[MAX_DATA - 1] = '\0';

    /* Send request to server */
    if (msgsnd(server_qid, &req, sizeof(req) - sizeof(long), 0) == -1) {
        perror("msgsnd request");
        msgctl(my_qid, IPC_RMID, NULL);
        return 1;
    }
    printf("[Client PID=%d] Sent request: '%s'\n", getpid(), req.data);

    /* Wait for reply — msgtyp = our PID, so only our reply arrives here */
    struct ResponseMsg resp;
    ssize_t n = msgrcv(my_qid, &resp, sizeof(resp) - sizeof(long),
                       getpid(), 0);  /* mtype = our PID */
    if (n == -1) {
        perror("msgrcv reply");
        msgctl(my_qid, IPC_RMID, NULL);
        return 1;
    }

    if (resp.status == 0)
        printf("[Client PID=%d] Response: '%s'\n", getpid(), resp.data);
    else
        printf("[Client PID=%d] Server returned error.\n", getpid());

    /* Clean up our private queue */
    msgctl(my_qid, IPC_RMID, NULL);
    return 0;
}

Building and Running the Example

## Terminal 1: Start the server
gcc svmsg_server.c -o svmsg_server
./svmsg_server

## Terminal 2: Run clients
gcc svmsg_client.c -o svmsg_client
./svmsg_client "hello world"
./svmsg_client "embedded systems"
./svmsg_client "linux ipc is powerful"

## Terminal 3: Multiple clients at once (to show concurrent access)
./svmsg_client "first"  &
./svmsg_client "second" &
./svmsg_client "third"  &
wait

## Expected server output:
# [Server] Request from PID=12345: 'hello world'
# [Server] Reply sent: 'HELLO WORLD'

## Expected client output:
# [Client PID=12345] Sent request: 'hello world'
# [Client PID=12345] Response: 'HELLO WORLD'

## Check queues while running:
ipcs -q

Bonus Example — Logging System Using Message Queues

A lightweight logging daemon where multiple worker processes send log messages to a single logger process by message type = log level.

/* msgq_logger.c — Single-file log demo (fork-based) */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/wait.h>

/* Log levels as message types */
#define LOG_INFO    1
#define LOG_WARN    2
#define LOG_ERROR   3

struct LogMsg {
    long  mtype;       /* 1=INFO 2=WARN 3=ERROR */
    pid_t sender_pid;
    char  text[200];
};

const char *level_name(long t) {
    if (t == LOG_INFO)  return "INFO ";
    if (t == LOG_WARN)  return "WARN ";
    if (t == LOG_ERROR) return "ERROR";
    return "?????";
}

int main(void)
{
    int qid = msgget(IPC_PRIVATE, IPC_CREAT | 0666);
    if (qid == -1) { perror("msgget"); return 1; }

    /* Fork 3 "worker" processes */
    for (int w = 1; w <= 3; w++) {
        pid_t pid = fork();
        if (pid == 0) {
            /* Worker: send some log messages */
            struct LogMsg m;
            m.sender_pid = getpid();

            m.mtype = LOG_INFO;
            snprintf(m.text, sizeof(m.text), "Worker %d started", w);
            msgsnd(qid, &m, sizeof(m) - sizeof(long), 0);

            if (w == 2) {
                m.mtype = LOG_WARN;
                snprintf(m.text, sizeof(m.text), "Worker %d: memory usage is high", w);
                msgsnd(qid, &m, sizeof(m) - sizeof(long), 0);
            }

            if (w == 3) {
                m.mtype = LOG_ERROR;
                snprintf(m.text, sizeof(m.text), "Worker %d: failed to open file", w);
                msgsnd(qid, &m, sizeof(m) - sizeof(long), 0);
            }

            m.mtype = LOG_INFO;
            snprintf(m.text, sizeof(m.text), "Worker %d done", w);
            msgsnd(qid, &m, sizeof(m) - sizeof(long), 0);

            exit(0);
        }
    }

    /* Parent = logger: wait for all children, then drain queue */
    for (int i = 0; i < 3; i++) wait(NULL);

    printf("=== LOG OUTPUT ===\n");
    struct LogMsg rcv;
    int total = 0;
    /* Read ALL messages (type=0 = FIFO order) */
    while (msgrcv(qid, &rcv, sizeof(rcv) - sizeof(long), 0, IPC_NOWAIT) != -1) {
        printf("[%s] PID=%-6d %s\n", level_name(rcv.mtype), rcv.sender_pid, rcv.text);
        total++;
    }
    printf("=== %d log entries ===\n", total);

    msgctl(qid, IPC_RMID, NULL);
    return 0;
}

Interview Questions — Client-Server Pattern

Q1. How does a server know which queue to send a reply to in a multi-client setup? The client includes its queue ID (client_qid) in the request message. The server reads this field and sends the reply to that specific queue using msgsnd().
Q2. Why is the client’s PID used as the reply message type? Each client waits on msgrcv() filtering by its own PID (type=getpid()). Since PIDs are unique and positive, no two running clients have the same type, so replies are automatically routed to the correct client.
Q3. What is a “well-known queue”? How is it established? A queue identified by a fixed key that all clients know how to compute. Typically created using ftok() with a known file path and project ID. The server creates it; clients open it without IPC_CREAT.
Q4. Can multiple servers share the same queue? What are the risks? Yes, but messages are consumed by whichever server reads first — no built-in load balancing guarantee. Risk: both servers may attempt to read the same message, causing a race. Usually only one server should own a queue.
Q5. What happens if the client exits before reading the server’s reply? The reply message stays in the client’s queue until the queue is deleted (when the client calls IPC_RMID or the process exits and the queue was created with IPC_PRIVATE). If the queue persists, the reply sits there until the kernel limit is hit.
Q6. How do you ensure exactly-once semantics with message queues? Message queues are not atomic by design. For exactly-once delivery, you need application-level sequence numbers, idempotent operations, or acknowledgment messages. System V queues don’t provide this natively.

Leave a Reply

Your email address will not be published. Required fields are marked *