Chapter 8. Using Asynchronous I/O

When you use asynchronous I/O, the work of buffering data and reading or writing a device is carried out in a parallel process or thread, while the process or thread that requested the I/O can continue doing other work. In a multiprocessor system, I/O can be fully overlapped with processing.

About Synchronous and Asynchronous I/O

Conventional I/O in UNIX is synchronous; that is, the process or thread that requests the I/O is blocked until the I/O has completed. The effects are different for input and for output.

About Synchronous Input

The normal sequence of operations for input is as follows:

  1. A process invokes the system function read(), either directly or indirectly—for example, by accessing a new page of a memory-mapped file, or by calling a library function that calls read().

  2. The kernel, operating under the identity of the calling process, enters the read entry point of a device driver.

  3. The device driver initiates an input operation and blocks the calling process, for example by waiting on a semaphore in the kernel address space.

  4. The kernel schedules another process to use the CPU.

  5. Later, the device completes the input operation and causes a hardware interrupt.

  6. The kernel interrupt handler enters the device driver interrupt entry point.

  7. The device driver, finding that the data has been received, unblocks the sleeping process, for example by posting a semaphore.

  8. The kernel notes that the blocked process can now run.

  9. Then or perhaps later, depending on scheduling priorities, the kernel schedules the process to run on some CPU.

  10. The unblocked process exits the read() system call and returns to user code, the read being complete.

During steps 4-8, the process that requested input is blocked. The duration of the delay is unpredictable. For example, the delay can be negligible if the data is already in a buffer in memory. It can be as long as one rotation time of a disk, if the disk is positioned on the correct cylinder. It can be longer still, if the disk has to seek, or if the disk controller or bus adapter is busy with other transfers.

About Synchronous Output

For disk files, a process that calls write() is normally delayed only as long as it takes to copy the output data to a buffer in the kernel address space. The kernel asks the device driver to schedule the device write. The actual disk output is asynchronous. As a result, a process that requests output is usually blocked for only a short time. However, a number of disk write requests could be pending, so the true state of a file on disk is unknown until the file is closed.

In order to make sure that all data has been written to disk successfully, a program calls fsync() for a conventional file or msync() for a memory-mapped file (see the fsync(2) and msync(2) reference pages). The process that calls these functions is blocked until all buffered data has been written. (An alternative for disk output is to use direct output, discussed under “Using Direct I/O”.)

Devices other than disks may block the calling process until the output is complete. It is the device driver logic that determines whether a call to write() blocks the caller, and for how long.

About Asynchronous I/O

Some processes should never be blocked for the unpredictable times that I/O can require. One obvious solution can be summarized as “call read() or write() from a different process, and run that process in a different CPU.” This is the essence of asynchronous I/O. You could implement an asynchronous I/O scheme of your own design, and you may wish to do so in order to integrate the I/O closely with your own design of processes and data structures. However, a standard solution is available.

IRIX supports asynchronous I/O library calls conforming to POSIX document 1003.1b-1993. You use relatively simple calls to initiate input or output. The library package handles the details of

  • Creating asynchronous processes or threads to perform the I/O.

  • Allocating a shared memory arena and the locks, semaphores, and other structures used to coordinate the I/O processes or threads.

  • Queueing multiple input or output requests to each of multiple file descriptors.

  • Reporting results back to your program, either on request, through signals, or through callback functions.

Asynchronous I/O Functions

Once you have opened the files and initialized asynchronous I/O, you perform asynchronous I/O by calling some of these functions:

aio_read(3)

Initiates asynchronous input from a file or device.

aio_write(3)

Initiates asynchronous output to a file or device.

lio_listio(3)

Initiates a list of operations to one or more files or devices.

aio_error(3)

Returns the status of an asynchronous operation.

aio_fsync(3)

Waits for all scheduled output for a file to complete.

aio_cancel(3)

Cancels pending, scheduled operations.

Each of these functions is described in detail in a reference page.

Asynchronous I/O Control Block

Each asynchronous I/O request is represented by an instance of struct aiocb, a data structure that your program must allocate. The important fields are as follows.

  • The file descriptor that is the target of the operation.

    File descriptors are returned by open() (see the open(2) reference page). A file descriptor used for asynchronous I/O can represent any file or device—not only a disk file.

  • The address and size of a buffer to supply or receive the data.

  • The file position for the operation as it would be passed to lseek() (see the lseek(2) reference page)

    The use of this value is discussed under “Multiple Operations to One File”.

  • A sigevent structure, whose contents indicate what, if anything, should be done to notify your program of the completion of the I/O.

    The use of the sigevent is discussed under “Checking for Completion”.


Note: The IRIX 5.2 implementation also accepted a request priority value. Request priorities are no longer supported. The request-priority field of aiocb exists for compatibility and for possible future use, but must currently contain zero.


Initializing Asynchronous I/O

You can initialize asynchronous I/O in either of two ways. One way is simple; the other gives you control over the initialization.

Implicit Initialization

You can initialize asynchronous I/O simply by starting an operation with aio_read(), lio_listio(), or aio_write(). The first such call causes default initialization. This is the only form of initialization described by the POSIX standard. However, you may need to control at least the timing of initialization.

Initializing with aio_sgi_init()

You can control initialization of asynchronous I/O by calling aio_sgi_init() (refer to the aio_sgi_init(3) reference page and to the declarations in /usr/include/aio.h). The argument to this call can be a null pointer, indicating you want default values, or you can pass an aioinit_t structure. The principal fields of this structure specify

  • the number of asynchronous processes or threads to execute I/O (aio_threads)

    The asynchronous I/O library creates asynchronous processes or threads to perform the I/O. It uses sproc() in normal programs, or pthread_create() in a pthread program.

    In either case, the default of asynchronous threads is 5 and the minimum is 2. Specify 1 more than the number of I/O operations that could reasonably be executed in parallel on the available hardware. For example if you will be doing asynchronous I/O to one disk file and one tape drive, there could be at most two concurrent I/O operations, so there is no need to have more than 3 (1 more than 2) asynchronous processes.

  • the number of locks that the asynchronous I/O processes should preallocate (aio_locks)

    The default used by aio_init() is 3 locks; the minimum is 1. Specify the maximum number of simultaneous lio_listio(LIO_NOWAIT), aio_fsync(), and aio_suspend() calls that your program could execute concurrently. If in doubt, specify the number of subprocesses your program contains.

  • the number of processes or threads that will be sharing the use of asynchronous I/O (aio_numusers)

    The default is 5; the minimum is 2. Specify 1 more than the number of different processes or pthreads that will be requesting asynchronous I/O.

