multibuffered streaming tape copy
Don Speck
speck at cit-vlsi
Tue Aug 6 00:06:45 AEST 1985
: Shell archive of 4.2bsd streaming tape copy program.
: Extract with sh, not csh.
echo "x stream.c"
sed 's/^X//' >stream.c <<FGD135
X/* 4.2bsd multi-buffered screaming tape copy
X with streamer as input, output, or both.
X Does it all in user code via 2 concurrent processes,
X synchronized with flock().
X For streamer-to-streamer copies, NBUFS should be large (~10)
X*/
X
X#define NBUFS 2 /* Number of multi-buffers (hence processes) */
X
X#include <errno.h>
X#include <signal.h>
X#include <sys/file.h>
X#include <sys/wait.h>
X
Xint firstrd[2], prevrd[2], nextrd[2]; /* File descriptors */
Xint firstwr[2], prevwr[2], nextwr[2];
Xint slavepid[NBUFS];
Xint bufsiz = 10240;
Xchar *buf, *sbrk();
X
Xunsigned atou(s) register char *s; { /* Parse digit string to unsigned int */
X register unsigned u = 0;
X while (*s >= '0' && *s <= '9')
X u = u*10 + (*s++ - '0');
X if (*s == 'b') u *= 512, s++;
X if (*s == 'k') u *= 1024, s++;
X return(*s == '\0' ? u : 0);
X}
X
Xabort() { /* Signal catchers */
X killall();
X _exit(EINTR);
X}
X
Xdone() {
X _exit(0);
X}
X
Xmain(argc,argv) int argc; char *argv[]; {
X register int i, pid;
X static int wstat, children = NBUFS;
X
X if (argc > 2 || argc == 2 && (bufsiz=atou(argv[1])) == 0) {
X static char usage[] = "Usage: stream [bufsiz][b|k]\n";
X write(2, usage, sizeof(usage)-1);
X _exit(EINVAL);
X }
X buf = sbrk(bufsiz);
X if (buf == (char *) -1) {
X perror("sbrk");
X _exit(ENOMEM);
X }
X if (signal(SIGINT, abort) == SIG_IGN)
X signal(SIGINT, SIG_IGN);
X if (signal(SIGTERM, abort) == SIG_IGN)
X signal(SIGTERM, SIG_IGN);
X
X lockpipe(firstrd);
X lockpipe(firstwr);
X for (i=0; i<NBUFS; ++i) {
X if (i == 0) {
X prevrd[0] = firstrd[1]; prevrd[1] = firstrd[0];
X prevwr[0] = firstwr[1]; prevwr[1] = firstwr[0];
X } else {
X prevrd[0] = nextrd[0]; prevrd[1] = nextrd[1];
X prevwr[0] = nextwr[0]; prevwr[1] = nextwr[1];
X }
X flock(prevrd[1], LOCK_EX);
X flock(prevwr[1], LOCK_EX);
X nextrd[0] = firstrd[0]; nextrd[1] = firstrd[1];
X nextwr[0] = firstwr[0]; nextwr[1] = firstwr[1];
X if ((i < NBUFS-1 && (lockpipe(nextrd)<0 || lockpipe(nextwr)<0))
X || (slavepid[i]=fork()) < 0) {
X perror("stream: too many slaves (recompile smaller)");
X killall();
X _exit(EAGAIN);
X }
X if (slavepid[i] == 0) { /* Slave starts up here */
X signal(SIGINT,SIG_IGN);
X signal(SIGTERM,done); /* exit cleanly */
X copier();
X _exit(0);
X }
X if (i > 0) {
X close(prevrd[0]); close(prevrd[1]);
X close(prevwr[0]); close(prevwr[1]);
X }
X }
X flock(firstrd[0], LOCK_UN);
X flock(firstwr[0], LOCK_UN);
X close(firstrd[0]); close(firstrd[1]);
X close(firstwr[0]); close(firstwr[1]);
X
X while (children > 0 && (pid=wait(&wstat)) > 0)
X for (i=0; i<NBUFS; i++)
X if (pid == slavepid[i]) {
X children--;
X slavepid[i] = 0;
X killall();
X if (wstat != 0) _exit(EIO);
X }
X _exit(0);
X}
X
Xkillall() {
X register int i;
X for (i=0; i<NBUFS; i++)
X if (slavepid[i] > 0) kill(slavepid[i], SIGTERM);
X}
X
Xlockpipe(fd) int fd[2]; { /* prefer pipe(), but flock() barfs on them */
X char tmpname[20];
X strcpy(tmpname, "/tmp/lockpipeXXXXXX");
X mktemp(tmpname);
X if ((fd[1]=creat(tmpname,0400)) < 0)
X return(fd[1]);
X fd[0] = open(tmpname, 0);
X unlink(tmpname);
X return(fd[0] < 0 ? fd[0] : 0);
X}
X
X/* Synchronization - each process has a lockfile, and shares file
X * descriptors to the following process's lockfile. When our write
X * completes, we release our lock on the following process's lock-
X * file, allowing the following process to lock it and proceed. We
X * get the lock back for the next cycle by swapping descriptors.
X * Similarly for reads.
X */
X
X#include <stdio.h>
Xcopier() {
X register int nread, toggle = 0;
X
X flock(prevrd[toggle], LOCK_EX);
X while ((nread=read(0, buf, bufsiz)) > 0) {
X flock(nextrd[toggle^1], LOCK_UN); /* Jolt awake next reader */
X flock(prevwr[toggle], LOCK_EX); /* Wait for previous write */
X if (write(1, buf, nread) != nread) {
X perror("stdout");
X _exit(1);
X }
X toggle ^= 1;
X flock(nextwr[toggle], LOCK_UN); /* Jolt awake next writer */
X flock(prevrd[toggle], LOCK_EX); /* Now wait for the read */
X }
X flock(prevwr[toggle], LOCK_EX);
X if (nread < 0) {
X perror("stdin");
X _exit(1);
X }
X}
FGD135
More information about the Comp.sources.unix
mailing list