P
is implemented by creating the command that
runs on the command and parallel processors,
then executes them using remote shell execution,
or rsh.
This
implementation does not allow recursive parallel
function calls. In other words, the input file
or stream is broken up into sections and these
are run on the parallel processors, and these
parallel processors can not run another parallel
program. This limitation is done to make it
easier to implement.
Parsing
Parse
the arguments to find pfiles, sfiles, shell
commands, and functions. Parsing is tricky because
we are doing the expansion of arguments instead
of the shell handling it. That is why all the
arguments are quoted (surrounded by ‘’
marks). The reason we have to do this is that
“<>|” mean something to the
Unix shell and file expansion has to be handled
carefully.
Shell commands “<>|”
Shell
commands are recognized by being members of
a special list ">", "<",
">>", "|" etc.
After the “|” command, there must
be a call to “p” after the symbol
in macroexpanded. This is because a parallel
pipe must go into a parallel function and the
p command must compile these all together.
File expansion
File expansion of serial files is the same as
normal shell expansion, but file expansion has
to be handled carefully. The screwy case comes
in when you have an output file such the parallel
version of ‘cut < foo > bar’
would be ‘p cut < foo > /net/arc*/bar’
but since bar does not exist yet, the normal
shell would expand /net/arc*/bar to nothing.
If
there is a / in the filename, then separate
the directory from the filename (the piece after
the last /). This will then be a directory that
can be expanded even if the full pathname does
not exist. Then use “ls -l” on the
directory part in the shell environment (to
get the right macro expansion if any). In the
case of a pfile this will be a list, and an
sfile this will just be a single directory.
Then put the filename back on each of the results.
Count
the number of files in a pfile argument to determine
"count". All pfile args should have
the same count or it is an error. If there is
no pfile arg, then use the length of the environment
variable P.
Functions or Commands
There
are the combining functions and the main function
(that would be run on each machine).
In
the case of a “|” component in the
call to p, the next function will be a “p”
or a “pxxx” function. In the case
of a “pxxx” function, we must expand
it into “p xxx”. How do we do the
macro expansion? Do we have to have a redundant
table in P? Can we just call the shell and get
what it returns?
Recognizing
the arguments to the combining function “-c”
argument to p is sloppy at the moment. How do
we have them not interfere with the other parts
of the p call? We could quote it, but that is
ugly since there could be a quote around the
whole thing.
Creating the commands to be run
Let’s
start with a simple case and work up. First
commands with no pipes.
p
cut ‘< sfile > pfile’ means
(in some pseudo notation):
for
i < count
split-file -count count -i i sfile | rsh machine_of_pfile[i]
(cut > pfile[i]) &
wait
machine_of_pfile[i]
can be figured out by doing a ‘df -k pfile[i]’
If it turns out to be the same machine as the
controlling machine (that is running the shell
script), it might be more efficient to replace
the “rsh machine” with a | command.
There are cases where one would run a p routine
that executes on the controlling machine (see
section Using P on Symmetric Multi Processors).
This
routine will execute the split-file function
on the controlling machine and pipe the part
of the file to the remote machine run the cut
there and store it on that remote machine. Each
of these commands will be run in the background,
and the wait command will wait until they are
all done before going to the next command. This
implementation does not trap any errors.
Split-file
is a function that takes a section of a file
and pipes it to stdout. The way that it figures
out what to grab is that it stats the file for
its length, starts by seeking i/count of the
way through the file, goes to the end of the
line, and starts there. That way it does not
chop any line in half. Then it goes to character
I+1/count and rounds up to the end of the line.
p
grep “foo” < sfile1 > sfile2
count
= length($P)
for i < count, for p in P
mkfifo /tmp/named_pipe_I;
split-file -count count -i i sfile1 | rsh machine_of_p[i]
(grep “foo”) > named_pipe_i &
buffered_cat named_pipe_* > sfile2
Since
there is no pfile specified we don’t know
explicitly what machines to use and how many
pieces to split up the file, we use the machines
associated with the files in environment variable
P. The number of entries in P calculates Count.
Also,
we need to bring the stream back together into
a serial file. Directing the output to a named
pipe called named_pipe_i on the controlling
machine does this. These named_pipes are then
concatenated together with a function called
buffered_cat.
Buffered_cat
is a special function that has the same semantics
as cat. The reason to not use cat is that it
would accept input on each of its inputs one
at a time. In the case of these being named
pipes, the array machines would stop processing
until it could write this. Buffered_cat is different
that it buffers each stream as it comes in into
a file so that the input streams never wait.
We believe this is important for performance
because then each remote machine can keep computing.
p
-c ‘sort -m’ ‘sort < pfile
> sfile2’
for i < count
rsh machine_of_pfile[i] (sort < pfile[i])
> named_pipe_i &
sort -m named_pipe_* > sfile2
This
will run a sort on each processor, and then
merge them back together.
p -c ‘sort -m’ ‘sort <
pfile1 > pfile2’
for i < count
rsh machine_of_pfile1[i] (sort < pfile1[i])
> named_pipe_i &
sort -m named_pipe_* > /tmp/stmp
for i < count
split-file -count count -i i sfile1 > pfile2[i]
&
wait
The
second split file is needed because the result
goes back into a pfile. It would be more efficient
if it started splitting right away rather than
writing the result into the serial temp file.
In this case the split-file could have known
that the size was the same as the original.
E.g.
for i < count
rsh machine_of_pfile1[i] (sort < pfile1[i])
> named_pipe_i &
sort -m named_pipe_* | split-stream -count count
-i i -size (size_of sfile1) pfile2
Maybe
there would be a reason to flag for this optimization.
**
p -cs uniq uniq ‘< sfile1 > sfile2’
for i < count
rsh machine_of_pfile[i] (sort < pfile[i])
> named_pipe_i &
buffered_cat named_pipe_* | uniq > sfile2
There
could be a faster combining function than uniq,
but this will work.
The buffered_cat is used here because uniq expects
an input stream, not a list of files.
Using pipe “|” from one P command
to the next
The
“|” command is tricky because the
next function will get a parallel stream input
but not see an explicit “<” in
its arguments. To handle this case, the p function,
which is acting like a compile at this point,
constructs the commands to be run on each processor.
e.g.
pcut ‘< sfile1 | pgrep | psort | puniq
> sfile5’
macroexpands into:
p cut < sfile1 | p grep | p -c ‘sort
-m’ sort | p -cs uniq uniq > sfile5
for i < count, for p in P
make_named_pipe /tmp/named_pipe_I;
split-file -count count -i i sfile1 | rsh machine_of_p[i]
(cut | grep | sort) > named_pipe_i &
sort -m named_pipe_* > /tmp/stmp
for i < count
split-file -count count -i i /tmp/stmp | rsh
machine_of_p[i] (uniq) > named_pipe_i &
buffered_cat named_pipe_* | uniq > sfile5
In
this example the piped commands can be run on
each processor until the streams need to be
combined through a combiner function. This is
then executed, and split up again and then continued
through the functions until it needs to be combined
again. This is the way that p can “compile”
a sequence of piped functions into an efficient
set of calls to the processors.
Implementation
notes:
Might
have to use the -n option to rsh when nothing
is piped into it.