Chapter 6. Message Queues

You use a message queue to pass blocks of data between processes or threads without having to share any memory between the processes. One process or thread puts a message into the queue. The message is held in the queue until another process or thread asks for the message.

IRIX supports two implementations of message queues: a POSIX implementation as specified by IEEE standard 1003.1b-1993, and an SVR4 implementation compatible with System V Release 4. Both implementations can be used to coordinate POSIX threads or IRIX processes. This chapter discusses message queues under these headings:

Overview of Message Queues

A message queue is a software object maintained by the IRIX kernel, logically apart from the address space of any process. When you create a message queue, the queue has a public identifier. (The identifier is a file pathname for POSIX, or an integer for SVR4.) A process uses the identifier to open the queue. When the queue is open, the process can send messages to the queue or receive messages from the queue.

A message queue has an access mode similar to a file access mode, specifying read and write access for its owner, its owner's group, or all users. A process with an effective user ID giving only read access can only receive messages from the queue. A process with an effective user ID lacking access cannot open the queue.

When a process requests a message from a queue and no message is available, the process can be notified immediately with an error code, or it can be suspended until a message is sent.

A message queue has a limit on the amount of data that can be queued. (POSIX limits the number of messages; SVR4 limits the total size of queued messages.) When a process sends a message that would exceed the queue's limit, the process can be notified immediately with an error code, or it can be suspended until there is room in the queue.

Implementation Differences

The abstract operations that a message queue supports are summarized in Table 6-1 with the names of the POSIX and SVR4 functions that implement them.

Table 6-1. Abstract Operations on a Message Queue

Operation

POSIX Function

SVR4 Function

Gain access to a queue, creating it if it does not exist.

mq_open(3)

msgget(2)

Query attributes of a queue and number of pending messages.

mq_getattr(3)

msgctl(2)

Change attributes of a queue.

mq_setattr(3)

msgctl(2)

Give up access to a queue.

mq_close(3)

n.a.

Remove a queue from the system.

mq_unlink(3), rm(1)

msgctl(2), ipcrm(1)

Send a message to a queue.

mq_send(3)

msgsnd(2)

Receive a message from a queue.

mq_receive(3)

msgrcv(2)

Request asynchronous notification of a message arriving at a queue.

mq_notify(3)

n.a.

Both implementations can be used to communicate between POSIX threads and between IRIX processes in any combination. Besides obvious features of syntax, the principal differences between the two implementations are as follows:

  • POSIX functions are implemented as library functions in the libc library and operate primarily in the user process address space. SVR4 functions are implemented in the kernel, and every operation requires a context switch. This generally results in lower overhead for the POSIX functions.

  • The identity of a POSIX or an SVR4 queue is retained over a reboot. The contents of a POSIX queue might or might not survive a reboot, but you should not depend on either type of queue to retain its state after the last program closes it.

  • POSIX allows you to set a limit on the number of messages and the size of one message. SVR4 allows you to set a limit on the aggregate size of queued messages, but not on their number or their individual sizes.

  • With a POSIX queue, the choice of whether or not operations should block on a full or empty queue is an attribute of the queue descriptor. With SVR4, you specify blocking or nonblocking operation on each send or receive operation.

  • POSIX supports asynchronous notification of a message arrival. SVR4 does not.

  • SVR4 allows a receiver to request a message from a particular priority class, in effect creating sub-queues within a queue. POSIX supports a priority class on each message, but it always returns the first message of the highest priority class.

Uses of Message Queues

You can use message queues in a variety of ways. For example, you can use a message queue to implement the “producer-consumer” model of cooperating processes or threads. The “producer” sends its output to the queue; the “consumer” receives the data from the queue. When one process gets ahead of the other, it is automatically suspended on the queue until the other process catches up.

Another design model, common in real-time programming, is to use message queues to dispatch units of work to waiting processes or threads. A process or thread dedicated to one type of work waits on a message queue. Whenever another process or thread needs a unit of work of that type, it sends the unit to that queue as a message.

Another use of a message queue is to regulate the use of a scarce resource, such as the buffers in a pool of buffers. Each resource unit is represented by a message. In order to obtain a unit, you receive one message from the queue. To release a unit for other processes to use, you send the unit message back to the queue.

The latter scheme can be used to compensate for a performance problem. The speed of communication through a queue is limited by the fact that every message is copied twice: when a message is sent, it is copied from the sender's buffer to some reserved memory space; when the message is received, it is copied into the buffer supplied by the receiving process or thread. When messages are small (or few in number), copying is not a serious problem.

When messages are large, copying can be avoided as follows. Allocate a pool of message buffers. Set up a queue of small messages, each message representing a “ticket” to use a particular buffer. In order to obtain a buffer, a process receives a message from this queue. The process fills the buffer, then it sends the buffer without copying, by sending only the “ticket” on another message queue. The process that receives the “ticket” uses the data in the buffer without needing to copy it, and releases the buffer by sending the “ticket” to the original queue.

POSIX Message Queues

The POSIX real-time extensions (detailed in IEEE standard 1003.1b) include support for messages queues. These functions are discussed in the following topics and demonstrated in example programs.

Managing Message Queues

The POSIX functions for creating, controlling, closing, and removing message queues are summarized in Table 6-2.

Table 6-2. POSIX Functions for Managing Message Queues

Function Name

Purpose and Operation

mq_open(3)

Create a queue if it does not exist, and gain access to it.

mq_getattr(3)

Get information about an open message queue.

