The Distributed Data Broker: A decentralized mechanism for periodic exchange of fields between multiple ensembles of parallel computations


Keith Sklower
Computer Science Department
University of California, Berkeley

Howard Robinson
Computer Science Department
University of California, Berkeley

Carlos R. Mechoso
Deparment of Atmospheric Sciences
University of California, Los Angeles

Leroy A. Drummond
Lawrence Berkeley Laboratory
Joseph Spahr
Deparment of Atmospheric Sciences
University of California, Los Angeles

John D. Farrara
Deparment of Atmospheric Sciences
University of California, Los Angeles

Edmund Mesrobian
Computer Science Department
University of California, Los Angeles

 

1. Introduction


The Distributed Data Broker is a general purpose tool for coupling multiple, possibly heterogenous, parallel models.  It is a library used by all participating elements, one of which serves as a distinguished process during a startup phase preceding the main computation. This "registration broker" process corollates offers to produce quantities with requests to consume them, forwards the the list of intersections to each of the producers, and informs each consumer of how many pieces to expect.  After the initial phase, the registration broker may participate as a regular member of the computation.


Having each producer send directly to each consumer conserves bandwidth, reduces memory requirements, and minimizes the delay that would otherwise occur if a centralized element were to reassemble each of the fields and retransmit them, especially in network topologies where there is more aggregate bandwidth in the system than available to each individual processor, as in the IBM SP2 and CRAY T3E.

The Data Broker is implemented using the C++ language, but provides a modest set of subroutines that simulation codes written in Fortran can call. It operates by means of a relatively simple protocol with a small number of message types, currently transported using PVM. A secondary design goal was to keep the protocol simple enough to allow other programs to interoperate with systems employing the Data Broker by using an independent implementation of the protocol.


The Data Broker was created as a tool as part of an effort to develop a comprehensive Earth System Model, which would incorporate General Circulation and Chemistry models of the Atmosphere and Oceans, an active archiver storing products in tertiary storage and registering metadata in an object relational database, and online visualization tools.


2. Design goals


Due to the central position of the Data Broker in the Earth System Model (ESM), with consequent potential for slowing down the operation of the entire ESM, efficiency has been a paramount concern in the design and implementation of the Data Broker.


The ability to have multiple consumers for a given product each taking delivery at different frequencies was also a requirement at the outset - as, for example, the atmospheric chemistry would want three dimensional wind data at a rate that would be impractical to archive in its entirety.
Another capability planned from the outset was to enable ephermal consumers, such as the visualization tool, that might want to inspect different quantities at different times as the computation unfolded according to the wishes of the operator based on what was discovered during the course of the computation.


We are motivated by the need of ESM scientists for a tool that requires minimal modifications to existing codes, is easy to learn and easy to remember, and permits the transmission of arbitrary numbers of fields among arbitrary numbers of models, and can be dynamically configured at run-time.


3. Protocol Description


There are 3 different roles performed by processes in the Data Broker. First, there is the registration broker which collects and retransmits about which quanties are being offered and requested. Secondly, there is a distinguished class of process called the model head, whose members have special responsibilities during the registration process. All other processes are referred to as computational elements. The process acting as the registration broker may in of itself also be a model head and may also do actual computations.


There are two phases of registration, and after that, the computation proceeds. In the first phase, the registration broker collects from each model head the MODELINFO_MSG, which specifies a coordinate system, a list of quantities to be computed and offered using that coordinate system, and the number of computational elements participating in that model. Each model head also files a META_REQUEST_MSG on behalf of each of its computational elements. When all of the model heads have been heard from, the registration broker sends out an INVENTORY_MSG in reply to each META_REQUEST_MSG. The INVENTORY_MSG message contains an initial SYSTEMINFO_MSG detailing the experiment name, starting and ending times, the protocol version, and the number of models, and then a list of MODELINFO_MSG's that it has received.


