The
Distributed Data Broker: A decentralized mechanism for periodic exchange
of fields between multiple ensembles of parallel computations
Keith Sklower
Computer Science Department
University of California, Berkeley
Howard Robinson
Computer Science Department
University of California, Berkeley
Carlos R. Mechoso
Deparment of Atmospheric Sciences
University of California, Los Angeles
Leroy A. Drummond
Lawrence Berkeley Laboratory
Joseph Spahr
Deparment of Atmospheric Sciences
University of California, Los Angeles
John D. Farrara
Deparment of Atmospheric Sciences
University of California, Los Angeles
Edmund Mesrobian
Computer Science Department
University of California, Los Angeles
1.
Introduction
The Distributed Data Broker is a general purpose tool for coupling multiple,
possibly heterogenous, parallel models. It is a library used by
all participating elements, one of which serves as a distinguished process
during a startup phase preceding the main computation. This "registration
broker" process corollates offers to produce quantities with requests
to consume them, forwards the the list of intersections to each of the
producers, and informs each consumer of how many pieces to expect.
After the initial phase, the registration broker may participate as
a regular member of the computation.
Having each producer send directly to each consumer conserves bandwidth,
reduces memory requirements, and minimizes the delay that would otherwise
occur if a centralized element were to reassemble each of the fields
and retransmit them, especially in network topologies where there is
more aggregate bandwidth in the system than available to each individual
processor, as in the IBM SP2 and CRAY T3E.
The Data Broker is implemented using the C++ language, but provides
a modest set of subroutines that simulation codes written in Fortran
can call. It operates by means of a relatively simple protocol with
a small number of message types, currently transported using PVM. A
secondary design goal was to keep the protocol simple enough to allow
other programs to interoperate with systems employing the Data Broker
by using an independent implementation of the protocol.
The Data Broker was created as a tool as part of an effort to develop
a comprehensive Earth System Model, which would incorporate General
Circulation and Chemistry models of the Atmosphere and Oceans, an active
archiver storing products in tertiary storage and registering metadata
in an object relational database, and online visualization tools.
2. Design goals
Due to the central position of the Data Broker in the Earth System Model
(ESM), with consequent potential for slowing down the operation of the
entire ESM, efficiency has been a paramount concern in the design and
implementation of the Data Broker.
The ability to have multiple consumers for a given product each taking
delivery at different frequencies was also a requirement at the outset
- as, for example, the atmospheric chemistry would want three dimensional
wind data at a rate that would be impractical to archive in its entirety.
Another capability planned from the outset was to enable ephermal consumers,
such as the visualization tool, that might want to inspect different
quantities at different times as the computation unfolded according
to the wishes of the operator based on what was discovered during the
course of the computation.
We are motivated by the need of ESM scientists for a tool that requires
minimal modifications to existing codes, is easy to learn and easy to
remember, and permits the transmission of arbitrary numbers of fields
among arbitrary numbers of models, and can be dynamically configured
at run-time.
3. Protocol Description
There are 3 different roles performed by processes in the Data Broker.
First, there is the registration broker which collects and retransmits
about which quanties are being offered and requested. Secondly, there
is a distinguished class of process called the model head, whose members
have special responsibilities during the registration process. All other
processes are referred to as computational elements. The process acting
as the registration broker may in of itself also be a model head and
may also do actual computations.
There are two phases of registration, and after that, the computation
proceeds. In the first phase, the registration broker collects from
each model head the MODELINFO_MSG, which specifies a coordinate system,
a list of quantities to be computed and offered using that coordinate
system, and the number of computational elements participating in that
model. Each model head also files a META_REQUEST_MSG on behalf of each
of its computational elements. When all of the model heads have been
heard from, the registration broker sends out an INVENTORY_MSG in reply
to each META_REQUEST_MSG. The INVENTORY_MSG message contains an initial
SYSTEMINFO_MSG detailing the experiment name, starting and ending times,
the protocol version, and the number of models, and then a list of MODELINFO_MSG's
that it has received.
Receipt of the SYSTEMINFO_MSG moves each process into the second phase
of registration. Each element that produces quantities it wishes to
make available sends the registration broker a VAR_SUPPLY_MSG, and each
element that wishes to make use of various quantities sends a VAR_REQUEST_MSG.
Of course, any process may simultaneously produce some quantities and
consume others. The VAR_SUPPLY_MSG contains a list enumerating for each
desired quantity, the subfield that the process will produce and the
frequency at which it will be produced. The VAR_REQUEST_MSG lists the
which subfields at which frequency of which quantities the process wishes
to consume. Each process then sends a BARRIER_REQUEST_MSG to the registration
broker, marking the end of its requests and allowing synchronization
at the end of the second phase.
Once the registration broker has received all offers and requests, it
then calculates the intersections, forwards them to relevant suppliers
via the VAR_REFER_MSG, and notifies each requestor that each request
will be satisfied by a certain number of tiles, via the VAR_REPLY_MSG.
After that, the registration broker sends BARRIER_REPLY_MSG's to all
the participating elements, terminating the second phase of registration
and the computation proceeds.
During the calculation, at the end of every time step, each element
producing a quantity examines the list of requests to identify which
of its consumers desire the quantity, and transmits a subfield of it
using the DATA_MSG, which contains not only the data, but also the description
of which variable, which subfield and which timestep.
4. Overview of the Fortran Interface
Appendix A will give precise definitions and argument lists for each
routine mentioned in this section. We'll gloss over some of the
detail here to provide context and sequencing information.
The user provides for the spawning of groups of PVM-aware processes.
Each group (model) will be computing some quantities (variables) represented
by two or three dimensional arrays, that it will make available to other
groups on a periodic basis. We'll call the state of a given variable
at a particular timestep a field. It is assumed that all the variables
computed by a given model are interpreted according to a common set
of rectilinear coordinates, (i.e. three uni-dimensional arrays of monotonically
increasing double precision numbers, but without the requirement that
they be uniformly spaced). Furthermore, it assumed that the units
for the coordinates and time steps which are transmitted by this interface
are the same for all models; e.g. all radians or all degrees, time measured
in hours or seconds or milliseconds from a common starting point.
During a registration period, the processes exchange information about
what quantities will be made available, and which processes are willing
to offer which subsets of which variables at what frequency. One
process needs to be selected to synchronize the registration process
as a whole, which we'll call the registration broker. This process
calls the routine MCLIamTheBroker, providing a character string to identify
the system as a whole, a starting time and an ending time.
Each model needs to have a desiginated member (model head) that gives
some information to the registration broker concerning that group of
processes as a whole. It is permitted, but not required,
for the registration broker to be one of the model heads. Within
the interface, each model head calls the subroutine MCLStartMetaRegistration
supplying a character string to identify the model, the coordinates
for the model, and a list of the TID's of each participating element
in that model.
In all of our implementations so far, the registration broker has spawned
the rest of the models (or all have been executed as part of the same
binary in the case of the T3E), so that the PVM call to determine the
parent TID has sufficed to provide the TID of the registration broker.
There is nothing in the protocol that requires this, and we have made
some effort to accomodate ephemeral consumers; so that a very
simple call should be added to inform the DDB library the TID for the
registration broker. However, it is beyond the scope of this interface
to determine how such late joiners would discover the TID for an unassociated
process.
The model head then calls the routine MCLMetaRegister for each variable,
supplying a character string for the name of the variable, the number
of dimensions, the frequency at which it will be offered, the fortran
type of the data, and some other metadata there by reasons of historical
intertia. Each model head then calls MCLEndMetaRegistration.
Any process that will be supplying or consuming data calls MCLStartRegistration.
If the process is not a model head, it calls the subroutine directly,
which will block the process awaiting the registration broker to identify
itself and supply the inventory of available variables. The process
then calls MCLRegisterProduce for each variable it is willing to supply,
giving it the name of the variable, and bounds describing the range
in each direction of the subset of the variable the process will produce.
Some of the parameters are redudantly provided to MCLMetaRegister, and
must be the same. Deadlock will ensue unless each element makes
available (via MCLPut; see below) its subset of the variable at the
frequency supplied in MCLMetaRegister. MCLRegisterProduce returns a
small integer, a kind of "coat check", which will be supplied
as an argument to MCLput. The process calls MCLRegisterConsume
for each variable it wishes to import, obtaining a small integer to
be used as an argument to MCLGet; see below. Here, the interval
at which the process wishes to consume the variable may be a multiple
of the interval in which it is produced.
If a process wishes to employ a variable in a coordinate system other
than that in which it is produced, it uses two routines, LILRegisterCoordinates
and LILRequestInterpolatedData. LILRegisterCoordinatestakes a
name and 3 axes, and returns a small integer. LILRequestInterpolatedData
takes that result, integer bounds describing what subset is desired,
and a vector describing in which direction wrapping might occur.
(I.E. one coordinate system is offset from the other by 90 degrees or
pi radians, and consequently coordinate comparisons along that axis
must be done modulo some value. One could argue that this parameter
belongs to LILRegisterCoordinates). LILRequestInterpolatedData
returns a small integer which is used as an argument to MCLGet; i.e.
the same routine used in the non-interpolated case.
The second phase of registration is concluded by calling MCLEndRegistration,
which blocks the process awaiting a list of consumers for its products.
It is intended that the process can safely commence computation with
no additional external synchronization once MCLEndRegistration returns.
During the actual computation the process calls MCLput to make quantities
available for transmission to consumers.
The process supplies the "coat check" returned from MCLRegisterProduce,
above, as well as a pointer to to a buffer containing the data and a
double precision value representing the time step. Making the
quantity available may not necessarily result in any transmission depending
on which processes have registered an interest in that particular timestep.
Similarly, processes call MCLGet to import fields.
5. Performance Results