mq_setattr(3)

Change the blocking/nonblocking attribute of an open message queue.

mq_close(3)

Give up access to a queue.

mq_unlink(3)

Remove a message queue from the system when the last process to have it open, closes it.


Creating a Message Queue

The mq_open() function has two purposes. It is used to gain access to a queue that exists, and it can create a queue that does not exist. To create a new queue, call mq_open() with four arguments as follows (using the names given in the reference page):

mq_name

The pathname that the queue will have.

oflag

A set of flags that includes O_CREAT and may include O_EXCL.

mode

The access permissions the queue will have.

mq_attr

Either NULL or the address of an mq_attr structure specifying the queue attributes of maximum message size and maximum messages.

The name of a queue has the same form as a disk filename, and in fact a queue is implemented as a file. This implementation is permitted, but not required, by the POSIX standard. Other implementations might not use it.

Once created, a queue is a persistent object that survives until removed. If you want the program to create a queue, use it, and then remove it during termination, you can call mq_unlink() to remove the queue.

The file can retain some queued messages when the queue is not open, so that some queued data can persist beyond the termination of the programs that use the queue. The queued data cannot be trusted after a reboot, because the data might not have been written to disk before the system came down. You should not depend on the state of the message queue after a reboot.

Opening an Existing Queue

It is more common to open an existing queue. When the program expects the queue to exist, it omits the O_CREAT flag bit. An error is returned if the queue does not exist, or if the queue exists but the effective user ID or group ID of the program does not allow access to it.

The program can specify the O_RDONLY, O_WRONLY, or O_RDWR flag to show its intended use of the queue. Access is controlled by the access permissions of the queue, just as for a file.

Specifying Blocking or Nonblocking Access

An important flag when opening a queue is the O_NONBLOCK flag. When the program specifies O_NONBLOCK, it wants an immediate return with an error code (EAGAIN) when it sends a message to a full queue or requests a message from an empty queue. When the program omits O_NONBLOCK, it specifies that it is willing to be suspended in these situations.

The O_NONBLOCK flag applies to all operations using the queue descriptor returned by mq_open(). (The same queue, opened under a different descriptor, can have different blocking behavior.) The blocking behavior can be changed by applying mq_setattr() to the queue descriptor. If the program normally wants to allow suspension, but in a particular situation wants to avoid suspension, it can apply mq_setattr() to change the blocking state, and then set it back again.

Using Message Queues

The POSIX functions for using an open queue are summarized in Table 6-3.

Table 6-3. POSIX Functions for Using Message Queues

Function Name

Purpose and Operation

mq_send(3)

Send a message to a queue.

mq_receive(3)

Receive a message from a queue.

mq_notify(3)

Request asynchronous notification of a message on a queue.


Sending a Message

To send a message to a queue, call mq_send() specifying the queue, the address and length of the message data, and an integer specifying the priority class of the message. Messages on the queue are retained in arrival sequence within priority classes.

The message is copied out of the caller's buffer, so the buffer can be reused immediately after a successful send. The mq_send() function blocks if the queue is full, unless the O_NONBLOCK attribute is in effect for the queue.

Receiving a Message

To receive a message, call mq_receive() specifying the queue, the address and size of a buffer, and the address of an integer to receive the message's priority. The size of the buffer must be at least as large as the maximum size allowed by that queue. You can learn this size using mq_getattr() (see Example 6-4 for an example of this).

The mq_receive() function blocks if the queue is empty, unless O_NONBLOCK is in effect for the queue. The message returned is always the oldest message in the highest priority class.

Using Asynchronous Notification

Some applications are designed so that each process or thread does nothing but process messages. In a design of this kind, it makes sense for a process or thread to suspend itself when no messages are available on its queue.

Other applications are designed so that one process or thread performs multiple tasks besides handling messages, or handles messages from multiple queues. In this kind of program, a process cannot suspend itself on a single message queue. Instead, it needs to do other work and only request a message when a message is available. One way to do this is to set the O_NONBLOCK flag, and to periodically poll for a message by calling mq_receive() and testing its return code. However, this is inefficient.

The POSIX message facility offers the ability to receive an asynchronous notification in the event that a message is posted to an empty queue and no process or thread is suspended waiting for that message. You do this by calling mq_notify() passing a queue and a sigevent_t structure. (The sigevent_t is declared in sys/signal.h, which is included by mqueue.h.)

The sigevent_t structure allows you to specify either a signal or a callback function. However, only the signal notification (SIGEV_SIGNAL) request is supported by the POSIX message queue implementation.

Example Programs

The following programs demonstrate the use of POSIX message queues:

The four example programs have a consistent design and use consistent command-line arguments. Each accepts optional arguments that allow you to exercise most features of each function, including most error return codes. The following is a simple example of use. First, a queue is created:

$ mq_open -p 0664 -b 128 -m 32 -c -x /var/tmp/Q32x128
flags: 0x0  maxmsg: 32  msgsize: 128  curmsgs: 0

An attempt is made to send a message that is larger than the queue maximum size:

$ mq_send -b 129 /var/tmp/Q32x128
mq_send(): Inappropriate message buffer length

A message of appropriate size is sent. Its presence on the queue is verified using mq_getattr():

$ mq_send -b 128 -p 7 /var/tmp/Q32x128
$ mq_attr /var/tmp/Q32x128
flags: 0x0  maxmsg: 32  msgsize: 128  curmsgs: 1

An attempt is made to send a message with an illegal priority (32 is the highest allowed):