Receipt of the SYSTEMINFO_MSG moves each process into the second phase of registration. Each element that produces quantities it wishes to make available sends the registration broker a VAR_SUPPLY_MSG, and each element that wishes to make use of various quantities sends a VAR_REQUEST_MSG. Of course, any process may simultaneously produce some quantities and consume others. The VAR_SUPPLY_MSG contains a list enumerating for each desired quantity, the subfield that the process will produce and the frequency at which it will be produced. The VAR_REQUEST_MSG lists the which subfields at which frequency of which quantities the process wishes to consume. Each process then sends a BARRIER_REQUEST_MSG to the registration broker, marking the end of its requests and allowing synchronization at the end of the second phase.


Once the registration broker has received all offers and requests, it then calculates the intersections, forwards them to relevant suppliers via the VAR_REFER_MSG, and notifies each requestor that each request will be satisfied by a certain number of tiles, via the VAR_REPLY_MSG. After that, the registration broker sends BARRIER_REPLY_MSG's to all the participating elements, terminating the second phase of registration and the computation proceeds.
During the calculation, at the end of every time step, each element producing a quantity examines the list of requests to identify which of its consumers desire the quantity, and transmits a subfield of it using the DATA_MSG, which contains not only the data, but also the description of which variable, which subfield and which timestep.


4. Overview of the Fortran Interface


Appendix A will give precise definitions and argument lists for each routine mentioned in this section.  We'll gloss over some of the detail here to provide context and sequencing information.
The user provides for the spawning of groups of PVM-aware processes. Each group (model) will be computing some quantities (variables) represented by two or three dimensional arrays, that it will make available to other groups on a periodic basis.  We'll call the state of a given variable at a particular timestep a field.  It is assumed that all the variables computed by a given model are interpreted according to a common set of rectilinear coordinates, (i.e. three uni-dimensional arrays of monotonically increasing double precision numbers, but without the requirement that they be uniformly spaced).  Furthermore, it assumed that the units for the coordinates and time steps which are transmitted by this interface are the same for all models; e.g. all radians or all degrees, time measured in hours or seconds or milliseconds from a common starting point.


During a registration period, the processes exchange information about what quantities will be made available, and which processes are willing to offer which subsets of which variables at what frequency.  One process needs to be selected to synchronize the registration process as a whole, which we'll call the registration broker.  This process calls the routine MCLIamTheBroker, providing a character string to identify the system as a whole, a starting time and an ending time.


Each model needs to have a desiginated member (model head) that gives some information to the registration broker concerning that group of processes as a whole.  It is permitted, but not required,  for the registration broker to be one of the model heads.  Within the interface, each model head calls the subroutine   MCLStartMetaRegistration supplying a character string to identify the model, the coordinates for the model, and a list of the TID's of each participating element in that model.


In all of our implementations so far, the registration broker has spawned the rest of the models (or all have been executed as part of the same binary in the case of the T3E), so that the PVM call to determine the parent TID has sufficed to provide the TID of the registration broker.  There is nothing in the protocol that requires this, and we have made some effort to accomodate  ephemeral consumers; so that a very simple call should be added to inform the DDB library the TID for the registration broker.  However, it is beyond the scope of this interface to determine how such late joiners would discover the TID for an unassociated process.


The model head then calls the routine MCLMetaRegister for each variable, supplying a character string for the name of the variable, the number of dimensions, the frequency at which it will be offered, the fortran type of the data, and some other metadata there by reasons of historical intertia.  Each model head then calls MCLEndMetaRegistration.