Other fields of the aioinit_t structure such as aio_num and aio_usedba are not used at this time and must be zero. Zero-valued fields are taken as a request for the default for that field. Example 8-1 shows a subroutine to initialize asynchronous I/O, given counts of devices and calling processes.

Example 8-1. Initializing Asynchronous I/O

int initAIO(int numDevs, int numSprocs, int maxOps)
{
aioinit_t A = {0}; /* ensure zero'd fields */
if (numDevs) /* we do know how many devices */
A.aio_threads = 1+numDevs;
if (numSprocs) /* we do know how many sprocs */
A.aio_locks = A.aio_numusers = 1+numSprocs;
if (maxOps) /* we do know max aiocbs at 1 time */
A.aio_num = maxOps;
return aioinit(&A);
}


When to Initialize

The time at which initialization occurs is important. If you initialize in a process that has been assigned to run on an isolated CPU, the asynchronous I/O processes will also run on that CPU. You probably want the I/O processes to run under normal dispatching on unrestricted CPUs. In that case, the proper sequence of initialization is:

  • Open all file descriptors and verify that files and devices are ready.

  • Initialize asynchronous I/O. The lightweight processes created by aioinit() inherit the attributes of the calling process, including its current priority and access to open file descriptors.

  • Isolate any CPUs that are to be dedicated.

  • Create child processes and assign them to their CPUs.

The asynchronous I/O processes created by aioinit() continue to be scheduled according to their priority in whatever CPUs remain available.

Scheduling Asynchronous I/O

You schedule an input or output operation by calling aio_read() or aio_write(), passing an aiocb structure to describe the operation (see the aio_read(3) and aio_write(3) reference pages). The operation is queued to that file descriptor. It will be executed when one of the asynchronous I/O processes or threads is available. The return code from the library call says nothing about the I/O operation itself; it merely indicates whether or not the aiocb could be queued.


Note: It is important to use a given aiocb for only one operation at a time, and to not modify an aiocb until its operation is complete.

You can schedule a list of operations using lio_listio() (see the lio_listio(3) reference page). The advantage of this function is that you can request a single notification (either a signal or a callback) when all of the operations in the list are complete. Alternatively, you can be notified of the completion of each one as it happens.

When an asynchronous I/O thread is free, it takes a queued aiocb and performs the equivalent function to lseek() (if a file position is specified), then the equivalent of read() or write(). The asynchronous process may be blocked for some time. That depends on the file or device and on the options that were specified when it was opened. When the operation is complete, the asynchronous process notifies the initiating process using the method requested in the aiocb.

You can cancel a started operation, or all pending operations for a given file descriptor, using aio_cancel() (see the aio_cancel(3) reference page).

Assuring Data Integrity

With sequential output, you call fsync() to ensure that all buffered data has been written. However, you cannot use fsync() with asynchronous I/O, since you are not sure when the write() calls will execute.

The aio_fsync() function queues the equivalent of an fsync() call for asynchronous execution (see the aio_fsync(3) reference page). This function takes an aiocb. The file descriptor in it specifies which file is to be synchronized. The fsync() operation is done following all other asynchronous operations that are pending when aio_fsync() is called. The synchronize operation can take considerable time, depending on how much output data has been buffered. Its completion is reported in the same ways as completion of a read or write (see the next topic).

Checking the Progress of Asynchronous Requests

You can test the progress and completion of an asynchronous operation by polling; or your program can be informed of the completion of an operation in a variety of ways. In the aiocb, the program can specify one of three things to be done when the operation is complete:

  • Nothing; take no action.

  • Send a signal of a specified number.

  • Invoke a callback function directly from the asynchronous process.

In addition, the aio_suspend() function blocks its caller until one of a list of pending operations is complete.

Polling for Status

You can check the progress of any asynchronous operation (including aio_fsync()) by calling aio_error(), passing the aiocb for that operation.

While the operation is incomplete, aio_error() returns EIINPROGRESS. When the operation is complete, you can check the final return code from read(), write(), or fsync() using aio_return() (see the aio_error(3) and aio_return(3) reference pages).

To see in an example of polling for status, see function inWait0() under “Asynchronous I/O Example”. This function is used when the aiocb is initialized with SIGEV_NONE, meaning that no notification is to be returned at the completion of the operation. The function waits for an asynchronous operation to complete using a loop in the general form shown in Example 8-2.

Example 8-2. Polling for Asynchronous Completion

int waitForEndOfAsyncOp(aiocb *pab)
{
    while (EINPROGRESS == (ret = aio_error(pab)))
        sginap(0);
    return ret;
}

The function result is the final return code from the read, write, or sync operation that was started.

Checking for Completion

You have a wide variety of design options other than polling. Your program can:

  • Use aio_suspend() to wait until one of a list of operations completes.

  • Set up an empty signal handler function and use sigsuspend() or sigwait() to wait until a signal arrives (see the sigsuspend(2) and sigwait(3) reference pages).

  • Use either a signal handler function or a callback function to report completion—for example, the function can post a semaphore.

Most of these methods are demonstrated in the example program under “Asynchronous I/O Example”.

Establishing a Completion Signal

You request a signal from an asynchronous operation by setting these values in the aiocb (refer to /usr/include/aio.h and /usr/include/sys/signal.h):

aio_sigevent.sigev_notify

Set to SIGEV_SIGNAL.

 

aio_sigevent.sigev_signo

The number of the signal. This should be one of the POSIX real-time signal numbers (see “Signal Numbers”).

 

aio_sigevent.sigev_value