$ mq_send -p 99 /var/tmp/Q32x128
mq_send(): Invalid argument

A message is sent with a valid priority:

$ mq_send -p 19 /var/tmp/Q32x128
$ mq_attr /var/tmp/Q32x128
flags: 0x0  maxmsg: 32  msgsize: 128  curmsgs: 2

The two messages are received. The one with higher priority is received first:

$ mq_receive -c 2 /var/tmp/Q32x128
1: priority 19  len 63 text 00001 Fri Jun 14 09:19:12 1996
2: priority 7  len 128 text 00001 Fri Jun 14 09:17:15 1996

Another message is requested. Since the O_NONBLOCK flag is used, the absence of any message is reported as an error code, rather than suspending the process:

$ mq_receive -n /var/tmp/Q32x128
mq_receive(): Resource temporarily unavailable

Example of mq_getattr()

The program mq_attr in Example 6-1 uses mq_getattr() to get and display the queue attributes. Only one command-line argument is accepted:

path

The file pathname of the queue must be given following all options.


Example 6-1. Program to Demonstrate mq_getattr() and mq_setattr()

/*
|| Program to test mq_getattr(3), displaying queue information.
||    mq_attr <path>
||    <path>   pathname of the queue, which must exist
*/
#include <mqueue.h>     /* message queue stuff */
#include <errno.h>      /* errno and perror */
#include <fcntl.h>      /* O_RDONLY */
#include <stdio.h>
int main(int argc, char **argv)
{
   mqd_t mqd;           /* queue descriptor */
   struct mq_attr obuf; /* output attr struct for getattr */
   if (argc < 2)
   {
      printf("A pathname of a message queue is required\n");
      return -1;
   }
   mqd = mq_open(argv[1],O_RDONLY);
   if (-1 != mqd)
   {
      if ( ! mq_getattr(mqd,&obuf) )
      {
         printf("flags: 0x%x  maxmsg: %d  msgsize: %d  curmsgs: %d\n",
         obuf.mq_flags, obuf.mq_maxmsg, obuf.mq_msgsize, obuf.mq_curmsgs);
      }
      else
         perror("mq_getattr()");
   }
   else
      perror("mq_open()");
}


Example of mq_open()

The program mq_open in Example 6-2 allows you to create a message queue from the command line. The following command-line arguments are supported:

path

The file pathname of the queue must be given, following all options.

-p perms

Access permissions to set, for example, -p 0664.

-b bytes

The maximum message size this queue allows, for example, -b 256.

-m msgs

The maximum number of messages that can be pending on this queue, for example, -m 64.

-c

Use the O_CREAT flag to create the queue if it doesn't exist.

-x

Use the O_EXCL flag to require that the queue not exist.


Example 6-2. Program to Demonstrate mq_open()

/*
|| Program to test mq_open(3).
||    mq_open  [-p <perms>] [-b <bytes>] [-m <msgs>] [-c] [-x] <path>
||    -p <perms>  access mode to use when creating, default 0600
||    -b <bytes>  maximum message size to set, default MQ_DEF_MSGSIZE
||    -m <msgs>   maximum messages on the queue, default MQ_DEF_MAXMSG
||    -f <flags>  flags to use with mq_open, including:
||       c        use O_CREAT
||       x        use O_EXCL
||    <path>      the pathname of the queue, required
|| Numeric arguments can be given in any form supported by strtoul(3).
*/
#include <mqueue.h>     /* message queue stuff */
#define MQ_DEF_MSGSIZE 1024
#define MQ_DEF_MAXMSG 16
#include <unistd.h>     /* for getopt() */
#include <errno.h>      /* errno and perror */
#include <fcntl.h>      /* O_flags */
#include <stdio.h>
int main(int argc, char **argv)
{
   int perms = 0600;          /* permissions */
   int oflags = O_RDWR;       /* flags: O_CREAT + O_EXCL */
   int rd=0, wr=0;            /* -r and -w options */
   mqd_t mqd;                 /* returned msg queue descriptor */
   int c;
   char *path;                /* ->first non-option argument */
   struct mq_attr buf;        /* buffer for stat info */
   buf.mq_msgsize = MQ_DEF_MSGSIZE;
   buf.mq_maxmsg = MQ_DEF_MAXMSG;
   while ( -1 != (c = getopt(argc,argv,"p:b:m:cx")) )
   {
      switch (c)
      {
      case 'p': /* permissions */
         perms = (int) strtoul(optarg, NULL, 0);
         break;
      case 'b': /* message size */
         buf.mq_msgsize = (int) strtoul(optarg, NULL, 0);
         break;
      case 'm': /* max messages */
         buf.mq_maxmsg = (int) strtoul(optarg, NULL, 0);
         break;
      case 'c': /* use O_CREAT */
         oflags |= O_CREAT;
         break;
      case 'x': /* use O_EXCL */
         oflags |= O_EXCL;
         break;
      default: /* unknown or missing argument */
         return -1;      
      } /* switch */
   } /* while */
   if (optind < argc)
      path = argv[optind]; /* first non-option argument */
   else
      { printf("Queue pathname required\n"); return -1; }
   mqd = mq_open(path,oflags,perms,&buf);
   if (-1 != mqd)
   {
      if ( ! mq_getattr(mqd,&buf) )
      {
         printf("flags: 0x%x  maxmsg: %d  msgsize: %d  curmsgs: %d\n",
         buf.mq_flags, buf.mq_maxmsg, buf.mq_msgsize, buf.mq_curmsgs);
      }
      else
         perror("mq_getattr()");
   }
   else
      perror("mq_open()");
}


