a fast, parametric pipeling cat(1) quasi clone
Piercarlo Grandi
pcg at aber-cs.UUCP
Fri Mar 17 04:36:52 AEST 1989
This program copies its standard input tom its standard output. It does
this in a pipelined fashion, permitting overlap of reading with
writing. Other similar programs are ddd (posted in comp.sources.unix) and
strm (standard with Microport).
It uses a ring of pipes connecting a ring of processes; the number of
processes can be chosen at command startup time. Each process waits for
a can-read token from its input pipe, then fills a buffer, whose size
can be chosen at command startup, after which it passes the can-read token
down its output pipe. It then waits for a can-write token from the input
pipe, and then writes the buffer, and then passes the can-write token along.
The net effect is asynchronous copy with any size ring of any size buffers.
It helps a lot with backups, and makes streaming possible.
One of greatest virtues is that it should run unchanged on virtually any
UNIX version I can think of, as it uses only pipes and other V7 features.
On the other hand this has two drawbacks, higher CPU consumption, and
specificity to UNIX (it relies on all processes derived from the same parent
having a shared file pointer in their inherited standard input and output).
It is reliable. I have not find bugs for a while.
Finally, a bit of history. This program has already been posted to BIX.
It has not been changed since then (if I remember correctly).
Even more finally :-), a disclaimer: in no way the University College of
Wales, my employer, has supported or helped the development of this program.
It has been entirely developed on my own time with my own resources, and
is an adjunct to my own personal research, and in no way it has anything to
do with the research of the University College of Wales.
I thank the University College of Wales for allowing me to
post this program through the use of their computer resources.
-------------------------- cut here -----------------------
#! /bin/sh
# This is a shell archive, meaning:
# 1. Remove everything above the #! /bin/sh line.
# 2. Save the resulting text in a file.
# 3. Execute the file with /bin/sh (not csh) to create the files:
# team.1
# team.c
# This archive created: Thu Mar 16 15:29:56 1989
export PATH; PATH=/bin:$PATH
if test -f 'team.1'
then
echo shar: will not over-write existing file "'team.1'"
else
cat << \SHAR_EOF > 'team.1'
.TH TEAM 1 (pg)
.ad b
.SH NAME
team \- parallel "pipe", allows asynchronous io
.SH SYNOPSIS
.B team
[blocksize[\fBb\fP|\fBk\fP] [processes]]
.SH DESCRIPTION
.I Team
just copies its standard input to its standard output. It does so
however forking a team of independent
.I processes
(default is 8), arranged in a ring, with reads overlapped
with writes.
.LP
Each process will wait for the end of the read phase of
previous process, will then read
.I blocksize
bytes (or 512 byte blocks if suffixed with
.I b
or kilobytes if suffixed with
.IR k ,
the default is 10240)
from its standard input, activate the next process read
phase, wait for the previous process write phase end, then
write to its standard output, and activate the next process
write phase.
.LP
.I Team
consumes system time to synchronize and task switch among
its processes; also, in order to avoid slowing it, it is
best run on a quiescent system.
.LP
This program is most useful for output to a device, especially
where a streaming tape is involved. It may be used to advantage with
disc to disc and disc to tape copies.
.SH EXAMPLES
find dir -print | cpio -oBc | team 20k 8 >/dev/rmt0
.br
team 20k 8 </dev/rmt0 | cpio -iBcdmu
.SH ADVICE
You are advised to experiment with different combinations of block size and
number of processes; each program used with
.I team
works best with certain parameters, and performance depends even more
strongly on the output device, so experiment with parameters also for
this (it seems that the blocking factor of the process that feeds
.I team
ought to be inferior to that given to it, and possibly inferior to the
limit on the size of a pipe for your version of the system).
.I Team
ought to be adaptive, and adjust dynamically both parameters, in order to
reach a state where there is no pause between each stage of the ring. This is
too difficult to achieve under UNIX.
.LP
Notice also that this program will read and write blocks all of the
same size as prescribed, except the last, even when reading from
pipes; if a read from its input supplies less bytes than the prescribed
block size, this program will read again until its buffer is filled
to norm or the input finishes.
.LP
A final note: it is usually advantageous to give to
.I team
a block size that is a multiple of the block size produced by the
program before it in a pipeline. Notice that in many cases, such
as the tape archival programs, the output will not be directly
recognizable to the tape archiver in input, but will have to be reblocked
back to the blocksize expected by the tape archiver either by way
of
.I dd
or reapplication of
.IR team ,
that is much faster of course.
.SH BUGS
.I Team
will emit a number of messages comprehensible only to the author in case
of errors. Plase note them and report them to the author.
.SH SEE ALSO
.IR volcopy (8)
.br
.IR cpio (1)
.br
.IR tar (1)
.br
.IR dump (8)
.SH AUTHOR
Piercarlo Grandi, Milano.
SHAR_EOF
fi # end of overwriting check
if test -f 'team.c'
then
echo shar: will not over-write existing file "'team.c'"
else
cat << \SHAR_EOF > 'team.c'
/*
$Header$
*/
static char Notice[] =
"Copyright (C) 1987,1989 Piercarlo Grandi. All rights reserved.";
/*
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 1, or (at your option)
any later version.
This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You may have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation,
Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
#undef DEBUG
/*
Unix programs normally do synchronous read and write, that is, you
read and then you write; no overlap is possible.
This is especially catastrophic for device to device copies, whereit
is important to minimize elapsed time, by overlapping activity on
one with activity on another.
To obtain this, a multiprocess structure is necessary under
Unix. This program is functionally equivalento to a pipe, in that it
copies its input (fd 0) to its output (fd 1) link.
This programs is executed as a Team of N processes, called Guys, all
of which share the same input and output links; the first reads a
chunk of input, awakens the second, writes the chunk to its output;
the second does the same, and the last awakens the first.
Since this process is essentially cyclic, we use a ring of pipesto
synchronize the Guys. Each guy has un input pipe from the upstream
guy and an output pipe to the downstream guy. Whenever a guy
receives a READ command from the upstream, it first reads a block
and then passes on the READ command downstream; it then waits for a
WRITE command from upstream, and then writes the block and after
that passes the WRITE command downstream.
Two other commands are used, one is STOP, and is sent downstream
from the guy that detects the end of file of the input, after which
the guy exits, and ABORT, which is sent downstream from the guy
which detects trouble in the guy upstream to it, which has much the
same effect.
*/
#include <errno.h>
extern int errno;
#include <signal.h>
#include <stdio.h>
#include <sys/types.h>
#include <sys/file.h>
#ifdef DEBUG
# define Mesg(list) mesg list
#else
# define Mesg(list)
#endif
/*VARARGS1*/
mesg(a,b,c,d,e,f,g,h,i)
char *a;
{
# if (defined(LOCK_EX))
int fd = open("/dev/tty",1);
flock(fd,LOCK_EX);
# endif
fprintf(stderr,"%u: ",getpid());
fprintf(stderr,a,b,c,d,e,f,g,h,i);
# if (defined(LOCK_EX))
flock(fd,LOCK_UN);
close(fd);
# endif
}
#ifndef PCG
# define PCG 0
#endif
#if (PCG)
# include "Extend.h"
# if (defined(vax) && defined(unix))
# include "Here42BSD.h"
# endif
# include "Sizes.h"
# include "Types.h"
#else
# define were if
# define fast register
# define public /* extern */
# define private static
# define void int
# define boolean int
# define true 1
# define false 0
# define NIL ((pointer) 0)
# define scalar int
typedef char *pointer;
# if (defined(SMALL_M))
typedef unsigned address;
# else
typedef long address;
# endif
# define when break; case
# define otherwise break; default
#endif
/*
The regular Unix read and write calls are not guaranteed to process
all the bytes requested. These procedures guarantee that if the
request is for N bytes, all of them are read or written unless there
is an error or eof.
*/
#define FdCLOSED 0
#define FdOPEN 1
#define FdEOF 2
#define FdERROR 3
struct Fd
{
int FFd;
short FStatus;
};
struct Fd FdIn,FdOut;
public boolean FdOpen(fd,ffd)
fast struct Fd *fd;
int ffd;
{
fd->FStatus = (ffd >= 0) ? FdOPEN : FdCLOSED;
fd->FFd = ffd;
Mesg(("FdOpen fd %d\n",ffd));
return ffd >= 0;
}
public boolean FdClose(fd)
fast struct Fd *fd;
{
int ffd;
ffd = fd->FFd;
Mesg(("FdClose fd %d\n",fd->FFd));
fd->FStatus = FdCLOSED;
fd->FFd = -1;
return close(ffd) >= 0;
}
public boolean FdCopy(to,from)
fast struct Fd *to,*from;
{
to->FStatus = from->FStatus;
to->FFd = dup(from->FFd);
Mesg(("FdCopy of %d is %d\n",from->FFd,to->FFd));
return to->FFd >= 0;
}
public void FdSet(to,from)
fast struct Fd *to,*from;
{
if (from->FFd < 0)
fprintf(stderr,"team: set an invalid fd\n");
to->FStatus = from->FStatus;
to->FFd = from->FFd;
}
public address FdRead(fd,buffer,goal)
fast struct Fd *fd;
pointer buffer;
fast address goal;
{
fast int nread;
fast address total;
switch (fd->FStatus)
{
when FdEOF: return 0;
when FdERROR: return -1;
when FdCLOSED: return -1;
when FdOPEN:
for
(
total = 0;
total < goal && (nread =
read(fd->FFd,buffer+total, (unsigned) (goal-total))) > 0;
total += nread
);
if (nread == 0) fd->FStatus = FdEOF;
if (nread < 0) fd->FStatus = FdERROR;
Mesg(("FdRead %d reads %d last %d\n",fd->FFd,total,nread));
return (total == 0) ? nread : total;
}
/*NOTREACHED*/
}
public address FdWrite(fd,buffer,goal)
fast struct Fd *fd;
pointer buffer;
fast address goal;
{
fast int nwritten;
fast address total;
switch (fd->FStatus)
{
when FdEOF: return 0;
when FdERROR: return -1;
when FdCLOSED: return -1;
when FdOPEN:
for
(
total = 0;
total < goal && (nwritten =
write(fd->FFd,buffer+total,(unsigned) (goal-total))) > 0;
total += nwritten
);
Mesg(("FdWrite %d writes %d last %d\n",fd->FFd,total,nwritten));
if (nwritten == 0) fd->FStatus = FdEOF;
if (nwritten < 0) fd->FStatus = FdERROR;
return (total == 0) ? nwritten : total;
}
/*NOTREACHED*/
}
/*
A Token is scalar value representing a command.
*/
typedef short scalar Token;
#define TokenREAD 0
#define TokenWRITE 1
#define TokenSTOP 2
#define TokenABORT -1
/*
*/
public boolean StreamPipe(downstream,upstream)
fast struct Fd *downstream;
fast struct Fd *upstream;
{
int links[2];
if (pipe(links) < 0)
{
perror("team: opening links");
return false;
}
Mesg(("StreamPipe fd downstream %d upstream %d\n",links[1],links[0]));
return FdOpen(downstream,links[1]) && FdOpen(upstream,links[0]);
}
struct StreamMsg
{
Token SmToken;
short SmStatus;
};
public boolean StreamSend(fd,token,status)
fast struct Fd *fd;
Token token;
short status;
{
fast int n;
struct StreamMsg message;
message.SmToken = token; message.SmStatus = status;
n = FdWrite(fd,(pointer) &message,(address) sizeof message);
Mesg(("StreamSend fd %u n %d token %d\n",fd->FFd,n,token));
return n == sizeof message;
}
public boolean StreamReceive(fd,tokenp,statusp)
fast struct Fd *fd;
Token *tokenp;
short *statusp;
{
fast int n;
struct StreamMsg message;
n = FdRead(fd,(pointer) &message,(address) sizeof message);
*tokenp = message.SmToken; *statusp = message.SmStatus;
Mesg(("StreamReceive fd %u n %d token %d\n",fd->FFd,n,*tokenp));
return n == sizeof message;
}
/*
A guy is an instance of the input to output copier. It is attached
to a relay station, with an upstream link, from which commands
arrive, and a downward link, to which they are relayed once they are
executed.
*/
struct Guy
{
int GPid;
struct Fd GUpStream;
struct Fd GDownStream;
};
public boolean GuyOpen(guy,pid,upstream,downstream)
fast struct Guy *guy;
int pid;
struct Fd *upstream,*downstream;
{
Mesg(("GuyOpen pid %u upstream %u downstream %u\n",
pid,upstream->FFd,downstream->FFd));
guy->GPid = pid;
FdSet(&guy->GUpStream,upstream);
FdSet(&guy->GDownStream,downstream);
return true;
}
#define GuySEND(guy,token,status) \
StreamSend(&guy->GDownStream,token,status)
#define GuyRECEIVE(guy,tokenp,statusp) \
StreamReceive(&guy->GUpStream,tokenp,statusp)
public boolean GuyStart(guy,bufsize)
fast struct Guy *guy;
address bufsize;
{
fast char *buffer;
Token token;
short status;
boolean received;
static int nread,nwritten;
extern char *malloc();
Mesg(("GuyStart guy %#o bufsize %u\n",guy,bufsize));
buffer = (pointer) malloc((unsigned) bufsize);
if (buffer == NIL)
{
fprintf(stderr,"team: guy %d cannot allocate %u bytes\n",
guy->GPid,bufsize);
return false;
}
while ((received = GuyRECEIVE(guy,&token,&status)) && token != TokenSTOP)
switch (token)
{
when TokenREAD:
FdIn.FStatus = status;
Mesg(("GuyStart reading %d chars\n",bufsize));
nread = FdRead(&FdIn,(pointer) buffer,bufsize);
Mesg(("GuyStart reads %d chars\n",nread));
if (nread == 0) GuyStop(guy,NIL);
if (nread < 0) GuyStop(guy,"error on guy read");
if (!GuySEND(guy,TokenREAD,FdIn.FStatus))
GuyStop(guy,"guy cannot send READ");
when TokenWRITE:
FdOut.FStatus = status;
Mesg(("GuyStart writing %d chars\n",nread));
nwritten = FdWrite(&FdOut,(pointer) buffer,(address) nread);
Mesg(("GuyStart writes %d chars\n",nwritten));
if (nwritten == 0) GuyStop(guy,"eof on guy write");
if (nwritten < 0) GuyStop(guy,"error on guy write");
if (!GuySEND(guy,TokenWRITE,FdOut.FStatus))
GuyStop(guy,"guy cannot send WRITE");
when TokenABORT:
GuyStop(guy,"guy was aborted");
otherwise:
GuyStop(guy,"impossible token on ring");
}
free((char *) buffer);
GuyStop(guy,(received) ? NIL : "error on upstream receive");
/*NOTREACHED*/
/*return true;*/
}
public boolean GuyStop(guy,errormsg)
fast struct Guy *guy;
char *errormsg;
{
Mesg(("GuyStop guy %#o\n",guy));
if (errormsg != NIL)
{
fprintf(stderr,"team: guy pid %u: %s\n",guy->GPid,errormsg);
(void) GuySEND(guy,TokenABORT,FdERROR);
exit(1);
/*NOTREACHED*/
}
if (!GuySEND(guy,TokenSTOP,FdEOF))
{
exit(1);
/*NOTREACHED*/
}
exit(0);
/*NOTREACHED*/
}
public boolean GuyClose(guy)
fast struct Guy *guy;
{
return FdClose(&guy->GUpStream) && FdClose(&guy->GDownStream);
}
/*
A team is made up of a ring of guys; each guy copies a blockfrom its
input to its ouput, and is driven by tokens sent to it by the
previous guy on a pipe.
*/
struct Team
{
struct Guy *TGuys;
short unsigned TSize;
short unsigned TActive;
};
public boolean TeamOpen(team,nominalsize)
struct Team *team;
short unsigned nominalsize;
{
extern char *calloc();
Mesg(("TeamOpen nominalsize %u\n",nominalsize));
team->TSize = 0;
team->TActive = 0;
team->TGuys = (struct Guy *) calloc(sizeof (struct Guy),nominalsize);
for (team->TSize = 0; team->TSize < nominalsize; team->TSize++);
were (team->TGuys == (struct Guy *) NIL)
return false;
return true;
}
public boolean TeamStart(team,bufsize)
fast struct Team *team;
address bufsize;
{
/*
When generating each guy, we pass it an upstream link that is
the downstream of the previous guy, and create a new downstream
link that will be the next upstream.
At each turn we obviously close the old downstream once it has
been passed to the forked guy.
A special case are the first and last guys; the upstreamof the
first guy shall be the downstream of the last. This goes against
the grain of our main logic, where the upstream is expected to
already exist and the downstream must be created.
This means that the last and first guys are created in a special
way. When creating the first guy we shall create its
upstreamlink as well as its downstream, and we shall save that
in a special variable, last_downstream. This we shall use as the
downstreamof the last guy.
We shall also keep it open in the team manager (parent process)
because we shall use it to do the initial send of the read and
write tokens that will circulate in the relay ring, activating
the guys.
Of course because of this each guy will inherit this link as
wellas its upstream and downstream, but shall graciously close
it.
*/
struct Fd last_downstream;
struct Fd this_upstream;
struct Fd this_downstream;
struct Fd next_upstream;
Mesg(("TeamStart team %#o size %u bufsize %u\n",
team,team->TSize,bufsize));
(void) FdOpen(&FdIn,0); (void) FdOpen(&FdOut,1);
for (team->TActive = 0; team->TActive < team->TSize; team->TActive++)
{
fast struct Guy *guy;
fast int pid;
guy = team->TGuys+team->TActive;
if (team->TActive == 0)
{
if (!StreamPipe(&last_downstream,&this_upstream))
{
perror("cannot open first link");
return false;
}
if (!StreamPipe(&this_downstream,&next_upstream))
{
perror("cannot open link");
return false;
}
}
else if (team->TActive < (team->TSize-1))
{
if (!StreamPipe(&this_downstream,&next_upstream))
{
perror("cannot open link");
return false;
}
}
else /*if (team->TActive == team->TSize-1)*/
{
FdSet(&this_downstream,&last_downstream);
if (!FdCopy(&last_downstream,&this_downstream))
perror("team: cannot copy last downstream");
}
Mesg(("TeamStart going to fork for guy %#o\n",guy));
pid = fork();
if (pid > 0)
{
Mesg(("TeamStart forked guy %#o as pid %u\n",guy,pid));
guy->GPid = pid;
if (!FdClose(&this_upstream))
perror("cannot close this upstream link");
if (!FdClose(&this_downstream))
perror("cannot close this downstream link");
FdSet(&this_upstream,&next_upstream);
}
else if (pid == 0)
{
pid = getpid();
if (!FdClose(&last_downstream))
perror("cannot close inherited first link");
if (!GuyOpen(guy,pid,&this_upstream,&this_downstream))
GuyStop(guy,"cannot open guy");
if (!GuyStart(guy,bufsize))
GuyStop(guy,"cannot start guy");
if (!GuyClose(guy))
perror("cannot close guy");
/*NOTREACHED*/
}
else if (pid < 0)
{
perror("team: forking a guy");
return false;
}
}
if (!StreamSend(&last_downstream,TokenREAD,FdOPEN))
{
perror("cannot send first READ token");
return false;
}
if (!StreamSend(&last_downstream,TokenWRITE,FdOPEN))
{
perror("cannot send first WRITE token");
return false;
}
if (!FdClose(&last_downstream))
perror("cannot close first link");
return true;
}
public boolean TeamWait(team)
fast struct Team *team;
{
while (team->TActive != 0)
{
int guypid;
int status;
guypid = wait(&status);
if (guypid >= 0)
{
fast short unsigned guyno;
for (guyno = 0; guyno < team->TSize; guyno++)
if (guypid == team->TGuys[guyno].GPid)
{
team->TGuys[guyno].GPid = -1;
break;
}
}
else
{
fprintf(stderr,"team: no guys, believed %u left\n",
team->TActive);
return true;
}
--team->TActive;
if (status != 0 && team->TActive != 0)
return false;
}
return true;
}
public boolean TeamStop(team)
fast struct Team *team;
{
fast short unsigned guyno;
Mesg(("TeamStop team %#o\n",team));
for (guyno = 0; guyno < team->TSize; guyno++)
{
fast struct Guy *guy;
guy = team->TGuys+guyno;
if (guy->GPid >= 0)
{
/*kill(guy->GPid,SIGKILL);*/
--team->TActive;
}
}
return team->TActive == 0;
}
public boolean TeamClose(team)
fast struct Team *team;
{
for (team->TSize; team->TSize != 0; --team->TSize)
continue;
free(team->TGuys);
return true;
}
public void usage()
{
fputs("\
syntax: team [size[bk] [#ofprocesses]]\n\
copies standard input to output\n\
as a # of (default 8) pipelined processes\n\
writing blocks of size (default 10240) bytes,\n\
or 512 byte blocks or kilobytes\n",stderr);
exit(1);
}
public void main(argc,argv)
int argc;
char *(argv[]);
{
struct Team team;
short unsigned teamsize;
address bufsize;
if (argc <= 1)
bufsize = 10*1024;
else
{
fast char *cipher;
for (
cipher = argv[1];
*cipher >= '0' && *cipher <= '9';
cipher++
)
bufsize = bufsize*10 + *cipher-'0';
if (*cipher == 'b') bufsize *= 512;
if (*cipher == 'k') bufsize *= 1024;
if (bufsize < 64L || bufsize > (64L*1024L-1))
{
fprintf(stderr,"team: invalid block size %d\n",
bufsize);
usage();
}
}
if (argc <= 2)
teamsize = 8;
else
{
teamsize = atoi(argv[2]);
if (teamsize < 2 || teamsize > 16)
{
fprintf(stderr,"team: invalid # of processes %d\n",
teamsize);
usage();
}
}
if (argc > 3) usage();
if (!TeamOpen(&team,teamsize))
{
fprintf(stderr,"team: cannot setup the team with %u guys\n",
teamsize);
exit(1);
}
if (!TeamStart(&team,bufsize))
{
fprintf(stderr,"team: cannot start the team\n");
exit(1);
}
if (!TeamWait(&team))
{
fprintf(stderr,"team: stop remaining %u guys\n",team.TActive);
if (!TeamStop(&team))
{
fprintf(stderr,"team: cannot stop the team\n");
exit(1);
}
}
if (!TeamClose(&team))
{
fprintf(stderr,"team: cannot close the team\n");
exit(1);
}
exit(0);
}
SHAR_EOF
fi # end of overwriting check
# End of shell archive
exit 0
--
Piercarlo "Peter" Grandi | ARPA: pcg%cs.aber.ac.uk at nss.cs.ucl.ac.uk
Dept of CS, UCW Aberystwyth | UUCP: ...!mcvax!ukc!aber-cs!pcg
Penglais, Aberystwyth SY23 3BZ, UK | INET: pcg at cs.aber.ac.uk
More information about the Alt.sources
mailing list