A value to be passed to the signal handler. This can be used to inform the signal handler of which I/O operation has completed; for example, it could be the address of the aiocb.

When you set up a signal handler for asynchronous completion, do so using sigaction() and specify the SA_SIGINFO flag (see the sigaction(2) reference page). This has two benefits: any new completion signal that arrives while the first is being handled is queued; and the aio_sigev.sigev_value word is passed to the handler in a siginfo structure.

Establishing a Callback Function

You request a callback at the end of an asynchronous operation by setting the following values in the aiocb:

aio_sigevent.sigev_notify

Set to SIGEV_CALLBACK.

 

aio_sigevent.sigev_func

The address of the callback function. Its prototype must be

void

functionName(union sigval);

aio_sigevent.sigev_value

A word to be passed to the callback function. This can be used to inform the function of which I/O operation has completed; for example, it could be the address of the aiocb.

The callback function is invoked from the asynchronous I/O thread when the read(), write() or fsync() operation finishes. This notification method has the lowest overhead and shortest latency, but it requires careful design to avoid race conditions in the use of shared variables.

The asynchronous I/O threads share the address space of the processes or threads that initialize asynchronous I/O. They may execute in a different CPU. Since the callback function could be entered at any time, it must coordinate its use of shared data structures. This is a good place to use a lock (see “Locks”). Locks have very low overhead in cases such as this, where there is likely to be little contention for the use of the lock.


Tip: You can call aio_read() or aio_write() from within a callback function or within a signal handler. This lets you start another operation with the least delay.

The code in Example 8-3 demonstrates a hypothetical set of subroutines to schedule asynchronous reads and writes using a single aiocb. The principle functions and global variables it uses are:

pendingIO

An array of records, each holding one request for an I/O operation.

 

dontTouchThatStuff

A lock used to gain exclusive use of pendingIO.

scheduleRead()

A function that accepts a request to read some amount of data, from a specified file descriptor, at a specified file offset. It places the request in pendingIO and then, if no asynchronous operation is under way, initiates it.

yeahWeFinishedOne()

The callback function that is entered when an asynchronous operation completes. If any more operations are pending, it initiates one.

initiatePending()

A function that initiates one selected pending operation. It prepares the aiocb structure, including the specification of yeahWeFinishedOne() as the callback function. The lock dontTouchThatStuff must be held before this function is called.



Note: The code in Example 8-3 is not intended to be realistic and is not recommended as a model. In order to demonstrate the use of callback functions and the aiocb, it essentially duplicates work that could be done by the lio_listio() feature of asynchronous I/O.


Example 8-3. Set of Functions to Schedule Asynchronous I/O

#define _ABI_SOURCE
#include <signal.h>
#include <aio.h>
#include <ulocks.h>
#define MAX_PENDING 10
#define STATUS_EMPTY 0
#define STATUS_ACTIVE 1
#define STATUS_PENDING 2
static struct onePendingIO {
    int status;
    int theFile;
    void *theData;
    off_t theSize;
    off_t theSeek;
    int readNotWrite;
    } pendingIO[MAX_PENDING];
static unsigned numPending;
static struct aiocb theAiocb;
static ulock_t dontTouchThatStuff;
static unsigned scanner;
static void initiatePending(int P);
static void
yeahWeFinishedOne(union sigval S)
{
    ussetlock(dontTouchThatStuff);
    pendingIO[S.sival_int].status = STATUS_EMPTY;
    if (numPending)
    {
        while (pendingIO[scanner].status != STATUS_PENDING)
        {
            if (++scanner >= MAX_PENDING)
                scanner = 0;
        }
        initiatePending(scanner);
    }
    usunsetlock(dontTouchThatStuff);
}
static void
initiatePending(int P) /* lock must be held on entry */
{
    theAiocb.aio_fildes = pendingIO[P].theFile;
    theAiocb.aio_buf = pendingIO[P].theData;
    theAiocb.aio_nbytes = pendingIO[P].theSize;
    theAiocb.aio_offset = pendingIO[P].theSeek;
    theAiocb.aio_sigevent.sigev_notify = SIGEV_CALLBACK;
    theAiocb.aio_sigevent.sigev_func = yeahWeFinishedOne;
    theAiocb.aio_sigevent.sigev_value.sival_int = P;
    if (pendingIO[P].readNotWrite)
        aio_read(&theAiocb);
    else
        aio_write(&theAiocb);
    pendingIO[P].status = STATUS_ACTIVE;
    --numPending;
}
/*public*/ int 
scheduleRead( int FD, void *pdata, off_t len, off_t pos )
{
    int j;
    if (numPending >= MAX_PENDING)
        likeTotallyFreakOut();
    ussetlock(dontTouchThatStuff);
    for(j=0; pendingIO[j].status != STATUS_EMPTY; ++j)
        ;
    pendingIO[j].theFile = FD;
    pendingIO[j].theData = pdata;
    pendingIO[j].theSize = len;
    pendingIO[j].theSeek = pos;
    pendingIO[j].readNotWrite = 1;
    pendingIO[j].status = STATUS_PENDING;
    if (1 == ++numPending)
        initiatePending(j);
    usunsetlock(dontTouchThatStuff);
}


Holding Callbacks Temporarily

You can temporarily prevent callback functions from being entered using the aio_hold() function. This function is not defined in the POSIX standard; it is added by the MIPS ABI standard. Use it as follows:

  • Call aio_hold(AIO_HOLD_CALLBACK) to prevent any callback function from being invoked.

  • Call aio_hold(AIO_RELEASE_CALLBACK) to allow callback functions to be invoked. Any that were held are now called.

  • Call aio_hold(AIO_ISHELD_CALLBACK) returns 1 if callbacks are currently being held; otherwise it returns 0.

Multiple Operations to One File

When you queue multiple operations to a single file descriptor, the asynchronous I/O package does not always guarantee the order of their execution. There are three ways you can ensure the sequence of operations.

You can open any output file descriptor passing the flag O_APPEND (see the open(1) reference page). Asynchronous write requests to a file opened with O_APPEND are executed in the sequence of the calls to aio_write() or the sequence they are listed for lio_listio(). You can use this feature to ensure that a sequence of records is appended to a file in sequence.