Example of mq_send()

The mq_send program in Example 6-3 allows you to send from 1 to 9999 messages to a queue from the command line. The following command line arguments are accepted:

path

The file pathname of the queue must be given following all options.

-b bytes

Size of each message, for example -b 0x200.

-c count

Number of messages to send. The default is 1.

-p priority

Numeric priority of message to send. Numbers from 0 to 32 are allowed by mq_send().

-n

Use the O_NONBLOCK flag with mq_open().

The count argument is limited to 99,999 so that the message text will not exceed 32 bytes, the (arbitrary) minimum message size the program defines.

Example 6-3. Program to Demonstrate mq_send()

/*
|| Program to test mq_send(3)
||    mq_send [-p <priority>] [-b <bytes>] [-c <count>] [-n] <path>
||       -p <priority>  priority code to use, default 0
||       -b <bytes>     size of the message, default 64, min 32
||       -c <count>     number of messages to send, default 1, max 9999
||       -n             use O_NONBLOCK flag in open
||       <path>         path to queue, required
|| The program sends <count> messages of <bytes> each at <priority>.
|| Each message is an ASCII string containing the time and date and
|| a serial number 1..<count>. The minimum message is 32 bytes.
*/
#include <mqueue.h>        /* message queue stuff */
#include <unistd.h>        /* for getopt() */
#include <errno.h>         /* errno and perror */
#include <time.h>          /* time(2) and ctime_r(3) */
#include <fcntl.h>         /* O_WRONLY */
#include <stdlib.h>        /* calloc(3) */
#include <stdio.h>
int main(int argc, char **argv)
{
   char *path;             /* -> first non-option argument */
   int oflags = O_WRONLY;  /* open flags, O_NONBLOCK may be added */
   mqd_t mqd;              /* queue descriptor from mq_open */
   unsigned int msg_prio = 0; /* message priority to use */
   size_t msglen = 64;     /* message size */
   int count = 1;          /* number of messages to send */
   char *msgptr;           /* -> allocated message space */
   int c;
   while ( -1 != (c = getopt(argc,argv,"p:b:c:n")) )
   {
      switch (c)
      {
      case 'p': /* priority */
         msg_prio = strtoul(optarg, NULL, 0);
         break;
      case 'b': /* bytes */
         msglen = strtoul(optarg, NULL, 0);
         if (msglen<32) msglen = 32;
         break;
      case 'c': /* count */
         count = strtoul(optarg, NULL, 0);
         if (count > 99999) count = 99999;
         break;
      case 'n': /* use nonblock */
         oflags |= O_NONBLOCK;
         break;
      default: /* unknown or missing argument */
         return -1;      
      }
   }
   if (optind < argc)
      path = argv[optind]; /* first non-option argument */
   else
      { printf("Queue pathname required\n"); return -1; }
   msgptr = calloc(1,msglen);
   mqd = mq_open(path,oflags);
   if (-1 != mqd)
   {
      char stime[26];
      const time_t tm = time(NULL); /* current time value */
      (void)ctime_r (&tm,stime);    /* formatted time string */
      stime[24] = '\0' ;            /* drop annoying \n */
      for( c=1; c<=count; ++c)
      {
         sprintf(msgptr,"%05d %s",c,stime);
         if ( mq_send(mqd,msgptr,msglen,msg_prio) )
         {
            perror("mq_send()");
            break;
         }
      }
   }
   else
      perror("mq_open(O_WRONLY)");
}


Example of mq_receive()

The mq_receive program in Example 6-4 allows you to receive and display messages from a queue. These command-line arguments are accepted:

path

The file pathname of the queue must be given following all options.

-c count

Number of messages to send. The default is 1.

-q

Tells program not to display a line for each message received.

-n

Use the O_NONBLOCK flag with mq_open().

You can use the -q option to keep the program from displaying messages. Do this when receiving a large number of messages, for example, to test performance.

Example 6-4. Program to Demonstrate mq_receive()