Any process that will be supplying or consuming data calls MCLStartRegistration. If the process is not a model head, it calls the subroutine directly, which will block the process awaiting the registration broker to identify itself and supply the inventory of available variables.  The process then calls MCLRegisterProduce for each variable it is willing to supply, giving it the name of the variable, and bounds describing the range in each direction of the subset of the variable the process will produce.  Some of the parameters are redudantly provided to MCLMetaRegister, and must be the same.  Deadlock will ensue unless each element makes available (via MCLPut; see below) its subset of the variable at the frequency supplied in MCLMetaRegister. MCLRegisterProduce returns a small integer, a kind of "coat check", which will be supplied as an argument to MCLput.  The process calls MCLRegisterConsume for each variable it wishes to import, obtaining a small integer to be used as an argument to MCLGet; see below.  Here, the interval at which the process wishes to consume the variable may be a multiple of the interval in which it is produced.


If a process wishes to employ a variable in a coordinate system other than that in which it is produced, it uses two routines, LILRegisterCoordinates and LILRequestInterpolatedData.  LILRegisterCoordinatestakes a name and 3 axes, and returns a small integer.  LILRequestInterpolatedData takes that result, integer bounds describing what subset is desired,
and a vector describing in which direction wrapping might occur.  (I.E. one coordinate system is offset from the other by 90 degrees or pi radians, and consequently coordinate comparisons along that axis must be done modulo some value.  One could argue that this parameter belongs to LILRegisterCoordinates).  LILRequestInterpolatedData returns a small integer which is used as an argument to MCLGet; i.e. the same routine used in the non-interpolated case.


The second phase of registration is concluded by calling MCLEndRegistration, which blocks the process awaiting a list of consumers for its products.  It is intended that the process can safely commence computation with no additional external synchronization once MCLEndRegistration returns.


During the actual computation the process calls MCLput to make quantities available for transmission to consumers.


The process supplies the "coat check" returned from MCLRegisterProduce, above, as well as a pointer to to a buffer containing the data and a double precision value representing the time step.  Making the quantity available may not necessarily result in any transmission depending on which processes have registered an interest in that particular timestep.  Similarly, processes call MCLGet to import fields.
 
5. Performance Results


Figure 1.  Memory requirements for centralized and distributed coupling.

Figure 2.Memory requirements with doubled resolution OGCM and increased the number of nodes.


Figure 1 and 2 illustrate comparison results based on the memory requirements for both coupling implementations. In Figure 1, the centralized data brokerage requires almost twice as much memory as the distributed data brokerage because it needs to collect the entire grid from one model in a single node.In the distributed case, each processor has enough information to produce the data needed by consumer processes and communication is realized in distributed manner. In Figure 2, a more drastic scenario is presented, in which the centralized coupling cannot be realized because of the 45Mw memory requested in a single computational node. In this case the distributed case requires less than a third of the memory requested by the centralized approach.


Figure 3. Simplified Timing model of centralized vs. distributed coupling.


Figure 3 compares the execution time between the two coupling approaches, and in this case the AGCM is sending 4 fields to the OGCM, and the requested time by the distributed approach is one third of the centralized.In the reverse communication, the OGCM sends a single field to the AGCM and the requested time is also greatly reduced with the distributed approach.


6. Implementation Description


We'll describe a conceptual framework in which the code for the Data Broker is organized. The entire distributed computation, made up of individual computational proceses deemed Clients, is thought of as a System. The System is comprised of several subcollections, called Models, which compute scientific quanties called Variables.


All the Variables associated with a given Model are interpreted accoording to a common coordinate system, or Grid, made up of three axes or Dimensions. The Data Broker currently supports both two and three dimensional Variables. The array of specific values for a Variable at a given time step is a Field.


Models and Variables are identified by a human readable, but potentially lengthy, ascii string. Since a given Client can belong to more than one Model Any real life computation requiring that a given scientific quantity be made available in more than one coordinate system could do so by having two separately named Variables that were easily recognized to be related in two separately named Models which might have the grid as part of its name, e.g. "Ocean2x6" and "Ocean1x3".


Alternatively, the Data Broker provides a linear interpolation facility so that Clients can request Variables and then convert them internally, using methods associated with an object called a Mesh comprising a specification of the target subset (Tile) of the users Grid, and a reference to the Grid in which it is produced.