For files that support lseek(), you can specify any order of operations by specifying the file offset in the aiocb. The asynchronous process executes an absolute seek to that offset as part of the operation. Even if the operations are not performed in the sequence they were requested, the data is transferred in sequence. You can use this feature to ensure that multiple requests for sequential disk input are stored in sequential locations.

For non-disk input operations, the only way you can be certain that operations are done in sequence is to schedule them one at a time, waiting for each one to complete.

Asynchronous I/O Example

The following source displays a highly artificial program whose purpose is to exercise most options of asynchronous I/O. The program syntax is:

aiocat [ -o outfile ] [-a {0|1|2|3} ] infilename... 

The actual output of the program is the concatenation of all the one or more files infilename..., written to the file outfile. The default outfile is $TEMPDIR/aiocat.out. In effect, the program is an overcomplicated version of the standard cat command.

When you compile it with the variable DO_SPROCS defined as 1, the program creates one process for each infilename. Each of these processes uses asynchronous I/O requests to read its corresponding input file, and to write that data to the correct offset in outfile.

After all the files have been read and written, the program reports the CPU time charged for each file, and the effective data transfer rate in bytes per microsecond.

The -a parameter specifies which of four methods is used to wait for I/O completion:

-a 0

Poll for completion with aio_error().

-a 1

Wait for completion with aio_suspend().

-a 2

Wait on a semaphore posted from a signal handler.

-a 3

Wait on a semaphore posted from a callback routine.

Execution of aiocat can resemble the following (from an Origin2000 with 8 CPUs):

> ls -l incat?
-rwxr-xr-x    1 cortesi  nuucp     234964 Jun  4 10:17 incat1
-rwxr-xr-x    1 cortesi  nuucp     234964 Jun  4 10:17 incat2
-rwxr-xr-x    1 cortesi  nuucp     234964 Jun  4 10:18 incat3
-rwxr-xr-x    1 cortesi  nuucp     234964 Jun  4 10:19 incat4
> aiocat -o outcat -a 0 incat?
    procid   time     fsize     filename
 0: 920      440000   234964    incat1
 1: 939      480000   234964    incat2
 2: 940      510000   234964    incat3
 3: 936      530000   234964    incat4
total time 1960000 usec, total bytes 939856, 0.479518 bytes/usec
> aiocat -o outcat -a 1 incat?
    procid   time     fsize     filename
 0: 942      350000   234964    incat1
 1: 944      370000   234964    incat2
 2: 949      370000   234964    incat3
 3: 946      370000   234964    incat4
total time 1460000 usec, total bytes 939856, 0.643737 bytes/usec
> aiocat -o outcat -a 2 incat?
    procid   time     fsize     filename
 0: 962      90000    234964    incat1
 1: 955      80000    234964    incat2
 2: 967      90000    234964    incat3
 3: 960      90000    234964    incat4
total time 350000 usec, total bytes 939856, 2.6853 bytes/usec
> aiocat -o outcat -a 3 incat?
    procid   time     fsize     filename
 0: 909      50000    234964    incat1
 1: 969      50000    234964    incat2
 2: 966      60000    234964    incat3
 3: 972      60000    234964    incat4
total time 220000 usec, total bytes 939856, 4.27207 bytes/usec

 

Example 8-4. Source Code of aiocat