/*
|| Program to test mq_receive(3)
||    mq_receive [-c <count>] [-n] [-q] <path>
||       -c <count>     number of messages to request, default 1
||       -n             use O_NONBLOCK flag on open
||       -q             quiet, do not display messages
||       <path>         path to message queue, required
|| The program calls mq_receive <count> times or until an error occurs.
*/
#include <mqueue.h>        /* message queue stuff */
#include <unistd.h>        /* for getopt() */
#include <errno.h>         /* errno and perror */
#include <fcntl.h>         /* O_RDONLY */
#include <stdlib.h>        /* calloc(3) */
#include <stdio.h>
int main(int argc, char **argv)
{
   char *path;             /* -> first non-option argument */
   int oflags = O_RDONLY;  /* open flags, O_NONBLOCK may be added */
   int quiet = 0;          /* -q option */
   int count = 1;          /* number of messages to request */
   mqd_t mqd;              /* queue descriptor from mq_open */
   char *msgptr;           /* -> allocated message space */
   unsigned int msg_prio;  /* received message priority */
   int c, ret;
   struct mq_attr obuf;    /* output of mq_getattr(): mq_msgsize */
   while ( -1 != (c = getopt(argc,argv,"c:nq")) )
   {
      switch (c)
      {
      case 'c': /* count */
         count = strtoul(optarg, NULL, 0);
         break;
      case 'q': /* quiet */
         quiet = 1;
         break;
      case 'n': /* nonblock */
         oflags |= O_NONBLOCK;
         break;
      default: /* unknown or missing argument */
         return -1;      
      }
   }
   if (optind < argc)
      path = argv[optind]; /* first non-option argument */
   else
      { printf("Queue pathname required\n"); return -1; }
   mqd = mq_open(path,oflags);
   if (-1 != mqd)
   {
      if (! (mq_getattr(mqd,&obuf)) ) /* get max message size */
      {
         msgptr = calloc(1,obuf.mq_msgsize);
         for( c=1; c<=count; ++c)
         {
            ret = mq_receive(mqd,msgptr,obuf.mq_msgsize,&msg_prio);
            if (ret >= 0) /* got a message */
            {
               if (!quiet)
               {
                  if ( isascii(*msgptr) )
                     printf("%d: priority %ld  len %d text %-32.32s\n",
                              c, msg_prio,        ret,      msgptr);
                  else
                     printf("%d: priority %ld  len %d (nonascii)\n",
                              c,  msg_prio,        ret);
               }
            }
            else /* an error on receive, stop */
            {
               perror("mq_receive()");
               break;
            }
         } /* for c <= count */
      } /* if getattr */
      else
      {
         perror("mq_getattr()");
         return -1;
      }     
   } /* if open */
   else
      perror("mq_open(O_WRONLY)");
}  


System V Message Queues

IRIX contains an implementation of message queues compatible with UNIX System V Release 4 (SVR4). These message queue functions are demonstrated in example programs in this section.

Managing SVR4 Message Queues

The functions used to create and control SVR4 message queues are summarized in Table 6-4.

Table 6-4. SVR4 Functions for Managing Message Queues

Function Name

Purpose and Operation

msgget(2)

Create a message queue if it does not exist, and gain access to it.

msgctl(2)

Query the status of a queue, change its owner ID or access permissions, or remove it from the system.

Unlike a POSIX message queue, whose name is also a filename, the external name of an SVR4 message queue is an integer held in an IPC name table (see “SVR4 IPC Name Space”). You specify this key when creating the message queue, and again whenever you access it for use.

Creating a Message Queue

The msgget() function has two purposes. It is used to gain access to a queue that exists, and it can create a queue that does not exist. To create a new queue, call msgget() with the following arguments:

key

An integer key that is not defined at this time.

msgflag

A set of flags that includes IPC_CREAT and may include IPC_EXCL. This value also contains the access permission bits.

For example, a call to create a queue might be written as follows:

ret = msgget(PROJ_KEY,IPC_CREAT+IPC_EXCL+0660);

This example relies on a constant PROJ_KEY to supply the key. Another option is to use the ftok() library function (see the ftok(3C) reference page).

Accessing an Existing Queue

When the program expects the queue to exist, it calls msgget() passing the expected key value and omitting the IPC_CREAT flag. If the queue does not exist, or if the effective user and group ID of the process are not allowed access to the queue, an error is returned. The program receives read-only or read-write access depending on the access permissions of the queue, just as with a file.

Modifying a Message Queue

You can use msgctl() to modify four attributes of a queue after creating or accessing it:

  • the user ID and group ID that owns the queue

  • the access permissions

  • the limit on the total size of all queued messages

The size limit on a new queue is set to the system limit (32,768 bytes as of IRIX 6.2). This determines how many messages can be waiting, unreceived, on the queue. That in turn determines how far the message-sending process can get ahead of the message-reading process. You can lower the limit to limit the sending process or thread more closely to the speed of the receiving process or thread.

Removing a Message Queue

You can remove a message queue using the ipcrm command (see the ipcrm(1) reference page), or by calling msgctl() and passing the IPC_RMID command code. In many cases, a message queue is meant for use within the scope of one program only, and you do not want the queue to persist after the termination of that program. Call msgctl() to remove the queue as part of termination.

Using SVR4 Message Queues

The SVR4 functions for using message queues are summarized in Table 6-5.

Table 6-5. SVR4 Functions for Using Message Queues

Function Name

Purpose and Operation

msgsnd(2)

Send a message to a queue.

msgrcv(2)

Receive a message from a queue.


Sending a Message

To send a message to a queue, call msgsnd() and specify the queue, the address and length of the message data, and a flag number that can contain IPC_NOWAIT. The message buffer contains an integer specifying the “type” of the message. Messages on the queue are retained in arrival sequence within types.

The message is copied out of the caller's buffer, so the buffer can be reused immediately after a successful send. If the queue is full, the msgsnd() function blocks unless the IPC_NOWAIT flag is passed.

Receiving a Message

To receive a message, call msgrcv() and specify the queue, the address and size of a buffer, a number for the desired message type, and a flag value. If the queue is empty, the msgrcv() function blocks unless the IPC_NOWAIT flag is passed. If the message buffer is not as large as the message, an error is returned unless the IPC_NOERROR flag is passed. Then the message is simply truncated to fit the buffer.

The type value can be 0, to specify “any type,” or it can be a specific (positive) type number to select the first number of that type. Finally, it can be a negative value to specify “any type less than or equal.”

Example Programs

The following programs demonstrate the use of SVR4 message queues:

  • Example 6-5 demonstrates the use of msgget() to create or access a queue.

  • Example 6-6 demonstrates the use of msgctl() to query or modify a queue.

  • Example 6-7 demonstrates the use of msgsnd() to put messages onto a queue.

  • Example 6-8 demonstrates the use of msgrcv() to take messages from a queue.