Offers by a Client to produce subsets of Variables according to a given Tile, have the same structure as requests to consume Variables, and are called VarRequests in the code. We expect that most computational elements will be producing some Variables while consuming others. For housekeeping purposes, the registration broker will need to retain the entire list of offers and requests, and individual processes will need to maintain subscriber lists for the quantities that they produce. This data structure is called the ClientServer.


The Data Broker library is largely event driven, being activated either by the explicit user requests described in the fortran interface above, by the receipt of messages, or when certain key counters reach full values. One such transition occurs when the registration broker has received all of the MODELINFO_MSG's, and proceeds to issue SYSTEM_INVENTORY_MSGs to each participating registrant. A similar transition occurs in registration broker to close the second phase of registration by receipt of the final BARRIER_REQUEST_MSG. A counter-induced event also occurs in every consuming element as the Data Broker "knows" that each of its requested fields is complete when it has counted the contributing number of tiles. (The number of tiles is conveyed in the VAR_REPLY_MSG).


We have not yet heavily optimized the Data Broker, relying on simple methods, but choosing them carefully. Identification of incoming data message packets are done by full string comparisons of the variable name, searching among linearly linked lists.


We have however, attempted to minimize the number of copies that the data itself must undergo, as the cost of memory-to-memory copies within a processor can take a comparable amount of time to transmission of the data between processors in modern tightly coupled systems, and the significant benefits acheieved by having producers send directly to consumers (in a distributed way) rather than requiring double transmission times and synchronization among all processes in the centralized scheme.


We have future plans (described below) for avoiding even these searches by using a simple indexing scheme.


7. Future Work


A high priority for us is to allow the use of MPI in the Data Broker system, because for some of the large multiprocessor systems, vendor-provided implementations of PVM do not perform as well MPI and seem more prone to bugs.


As noted above, performance and user-friendliness have been our utmost concern. we believe that there are further performance gains to be made. We are less concerned with the performance of the registration phases, but are considering some streamlining of the data transfer phase.


Protocols such as PVM and MPI allow one to obtain a tag associated with a message, which could even be detected and manipulated independently from the rest of the associated data. We propose using a range of tags to identify the individual tiles assocated with various requested quantities, and thus bypass the current linear lookups based on quantity name, and also thus reserve the entire data portion of the message for the numerical quantities, which might even allow for direct deposition of the data in the appropriate place without calling special unpacking routines.


During the development of the Data Broker, when the primary interface was the direct inclusion of PVM calls into the simulation codes, a source of run time errors were inconsistencies between the Data Broker and participating codes, especially when inadvertently runing older versions of the Broker.


This suggested that a formal description of the protocol could be written down and that the calls for marshalling and unmarshalling data into messages could be mechanically generated as has been done for Remote Proceedure Call mechanisms, and for processing the presentation layer in the OSI protocols.


Parenthetically, it seems that message passing mechanisms and RPC are essentially isomorphic, since given an RPC, one can get a (possibly inefficient) message passing mechanism merely by ignoring the returned values from RPC calls, and conversely, with some effort, one could build an RPC on top of pairs of passed messages - one for the request and one for the return.
Going through the exercise of writing down the specification for our protocol (included here as Appendix B.) has been useful as a tool for discussion and guidance, although we've never actually rewritten the data broker to make use of it.


We estimate that 30% of the code of the existing Data Broker could be mechanically generated from formal specifications; and doing this exercise would lessen the burden of future maintenance efforts. However, the anticipated amount of work in debugging such a substantial rewrite of the system for no apparent gain in meeting performance milestones, has persuaded us to postpone this nonetheless desireable effort.


Acknowledgements


This project has been supported by the NASA High Performance Computing and Communication for Earth and Space Sciences (HPCC-ESS) project under CAN 21425/041. The tests were performed at the Department of Energy's National Energy Research Scientific Computing center (NERSC)