/* ============================================================================
||  aiocat.c : This highly artificial example demonstrates asynchronous I/O. 
||
|| The command syntax is:
||  aiocat [ -o outfile ] [-a {0|1|2|3} ] infilename...
||
|| The output file is given by -o, with $TMPDIR/aiocat.out by default.
|| The aio method of waiting for completion is given by -a as follows:
||  -a 0 poll for completion with aio_error() (default)
||  -a 1 wait for completion with aio_suspend()
||  -a 2 wait on a semaphore posted from a signal handler
||  -a 3 wait on a semaphore posted from a callback routine
||
|| Up to MAX_INFILES input files may be specified. Each input file is
|| read in BLOCKSIZE units. The output file contains the data from
|| the input files in the order they were specified. Thus the
|| output should be the same as "cat infilename... >outfile".
||
|| When DO_SPROCS is compiled true, all I/O is done asynchronously
|| and concurrently using one sproc'd process per file.  Thus in a
|| multiprocessor concurrent input can be done.
============================================================================ */
#define _SGI_MP_SOURCE  /* see the "Caveats" section of sproc(2) */
#include <sys/time.h>   /* for clock() */
#include <errno.h>      /* for perror() */
#include <stdio.h>      /* for printf() */
#include <stdlib.h>     /* for getenv(), malloc(3c) */
#include <ulocks.h>     /* usinit() & friends */
#include <bstring.h>    /* for bzero() */
#include <sys/resource.h> /* for prctl, get/setrlimit() */
#include <sys/prctl.h>  /* for prctl() */
#include <sys/types.h>  /* required by lseek(), prctl */
#include <unistd.h>     /* ditto */
#include <sys/types.h>  /* wanted by sproc() */
#include <sys/prctl.h>  /* ditto */
#include <signal.h>     /* for signals - gets sys/signal and sys/siginfo */
#include <aio.h>        /* async I/O */
#define BLOCKSIZE 2048  /* input units -- play with this number */
#define MAX_INFILES 10  /* max sprocs: anything from 4 to 20 or so */
#define DO_SPROCS 1     /* set 0 to do all I/O in a single process */
#define QUITIFNULL(PTR,MSG) if (NULL==PTR) {perror(MSG);return(errno);}
#define QUITIFMONE(INT,MSG) if (-1==INT) {perror(MSG);return(errno);}
/*****************************************************************************
|| The following structure contains the info needed by one child proc.
|| The main program builds an array of MAX_INFILES of these.
|| The reason for storing the actual filename here (not a pointer) is
|| to force the struct to >128 bytes.  Then, when the procs run in 
|| different CPUs on a CHALLENGE, the info structs will be in different
|| cache lines, and a store by one proc will not invalidate a cache line
|| for its neighbor proc.
*/
typedef struct child
{
        /* read-only to child */
    char fname[100];        /* input filename from argv[n] */
    int         fd;         /* FD for this file */
    void*       buffer;     /* buffer for this file */
    int         procid;     /* process ID of child process */
    off_t       fsize;      /* size of this input file */
        /* read-write to child */
    usema_t*    sema;       /* semaphore used by methods 2 & 3 */
    off_t       outbase;    /* starting offset in output file */
    off_t       inbase;     /* current offset in input file */
    clock_t     etime;      /* sum of utime/stime to read file */
    aiocb_t     acb;        /* aiocb used for reading and writing */
} child_t;
/******************************************************************************
|| Globals, accessible to all processes
*/
char*       ofName = NULL;  /* output file name string */
int         outFD;          /* output file descriptor */
usptr_t*    arena;          /* arena where everything is built */
barrier_t*  convene;        /* barrier used to sync up */
int         nprocs = 1;     /* 1 + number of child procs */
child_t*    array;          /* array of child_t structs in arena */
int         errors = 0;     /* always incremented on an error */
/******************************************************************************
|| forward declaration of the child process functions
*/
void inProc0(void *arg, size_t stk);    /* polls with aio_error() */
void inProc1(void *arg, size_t stk);    /* uses aio_suspend() */
void inProc2(void *arg, size_t stk);    /* uses a signal and semaphore */
void inProc3(void *arg, size_t stk);    /* uses a callback and semaphore */
/******************************************************************************
// The main()
*/
int main(int argc, char **argv)
{
    char*       tmpdir;         /* ->name string of temp dir */
    int         nfiles;         /* how many input files on cmd line */
    int         argno;          /* loop counter */
    child_t*    pc;             /* ->child_t of current file */
    void (*method)(void *,size_t) = inProc0; /* ->chosen input method */
    char        arenaPath[128]; /* build area for arena pathname */
    char        outPath[128];   /* build area for output pathname */    
    /*
    || Ensure the name of a temporary directory.
    */
    tmpdir = getenv("TMPDIR");
    if (!tmpdir) tmpdir = "/var/tmp";
    /*
    || Build a name for the arena file.
    */
    strcpy(arenaPath,tmpdir);
    strcat(arenaPath,"/aiocat.wrk");
    /*
    || Create the arena. First, call usconfig() to establish the
    || minimum size (twice the buffer size per file, to allow for misc usage)
    || and the (maximum) number of processes that may later use
    || this arena.  For this program that is MAX_INFILES+10, allowing
    || for our sprocs plus those done by aio_sgi_init().
    || These values apply to any arenas made subsequently, until changed.
    */
    {
        ptrdiff_t ret;
        ret = usconfig(CONF_INITSIZE,2*BLOCKSIZE*MAX_INFILES);
        QUITIFMONE(ret,"usconfig size")
        ret = usconfig(CONF_INITUSERS,MAX_INFILES+10);
        QUITIFMONE(ret,"usconfig users")
        arena = usinit(arenaPath);
        QUITIFNULL(arena,"usinit")
    }
    /*
    || Allocate the barrier.
    */
    convene = new_barrier(arena);
    QUITIFNULL(convene,"new_barrier")
    /*
    || Allocate the array of child info structs and zero it.
    */
    array = (child_t*)usmalloc(MAX_INFILES*sizeof(child_t),arena);
    QUITIFNULL(array,"usmalloc")
    bzero((void *)array,MAX_INFILES*sizeof(child_t));
    /*
    || Loop over the arguments, setting up child structs and
    || counting input files.  Quit if a file won't open or seek,
    || or if we can't get a buffer or semaphore.
    */
    for (nfiles=0, argno=1; argno < argc; ++argno )
    {
        if (0 == strcmp(argv[argno],"-o"))
        { /* is the -o argument */
            ++argno;
            if (argno < argc)
                ofName = argv[argno];
            else
            {
                fprintf(stderr,"-o must have a filename after\n");
                return -1;
            }
        }
        else if (0 == strcmp(argv[argno],"-a"))
        { /* is the -a argument */
            char c = argv[++argno][0];
            switch(c)
            {
            case '0' : method = inProc0; break;
            case '1' : method = inProc1; break;
            case '2' : method = inProc2; break;
            case '3' : method = inProc3; break;
            default:
                {
                    fprintf(stderr,"unknown method -a %c\n",c);
                    return -1;
                }
            }
        }
        else if ('-' == argv[argno][0])
        { /* is unknown -option */
            fprintf(stderr,"aiocat [-o outfile] [-a 0|1|2|3] infiles...\n");
            return -1;
        }
        else    
        { /* neither -o nor -a, assume input file */
            if (nfiles < MAX_INFILES)
            {
                /*
                || save the filename
                */
                pc = &array[nfiles];
                strcpy(pc->fname,argv[argno]);
                /*
                || allocate a buffer and a semaphore.  Not all
                || child procs use the semaphore but so what?
                */
                pc->buffer = usmalloc(BLOCKSIZE,arena);
                QUITIFNULL(pc->buffer,"usmalloc(buffer)")
                pc->sema = usnewsema(arena,0);
                QUITIFNULL(pc->sema,"usnewsema")
                /*
                || open the file
                */
                pc->fd = open(pc->fname,O_RDONLY);
                QUITIFMONE(pc->fd,"open")
                /*
                || get the size of the file. This leaves the file
                || positioned at-end, but there is no need to reposition 
                || because all aio_read calls have an implied lseek.
                || NOTE: there is no check for zero-length file; that
                || is a valid (and interesting) test case.
                */
                pc->fsize = lseek(pc->fd,0,SEEK_END);
                QUITIFMONE(pc->fsize,"lseek")
                /*
                || set the starting base address of this input file
                || in the output file.  The first file starts at 0.
                || Each one after starts at prior base + prior size.
                */
                if (nfiles) /* not first */
                    pc->outbase =
                        array[nfiles-1].fsize + array[nfiles-1].outbase;
                ++nfiles;
            }
            else
            {
                printf("Too many files, %s ignored\n",argv[argno]);
            }
        }
    } /* end for(argc) */
    /*
    || If there was no -o argument, construct an output file name.
    */
    if (!ofName)
    {
        strcpy(outPath,tmpdir);
        strcat(outPath,"/aiocat.out");
        ofName = outPath;
    }
    /*
    || Open, creating or truncating, the output file.
    || Do not use O_APPEND, which would constrain aio to doing
    || operations in sequence.
    */
    outFD = open(ofName, O_WRONLY+O_CREAT+O_TRUNC,0666);
    QUITIFMONE(outFD,"open(output)")
    /*
    || If there were no input files, just quit, leaving empty output
    */
    if (!nfiles)
    {
        return 0;
    }
    /*
    || Note the number of processes-to-be, for use in initializing
    || aio and for use by each child in a barrier() call.
    */
    nprocs = 1+nfiles;
    /*
    || Initialize async I/O using aio_sgi_init(), in order to specify
    || a number of locks at least equal to the number of child procs
    || and in order to specify extra sproc users.
    */
    {
        aioinit_t ainit = {0}; /* all fields initially zero */
        /*
        || Go with the default 5 for the number of aio-created procs,
        || as we have no way of knowing the number of unique devices.
        */
#define AIO_PROCS 5
        ainit.aio_threads = AIO_PROCS;
        /*
        || Set the number of locks aio needs to the number of procs
        || we will start, minimum 3.
        */
        ainit.aio_locks = (nprocs > 2)?nprocs:3;
        /*
        || Warn aio of the number of user procs that will be
        || using its arena.
        */
        ainit.aio_numusers = nprocs;
        aio_sgi_init(&ainit);
    }
    /*
    || Process each input file, either in a child process or in
    || a subroutine call, as specified by the DO_SPROCS variable.
    */
    for (argno = 0; argno < nfiles; ++argno)
    {
        pc = &array[argno];
#if DO_SPROCS
#define CHILD_STACK 64*1024
        /*
        || For each input file, start a child process as an instance
        || of the selected method (-a argument).
        || If an error occurs, quit. That will send a SIGHUP to any
        || already-started child, which will kill it, too.
        */
        pc->procid = sprocsp(method     /* function to start */
                            ,PR_SALL    /* share all, keep FDs sync'd */
                            ,(void *)pc /* argument to child func */
                            ,NULL       /* absolute stack seg */
                            ,CHILD_STACK);  /* max stack seg growth */
        QUITIFMONE(pc->procid,"sproc")
#else
        /*
        || For each input file, call the selected (-a) method as a
        || subroutine to copy its file.
        */
        fprintf(stderr,"file %s...",pc->fname);
        method((void*)pc,0);
        if (errors) break;
        fprintf(stderr,"done\n");
#endif
    }
#if DO_SPROCS
    /*
    || Wait for all the kiddies to get themselves initialized.
    || When all have started and reached barrier(), all continue.
    || If any errors occurred in initialization, quit.
    */
    barrier(convene,nprocs);
    /*
    || Child processes are executing now. Reunite the family round the
    || old hearth one last time, when their processing is complete.
    || Each child ensures that all its output is complete before it
    || invokes barrier().
    */
    barrier(convene,nprocs);
#endif
    /*
    || Close the output file and print some statistics.
    */
    close(outFD);
    {
        clock_t timesum;
        long    bytesum;
        double  bperus;
        printf("    procid   time     fsize     filename\n");
        for(argno = 0, timesum = bytesum = 0 ; argno < nfiles ; ++argno)
        {
            pc = &array[argno];
            timesum += pc->etime;
            bytesum += pc->fsize;
            printf("%2d: %-8d %-8d %-8d  %s\n"
                    ,argno,pc->procid,pc->etime,pc->fsize,pc->fname);
        }
        bperus = ((double)bytesum)/((double)timesum);
        printf("total time %d usec, total bytes %d, %g bytes/usec\n"
                     ,timesum            , bytesum , bperus);
    }
    /*
    || Unlink the arena file, so it won't exist when this progam runs
    || again. If it did exist, it would be used as the initial state of
    || the arena, which might or might not have any effect.
    */
    unlink(arenaPath);
    return 0;
}
/******************************************************************************
|| inProc0() alternates polling with aio_error() with sginap(). Under
|| the Frame Scheduler, it would use frs_yield() instead of sginap().
|| The general pattern of this function is repeated in the other three;
|| only the wait method varies from function to function.
*/
int inWait0(child_t *pch)
{
    int ret;
    aiocb_t* pab = &pch->acb;
    while (EINPROGRESS == (ret = aio_error(pab)))
    {
        sginap(0);
    }
    return ret;
}
void inProc0(void *arg, size_t stk)
{
    child_t *pch = arg;         /* starting arg is ->child_t for my file */
    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
    int ret;                    /* as long as this is 0, all is ok */
    int bytes;                  /* #bytes read on each input */
    /*
    || Initialize -- no signals or callbacks needed.
    */
    pab->aio_sigevent.sigev_notify = SIGEV_NONE;
    pab->aio_buf = pch->buffer; /* always the same */
#if DO_SPROCS
    /*
    || Wait for the starting gun...
    */
    barrier(convene,nprocs);
#endif
    pch->etime = clock();
    do /* read and write, read and write... */
    {
        /*
        || Set up the aiocb for a read, queue it, and wait for it.
        */
        pab->aio_fildes = pch->fd;
        pab->aio_offset = pch->inbase;
        pab->aio_nbytes = BLOCKSIZE;
        if (ret = aio_read(pab))
            break;  /* unable to schedule a read */
        ret = inWait0(pch);
        if (ret)
            break;  /* nonzero read completion status */
        /*
        || get the result of the read() call, the count of bytes read.
        || Since aio_error returned 0, the count is nonnegative.
        || It could be 0, or less than BLOCKSIZE, indicating EOF.
        */
        bytes = aio_return(pab); /* actual read result */
        if (!bytes)
            break;  /* no need to write a last block of 0 */
        pch->inbase += bytes;   /* where to read next time */
        /*
        || Set up the aiocb for a write, queue it, and wait for it.
        */
        pab->aio_fildes = outFD;
        pab->aio_nbytes = bytes;
        pab->aio_offset = pch->outbase;
        if (ret = aio_write(pab))
            break;
        ret = inWait0(pch);
        if (ret)
            break;
        pch->outbase += bytes;  /* where to write next time */
    } while ((!ret) && (bytes == BLOCKSIZE));
    /*
    || The loop is complete.  If no errors so far, use aio_fsync()
    || to ensure that output is complete.  This requires waiting
    || yet again.
    */
    if (!ret)
    {
        if (!(ret = aio_fsync(O_SYNC,pab)))
        ret = inWait0(pch);
    }
    /*
    || Flag any errors for the parent proc. If none, count elapsed time.
    */
    if (ret) ++errors;
    else pch->etime = (clock() - pch->etime);
#if DO_SPROCS
    /*
    || Rendezvous with the rest of the family, then quit.
    */
    barrier(convene,nprocs);
#endif
    return;
} /* end inProc1 */
/******************************************************************************
|| inProc1 uses aio_suspend() to await the completion of each operation.
|| Otherwise it is the same as inProc0, above.
*/
 