The four example programs have a consistent design and use consistent command-line argument letters. Each accepts optional arguments that allow you to exercise all the features of one function, including most error return codes. The following is a simple example of use. First, ipcs is used to show no queues exist.

$ ipcs -q
IPC status from /dev/kmem as of Wed Jun 12 10:36:38 1996
T     ID     KEY        MODE       OWNER    GROUP
Message Queues:

Then a queue is created with key 9 and ipcs used to verify the operation.

$ msgget -k 9 -c
msqid = 0x0032. owner = 1110.20, perms = 100600, max bytes = 32768
0 msgs = 0 bytes on queue
$ ipcs -q
IPC status from /dev/kmem as of Thu Jun 20 09:32:25 1996
T     ID     KEY        MODE       OWNER    GROUP
Message Queues:
q     50 0x00000009 --rw-------  cortesi     user

The use of the IPC_EXCL flag is tested:

$ msgget -k 9 -c -x
msgget(): File exists

A message is sent to the queue, addressing the queue by its ID.

$ msgsnd -i 50 -t 17
$ msgctl -i 50
owner = 1110.20, perms = 100600, max bytes = 32768
1 msgs = 64 bytes on queue

The maximum queue size is changed, this time addressing the queue by its key.

$ msgctl -k 9 -b 1024
owner = 1110.20, perms = 100600, max bytes = 1024
1 msgs = 64 bytes on queue

A second message is sent:

$ msgsnd -i 50 -t 18
$ msgctl -i 50
owner = 1110.20, perms = 100600, max bytes = 1024
2 msgs = 128 bytes on queue

The first and second messages are received:

$ msgrcv -k 9
1: type 17  len 64 text 00001 Thu Jun 20 09:32:55 1996  
$ msgrcv -i 50
1: type 18  len 64 text 00001 Thu Jun 20 09:33:18 1996 

Another message receipt is attempted, first with IPC_NOWAIT:

$ msgrcv -i 50 -n
msgrcv(): No message of desired type

Another message is attempted without IPC_NOWAIT. While msgrcv is suspended, the message queue is removed.

$ msgrcv -k 9 &
12477
$ ipcrm -q 50
$ msgrcv(): Identifier removed

Example of msgget

The program msgget in Example 6-5 allows you to create a message queue from the command line. The following command-line arguments are supported:

-k key

Numeric identifier of a message queue, for example -k 99.

-p perms

Access permissions to set, for example -p 0664.

-x

Use the IPC_EXCL flag with msgget().

-c

Use the IPC_CREAT flag with msgget().

If the -k argument is omitted, the program uses a private key and thus creates a message queue that can be used from this program only. (This is not useful, since the program does nothing with the queue before it terminates.)

Example 6-5. Program to Demonstrate msgget()

/*
|| Program to test msgget(2).
||    msgget [-k <key>] [-p <perms>] [-x] [-c]
||       -k <key>    the key to use, default == 0 == IPC_PRIVATE
||       -p <perms>  permissions to use, default 600
||       -x          use IPC_EXCL
||       -c          use IPC_CREAT
*/
#include <sys/msg.h>    /* msg queue stuff, ipc.h, types.h */
#include <unistd.h>     /* for getopt() */
#include <errno.h>      /* errno and perror */
#include <stdio.h>
int main(int argc, char **argv)
{
   key_t key = IPC_PRIVATE;   /* key */
   int perms = 0600;          /* permissions */
   int msgflg = 0;            /* flags: CREAT + EXCL */
   int msqid;                 /* returned msg queue id */
   struct msqid_ds buf;       /* buffer for stat info */
   int c;
   while ( -1 != (c = getopt(argc,argv,"k:p:xc")) )
   {
      switch (c)
      {
      case 'k': /* key */
         key = (key_t) strtoul(optarg, NULL, 0);
         break;
      case 'p': /* permissions */
         perms = (int) strtoul(optarg, NULL, 0);
         break;
      case 'c':
         msgflg |= IPC_CREAT;
         break;
      case 'x':
         msgflg |= IPC_EXCL;
         break;
      default: /* unknown or missing argument */
         return -1;      
      }
   }
   msqid = msgget (key, msgflg|perms);
   if (-1 != msqid)
   {
      printf("msqid = 0x%04x. ",msqid);
      if (-1 != msgctl(msqid,IPC_STAT,&buf))
      {
         printf("owner = %d.%d, perms = %04o, max bytes = %d\n",
                  buf.msg_perm.uid,
                  buf.msg_perm.gid,
                  buf.msg_perm.mode,
                  buf.msg_qbytes);           
         printf("%d msgs = %d bytes on queue\n",
                  buf.msg_qnum, buf.msg_cbytes);
      }
      else
         perror("\nmsgctl()");
   }
   else
      perror("msgget()");
}


Example of msgctl

The program msgctl in Example 6-6 allows you to display the state of a queue, or to change the permissions, owner ID, group ID, or maximum size of a queue. The following command-line arguments are supported:

-k key

Numeric identifier of a message queue, for example, -k 99.

-i id

Message queue ID, alternative to specifying the key; for example, -i 80.

-p perms

Access permissions to set, for example, -p 0664.

-b bytes

Maximum size of the message queue, for example, -b 0x1000.

-u uid

Numeric user ID to set as owner.

-g gid

Numeric group ID to set as owner.


Example 6-6. Program to Demonstrate msgctl()