Figure 1. Memory requirements for centralized and distributed
coupling.

Figure
2.Memory requirements with doubled resolution OGCM and increased the
number of nodes.
Figure 1 and 2 illustrate comparison results based on the memory requirements
for both coupling implementations. In Figure 1, the centralized data
brokerage requires almost twice as much memory as the distributed data
brokerage because it needs to collect the entire grid from one model
in a single node.In the distributed case, each processor has enough
information to produce the data needed by consumer processes and communication
is realized in distributed manner. In Figure 2, a more drastic scenario
is presented, in which the centralized coupling cannot be realized because
of the 45Mw memory requested in a single computational node. In this
case the distributed case requires less than a third of the memory requested
by the centralized approach.

Figure 3. Simplified Timing model of centralized vs. distributed coupling.
Figure 3 compares the execution time between the two coupling approaches,
and in this case the AGCM is sending 4 fields to the OGCM, and the requested
time by the distributed approach is one third of the centralized.In
the reverse communication, the OGCM sends a single field to the AGCM
and the requested time is also greatly reduced with the distributed
approach.
6. Implementation Description
We'll describe a conceptual framework in which the code for the Data
Broker is organized. The entire distributed computation, made up of
individual computational proceses deemed Clients, is thought of as a
System. The System is comprised of several subcollections, called Models,
which compute scientific quanties called Variables.
All the Variables associated with a given Model are interpreted accoording
to a common coordinate system, or Grid, made up of three axes or Dimensions.
The Data Broker currently supports both two and three dimensional Variables.
The array of specific values for a Variable at a given time step is
a Field.
Models and Variables are identified by a human readable, but potentially
lengthy, ascii string. Since a given Client can belong to more than
one Model Any real life computation requiring that a given scientific
quantity be made available in more than one coordinate system could
do so by having two separately named Variables that were easily recognized
to be related in two separately named Models which might have the grid
as part of its name, e.g. "Ocean2x6" and "Ocean1x3".
Alternatively, the Data Broker provides a linear interpolation facility
so that Clients can request Variables and then convert them internally,
using methods associated with an object called a Mesh comprising a specification
of the target subset (Tile) of the users Grid, and a reference to the
Grid in which it is produced.
Offers by a Client to produce subsets of Variables according to a given
Tile, have the same structure as requests to consume Variables, and
are called VarRequests in the code. We expect that most computational
elements will be producing some Variables while consuming others. For
housekeeping purposes, the registration broker will need to retain the
entire list of offers and requests, and individual processes will need
to maintain subscriber lists for the quantities that they produce. This
data structure is called the ClientServer.
The Data Broker library is largely event driven, being activated either
by the explicit user requests described in the fortran interface above,
by the receipt of messages, or when certain key counters reach full
values. One such transition occurs when the registration broker has
received all of the MODELINFO_MSG's, and proceeds to issue SYSTEM_INVENTORY_MSGs
to each participating registrant. A similar transition occurs in registration
broker to close the second phase of registration by receipt of the final
BARRIER_REQUEST_MSG. A counter-induced event also occurs in every consuming
element as the Data Broker "knows" that each of its requested
fields is complete when it has counted the contributing number of tiles.
(The number of tiles is conveyed in the VAR_REPLY_MSG).
We have not yet heavily optimized the Data Broker, relying on simple
methods, but choosing them carefully. Identification of incoming data
message packets are done by full string comparisons of the variable
name, searching among linearly linked lists.
We have however, attempted to minimize the number of copies that the
data itself must undergo, as the cost of memory-to-memory copies within
a processor can take a comparable amount of time to transmission of
the data between processors in modern tightly coupled systems, and the
significant benefits acheieved by having producers send directly to
consumers (in a distributed way) rather than requiring double transmission
times and synchronization among all processes in the centralized scheme.
We have future plans (described below) for avoiding even these searches
by using a simple indexing scheme.
7. Future Work
A high priority for us is to allow the use of MPI in the Data Broker
system, because for some of the large multiprocessor systems, vendor-provided
implementations of PVM do not perform as well MPI and seem more prone
to bugs.
As noted above, performance and user-friendliness have been our utmost
concern. we believe that there are further performance gains to be made.
We are less concerned with the performance of the registration phases,
but are considering some streamlining of the data transfer phase.
Protocols such as PVM and MPI allow one to obtain a tag associated with
a message, which could even be detected and manipulated independently
from the rest of the associated data. We propose using a range of tags
to identify the individual tiles assocated with various requested quantities,
and thus bypass the current linear lookups based on quantity name, and
also thus reserve the entire data portion of the message for the numerical
quantities, which might even allow for direct deposition of the data
in the appropriate place without calling special unpacking routines.
During the development of the Data Broker, when the primary interface
was the direct inclusion of PVM calls into the simulation codes, a source
of run time errors were inconsistencies between the Data Broker and
participating codes, especially when inadvertently runing older versions
of the Broker.
This suggested that a formal description of the protocol could be written
down and that the calls for marshalling and unmarshalling data into
messages could be mechanically generated as has been done for Remote
Proceedure Call mechanisms, and for processing the presentation layer
in the OSI protocols.
Parenthetically, it seems that message passing mechanisms and RPC are
essentially isomorphic, since given an RPC, one can get a (possibly
inefficient) message passing mechanism merely by ignoring the returned
values from RPC calls, and conversely, with some effort, one could build
an RPC on top of pairs of passed messages - one for the request and
one for the return.
Going through the exercise of writing down the specification for our
protocol (included here as Appendix B.) has been useful as a tool for
discussion and guidance, although we've never actually rewritten the
data broker to make use of it.
We estimate that 30% of the code of the existing Data Broker could be
mechanically generated from formal specifications; and doing this exercise
would lessen the burden of future maintenance efforts. However, the
anticipated amount of work in debugging such a substantial rewrite of
the system for no apparent gain in meeting performance milestones, has
persuaded us to postpone this nonetheless desireable effort.
Acknowledgements
This project has been supported by the NASA High Performance Computing
and Communication for Earth and Space Sciences (HPCC-ESS) project under
CAN 21425/041. The tests were performed at the Department of Energy's
National Energy Research Scientific Computing center (NERSC)