int inWait1(child_t *pch)
{
    int ret;
    aiocb_t* susplist[1]; /* list of 1 aiocb for aio_suspend() */
    susplist[0] = &pch->acb;
    /*
    || Note: aio.h declares the 1st argument of aio_suspend() as "const."
    || The C compiler requires the actual-parameter to match in type,
    || so the list we pass must either be declared "const aiocb_t*" or
    || must be cast to that -- else cc gives a warning.  The cast
    || in the following statement is only to avoid this warning.
    */
    ret = aio_suspend( (const aiocb_t **) susplist,1,NULL);
    return ret;
}
void inProc1(void *arg, size_t stk)
{
    child_t *pch = arg;         /* starting arg is ->child_t for my file */
    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
    int ret;                    /* as long as this is 0, all is ok */
    int bytes;                  /* #bytes read on each input */
    /*
    || Initialize -- no signals or callbacks needed.
    */
    pab->aio_sigevent.sigev_notify = SIGEV_NONE;
    pab->aio_buf = pch->buffer; /* always the same */
#if DO_SPROCS
    /*
    || Wait for the starting gun...
    */
    barrier(convene,nprocs);
#endif
    pch->etime = clock();
    do /* read and write, read and write... */
    {
        /*
        || Set up the aiocb for a read, queue it, and wait for it.
        */
        pab->aio_fildes = pch->fd;
        pab->aio_offset = pch->inbase;
        pab->aio_nbytes = BLOCKSIZE;
        if (ret = aio_read(pab))
            break;
        ret = inWait1(pch);
        /*
        || If the aio_suspend() return is nonzero, it means that the wait
        || did not end for i/o completion but because of a signal. Since we
        || expect no signals here, we take that as an error.
        */
        if (!ret) /* op is complete */
            ret = aio_error(pab);  /* read() status, should be 0 */
        if (ret)
            break;  /* signal, or nonzero read completion */
        /*
        || get the result of the read() call, the count of bytes read.
        || Since aio_error returned 0, the count is nonnegative.
        || It could be 0, or less than BLOCKSIZE, indicating EOF.
        */
        bytes = aio_return(pab); /* actual read result */
        if (!bytes)
            break;  /* no need to write a last block of 0 */
        pch->inbase += bytes;   /* where to read next time */
        /*
        || Set up the aiocb for a write, queue it, and wait for it.
        */
        pab->aio_fildes = outFD;
        pab->aio_nbytes = bytes;
        pab->aio_offset = pch->outbase;
        if (ret = aio_write(pab))
            break;
        ret = inWait1(pch);
        if (!ret) /* op is complete */
            ret = aio_error(pab);  /* should be 0 */
        if (ret)
            break;
        pch->outbase += bytes;  /* where to write next time */
    } while ((!ret) && (bytes == BLOCKSIZE));
    /*
    || The loop is complete.  If no errors so far, use aio_fsync()
    || to ensure that output is complete.  This requires waiting
    || yet again.
    */
    if (!ret)
    {
        if (!(ret = aio_fsync(O_SYNC,pab)))
            ret = inWait1(pch);
    }
    /*
    || Flag any errors for the parent proc. If none, count elapsed time.
    */
    if (ret) ++errors;
    else pch->etime = (clock() - pch->etime);
#if DO_SPROCS
    /*
    || Rendezvous with the rest of the family, then quit.
    */
    barrier(convene,nprocs);
#endif
} /* end inProc0 */
/******************************************************************************
|| inProc2 requests a signal upon completion of an I/O. After starting
|| an operation, it P's a semaphore which is V'd from the signal handler.
*/
#define AIO_SIGNUM SIGRTMIN+1 /* arbitrary choice of signal number */
void sigHandler2(const int signo, const struct siginfo *sif )
{
    /*
    || In this minimal signal handler we pick up the address of the
    || child_t info structure -- which was put in aio_sigevent.sigev_value
    || field during initialization -- and use it to find the semaphore.
    */
    child_t *pch = sif->si_value.sival_ptr ;
    usvsema(pch->sema);
    return; /* stop here with dbx to print the above address */
}
int inWait2(child_t *pch)
{
    /*
    || Wait for any signal handler to post the semaphore.  The signal
    || handler could have been entered before this function is called,
    || or it could be entered afterward.
    */
    uspsema(pch->sema);
    /*
    || Since this process executes only one aio operation at a time,
    || we can return the status of that operation.  In a more complicated
    || design, if a signal could arrive from more than one pending
    || operation, this function could not return status.
    */
    return aio_error(&pch->acb);
}
void inProc2(void *arg, size_t stk)
{
    child_t *pch = arg;         /* starting arg is ->child_t for my file */
    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
    int ret;                    /* as long as this is 0, all is ok */
    int bytes;                  /* #bytes read on each input */
    /*
    || Initialize -- request a signal in aio_sigevent. The address of
    || the child_t struct is passed as the siginfo value, for use
    || in the signal handler.
    */
    pab->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
    pab->aio_sigevent.sigev_signo = AIO_SIGNUM;
    pab->aio_sigevent.sigev_value.sival_ptr = (void *)pch;
    pab->aio_buf = pch->buffer; /* always the same */
    /*
    || Initialize -- set up a signal handler for AIO_SIGNUM.
    */
    {
        struct sigaction sa = {SA_SIGINFO,sigHandler2};
        ret = sigaction(AIO_SIGNUM,&sa,NULL);
        if (ret) ++errors; /* parent will shut down ASAP */
    }   
#if DO_SPROCS
    /*
    || Wait for the starting gun...
    */
    barrier(convene,nprocs);
#else
    if (ret) return;
#endif
    pch->etime = clock();
    do /* read and write, read and write... */
    {
        /*
        || Set up the aiocb for a read, queue it, and wait for it.
        */
        pab->aio_fildes = pch->fd;
        pab->aio_offset = pch->inbase;
        pab->aio_nbytes = BLOCKSIZE;
        if (!(ret = aio_read(pab)))
            ret = inWait2(pch);
        if (ret)
            break;  /* could not start read, or it ended badly */
        /*
        || get the result of the read() call, the count of bytes read.
        || Since aio_error returned 0, the count is nonnegative.
        || It could be 0, or less than BLOCKSIZE, indicating EOF.
        */
        bytes = aio_return(pab); /* actual read result */
        if (!bytes)
            break;  /* no need to write a last block of 0 */
        pch->inbase += bytes;   /* where to read next time */
        /*
        || Set up the aiocb for a write, queue it, and wait for it.
        */
        pab->aio_fildes = outFD;
        pab->aio_nbytes = bytes;
        pab->aio_offset = pch->outbase;
        if (!(ret = aio_write(pab)))
             ret = inWait2(pch);
        if (ret)
            break;
        pch->outbase += bytes;  /* where to write next time */
    } while ((!ret) && (bytes == BLOCKSIZE));
    /*
    || The loop is complete.  If no errors so far, use aio_fsync()
    || to ensure that output is complete.  This requires waiting
    || yet again.
    */
    if (!ret)
    {
        if (!(ret = aio_fsync(O_SYNC,pab)))
            ret = inWait2(pch);
    }
    /*
    || Flag any errors for the parent proc. If none, count elapsed time.
    */
    if (ret) ++errors;
    else pch->etime = (clock() - pch->etime);
#if DO_SPROCS
    /*
    || Rendezvous with the rest of the family, then quit.
    */
    barrier(convene,nprocs);
#endif
} /* end inProc2 */
 