/*
|| Program to test msgctl(2).
||    msgctl {-k <key> -i <id>} [-b <bytes>] [-p <perms>] [-u <uid>] [-g <gid>]
||       -k <key>    the key to use, or..
||       -i <id>     ..the mq id
||       -b <bytes>  new max number of bytes to set in msg_qbytes
||       -p <perms>  new permissions to assign in msg_perm.mode
||       -u <uid>    new user id (numeric) for msg_perm.uid
||       -g <gid>    new group id (numeric) for msg_perm.gid
*/
#include <sys/msg.h>    /* msg queue stuff, ipc.h, types.h */
#include <unistd.h>     /* for getopt() */
#include <errno.h>      /* errno and perror */
#include <stdio.h>
int main(int argc, char **argv)
{
   key_t key;           /* key for msgget.. */
   int msqid = -1;      /* ..specified or received msg queue id */
   long perms = -1L;    /* -1L is not valid for any of these */
   long bytes = -1L;
   long uid = -1L;
   long gid = -1L;
   struct msqid_ds buf;
   int c;
   while ( -1 != (c = getopt(argc,argv,"k:i:b:p:u:g:")) )
   {
      switch (c)
      {
      case 'k': /* key */
         key = (key_t) strtoul(optarg, NULL, 0);
         break;
      case 'i': /* id */
         msqid = (int) strtoul(optarg, NULL, 0);
         break;
      case 'p': /* permissions */
         perms = strtoul(optarg, NULL, 0);
         break;
      case 'b': /* bytes */
         bytes = strtoul(optarg, NULL, 0);
         break;
      case 'u': /* uid */
         uid = strtoul(optarg, NULL, 0);
         break;
      case 'g': /* gid */
         gid = strtoul(optarg, NULL, 0);
         break;
      default: /* unknown or missing argument */
         return -1;      
      }
   }
   if (-1 == msqid) /* no id given, try key */
      msqid = msgget (key, 0);
   if (-1 != msqid)
   {
      if (-1 != msgctl(msqid,IPC_STAT,&buf))
      {
         if ((perms!=-1L)||(bytes!=-1L)||(uid!=-1L)||(gid!=-1L))
         {
            /* put new values in buf fields as requested */
            if (perms != -1L) buf.msg_perm.mode = (mode_t)perms;
            if (uid   != -1L) buf.msg_perm.uid  = (uid_t)uid;
            if (gid   != -1L) buf.msg_perm.gid  = (gid_t)gid;
            if (bytes != -1L) buf.msg_qbytes = (ulong_t)bytes;
            if (-1 == msgctl(msqid,IPC_SET,&buf))
               perror("\nmsgctl(IPC_SET)");
         }
         printf("owner = %d.%d, perms = %04o, max bytes = %d\n",
               buf.msg_perm.uid,
               buf.msg_perm.gid,
               buf.msg_perm.mode,
               buf.msg_qbytes);           
         printf("%d msgs = %d bytes on queue\n",
               buf.msg_qnum, buf.msg_cbytes);
      }
      else
         perror("\nmsgctl(IPC_STAT)");
   }
   else
      perror("msgget()");
}


Example of msgsnd

The msgsnd program in Example 6-7 allows you to send one or more messages of specified length and type to a message queue. The following command-line arguments are supported:

-k key

Numeric identifier of a message queue, for example, -k 99.

-i id

Message queue ID, alternative to specifying the key; for example, -i 80.

-c count

Number of messages to send. The default is 1.

-t type

Numeric type of message to send. Types less than 1 are rejected by msgsnd().

-b bytes

Size of each message, for example, -b 0x200.

-n

Use the IPC_NOWAIT flag with msgsnd().

The program sends as many messages as you specify, each with the specified type and size. The first 32 bytes of each message is a printable string containing a sequence number and the date and time. The message is padded out to the specified size with binary 0.

Example 6-7. Program to Demonstrate msgsnd()

/*
|| Program to test msgsnd(2)
||    msgsnd {-k <key> -i <id>} [-t <type>] [-b <bytes>] [-c <count>] [-n]
||       -k <key>    the key to use, or..
||       -i <id>     ..the mq id
||       -t <type>   the type of each message, default = 1
||       -b <bytes>  the size of each message, default = 64, min 32
||       -c <count>  the number of messages to send, default = 1, max 99999
||       -n          use IPC_NOWAIT flag
|| The program sends <count> messages of <type>, <bytes> each on the queue.
|| Each message is an ASCII string containing the time and date, and
|| a serial number 1..<count>, minimum message is 32 bytes.
*/
#include <sys/msg.h>    /* msg queue stuff, ipc.h, types.h */
#include <unistd.h>     /* for getopt() */
#include <errno.h>      /* errno and perror */
#include <time.h>       /* time(2) and ctime_r(3) */
#include <stdio.h>
int main(int argc, char **argv)
{
   key_t key;           /* key for msgget.. */
   int msqid = -1;      /* ..specified or received msg queue id */
   int msgflg = 0;      /* flag, 0 or IPC_NOWAIT */
   long type = 1;       /* message type -- 0 is not valid to msgsnd() */
   size_t bytes = 64;   /* message text size */
   int count = 1;       /* number to send */
   int c;
   struct msgspace { long type; char text[32]; } *msg;
   while ( -1 != (c = getopt(argc,argv,"k:i:t:b:c:n")) )
   {
      switch (c)
      {
      case 'k': /* key */
         key = (key_t) strtoul(optarg, NULL, 0);
         break;
      case 'i': /* id */
         msqid = (int) strtoul(optarg, NULL, 0);
         break;
      case 't': /* type */
         type = strtoul(optarg, NULL, 0);
         break;
      case 'b': /* bytes */
         bytes = strtoul(optarg, NULL, 0);
         if (bytes<32) bytes = 32;
         break;
      case 'c': /* count */
         count = strtoul(optarg, NULL, 0);
         if (count > 99999) count = 99999;
         break;
      case 'n': /* nowait */
         msgflg |= IPC_NOWAIT;
         break;
      default: /* unknown or missing argument */
         return -1;      
      }
   }
   msg = (struct msgspace *)calloc(1,sizeof(long)+bytes);    
   if (-1 == msqid) /* no id given, try key */
      msqid = msgget (key, 0);
   if (-1 != msqid)
   {
      const time_t tm = time(NULL);
      char stime[26];
      (void)ctime_r (&tm,stime); /* format timestamp for msg */
      stime[24] = '\0';          /* drop annoying \n */
      for( c=1; c<=count; ++c)
      {
         msg->type = type;
         sprintf(msg->text,"%05d %s",c,stime);
         if (-1 == msgsnd(msqid,msg,bytes,msgflg))
         {
            perror("msgsnd()");
            break;
         }
      }
   }
   else
      perror("msgget()");
}