/******************************************************************************
|| inProc3 uses a callback and a semaphore. It waits with a P operation.
|| The callback function executes a V operation.  This may come before or
|| after the P operation.
*/
void callBack3(union sigval usv)
{
    /*
    || The callback function receives the pointer to the child_t struct,
    || as prepared in aio_sigevent.sigev_value.sival_ptr.  Use this to 
    || post the semaphore in the child_t struct.
    */
    child_t *pch = usv.sival_ptr;
    usvsema(pch->sema);
    return;
}
int inWait3(child_t *pch)
{
    /*
    || Suspend, if necessary, by polling the semaphore.  The callback
    || function might be entered before we reach this point, or after.
    */
    uspsema(pch->sema);
    /*
    || Return the status of the aio operation associated with the sema.
    */
    return aio_error(&pch->acb);    
}
void inProc3(void *arg, size_t stk)
{
    child_t *pch = arg;         /* starting arg is ->child_t for my file */
    aiocb_t *pab = &pch->acb;   /* base address of the aiocb_t in child_t */
    int ret;                    /* as long as this is 0, all is ok */
    int bytes;                  /* #bytes read on each input */
    /*
    || Initialize -- request a callback in aio_sigevent. The address of
    || the child_t struct is passed as the siginfo value to be passed
    || into the callback. 
    */
    pab->aio_sigevent.sigev_notify = SIGEV_CALLBACK;
    pab->aio_sigevent.sigev_func = callBack3;
    pab->aio_sigevent.sigev_value.sival_ptr = (void *)pch;
    pab->aio_buf = pch->buffer; /* always the same */
#if DO_SPROCS
    /*
    || Wait for the starting gun...
    */
    barrier(convene,nprocs);
#endif
    pch->etime = clock();
    do /* read and write, read and write... */
    {
        /*
        || Set up the aiocb for a read, queue it, and wait for it.
        */
        pab->aio_fildes = pch->fd;
        pab->aio_offset = pch->inbase;
        pab->aio_nbytes = BLOCKSIZE;
        if (!(ret = aio_read(pab)))
            ret = inWait3(pch);
        if (ret)
            break;  /* read error */
        /*
        || get the result of the read() call, the count of bytes read.
        || Since aio_error returned 0, the count is nonnegative.
        || It could be 0, or less than BLOCKSIZE, indicating EOF.
        */
        bytes = aio_return(pab); /* actual read result */
        if (!bytes)
            break;  /* no need to write a last block of 0 */
        pch->inbase += bytes;   /* where to read next time */
        /*
        || Set up the aiocb for a write, queue it, and wait for it.
        */
        pab->aio_fildes = outFD;
        pab->aio_nbytes = bytes;
        pab->aio_offset = pch->outbase;
        if (!(ret = aio_write(pab)))
             ret = inWait3(pch);
        if (ret)
            break;
        pch->outbase += bytes;  /* where to write next time */
    } while ((!ret) && (bytes == BLOCKSIZE));
    /*
    || The loop is complete.  If no errors so far, use aio_fsync()
    || to ensure that output is complete.  This requires waiting
    || yet again.
    */
    if (!ret)
    {
        if (!(ret = aio_fsync(O_SYNC,pab)))
            ret = inWait3(pch);
    }
    /*
    || Flag any errors for the parent proc. If none, count elapsed time.
    */
    if (ret) ++errors;
    else pch->etime = (clock() - pch->etime);
#if DO_SPROCS
    /*
    || Rendezvous with the rest of the family, then quit.
    */
    barrier(convene,nprocs);
#endif
} /* end inProc3 */