Example of msgrcv

The program msgrcv in Example 6-8 allows you to receive messages from a specified queue. The following arguments are used in more than one program:

-k key

Numeric identifier of a message queue, for example -k 99.

-i id

Message queue ID, alternative to specifying the key; for example, -i 80.

-c count

Number of messages to attempt to receive.

-b bytes

Maximum size of a message, for example, -b 0x200.

-n

Use the IPC_NOWAIT flag with msgrcv().

-e

Use the MSG_NOERROR flag with msgrcv(), to truncate messages longer than bytes.

-q

Be quiet, do not display the received message. Use for performance testing.

As each message is received, it is displayed. A sequence number and the message type are always displayed; the first 32 bytes of the text are displayed if it begins with ASCII.

Example 6-8. Program to Demonstrate msgrcv()

/*
|| Program to test msgrcv(2)
||    msgrcv {-k <key> -i <id>} [-t <type>] [-b <bytes>] [-c <count>]
||                                           [-n] [-e] [-q]
||       -k <key>    the key to use, or..
||       -i <id>     ..the mq id
||       -t <type>   the type of message, default = 0 (any msg)
||       -b <bytes>  the max size to receive, default = 64
||       -c <count>  the number of messages to receive, default = 1
||       -n          use IPC_NOWAIT flag
||       -e          use MSG_NOERROR flag (truncate long msg)
||       -q          quiet, do not display received message
|| The program calls msgrcv <count> times or until an error occurs,
|| each time requesting a message of type <type> and max size <bytes>.
*/
#include <sys/msg.h>    /* msg queue stuff, ipc.h, types.h */
#include <unistd.h>     /* for getopt() */
#include <errno.h>      /* errno and perror */
#include <ctype.h>      /* isascii() */
#include <stdio.h>
int main(int argc, char **argv)
{
   key_t key;           /* key for msgget.. */
   int msqid = -1;      /* ..specified or received msg queue id */
   int msgflg = 0;      /* flag, 0, IPC_NOWAIT, MSG_NOERROR */
   long type = 0;       /* message type */
   size_t bytes = 64;   /* message size limit */
   int count = 1;       /* number to receive */
   int quiet = 0;       /* quiet flag */
   int c;
   struct msgspace { long type; char text[32]; } *msg;
   while ( -1 != (c = getopt(argc,argv,"k:i:t:b:c:enq")) )
   {
      switch (c)
      {
      case 'k': /* key */
         key = (key_t) strtoul(optarg, NULL, 0);
         break;
      case 'i': /* id */
         msqid = (int) strtoul(optarg, NULL, 0);
         break;
      case 't': /* type -- can be negative */
         type = strtol(optarg, NULL, 0);
         break;
      case 'b': /* bytes -- no minimum */
         bytes = strtoul(optarg, NULL, 0);
         break;
      case 'c': /* count - no maximum */
         count = strtoul(optarg, NULL, 0);
         break;
      case 'n': /* nowait */
         msgflg |= IPC_NOWAIT;
         break;
      case 'e': /* noerror -- allow truncation of msgs */
         msgflg |= MSG_NOERROR;
         break;
      case 'q': /* quiet */
         quiet = 1;
         break;
      default: /* unknown or missing argument */
         return -1;      
      }
   }
   if (-1 == msqid) /* no id given, try key */
      msqid = msgget (key, 0);
   msg = (struct msgspace *)calloc(1,sizeof(long)+bytes);    
   if (-1 != msqid)
   {
      for( c=1; c<=count; ++c)
      {
         int ret = msgrcv(msqid,msg,bytes,type,msgflg);
         if (ret >= 0) /* got a message */
         {
            if (!quiet)
            {
               if (isascii(msg->text[0]))
                  printf("%d: type %ld  len %d text %-32.32s\n",
                           c,   msg->type,  ret,      msg->text);
               else
                  printf("%d: type %ld len %d (nonascii)\n",
                           c,   msg->type,  ret);
            }
         }
         else /* an error, end loop */
         {
            perror("msgrcv()");
            break;
         }
      } /* for c<=count */
   } /* good msgget */
   else
      perror("msgget()");
}