moab
|
00001 00002 #undef DEBUG 00003 #undef TIME_DEBUG 00004 00005 #include <stdio.h> 00006 #include <stdarg.h> 00007 00008 #include <stdio.h> 00009 #include <time.h> 00010 00011 #ifndef HDF5_FILE 00012 # error Attempt to compile WriteHDF5Parallel with HDF5 support disabled 00013 #endif 00014 00015 #include <stdlib.h> 00016 #include <string.h> 00017 00018 #include <vector> 00019 #include <set> 00020 #include <map> 00021 #include <utility> 00022 #include <iostream> 00023 #include <sstream> 00024 00025 #include "WriteHDF5Parallel.hpp" 00026 00027 #include <H5Tpublic.h> 00028 #include <H5Ppublic.h> 00029 #include <H5FDmpi.h> 00030 #include <H5FDmpio.h> 00031 00032 #include "mhdf.h" 00033 00034 #include "moab/Interface.hpp" 00035 #include "Internals.hpp" 00036 #include "MBTagConventions.hpp" 00037 #include "MBParallelConventions.h" 00038 #include "moab/ParallelComm.hpp" 00039 #include "moab/CN.hpp" 00040 #include "moab/Range.hpp" 00041 00042 #include "IODebugTrack.hpp" 00043 #include "moab/FileOptions.hpp" 00044 00045 namespace { 00046 template<bool Condition> struct STATIC_ASSERTION; 00047 template<> struct STATIC_ASSERTION<true> {}; 00048 } 00049 00050 #define PP_CAT_(a,b) a ## b 00051 #define PP_CAT(a,b) PP_CAT_(a,b) 00052 #define STATIC_ASSERT(Condition) \ 00053 enum { PP_CAT(dummy, __LINE__) = sizeof(::STATIC_ASSERTION<(bool)(Condition)>) } 00054 00055 namespace moab { 00056 00057 // Need an MPI type that we can put handles in 00058 STATIC_ASSERT(sizeof(unsigned long) >= sizeof(EntityHandle)); 00059 00060 // Need an MPI type that we can put file IDs in 00061 STATIC_ASSERT(sizeof(unsigned long) >= sizeof(id_t)); 00062 00063 // This function doesn't do anything useful. It's just a nice 00064 // place to set a break point to determine why the reader fails. 00065 static inline ErrorCode error( ErrorCode rval ) 00066 { return rval; } 00067 00068 const char* mpi_err_str( int errorcode ) { 00069 static char buffer[2048]; 00070 int len = sizeof(buffer); 00071 MPI_Error_string( errorcode, buffer, &len ); 00072 buffer[std::min((size_t)len,sizeof(buffer)-1)] = '\0'; 00073 return buffer; 00074 } 00075 00076 #define MPI_FAILURE_MSG(A) "MPI Failure at " __FILE__ ":%d : (Code %d) %s\n", \ 00077 __LINE__, (int)(A), mpi_err_str((A)) 00078 00079 #define CHECK_MPI(A) do { if (MPI_SUCCESS != (A)) { \ 00080 writeUtil->report_error( MPI_FAILURE_MSG((A)) ); \ 00081 dbgOut.printf(1, MPI_FAILURE_MSG((A)) ); \ 00082 return error(MB_FAILURE); } } while(false) 00083 00084 #define MB_FAILURE_MSG(A) "MOAB_Failure at " __FILE__ ":%d : %s (%d)\n", \ 00085 __LINE__, ErrorCodeStr[(A)], (int)(A) 00086 00087 #define CHECK_MB(A) do { if (MB_SUCCESS != (A)) { \ 00088 writeUtil->report_error( MB_FAILURE_MSG(A) ); \ 00089 dbgOut.printf(1, MB_FAILURE_MSG((A)) ); \ 00090 return error(A); } } while(false) 00091 00092 #define HDF_FAILURE_MSG(A) "MHDF Failure at " __FILE__ ":%d : %s\n", \ 00093 __LINE__, mhdf_message(&(A)) 00094 00095 #define CHECK_HDF(A) do { if (mhdf_isError(&(A))) { \ 00096 writeUtil->report_error( HDF_FAILURE_MSG(A) ); \ 00097 dbgOut.printf(1, HDF_FAILURE_MSG((A)) ); \ 00098 return error(MB_FAILURE); } } while(false) 00099 00100 #define CHECK_HDFN(A) do { if (mhdf_isError(&(A))) { \ 00101 file->write_util()->report_error( HDF_FAILURE_MSG(A) ); \ 00102 return error(MB_FAILURE); } } while(false) 00103 00104 00105 #ifdef VALGRIND 00106 # include <valgrind/memcheck.h> 00107 #else 00108 # ifndef VALGRIND_CHECK_MEM_IS_DEFINED 00109 # define VALGRIND_CHECK_MEM_IS_DEFINED(a,b) 00110 # endif 00111 # ifndef VALGRIND_CHECK_MEM_IS_ADDRESSABLE 00112 # define VALGRIND_CHECK_MEM_IS_ADDRESSABLE(a, b) 00113 # endif 00114 # ifndef VALGRIND_MAKE_MEM_UNDEFINED 00115 # define VALGRIND_MAKE_MEM_UNDEFINED(a, b) 00116 # endif 00117 #endif 00118 00119 template <typename T> inline 00120 void VALGRIND_MAKE_VEC_UNDEFINED( std::vector<T>& v ) { 00121 if (v.size()) {} 00122 VALGRIND_MAKE_MEM_UNDEFINED( &v[0], v.size() * sizeof(T) ); 00123 } 00124 00125 00126 #ifdef DEBUG 00127 # define START_SERIAL \ 00128 for (unsigned _x = 0; _x < myPcomm->proc_config().proc_size(); ++_x) {\ 00129 MPI_Barrier( myPcomm->proc_config().proc_comm() ); \ 00130 if (_x != myPcomm->proc_config().proc_rank()) continue 00131 # define END_SERIAL \ 00132 } \ 00133 MPI_Barrier( myPcomm->proc_config().proc_comm() ) 00134 #else 00135 # define START_SERIAL 00136 # define END_SERIAL 00137 #endif 00138 00139 #ifdef NDEBUG 00140 # undef assert 00141 # define assert 00142 #else 00143 # undef assert 00144 # define assert(A) if (!(A)) do_assert(__FILE__, __LINE__, #A) 00145 static void do_assert( const char* file, int line, const char* condstr ) 00146 { 00147 int rank; 00148 MPI_Comm_rank( MPI_COMM_WORLD, &rank ); 00149 fprintf( stderr, "[%d] Assert(%s) failed at %s:%d\n", rank, condstr, file, line ); 00150 fflush( stderr ); 00151 abort(); 00152 } 00153 #endif 00154 00155 class CpuTimer { 00156 private: 00157 double atBirth, atLast; 00158 public: 00159 CpuTimer() : atBirth(MPI_Wtime()), atLast(MPI_Wtime()) {} 00160 double since_birth() { return (atLast = MPI_Wtime()) - atBirth; }; 00161 double elapsed() { double tmp = atLast; return (atLast = MPI_Wtime()) - tmp; } 00162 }; 00163 00164 static int my_Gatherv( void* sendbuf, 00165 int sendcount, 00166 MPI_Datatype sendtype, 00167 std::vector<unsigned char>& recvbuf, 00168 std::vector<int>& recvcounts, 00169 int root, 00170 MPI_Comm comm ) 00171 { 00172 int nproc, rank, bytes, err; 00173 MPI_Comm_size( comm, &nproc ); 00174 MPI_Comm_rank( comm, &rank ); 00175 MPI_Type_size( sendtype, &bytes ); 00176 00177 recvcounts.resize( rank == root ? nproc : 0 ); 00178 err = MPI_Gather( &sendcount, 1, MPI_INT, &recvcounts[0], 1, MPI_INT, root, comm ); 00179 if (MPI_SUCCESS != err) return err; 00180 00181 00182 std::vector<int> disp(recvcounts.size()); 00183 if (root == rank) { 00184 disp[0] = 0; 00185 for (int i = 1; i < nproc; ++i) 00186 disp[i] = disp[i-1] + recvcounts[i-1]; 00187 recvbuf.resize( bytes * (disp.back() + recvcounts.back()) ); 00188 } 00189 00190 return MPI_Gatherv( sendbuf, sendcount, sendtype, 00191 &recvbuf[0], &recvcounts[0], &disp[0], 00192 sendtype, root, comm ); 00193 } 00194 00195 static void print_type_sets( Interface* iFace, DebugOutput* str, Range& sets ) 00196 { 00197 const unsigned VB = 2; 00198 if (str->get_verbosity() < VB) 00199 return; 00200 00201 Tag gid, did, bid, sid, nid; 00202 iFace->tag_get_handle( GLOBAL_ID_TAG_NAME, 1, MB_TYPE_INTEGER, gid ); 00203 iFace->tag_get_handle( GEOM_DIMENSION_TAG_NAME, 1, MB_TYPE_INTEGER, did ); 00204 iFace->tag_get_handle( MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, bid ); 00205 iFace->tag_get_handle( DIRICHLET_SET_TAG_NAME, 1, MB_TYPE_INTEGER, nid ); 00206 iFace->tag_get_handle( NEUMANN_SET_TAG_NAME, 1, MB_TYPE_INTEGER, sid ); 00207 Range typesets[10]; 00208 const char* typenames[] = {"Block ", "Sideset ", "NodeSet", "Vertex", "Curve", "Surface", "Volume", "Body", "Other"}; 00209 for (Range::iterator riter = sets.begin(); riter != sets.end(); ++riter) 00210 { 00211 unsigned dim, id ; //, oldsize; 00212 if (MB_SUCCESS == iFace->tag_get_data(bid, &*riter, 1, &id)) 00213 dim = 0; 00214 else if (MB_SUCCESS == iFace->tag_get_data(sid, &*riter, 1, &id)) 00215 dim = 1; 00216 else if (MB_SUCCESS == iFace->tag_get_data(nid, &*riter, 1, &id)) 00217 dim = 2; 00218 else if (MB_SUCCESS == iFace->tag_get_data(did, &*riter, 1, &dim)) { 00219 id = 0; 00220 iFace->tag_get_data(gid, &*riter, 1, &id); 00221 dim += 3; 00222 } 00223 else { 00224 id = *riter; 00225 dim = 9; 00226 } 00227 00228 //oldsize = typesets[dim].size(); 00229 typesets[dim].insert( id ); 00230 // assert( typesets[dim].size() - oldsize == 1 ); 00231 } 00232 for (int ii = 0; ii < 9; ++ii) 00233 { 00234 char tmp[64]; 00235 sprintf(tmp,"%s (%lu) ",typenames[ii],(unsigned long)typesets[ii].size()); 00236 str->print( VB, tmp, typesets[ii] ); 00237 } 00238 str->printf(VB,"Total: %lu\n", (unsigned long)sets.size()); 00239 } 00240 00241 #define debug_barrier() debug_barrier_line(__LINE__) 00242 00243 void WriteHDF5Parallel::debug_barrier_line(int lineno) 00244 { 00245 const unsigned threshold = 2; 00246 static unsigned long count = 0; 00247 if (dbgOut.get_verbosity() >= threshold && myPcomm) { 00248 dbgOut.printf( threshold, "*********** Debug Barrier %lu (@%d)***********\n", ++count, lineno); 00249 MPI_Barrier( myPcomm->proc_config().proc_comm() ); 00250 } 00251 } 00252 00253 WriterIface* WriteHDF5Parallel::factory( Interface* iface ) 00254 { return new WriteHDF5Parallel( iface ); } 00255 WriteHDF5Parallel::WriteHDF5Parallel( Interface* iface) 00256 : WriteHDF5(iface), myPcomm(NULL), pcommAllocated(false), hslabOp(H5S_SELECT_OR) 00257 { 00258 } 00259 00260 WriteHDF5Parallel::~WriteHDF5Parallel() 00261 { 00262 if (pcommAllocated && myPcomm) 00263 delete myPcomm; 00264 } 00265 00266 // The parent WriteHDF5 class has ExportSet structs that are 00267 // populated with the entities to be written, grouped by type 00268 // (and for elements, connectivity length). This function: 00269 // o determines which entities are to be written by a remote processor 00270 // o removes those entities from the ExportSet structs in WriteMesh 00271 // o passes them back in a Range 00272 ErrorCode WriteHDF5Parallel::gather_interface_meshes(Range& nonowned) 00273 { 00274 ErrorCode result; 00275 00276 //START_SERIAL; 00277 dbgOut.print( 3, "Pre-interface mesh:\n"); 00278 dbgOut.print( 3, nodeSet.range ); 00279 for (std::list<ExportSet>::iterator eiter = exportList.begin(); 00280 eiter != exportList.end(); ++eiter ) 00281 dbgOut.print( 3, eiter->range ); 00282 dbgOut.print( 3, setSet.range ); 00283 00284 // Move handles of non-owned entities from lists of entities 00285 // that this processor will write to the 'nonowned' list. 00286 00287 nonowned.clear(); 00288 result = myPcomm->filter_pstatus( nodeSet.range, PSTATUS_NOT_OWNED, PSTATUS_AND, -1, &nonowned); 00289 if (MB_SUCCESS != result) 00290 return error(result); 00291 nodeSet.range = subtract( nodeSet.range, nonowned ); 00292 00293 for (std::list<ExportSet>::iterator eiter = exportList.begin(); 00294 eiter != exportList.end(); ++eiter ) { 00295 Range tmpset; 00296 result = myPcomm->filter_pstatus( eiter->range, PSTATUS_NOT_OWNED, PSTATUS_AND, -1, &tmpset); 00297 if (MB_SUCCESS != result) 00298 return error(result); 00299 eiter->range = subtract( eiter->range, tmpset ); 00300 nonowned.merge(tmpset); 00301 } 00302 00303 dbgOut.print( 3, "Post-interface mesh:\n"); 00304 dbgOut.print( 3, nodeSet.range ); 00305 for (std::list<ExportSet>::iterator eiter = exportList.begin(); 00306 eiter != exportList.end(); ++eiter ) 00307 dbgOut.print( 3, eiter->range ); 00308 dbgOut.print( 3, setSet.range ); 00309 00310 //END_SERIAL; 00311 00312 return MB_SUCCESS; 00313 } 00314 00315 ErrorCode WriteHDF5Parallel::parallel_create_file( const char* filename, 00316 bool overwrite, 00317 const std::vector<std::string>& qa_records, 00318 const FileOptions& opts, 00319 const Tag* user_tag_list, 00320 int user_tag_count, 00321 int dimension, 00322 double* times) 00323 { 00324 ErrorCode rval; 00325 mhdf_Status status; 00326 00327 int pcomm_no = 0; 00328 opts.get_int_option("PARALLEL_COMM", pcomm_no); 00329 00330 myPcomm = ParallelComm::get_pcomm(iFace, pcomm_no); 00331 if (0 == myPcomm) { 00332 myPcomm = new ParallelComm(iFace, MPI_COMM_WORLD); 00333 pcommAllocated = true; 00334 } 00335 00336 MPI_Info info = MPI_INFO_NULL; 00337 std::string cb_size; 00338 rval = opts.get_str_option("CB_BUFFER_SIZE", cb_size); 00339 if (MB_SUCCESS == rval) { 00340 MPI_Info_create (&info); 00341 MPI_Info_set (info, const_cast<char*>("cb_buffer_size"), const_cast<char*>(cb_size.c_str())); 00342 } 00343 00344 dbgOut.set_rank( myPcomm->proc_config().proc_rank() ); 00345 dbgOut.limit_output_to_first_N_procs( 32 ); 00346 Range nonlocal; 00347 debug_barrier(); 00348 dbgOut.tprint(1,"Gathering interface meshes\n"); 00349 rval = gather_interface_meshes( nonlocal ); 00350 if (MB_SUCCESS != rval) return error(rval); 00351 00352 /**************** get tag names for sets likely to be shared ***********/ 00353 //debug_barrier(); 00354 //dbgOut.tprint(1,"Getting shared entity sets\n"); 00355 //rval = get_sharedset_tags(); 00356 //if (MB_SUCCESS != rval) return error(rval); 00357 00358 00359 /**************** Create actual file and write meta info ***************/ 00360 00361 debug_barrier(); 00362 if (myPcomm->proc_config().proc_rank() == 0) 00363 { 00364 dbgOut.tprintf(1,"Creating file: %s\n",filename); 00365 00366 // create the file 00367 const char* type_names[MBMAXTYPE]; 00368 memset( type_names, 0, MBMAXTYPE * sizeof(char*) ); 00369 for (EntityType i = MBEDGE; i < MBENTITYSET; ++i) 00370 type_names[i] = CN::EntityTypeName( i ); 00371 00372 dbgOut.tprint(1,"call mhdf_createFile\n"); 00373 filePtr = mhdf_createFile( filename, overwrite, type_names, MBMAXTYPE, id_type, &status ); 00374 if (!filePtr) 00375 { 00376 writeUtil->report_error( "%s\n", mhdf_message( &status ) ); 00377 return error(MB_FAILURE); 00378 } 00379 00380 dbgOut.tprint(1,"call write_qa\n"); 00381 rval = write_qa( qa_records ); 00382 if (MB_SUCCESS != rval) return error(rval); 00383 } 00384 00385 00386 /**************** Create node coordinate table ***************/ 00387 CpuTimer timer; 00388 debug_barrier(); 00389 dbgOut.tprint(1,"creating node table\n"); 00390 topState.start("creating node table"); 00391 rval = create_node_table( dimension ); 00392 topState.end(rval); 00393 if (MB_SUCCESS != rval) return error(rval); 00394 if (times) times[CREATE_NODE_TIME] = timer.elapsed(); 00395 00396 /**************** Create element tables ***************/ 00397 00398 debug_barrier(); 00399 dbgOut.tprint(1,"negotiating element types\n"); 00400 topState.start("negotiating element types"); 00401 rval = negotiate_type_list(); 00402 topState.end(rval); 00403 if (MB_SUCCESS != rval) return error(rval); 00404 if (times) times[NEGOTIATE_TYPES_TIME] = timer.elapsed(); 00405 dbgOut.tprint(1,"creating element table\n"); 00406 topState.start("creating element tables"); 00407 rval = create_element_tables(); 00408 topState.end(rval); 00409 if (MB_SUCCESS != rval) return error(rval); 00410 if (times) times[CREATE_ELEM_TIME] = timer.elapsed(); 00411 00412 00413 /*************** Exchange file IDs *****************/ 00414 00415 debug_barrier(); 00416 dbgOut.tprint(1,"communicating file ids\n"); 00417 topState.start("communicating file ids"); 00418 rval = exchange_file_ids( nonlocal ); 00419 topState.end(rval); 00420 if (MB_SUCCESS != rval) return error(rval); 00421 if (times) times[FILEID_EXCHANGE_TIME] = timer.elapsed(); 00422 00423 00424 /**************** Create adjacency tables *********************/ 00425 00426 debug_barrier(); 00427 dbgOut.tprint(1,"creating adjacency table\n"); 00428 topState.start("creating adjacency tables"); 00429 rval = create_adjacency_tables(); 00430 topState.end(rval); 00431 if (MB_SUCCESS != rval) return error(rval); 00432 if (times) times[CREATE_ADJ_TIME] = timer.elapsed(); 00433 00434 /**************** Create meshset tables *********************/ 00435 00436 debug_barrier(); 00437 dbgOut.tprint(1,"creating meshset table\n"); 00438 topState.start("creating meshset tables"); 00439 rval = create_meshset_tables(times); 00440 topState.end(rval); 00441 if (MB_SUCCESS != rval) return error(rval); 00442 if (times) times[CREATE_SET_TIME] = timer.elapsed(); 00443 00444 /**************** Create tag data *********************/ 00445 00446 debug_barrier(); 00447 dbgOut.tprint(1,"creating tag tables\n"); 00448 topState.start("creating tag tables"); 00449 rval = gather_tags( user_tag_list, user_tag_count ); 00450 if (MB_SUCCESS != rval) return error(rval); 00451 rval = create_tag_tables(); 00452 topState.end(rval); 00453 if (MB_SUCCESS != rval) return error(rval); 00454 if (times) times[CREATE_TAG_TIME] = timer.elapsed(); 00455 00456 /************** Close serial file and reopen parallel *****************/ 00457 00458 if (0 == myPcomm->proc_config().proc_rank()) 00459 { 00460 mhdf_closeFile( filePtr, &status ); 00461 } 00462 00463 MPI_Barrier( myPcomm->proc_config().proc_comm() ); 00464 dbgOut.tprint(1,"(re)opening file in parallel mode\n"); 00465 unsigned long junk; 00466 hid_t hdf_opt = H5Pcreate( H5P_FILE_ACCESS ); 00467 H5Pset_fapl_mpio( hdf_opt, myPcomm->proc_config().proc_comm(), info ); 00468 filePtr = mhdf_openFileWithOpt( filename, 1, &junk, id_type, hdf_opt, &status ); 00469 H5Pclose( hdf_opt ); 00470 if (!filePtr) 00471 { 00472 writeUtil->report_error( "%s\n", mhdf_message( &status ) ); 00473 return error(MB_FAILURE); 00474 } 00475 00476 if (collectiveIO) { 00477 dbgOut.print(1,"USING COLLECTIVE IO\n"); 00478 writeProp = H5Pcreate( H5P_DATASET_XFER ); 00479 H5Pset_dxpl_mpio( writeProp, H5FD_MPIO_COLLECTIVE ); 00480 } 00481 00482 /* Test if we can use H5S_APPEND when selecting hyperslabs */ 00483 if (MB_SUCCESS != opts.get_null_option("HYPERSLAB_OR") && 00484 (MB_SUCCESS == opts.get_null_option( "HYPERSLAB_APPEND" ) 00485 || HDF5_can_append_hyperslabs())) { 00486 dbgOut.print(1,"HDF5 library supports H5Sselect_hyperlsab with H5S_SELECT_APPEND\n"); 00487 hslabOp = H5S_SELECT_APPEND; 00488 } 00489 00490 dbgOut.tprint(1,"Exiting parallel_create_file\n"); 00491 return MB_SUCCESS; 00492 } 00493 00494 00495 class TagNameCompare { 00496 Interface* iFace; 00497 std::string name1, name2; 00498 public: 00499 TagNameCompare( Interface* iface ) : iFace(iface) {} 00500 bool operator() (const WriteHDF5::TagDesc& t1, 00501 const WriteHDF5::TagDesc& t2); 00502 }; 00503 bool TagNameCompare::operator() (const WriteHDF5::TagDesc& t1, 00504 const WriteHDF5::TagDesc& t2) 00505 { 00506 iFace->tag_get_name( t1.tag_id, name1 ); 00507 iFace->tag_get_name( t2.tag_id, name2 ); 00508 return name1 < name2; 00509 } 00510 00511 struct serial_tag_data { 00512 TagType storage; 00513 DataType type; 00514 int size; 00515 int name_len; 00516 int def_val_len; 00517 char name[sizeof(unsigned long)]; 00518 00519 static size_t pad( size_t len ) { 00520 if (len % sizeof(unsigned long)) 00521 return len + sizeof(unsigned long) - len % sizeof(unsigned long); 00522 else 00523 return len; 00524 } 00525 00526 static size_t def_val_bytes( int def_val_len, DataType type ) { 00527 switch (type) { 00528 case MB_TYPE_BIT: return def_val_len ? 1 : 0; break; 00529 case MB_TYPE_OPAQUE: return def_val_len; break; 00530 case MB_TYPE_INTEGER: return def_val_len*sizeof(int); break; 00531 case MB_TYPE_DOUBLE: return def_val_len*sizeof(double); break; 00532 case MB_TYPE_HANDLE: return def_val_len*sizeof(EntityHandle);break; 00533 } 00534 return 0; 00535 } 00536 00537 static size_t len( int name_len, int def_val_len, DataType type ) { 00538 return sizeof(serial_tag_data) + pad(name_len + def_val_bytes(def_val_len,type)) - sizeof(unsigned long); 00539 } 00540 size_t len() const { return len( name_len, def_val_len, type ); } 00541 void* default_value() { return def_val_len ? name + name_len : 0; } 00542 const void* default_value() const { return const_cast<serial_tag_data*>(this)->default_value(); } 00543 void set_default_value( const void* val ) { memcpy( default_value(), val, def_val_bytes( def_val_len, type ) ); } 00544 }; 00545 00546 ErrorCode WriteHDF5Parallel::append_serial_tag_data( 00547 std::vector<unsigned char>& buffer, 00548 const WriteHDF5::TagDesc& tag ) 00549 { 00550 ErrorCode rval; 00551 00552 std::string name; 00553 rval = iFace->tag_get_name( tag.tag_id, name ); 00554 if (MB_SUCCESS != rval) 00555 return error(rval); 00556 00557 // get name length, including space for null char 00558 size_t name_len = name.size() + 1; 00559 if (name_len == 1) return MB_SUCCESS; // skip tags with no name 00560 00561 DataType data_type; 00562 rval = iFace->tag_get_data_type( tag.tag_id, data_type ); 00563 if (MB_SUCCESS != rval) return error(rval); 00564 00565 // get default value 00566 int def_val_len; 00567 const void* def_val; 00568 if (MB_SUCCESS != iFace->tag_get_default_value( tag.tag_id, def_val, def_val_len )) { 00569 def_val_len = 0; 00570 def_val = 0; 00571 } 00572 00573 // allocate struct within buffer 00574 size_t init_size = buffer.size(); 00575 buffer.resize( init_size + serial_tag_data::len(name_len, def_val_len, data_type) ); 00576 serial_tag_data* ptr = reinterpret_cast<serial_tag_data*>(&buffer[init_size]); 00577 00578 // populate struct 00579 rval = iFace->tag_get_type( tag.tag_id, ptr->storage ); 00580 if (MB_SUCCESS != rval) return error(rval); 00581 ptr->type = data_type; 00582 rval = iFace->tag_get_length( tag.tag_id, ptr->size ); 00583 if (MB_VARIABLE_DATA_LENGTH == rval) 00584 ptr->size = MB_VARIABLE_LENGTH; 00585 else if (MB_SUCCESS != rval) 00586 return error(rval); 00587 ptr->name_len = name_len; 00588 Range range; 00589 memset( ptr->name, 0, ptr->name_len ); 00590 memcpy( ptr->name, name.data(), name.size() ); 00591 ptr->def_val_len = def_val_len; 00592 ptr->set_default_value( def_val ); 00593 00594 return MB_SUCCESS; 00595 } 00596 00597 00598 ErrorCode WriteHDF5Parallel::check_serial_tag_data( 00599 const std::vector<unsigned char>& buffer, 00600 std::vector<TagDesc*>* missing, 00601 std::vector<TagDesc*>* newlist ) 00602 { 00603 ErrorCode rval; 00604 00605 // Use 'write_sparse' field as a 'visited' mark 00606 std::list<TagDesc>::iterator tag_iter; 00607 if (missing) 00608 for (tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter) 00609 tag_iter->write_sparse = true; 00610 00611 // Use a set as a temporary for what will ultimately go in 00612 // newlist because we need to pass back newlist in the order 00613 // of the tagList member. 00614 std::set<TagDesc*> newset; 00615 00616 // Iterate over data from, updating the local list of tags. 00617 // Be carefull to keep tagList sorted such that in the end all 00618 // procs have the same list in the same order. 00619 std::vector<unsigned char>::const_iterator diter = buffer.begin(); 00620 tag_iter = tagList.begin(); 00621 while (diter < buffer.end()) { 00622 // Get struct from buffer 00623 const serial_tag_data* ptr = reinterpret_cast<const serial_tag_data*>(&*diter); 00624 00625 // Find local struct for tag 00626 std::string name(ptr->name); 00627 std::string n; 00628 iFace->tag_get_name( tag_iter->tag_id, n ); // second time we've called, so shouldnt fail 00629 if (n > name) { 00630 tag_iter = tagList.begin(); // new proc, start search from beginning 00631 } 00632 iFace->tag_get_name( tag_iter->tag_id, n ); 00633 while (n < name) { 00634 ++tag_iter; 00635 if (tag_iter == tagList.end()) 00636 break; 00637 iFace->tag_get_name( tag_iter->tag_id, n ); 00638 } 00639 if (tag_iter == tagList.end() || n != name) { // new tag 00640 TagDesc newtag; 00641 00642 if (ptr->size == MB_VARIABLE_LENGTH) 00643 rval = iFace->tag_get_handle( name.c_str(), ptr->def_val_len, ptr->type, newtag.tag_id, MB_TAG_VARLEN|MB_TAG_CREAT|ptr->storage, ptr->default_value() ); 00644 else 00645 rval = iFace->tag_get_handle( name.c_str(), ptr->size, ptr->type, newtag.tag_id, MB_TAG_CREAT|ptr->storage, ptr->default_value() ); 00646 if (MB_SUCCESS != rval) 00647 return error(rval); 00648 00649 newtag.sparse_offset = 0; 00650 newtag.var_data_offset = 0; 00651 newtag.write_sparse = false; 00652 newtag.max_num_ents = 0; 00653 newtag.max_num_vals = 0; 00654 00655 tag_iter = tagList.insert( tag_iter, newtag ); 00656 if (newlist) 00657 newset.insert( &*tag_iter ); 00658 } 00659 else { // check that tag is as expected 00660 DataType type; 00661 iFace->tag_get_data_type( tag_iter->tag_id, type ); 00662 if (type != ptr->type) { 00663 writeUtil->report_error("Processes have inconsistent data type for tag \"%s\"", name.c_str() ); 00664 return error(MB_FAILURE); 00665 } 00666 int size; 00667 iFace->tag_get_length( tag_iter->tag_id, size ); 00668 if (size != ptr->size) { 00669 writeUtil->report_error("Processes have inconsistent size for tag \"%s\"", name.c_str() ); 00670 return error(MB_FAILURE); 00671 } 00672 tag_iter->write_sparse = false; 00673 } 00674 00675 // Step to next variable-length struct. 00676 diter += ptr->len(); 00677 } 00678 00679 // now pass back any local tags that weren't in the buffer 00680 if (missing) { 00681 for (tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter) { 00682 if (tag_iter->write_sparse) { 00683 tag_iter->write_sparse = false; 00684 missing->push_back( &*tag_iter ); 00685 } 00686 } 00687 } 00688 00689 // be careful to populate newlist in the same, sorted, order as tagList 00690 if (newlist) { 00691 for (tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter) 00692 if (newset.find(&*tag_iter) != newset.end()) 00693 newlist->push_back(&*tag_iter); 00694 } 00695 00696 return MB_SUCCESS; 00697 } 00698 00699 static void set_bit( int position, unsigned char* bytes ) 00700 { 00701 int byte = position/8; 00702 int bit = position%8; 00703 bytes[byte] |= (((unsigned char)1)<<bit); 00704 } 00705 00706 static bool get_bit( int position, const unsigned char* bytes ) 00707 { 00708 int byte = position/8; 00709 int bit = position%8; 00710 return 0 != (bytes[byte] & (((unsigned char)1)<<bit)); 00711 } 00712 00713 00714 ErrorCode WriteHDF5Parallel::create_tag_tables() 00715 { 00716 std::list<TagDesc>::iterator tag_iter; 00717 ErrorCode rval; 00718 int err; 00719 const int rank = myPcomm->proc_config().proc_rank(); 00720 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 00721 00722 subState.start( "negotiating tag list" ); 00723 00724 dbgOut.tprint(1,"communicating tag metadata\n"); 00725 00726 dbgOut.printf(2,"Exchanging tag data for %d tags.\n", (int)tagList.size() ); 00727 00728 // Sort tagList contents in alphabetical order by tag name 00729 tagList.sort( TagNameCompare( iFace ) ); 00730 00731 // Negotiate total list of tags to write 00732 00733 // Build concatenated list of all tag data 00734 std::vector<unsigned char> tag_buffer; 00735 for (tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter) { 00736 rval = append_serial_tag_data( tag_buffer, *tag_iter ); CHECK_MB(rval); 00737 } 00738 00739 // broadcast list from root to all other procs 00740 unsigned long size = tag_buffer.size(); 00741 err = MPI_Bcast( &size, 1, MPI_UNSIGNED_LONG, 0, comm ); 00742 CHECK_MPI(err); 00743 tag_buffer.resize(size); 00744 err = MPI_Bcast( &tag_buffer[0], size, MPI_UNSIGNED_CHAR, 0, comm ); 00745 CHECK_MPI(err); 00746 00747 // update local tag list 00748 std::vector<TagDesc*> missing; 00749 rval = check_serial_tag_data( tag_buffer, &missing, 0 ); CHECK_MB(rval); 00750 00751 // check if we're done (0->done, 1->more, 2+->error) 00752 int code, lcode = (MB_SUCCESS != rval) ? rval + 2 : missing.empty() ? 0 : 1; 00753 err = MPI_Allreduce( &lcode, &code, 1, MPI_INT, MPI_MAX, comm ); 00754 CHECK_MPI(err); 00755 if (code > 1) { 00756 writeUtil->report_error("Inconsistent tag definitions between procs."); 00757 return error((ErrorCode)(code-2)); 00758 } 00759 00760 // if not done... 00761 if (code) { 00762 dbgOut.print(1,"Not all procs had same tag definitions, negotiating...\n"); 00763 00764 // get tags defined on this proc but not on root proc 00765 tag_buffer.clear(); 00766 for (size_t i = 0; i < missing.size(); ++i) { 00767 rval = append_serial_tag_data( tag_buffer, *missing[i] ); CHECK_MB(rval); 00768 } 00769 00770 // gather extra tag definitions on root processor 00771 std::vector<int> junk; // don't care how many from each proc 00772 assert(rank || tag_buffer.empty()); // must be empty on root 00773 err = my_Gatherv( &tag_buffer[0], tag_buffer.size(), 00774 MPI_UNSIGNED_CHAR, tag_buffer, junk, 0, comm ); 00775 CHECK_MPI(err); 00776 00777 // process serialized tag descriptions on root, and 00778 rval = MB_SUCCESS; 00779 if (0 == rank) { 00780 // process serialized tag descriptions on root, and 00781 std::vector<TagDesc*> newlist; 00782 rval = check_serial_tag_data( tag_buffer, 0, &newlist ); 00783 tag_buffer.clear(); 00784 // re-serialize a unique list of new tag defintitions 00785 for (size_t i = 0; MB_SUCCESS == rval && i != newlist.size(); ++i) { 00786 rval = append_serial_tag_data( tag_buffer, *newlist[i] ); CHECK_MB(rval); 00787 } 00788 } 00789 00790 // broadcast any new tag definitions from root to other procs 00791 long this_size = tag_buffer.size(); 00792 if (MB_SUCCESS != rval) 00793 this_size = -rval; 00794 err = MPI_Bcast( &this_size, 1, MPI_LONG, 0, comm ); 00795 CHECK_MPI(err); 00796 if (this_size < 0) { 00797 writeUtil->report_error("Inconsistent tag definitions between procs."); 00798 return error((ErrorCode)-this_size); 00799 } 00800 tag_buffer.resize(this_size); 00801 err = MPI_Bcast( &tag_buffer[0], this_size, MPI_UNSIGNED_CHAR, 0, comm ); 00802 CHECK_MPI(err); 00803 00804 // process new tag definitions 00805 rval = check_serial_tag_data( tag_buffer, 0, 0 ); CHECK_MB(rval); 00806 } 00807 00808 subState.end(); 00809 subState.start("negotiate which element/tag combinations are dense"); 00810 00811 // Figure out for which tag/element combinations we can 00812 // write dense tag data. 00813 00814 // Construct a table of bits, 00815 // where each row of the table corresponds to a tag 00816 // and each column to an element group. 00817 00818 // Two extra, because first is nodes and last is sets. 00819 // (n+7)/8 is ceil(n/8) 00820 const int bytes_per_tag = (exportList.size() + 9)/8; 00821 std::vector<unsigned char> data(bytes_per_tag * tagList.size(), 0); 00822 std::vector<unsigned char> recv( data.size(), 0 ); 00823 unsigned char* iter = &data[0]; 00824 if (writeTagDense && !data.empty()) { 00825 for (tag_iter = tagList.begin(); tag_iter != tagList.end(); 00826 ++tag_iter, iter += bytes_per_tag) { 00827 00828 Range tagged; 00829 rval = get_sparse_tagged_entities( *tag_iter, tagged ); CHECK_MB(rval); 00830 00831 int s; 00832 if (MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s )) 00833 continue; 00834 00835 std::string n; 00836 iFace->tag_get_name( tag_iter->tag_id, n ); // second time we've called, so shouldnt fail 00837 00838 // Check if we want to write this tag in dense format even if not 00839 // all of the entities have a tag value. The criterion of this 00840 // is that the tag be dense, have a default value, and have at 00841 // least 2/3 of the entities tagged. 00842 bool prefer_dense = false; 00843 TagType type; 00844 rval = iFace->tag_get_type( tag_iter->tag_id, type ); CHECK_MB(rval); 00845 if (MB_TAG_DENSE == type) { 00846 const void* defval = 0; 00847 rval = iFace->tag_get_default_value( tag_iter->tag_id, defval, s ); 00848 if (MB_SUCCESS == rval) 00849 prefer_dense = true; 00850 } 00851 00852 int i = 0; 00853 if (check_dense_format_tag( nodeSet, tagged, prefer_dense )) { 00854 set_bit( i, iter ); 00855 dbgOut.printf( 2, "Can write dense data for \"%s\"/Nodes\n", n.c_str()); 00856 } 00857 std::list<ExportSet>::const_iterator ex_iter = exportList.begin(); 00858 for (++i; ex_iter != exportList.end(); ++i, ++ex_iter) { 00859 if (check_dense_format_tag( *ex_iter, tagged, prefer_dense )) { 00860 set_bit( i, iter ); 00861 dbgOut.printf( 2, "Can write dense data for \"%s\"/%s\n", n.c_str(), 00862 ex_iter->name()); 00863 } 00864 } 00865 if (check_dense_format_tag( setSet, tagged, prefer_dense )) { 00866 set_bit( i, iter ); 00867 dbgOut.printf( 2, "Can write dense data for \"%s\"/Sets\n", n.c_str()); 00868 } 00869 } 00870 00871 // Do bit-wise AND of list over all processors (only write dense format 00872 // if all proccesses want dense format for this group of entities). 00873 err = MPI_Allreduce( &data[0], &recv[0], data.size(), MPI_UNSIGNED_CHAR, 00874 MPI_BAND, myPcomm->proc_config().proc_comm() ); 00875 CHECK_MPI(err); 00876 } // if (writeTagDense) 00877 00878 // Store initial counts for sparse-formatted tag data. 00879 // The total number of values to send and receive will be the number of 00880 // tags plus the number of var-len tags because we need to negotiate 00881 // offsets into two different tables for the var-len tags. 00882 std::vector<long> counts; 00883 00884 // Record dense tag/element combinations 00885 iter = &recv[0]; 00886 const unsigned char* iter2 = &data[0]; 00887 for (tag_iter = tagList.begin(); tag_iter != tagList.end(); 00888 ++tag_iter, iter += bytes_per_tag, iter2 += bytes_per_tag) { 00889 00890 Range tagged; 00891 rval = get_sparse_tagged_entities( *tag_iter, tagged ); CHECK_MB(rval); 00892 00893 std::string n; 00894 iFace->tag_get_name( tag_iter->tag_id, n ); // second time we've called, so shouldnt fail 00895 00896 int i = 0; 00897 if (get_bit(i, iter)) { 00898 assert(get_bit(i, iter2)); 00899 tag_iter->dense_list.push_back(nodeSet); 00900 tagged -= nodeSet.range; 00901 dbgOut.printf( 2, "Will write dense data for \"%s\"/Nodes\n", n.c_str()); 00902 } 00903 std::list<ExportSet>::const_iterator ex_iter = exportList.begin(); 00904 for (++i; ex_iter != exportList.end(); ++i, ++ex_iter) { 00905 if (get_bit(i, iter)) { 00906 assert(get_bit(i, iter2)); 00907 tag_iter->dense_list.push_back(*ex_iter); 00908 dbgOut.printf( 2, "WIll write dense data for \"%s\"/%s\n", n.c_str(), 00909 ex_iter->name()); 00910 tagged -= ex_iter->range; 00911 } 00912 } 00913 if (get_bit(i, iter)) { 00914 assert(get_bit(i, iter2)); 00915 tag_iter->dense_list.push_back(setSet); 00916 dbgOut.printf( 2, "Will write dense data for \"%s\"/Sets\n", n.c_str()); 00917 tagged -= setSet.range; 00918 } 00919 00920 counts.push_back( tagged.size() ); 00921 00922 int s; 00923 if (MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s )) { 00924 unsigned long data_len; 00925 rval = get_tag_data_length( *tag_iter, tagged, data_len ); CHECK_MB(rval); 00926 counts.push_back( data_len ); 00927 } 00928 } 00929 00930 00931 subState.end(); 00932 subState.start("Negotiate offsets for sparse tag info"); 00933 00934 std::vector<long> offsets(counts.size()), maxima(counts.size()), totals(counts.size()); 00935 rval = create_dataset( counts.size(), &counts[0], &offsets[0], &maxima[0], &totals[0] ); 00936 CHECK_MB(rval); 00937 00938 // Copy values into local structs and if root then create tables 00939 size_t idx = 0; 00940 for (tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, ++idx) { 00941 assert(idx < counts.size()); 00942 tag_iter->sparse_offset = offsets[idx]; 00943 tag_iter->max_num_ents = maxima[idx]; 00944 tag_iter->write_sparse = (0 != totals[idx]); 00945 int s; 00946 if (MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s )) { 00947 ++idx; 00948 assert(idx < counts.size()); 00949 tag_iter->var_data_offset = offsets[idx]; 00950 tag_iter->max_num_vals = maxima[idx]; 00951 } 00952 else { 00953 tag_iter->var_data_offset = 0; 00954 tag_iter->max_num_vals = 0; 00955 } 00956 } 00957 00958 subState.end(); 00959 00960 // Create tag tables on root process 00961 if (0 == myPcomm->proc_config().proc_rank()) { 00962 size_t iidx = 0; 00963 for (tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter, ++iidx) { 00964 assert(iidx < totals.size()); 00965 unsigned long num_ents = totals[iidx]; 00966 unsigned long num_val = 0; 00967 int s; 00968 if (MB_VARIABLE_DATA_LENGTH == iFace->tag_get_length( tag_iter->tag_id, s )) { 00969 ++iidx; 00970 assert(iidx < totals.size()); 00971 num_val = totals[iidx]; 00972 } 00973 dbgOut.printf( 2, "Writing tag description for tag 0x%lx with %lu values\n", 00974 (unsigned long)tag_iter->tag_id, num_val ? num_val : num_ents ); 00975 00976 rval = create_tag( *tag_iter, num_ents, num_val ); 00977 if (MB_SUCCESS != rval) 00978 return error(rval); 00979 } 00980 } 00981 00982 if (dbgOut.get_verbosity() > 1) { 00983 dbgOut.printf(2,"Tags: %12s %8s %8s %8s %8s %8s\n", "Name", "Count", "Offset", "Var Off", "Max Ent", "Handle"); 00984 00985 for (tag_iter = tagList.begin(); tag_iter != tagList.end(); ++tag_iter) { 00986 std::string name; 00987 iFace->tag_get_name( tag_iter->tag_id, name ); 00988 size_t this_size; 00989 get_num_sparse_tagged_entities( *tag_iter, this_size ); 00990 dbgOut.printf(2,"%18s %8lu %8lu %8lu %8lu 0x%7lx\n", name.c_str(), 00991 (unsigned long)this_size, 00992 (unsigned long)tag_iter->sparse_offset, 00993 (unsigned long)tag_iter->var_data_offset, 00994 (unsigned long)tag_iter->max_num_ents, 00995 (unsigned long)tag_iter->tag_id ); 00996 } 00997 } 00998 00999 return MB_SUCCESS; 01000 } 01001 01002 struct DatasetVals { 01003 long start_id; 01004 long max_count; 01005 long total; 01006 }; 01007 STATIC_ASSERT(sizeof(DatasetVals) == 3*sizeof(long)); 01008 01009 ErrorCode WriteHDF5Parallel::create_dataset( int num_datasets, 01010 const long* num_owned, 01011 long* offsets_out, 01012 long* max_proc_entities, 01013 long* total_entities, 01014 const DataSetCreator& creator, 01015 ExportSet* groups[], 01016 id_t* first_ids_out ) 01017 { 01018 int result; 01019 ErrorCode rval; 01020 const unsigned rank = myPcomm->proc_config().proc_rank(); 01021 const unsigned nproc = myPcomm->proc_config().proc_size(); 01022 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01023 01024 // gather entity counts for each processor on root 01025 std::vector<long> counts( rank ? 0 : nproc * num_datasets ); 01026 VALGRIND_CHECK_MEM_IS_DEFINED( &num_owned, sizeof(long) ); 01027 result = MPI_Gather( const_cast<long*>(num_owned), num_datasets, MPI_LONG, &counts[0], num_datasets, MPI_LONG, 0, comm ); 01028 CHECK_MPI(result); 01029 01030 // create node data in file 01031 DatasetVals zero_val = { 0, 0 ,0 }; 01032 std::vector<DatasetVals> cumulative(num_datasets,zero_val); 01033 if (rank == 0) 01034 { 01035 for (unsigned i = 0; i < nproc; i++) { 01036 const long* proc_data = &counts[i*num_datasets]; 01037 for (int index = 0; index < num_datasets; ++index) { 01038 cumulative[index].total += proc_data[index]; 01039 if (proc_data[index] > cumulative[index].max_count) 01040 cumulative[index].max_count = proc_data[index]; 01041 } 01042 } 01043 01044 for (int index = 0; index < num_datasets; ++index) { 01045 if (cumulative[index].total) { 01046 rval = creator( this, 01047 cumulative[index].total, 01048 groups ? groups[index] : 0, 01049 cumulative[index].start_id ); 01050 CHECK_MB(rval); 01051 } 01052 else { 01053 cumulative[index].start_id = -1; 01054 } 01055 } 01056 } 01057 01058 // send id offset to every proc 01059 result = MPI_Bcast( &cumulative[0], 3*num_datasets, MPI_LONG, 0, comm ); 01060 CHECK_MPI(result); 01061 for (int index = 0; index < num_datasets; ++index) { 01062 if (first_ids_out) 01063 first_ids_out[index] = (id_t)cumulative[index].start_id; 01064 max_proc_entities[index] = cumulative[index].max_count; 01065 total_entities[index] = cumulative[index].total; 01066 } 01067 01068 // convert array of per-process counts to per-process offsets 01069 if (rank == 0) 01070 { 01071 // initialize prev_size with data sizes for root process 01072 std::vector<long> prev_size(counts.begin(), counts.begin() + num_datasets); 01073 // root process gets offset zero 01074 std::fill( counts.begin(), counts.begin() + num_datasets, 0L ); 01075 // for each proc other than this one (root) 01076 for (unsigned i = 1; i < nproc; ++i) 01077 { 01078 // get pointer to offsets for previous process in list 01079 long* prev_data = &counts[(i-1)*num_datasets]; 01080 // get poitner to offsets for this process in list 01081 long* proc_data = &counts[i*num_datasets]; 01082 // for each data set 01083 for (int j = 0; j < num_datasets; ++j) { 01084 // get size of data in dataset from process i 01085 long mysize = proc_data[j]; 01086 // offset for process i is offset of prevous process plus 01087 // number of values previous process will write 01088 proc_data[j] = prev_data[j] + prev_size[j]; 01089 // store my size, as it is no longer available in 'counts' 01090 prev_size[j] = mysize; 01091 } 01092 } 01093 } 01094 01095 // send each proc it's offset in the table 01096 if (rank == 0) { 01097 VALGRIND_CHECK_MEM_IS_DEFINED( &counts[0], num_datasets*nproc*sizeof(long) ); 01098 } 01099 result = MPI_Scatter( &counts[0], num_datasets, MPI_LONG, offsets_out, num_datasets, MPI_LONG, 0, comm ); 01100 CHECK_MPI(result); 01101 01102 return MB_SUCCESS; 01103 } 01104 01105 ErrorCode WriteHDF5Parallel::create_node_table( int dimension ) 01106 { 01107 nodeSet.num_nodes = dimension; // put it here so NodeSetCreator can access it 01108 struct NodeSetCreator : public DataSetCreator { 01109 ErrorCode operator()( WriteHDF5* file, long count, const ExportSet* group, long& start_id ) const 01110 { 01111 mhdf_Status status; 01112 hid_t handle = mhdf_createNodeCoords( file->file_ptr(), group->num_nodes, count, &start_id, &status ); 01113 CHECK_HDFN(status); 01114 mhdf_closeData( file->file_ptr(), handle, &status ); 01115 CHECK_HDFN(status); 01116 return MB_SUCCESS; 01117 } 01118 }; 01119 01120 const long count = nodeSet.range.size(); 01121 ExportSet* array[] = { &nodeSet }; 01122 ErrorCode rval = create_dataset( 1, 01123 &count, 01124 &nodeSet.offset, 01125 &nodeSet.max_num_ents, 01126 &nodeSet.total_num_ents, 01127 NodeSetCreator(), 01128 array, 01129 &nodeSet.first_id ); 01130 CHECK_MB(rval); 01131 return assign_ids( nodeSet.range, nodeSet.first_id + nodeSet.offset ); 01132 } 01133 01134 01135 struct elemtype { 01136 int mbtype; 01137 int numnode; 01138 01139 elemtype( int vals[2] ) : mbtype(vals[0]), numnode(vals[1]) {} 01140 elemtype( int t, int n ) : mbtype(t), numnode(n) {} 01141 01142 bool operator==( const elemtype& other ) const 01143 { 01144 return mbtype == other.mbtype && 01145 (mbtype == MBENTITYSET || 01146 numnode == other.numnode); 01147 } 01148 bool operator<( const elemtype& other ) const 01149 { 01150 if (mbtype > other.mbtype) 01151 return false; 01152 01153 return mbtype < other.mbtype || 01154 (mbtype != MBENTITYSET && 01155 numnode < other.numnode); 01156 } 01157 bool operator!=( const elemtype& other ) const 01158 { return !this->operator==(other); } 01159 }; 01160 01161 01162 ErrorCode WriteHDF5Parallel::negotiate_type_list() 01163 { 01164 int result; 01165 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01166 01167 exportList.sort(); 01168 int num_types = exportList.size(); 01169 01170 // Get list of types on this processor 01171 typedef std::vector< std::pair<int,int> > typelist; 01172 typelist my_types(num_types); 01173 VALGRIND_MAKE_VEC_UNDEFINED( my_types ); 01174 typelist::iterator viter = my_types.begin(); 01175 for (std::list<ExportSet>::iterator eiter = exportList.begin(); 01176 eiter != exportList.end(); ++eiter) 01177 { 01178 viter->first = eiter->type; 01179 viter->second = eiter->num_nodes; 01180 ++viter; 01181 } 01182 01183 dbgOut.print( 2,"Local Element Types:\n"); 01184 for (viter = my_types.begin(); viter != my_types.end(); ++viter) 01185 { 01186 int type = viter->first; 01187 int count = viter->second; 01188 dbgOut.printf(2, " %s : %d\n", CN::EntityTypeName((EntityType)type), count); 01189 } 01190 01191 // Broadcast number of types from root to all nodes 01192 int num_types0 = num_types; 01193 result = MPI_Bcast( &num_types0, 1, MPI_INT, 0, comm ); 01194 CHECK_MPI(result); 01195 // Broadcast type list from root to all nodes 01196 typelist root_types( num_types0 ); 01197 if (0 == myPcomm->proc_config().proc_rank()) 01198 root_types = my_types; 01199 result = MPI_Bcast( &root_types[0], 2*num_types0, MPI_INT, 0, comm ); 01200 CHECK_MPI(result); 01201 01202 // Build local list of any types that root did not know about 01203 typelist non_root_types; 01204 viter = root_types.begin(); 01205 for (typelist::iterator iter = my_types.begin(); iter != my_types.end(); ++iter) { 01206 if (viter == root_types.end() || *viter != *iter) 01207 non_root_types.push_back( *iter ); 01208 else 01209 ++viter; 01210 } 01211 01212 // Determine if any process had types not defined on the root 01213 int non_root_count = non_root_types.size(); 01214 int not_done; 01215 result = MPI_Allreduce( &non_root_count, ¬_done, 1, MPI_INT, MPI_LOR, comm ); 01216 CHECK_MPI(result); 01217 if (not_done) { 01218 // Get number of types each processor has that root does not 01219 std::vector<int> counts(myPcomm->proc_config().proc_size()); 01220 int two_count = 2*non_root_count; 01221 result = MPI_Gather( &two_count, 1, MPI_INT, &counts[0], 1, MPI_INT, 0, comm ); 01222 CHECK_MPI(result); 01223 01224 // Get list of types from each processor 01225 std::vector<int> displs(myPcomm->proc_config().proc_size() + 1); 01226 VALGRIND_MAKE_VEC_UNDEFINED( displs ); 01227 displs[0] = 0; 01228 for (unsigned long i = 1; i <= myPcomm->proc_config().proc_size(); ++i) 01229 displs[i] = displs[i-1] + counts[i-1]; 01230 int total = displs[myPcomm->proc_config().proc_size()]; 01231 typelist alltypes(total/2); 01232 VALGRIND_MAKE_VEC_UNDEFINED( alltypes ); 01233 VALGRIND_CHECK_MEM_IS_DEFINED( &non_root_types[0], non_root_types.size()*sizeof(int) ); 01234 result = MPI_Gatherv( &non_root_types[0], 2*non_root_count, MPI_INT, 01235 &alltypes[0], &counts[0], &displs[0], MPI_INT, 0, comm ); 01236 CHECK_MPI(result); 01237 01238 // Merge type lists. 01239 // Prefer O(n) insertions with O(ln n) search time because 01240 // we expect data from a potentially large number of processes, 01241 // but with a small total number of element types. 01242 if (0 == myPcomm->proc_config().proc_rank()) { 01243 for (viter = alltypes.begin(); viter != alltypes.end(); ++viter) { 01244 typelist::iterator titer = std::lower_bound( my_types.begin(), my_types.end(), *viter ); 01245 if (titer == my_types.end() || *titer != *viter) 01246 my_types.insert( titer, *viter ); 01247 } 01248 01249 dbgOut.print(2, "Global Element Types:\n"); 01250 for (viter = my_types.begin(); viter != my_types.end(); ++viter) 01251 { 01252 dbgOut.printf(2," %s : %d\n", CN::EntityTypeName((EntityType)viter->first), viter->second); 01253 } 01254 } 01255 01256 // Send total number of types to each processor 01257 total = my_types.size(); 01258 result = MPI_Bcast( &total, 1, MPI_INT, 0, comm ); 01259 CHECK_MPI(result); 01260 01261 // Send list of types to each processor 01262 my_types.resize(total); 01263 result = MPI_Bcast( &my_types[0], 2*total, MPI_INT, 0, comm ); 01264 CHECK_MPI(result); 01265 } 01266 else { 01267 // special case: if root had types but some subset of procs did not 01268 // have those types, but there are no types that the root doesn't 01269 // know about then we still need to update processes that are missing 01270 // types. 01271 my_types.swap( root_types ); 01272 } 01273 01274 // Insert missing types into exportList, with an empty 01275 // range of entities to export. 01276 std::list<ExportSet>::iterator ex_iter = exportList.begin(); 01277 for (viter = my_types.begin(); viter != my_types.end(); ++viter) 01278 { 01279 while (ex_iter != exportList.end() && *ex_iter < *viter) 01280 ++ex_iter; 01281 01282 if (ex_iter == exportList.end() || !(*ex_iter == *viter)) 01283 { 01284 ExportSet insert; 01285 insert.type = (EntityType)viter->first; 01286 insert.num_nodes = viter->second; 01287 insert.first_id = 0; 01288 insert.offset = 0; 01289 insert.adj_offset = 0; 01290 ex_iter = exportList.insert( ex_iter, insert ); 01291 } 01292 } 01293 01294 return MB_SUCCESS; 01295 } 01296 01297 ErrorCode WriteHDF5Parallel::create_element_tables() 01298 { 01299 struct ElemSetCreator : public DataSetCreator { 01300 ErrorCode operator()( WriteHDF5* file, long size, const ExportSet* ex, long& start_id ) const 01301 { return file->create_elem_table( *ex, size, start_id ); } 01302 }; 01303 01304 const int numtypes = exportList.size(); 01305 std::vector<ExportSet*> groups(numtypes); 01306 std::vector<long> counts(numtypes), offsets(numtypes), max_ents(numtypes), total_ents(numtypes); 01307 std::vector<id_t> start_ids(numtypes); 01308 01309 size_t idx = 0; 01310 std::list<ExportSet>::iterator ex_iter; 01311 for (ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter, ++idx) { 01312 groups[idx] = &*ex_iter; 01313 counts[idx] = ex_iter->range.size(); 01314 } 01315 ErrorCode rval = create_dataset( numtypes, 01316 &counts[0], 01317 &offsets[0], 01318 &max_ents[0], 01319 &total_ents[0], 01320 ElemSetCreator(), 01321 &groups[0], 01322 &start_ids[0] ); 01323 CHECK_MB(rval); 01324 01325 for (idx = 0, ex_iter = exportList.begin(); ex_iter != exportList.end(); ++ex_iter, ++idx) { 01326 ex_iter->first_id = start_ids[idx]; 01327 ex_iter->offset = offsets[idx]; 01328 ex_iter->max_num_ents = max_ents[idx]; 01329 ex_iter->total_num_ents = total_ents[idx]; 01330 rval = assign_ids( ex_iter->range, ex_iter->first_id + ex_iter->offset ); 01331 CHECK_MB(rval); 01332 } 01333 01334 return MB_SUCCESS; 01335 } 01336 01337 ErrorCode WriteHDF5Parallel::create_adjacency_tables() 01338 { 01339 struct AdjSetCreator : public DataSetCreator { 01340 ErrorCode operator()( WriteHDF5* file, long size, const ExportSet* ex, long& start_id ) const 01341 { 01342 mhdf_Status status; 01343 hid_t handle = mhdf_createAdjacency( file->file_ptr(), 01344 ex->name(), 01345 size, 01346 &status ); 01347 CHECK_HDFN(status); 01348 mhdf_closeData( file->file_ptr(), handle, &status ); 01349 CHECK_HDFN(status); 01350 start_id = -1; 01351 return MB_SUCCESS; 01352 } 01353 }; 01354 01355 std::vector<ExportSet*> groups; 01356 #ifdef WRITE_NODE_ADJACENCIES 01357 groups.push_back( &nodeSet ); 01358 #endif 01359 for (std::list<ExportSet>::iterator ex_iter = exportList.begin(); 01360 ex_iter != exportList.end(); ++ex_iter) 01361 groups.push_back( &*ex_iter ); 01362 01363 ErrorCode rval; 01364 const int numtypes = groups.size(); 01365 std::vector<long> counts(numtypes); 01366 std::vector<long> offsets(numtypes); 01367 std::vector<long> max_ents(numtypes); 01368 std::vector<long> totals(numtypes); 01369 for (int i = 0; i < numtypes; ++i) { 01370 id_t count; 01371 rval = count_adjacencies( groups[i]->range, count ); 01372 counts[i] = count; 01373 CHECK_MB(rval); 01374 } 01375 01376 rval = create_dataset( numtypes, 01377 &counts[0], 01378 &offsets[0], 01379 &max_ents[0], 01380 &totals[0], 01381 AdjSetCreator(), 01382 &groups[0] ); 01383 CHECK_MB(rval); 01384 01385 for (int i = 0; i < numtypes; ++i) { 01386 groups[i]->max_num_adjs = max_ents[i]; 01387 groups[i]->adj_offset = offsets[i]; 01388 } 01389 return MB_SUCCESS; 01390 } 01391 01392 const unsigned SSVB = 3; 01393 01394 void WriteHDF5Parallel::print_set_sharing_data( const Range& range, const char* label, Tag idt ) 01395 { 01396 dbgOut.printf(SSVB,"set\tid\towner\t%-*s\tfid\tshared\n",(int)(sizeof(EntityHandle)*2),"handle"); 01397 for (Range::iterator it = range.begin(); it != range.end(); ++it) { 01398 int id; 01399 iFace->tag_get_data( idt, &*it, 1, &id ); 01400 EntityHandle handle = 0; 01401 unsigned owner = 0; 01402 id_t file_id = 0; 01403 myPcomm->get_entityset_owner( *it, owner, &handle ); 01404 if (!idMap.find( *it, file_id )) 01405 file_id = 0; 01406 dbgOut.printf(SSVB,"%s\t%d\t%u\t%lx\t%lu\t", label, id, owner, (unsigned long)handle, (unsigned long)file_id); 01407 std::vector<unsigned> procs; 01408 myPcomm->get_entityset_procs( *it, procs ); 01409 if (procs.empty()) 01410 dbgOut.print(SSVB,"<none>\n"); 01411 else { 01412 for (unsigned i = 0; i < procs.size()-1; ++i) 01413 dbgOut.printf(SSVB,"%u,", procs[i]); 01414 dbgOut.printf(SSVB,"%u\n",procs.back()); 01415 } 01416 } 01417 } 01418 01419 01420 void WriteHDF5Parallel::print_shared_sets() 01421 { 01422 const char* tag_names[][2] = { { MATERIAL_SET_TAG_NAME, "block" }, 01423 { DIRICHLET_SET_TAG_NAME, "nodeset" }, 01424 { NEUMANN_SET_TAG_NAME, "sideset" }, 01425 { 0, 0 } }; 01426 01427 for (int i = 0; tag_names[i][0]; ++i) { 01428 Tag tag; 01429 if (MB_SUCCESS != iFace->tag_get_handle( tag_names[i][0], 1, MB_TYPE_INTEGER, tag )) 01430 continue; 01431 01432 Range tagged; 01433 iFace->get_entities_by_type_and_tag( 0, MBENTITYSET, &tag, 0, 1, tagged ); 01434 print_set_sharing_data( tagged, tag_names[i][1], tag ); 01435 } 01436 01437 Tag geom, id; 01438 if (MB_SUCCESS != iFace->tag_get_handle( GEOM_DIMENSION_TAG_NAME, 1, MB_TYPE_INTEGER, geom )) 01439 return; 01440 if (MB_SUCCESS != iFace->tag_get_handle( GLOBAL_ID_TAG_NAME, 1, MB_TYPE_INTEGER, id )) 01441 return; 01442 01443 const char* geom_names[] = { "vertex", "curve", "surface", "volume" }; 01444 for (int d = 0; d <= 3; ++d) { 01445 Range tagged; 01446 const void* vals[] = { &d }; 01447 iFace->get_entities_by_type_and_tag( 0, MBENTITYSET, &geom, vals, 1, tagged ); 01448 print_set_sharing_data( tagged, geom_names[d], id ); 01449 } 01450 } 01451 01452 ErrorCode WriteHDF5Parallel::communicate_shared_set_ids( const Range& owned, 01453 const Range& remote ) 01454 { 01455 ErrorCode rval; 01456 int mperr; 01457 const int TAG = 0xDEADF00; 01458 //const unsigned rank = myPcomm->proc_config().proc_rank(); 01459 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01460 01461 dbgOut.tprint(1,"COMMUNICATING SHARED SET IDS\n"); 01462 dbgOut.print(6, "Owned, shared sets: ", owned ); 01463 01464 // Post receive buffers for all procs for which we share sets 01465 01466 std::vector<unsigned> procs; 01467 rval = myPcomm->get_entityset_owners( procs ); CHECK_MB(rval); 01468 std::vector<unsigned>::iterator it = std::find(procs.begin(), procs.end(), 01469 myPcomm->proc_config().proc_rank()); 01470 if (it != procs.end()) 01471 procs.erase(it); 01472 01473 std::vector<MPI_Request> recv_req(procs.size(), MPI_REQUEST_NULL); 01474 std::vector< std::vector<unsigned long> > recv_buf(procs.size()); 01475 01476 size_t recv_count = 0; 01477 for (size_t i = 0; i < procs.size(); ++i) { 01478 Range tmp; 01479 rval = myPcomm->get_owned_sets( procs[i], tmp ); CHECK_MB(rval); 01480 size_t count = intersect( tmp, remote ).size(); // necessary because we might not be writing all of the database 01481 if (count) { 01482 dbgOut.printf( 6, "Sets owned by proc %u (remote handles): ", procs[i] ); 01483 if (dbgOut.get_verbosity() >= 6) { 01484 Range remote_handles; 01485 tmp = intersect( tmp, remote ); 01486 for (Range::iterator j = tmp.begin(); j != tmp.end(); ++j) { 01487 unsigned r; 01488 EntityHandle h; 01489 myPcomm->get_entityset_owner( *j, r, &h ); 01490 assert(r == procs[i]); 01491 remote_handles.insert( h ); 01492 } 01493 dbgOut.print( 6, remote_handles ); 01494 } 01495 recv_count++; 01496 recv_buf[i].resize(2*count+1); 01497 dbgOut.printf(5,"Posting receive buffer of size %lu for proc %u (%lu of %lu owned sets)\n", 01498 (unsigned long)recv_buf[i].size(), procs[i], count, tmp.size()); 01499 mperr = MPI_Irecv( &recv_buf[i][0], recv_buf[i].size(), MPI_UNSIGNED_LONG, 01500 procs[i], TAG, comm, &recv_req[i] ); CHECK_MPI(mperr); 01501 } 01502 } 01503 01504 // Send set ids to all procs with which we share them 01505 01506 // first build per-process lists of sets for which we need to send data 01507 std::map<unsigned,Range> send_sets; 01508 std::vector<unsigned> set_procs; 01509 for (Range::reverse_iterator i = owned.rbegin(); i != owned.rend(); ++i) { 01510 set_procs.clear(); 01511 rval = myPcomm->get_entityset_procs( *i, set_procs ); CHECK_MB(rval); 01512 for (size_t j = 0; j < set_procs.size(); ++j) 01513 if (set_procs[j] != myPcomm->proc_config().proc_rank()) 01514 send_sets[set_procs[j]].insert( *i ); 01515 } 01516 assert(send_sets.find(myPcomm->proc_config().proc_rank()) == send_sets.end()); 01517 01518 // now send the data 01519 std::vector< std::vector<unsigned long> > send_buf(send_sets.size()); 01520 std::vector< MPI_Request > send_req(send_sets.size()); 01521 std::map<unsigned,Range>::iterator si = send_sets.begin(); 01522 for (size_t i = 0; si != send_sets.end(); ++si, ++i) { 01523 dbgOut.printf(6,"Sending data for shared sets to proc %u: ",si->first); 01524 dbgOut.print(6,si->second); 01525 01526 send_buf[i].reserve( 2*si->second.size()+1 ); 01527 send_buf[i].push_back( si->second.size() ); 01528 for (Range::iterator j = si->second.begin(); j != si->second.end(); ++j) { 01529 send_buf[i].push_back( *j ); 01530 send_buf[i].push_back( idMap.find( *j ) ); 01531 } 01532 dbgOut.printf(5,"Sending buffer of size %lu to proc %u (%lu of %lu owned sets)\n", 01533 (unsigned long)send_buf[i].size(), si->first, si->second.size(), owned.size()); 01534 mperr = MPI_Isend( &send_buf[i][0], send_buf[i].size(), MPI_UNSIGNED_LONG, 01535 si->first, TAG, comm, &send_req[i] ); 01536 } 01537 01538 // Process received data 01539 MPI_Status status; 01540 int idx; 01541 while (recv_count--) { 01542 mperr = MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ); 01543 CHECK_MPI(mperr); 01544 01545 assert((unsigned)status.MPI_SOURCE == procs[idx]); 01546 assert(2*recv_buf[idx].front()+1 == recv_buf[idx].size()); 01547 const size_t n = std::min<size_t>( recv_buf[idx].front(), (recv_buf[idx].size()-1)/2 ); 01548 dbgOut.printf(5,"Received buffer of size %lu from proc %d\n", 01549 (unsigned long)(2*n+1), (int)status.MPI_SOURCE ); 01550 01551 for (size_t i = 0; i < n; ++i) { 01552 EntityHandle handle = 0; 01553 rval = myPcomm->get_entityset_local_handle( procs[idx], recv_buf[idx][2*i+1], handle ); 01554 CHECK_MB(rval); 01555 assert(handle != 0); 01556 if (!idMap.insert( handle, recv_buf[idx][2*i+2], 1 ).second) 01557 error(MB_FAILURE); // conflicting IDs?????? 01558 } 01559 01560 recv_req[idx] = MPI_REQUEST_NULL; 01561 } 01562 assert( MPI_SUCCESS == MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ) 01563 && MPI_UNDEFINED == idx ); // check that we got them all 01564 01565 // Wait for all sends to complete before we release send 01566 // buffers (implicitly releases when we return from this function) 01567 01568 std::vector<MPI_Status> stats( send_req.size() ); 01569 mperr = MPI_Waitall( send_req.size(), &send_req[0], &stats[0] ); 01570 CHECK_MPI(mperr); 01571 01572 if (dbgOut.get_verbosity() >= SSVB) 01573 print_shared_sets(); 01574 01575 return MB_SUCCESS; 01576 } 01577 01578 //void get_global_ids( Interface* iFace, const unsigned long* ptr, 01579 // size_t len, unsigned flags, 01580 // std::vector<int>& ids ) 01581 //{ 01582 // Tag idtag; 01583 // iFace->tag_get_handle( GLOBAL_ID_TAG_NAME, 1, MB_TYPE_INTEGER, idtag ); 01584 // for (size_t i = 0; i < len; ++i) { 01585 // if (flags & MESHSET_ORDERED) { 01586 // int tmp; 01587 // iFace->tag_get_data( idtag, ptr + i, 1, &tmp ); 01588 // ids.push_back( tmp ); 01589 // continue; 01590 // } 01591 // 01592 // EntityHandle s = ptr[i]; 01593 // EntityHandle e = ptr[++i]; 01594 // for (; s <= e; ++s) { 01595 // int tmp; 01596 // iFace->tag_get_data( idtag, &s, 1, &tmp ); 01597 // ids.push_back( tmp ); 01598 // } 01599 // } 01600 //} 01601 01602 ErrorCode WriteHDF5Parallel::pack_set( Range::const_iterator it, 01603 unsigned long* buffer, 01604 size_t buffer_size ) 01605 { 01606 ErrorCode rval; 01607 const EntityHandle* ptr; 01608 int len; 01609 unsigned char flags; 01610 std::vector<id_t> tmp; 01611 size_t newlen; 01612 01613 // buffer must always contain at least flags and desired sizes 01614 assert(buffer_size >= 4); 01615 buffer_size -= 4; 01616 01617 Range::const_iterator nd = it; ++nd; 01618 rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::CONTENTS, &len, &flags ); 01619 CHECK_MB(rval); 01620 01621 //Tag mattag; 01622 //iFace->tag_get_handle( MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag ); 01623 //int block; 01624 //if (MB_SUCCESS != iFace->tag_get_data( mattag, &*it, 1, &block )) 01625 // block = 0; 01626 // 01627 //if (block) { 01628 // std::vector<int> ids; 01629 // get_global_ids( iFace, ptr, len, flags, ids ); 01630 //} 01631 01632 if (len && !(flags & MESHSET_ORDERED)) { 01633 tmp.clear(); 01634 bool blocked = false; 01635 assert( 0 == len % 2 ); 01636 rval = range_to_blocked_list( ptr, len/2, tmp, blocked ); 01637 if (blocked) 01638 flags |= mhdf_SET_RANGE_BIT; 01639 } 01640 else { 01641 tmp.resize( len ); 01642 rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); 01643 tmp.resize( newlen ); 01644 } 01645 CHECK_MB(rval); 01646 01647 buffer[0] = flags; 01648 buffer[1] = tmp.size(); 01649 if (tmp.size() <= buffer_size) 01650 std::copy( tmp.begin(), tmp.end(), buffer + 4 ); 01651 01652 rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::CHILDREN, &len ); 01653 CHECK_MB(rval); 01654 tmp.resize( len ); 01655 rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); 01656 tmp.resize( newlen ); 01657 buffer[2] = tmp.size(); 01658 if (tmp.size() <= buffer_size - buffer[1]) 01659 std::copy( tmp.begin(), tmp.end(), buffer + 4 + buffer[1] ); 01660 01661 rval = writeUtil->get_entity_list_pointers( it, nd, &ptr, WriteUtilIface::PARENTS, &len ); 01662 CHECK_MB(rval); 01663 tmp.resize( len ); 01664 rval = vector_to_id_list( ptr, len, &tmp[0], newlen, true ); 01665 tmp.resize( newlen ); 01666 buffer[3] = tmp.size(); 01667 if (tmp.size() <= buffer_size - buffer[1] - buffer[2]) 01668 std::copy( tmp.begin(), tmp.end(), buffer + 4 + buffer[1] + buffer[2]); 01669 01670 return MB_SUCCESS; 01671 } 01672 01673 template<typename TYPE> 01674 static void convert_to_ranged_ids( const TYPE* buffer, 01675 size_t len, 01676 std::vector<WriteHDF5::id_t>& result ) 01677 { 01678 if (!len) { 01679 result.clear(); 01680 return; 01681 } 01682 01683 result.resize( len*2 ); 01684 Range tmp; 01685 for (size_t i=0; i<len; i++) 01686 tmp.insert( (EntityHandle)buffer[i]); 01687 result.resize(tmp.psize()*2); 01688 int j=0; 01689 for (Range::const_pair_iterator pit=tmp.const_pair_begin(); 01690 pit!=tmp.const_pair_end(); pit++, j++) 01691 { 01692 result[2*j]=pit->first; 01693 result[2*j+1]=pit->second-pit->first+1; 01694 } 01695 } 01696 01697 static void merge_ranged_ids( const unsigned long* range_list, 01698 size_t len, 01699 std::vector<WriteHDF5::id_t>& result ) 01700 { 01701 typedef WriteHDF5::id_t id_t; 01702 assert(0 == len%2); 01703 assert(0 == result.size()%2); 01704 STATIC_ASSERT(sizeof(std::pair<id_t,id_t>) == 2*sizeof(id_t)); 01705 01706 result.insert( result.end(), range_list, range_list+len ); 01707 size_t plen = result.size()/2; 01708 Range tmp; 01709 for (size_t i=0; i<plen; i++) 01710 { 01711 EntityHandle starth=(EntityHandle)result[2*i]; 01712 EntityHandle endh=(EntityHandle)result[2*i]+(id_t)result[2*i+1]-1; // id+count-1 01713 tmp.insert( starth, endh); 01714 } 01715 // now convert back to std::vector<WriteHDF5::id_t>, compressed range format 01716 result.resize(tmp.psize()*2); 01717 int j=0; 01718 for (Range::const_pair_iterator pit=tmp.const_pair_begin(); 01719 pit!=tmp.const_pair_end(); pit++, j++) 01720 { 01721 result[2*j]=pit->first; 01722 result[2*j+1]=pit->second-pit->first+1; 01723 } 01724 } 01725 01726 static void merge_vector_ids( const unsigned long* list, 01727 size_t len, 01728 std::vector<WriteHDF5::id_t>& result ) 01729 { 01730 result.insert( result.end(), list, list+len ); 01731 } 01732 01733 ErrorCode WriteHDF5Parallel::unpack_set( EntityHandle set, 01734 const unsigned long* buffer, 01735 size_t buffer_size ) 01736 { 01737 // use local variables for readability 01738 assert(buffer_size >= 4); 01739 assert(buffer[1]+buffer[2]+buffer[3] <= buffer_size); 01740 const unsigned long flags = buffer[0]; 01741 unsigned long num_content = buffer[1]; 01742 const unsigned long num_child = buffer[2]; 01743 const unsigned long num_parent = buffer[3]; 01744 const unsigned long* contents = buffer + 4; 01745 const unsigned long* children = contents + num_content; 01746 const unsigned long* parents = children + num_child; 01747 01748 SpecialSetData* data = find_set_data( set ); assert(!!data); 01749 if (!data) return MB_FAILURE; 01750 01751 //Tag mattag; 01752 //iFace->tag_get_handle( MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag ); 01753 //int block; 01754 //if (MB_SUCCESS != iFace->tag_get_data( mattag, &set, 1, &block )) 01755 // block = 0; 01756 01757 // If either the current data or the new data is in ranged format, 01758 // then change the other to ranged format if it isn't already 01759 // in both cases when they differ, the data will end up "compressed range" 01760 std::vector<id_t> tmp; 01761 if ((flags & mhdf_SET_RANGE_BIT) != (data->setFlags & mhdf_SET_RANGE_BIT)) { 01762 if (flags & mhdf_SET_RANGE_BIT) { 01763 tmp = data->contentIds; 01764 convert_to_ranged_ids( &tmp[0], tmp.size(), data->contentIds ); 01765 data->setFlags |= mhdf_SET_RANGE_BIT; 01766 } 01767 else { 01768 tmp.clear(); 01769 convert_to_ranged_ids( contents, num_content, tmp ); 01770 num_content = tmp.size(); 01771 if (sizeof(id_t) < sizeof(long)) { 01772 size_t old_size = tmp.size(); 01773 tmp.resize( sizeof(long) * old_size / sizeof(id_t) ); 01774 unsigned long* array = reinterpret_cast<unsigned long*>(&tmp[0]); 01775 for (long i = ((long)old_size)-1; i >= 0; --i) 01776 array[i] = tmp[i]; 01777 contents = array; 01778 } 01779 else if (sizeof(id_t) > sizeof(long)) { 01780 unsigned long* array = reinterpret_cast<unsigned long*>(&tmp[0]); 01781 std::copy( tmp.begin(), tmp.end(), array ); 01782 } 01783 contents = reinterpret_cast<unsigned long*>(&tmp[0]); 01784 } 01785 } 01786 01787 if (data->setFlags & mhdf_SET_RANGE_BIT) 01788 merge_ranged_ids( contents, num_content, data->contentIds ); 01789 else 01790 merge_vector_ids( contents, num_content, data->contentIds ); 01791 01792 merge_vector_ids( children, num_child, data->childIds ); 01793 merge_vector_ids( parents, num_parent, data->parentIds ); 01794 return MB_SUCCESS; 01795 } 01796 01797 ErrorCode WriteHDF5Parallel::communicate_shared_set_data( const Range& owned, 01798 const Range& remote ) 01799 { 01800 ErrorCode rval; 01801 int mperr; 01802 const unsigned rank = myPcomm->proc_config().proc_rank(); 01803 const MPI_Comm comm = myPcomm->proc_config().proc_comm(); 01804 01805 dbgOut.tprintf(1,"COMMUNICATING SHARED SET DATA (%lu owned & %lu remote)\n", 01806 (unsigned long)owned.size(), (unsigned long)remote.size() ); 01807 01808 // Calculate the total number of messages to be in transit (send and receive) 01809 size_t nummess = 0; 01810 std::vector<unsigned> procs;; 01811 Range shared(owned); 01812 shared.merge(remote); 01813 for (Range::iterator i = shared.begin(); i != shared.end(); ++i) { 01814 procs.clear(); 01815 rval = myPcomm->get_entityset_procs( *i, procs ); CHECK_MB(rval); 01816 nummess += procs.size(); 01817 } 01818 01819 01820 // Choose a receive buffer size. We need 4*sizeof(long) minimum, 01821 // but that is almost useless so use 16*sizeof(long) as the minimum 01822 // instead. Choose an upper limit such that we don't exceed 32 MB 01823 // of allocated memory (unless we absolutely must to meet the minimum.) 01824 // Also, don't initially choose buffers larger than 128*sizeof(long). 01825 const size_t MAX_BUFFER_MEM = 32*1024*1024 / sizeof(long); 01826 //const size_t INIT_BUFFER_SIZE = 128; 01827 const size_t INIT_BUFFER_SIZE = 1024; 01828 const size_t MIN_BUFFER_SIZE = 16; 01829 size_t init_buff_size = INIT_BUFFER_SIZE; 01830 if (init_buff_size * nummess > MAX_BUFFER_MEM) 01831 init_buff_size = MAX_BUFFER_MEM / nummess; 01832 if (init_buff_size < MIN_BUFFER_SIZE) 01833 init_buff_size = MIN_BUFFER_SIZE; 01834 01835 dbgOut.printf(2,"Using buffer size of %lu for an expected message count of %lu\n", 01836 (unsigned long)init_buff_size, (unsigned long)nummess); 01837 01838 // count number of recvs 01839 size_t numrecv = 0; 01840 for (Range::iterator i = owned.begin(); i != owned.end(); ++i) { 01841 procs.clear(); 01842 rval = myPcomm->get_entityset_procs( *i, procs ); CHECK_MB(rval); 01843 numrecv += procs.size(); 01844 if (std::find(procs.begin(),procs.end(),rank) != procs.end()) 01845 --numrecv; 01846 } 01847 01848 01849 // post receive buffers for all owned sets for all sharing procs 01850 std::vector<MPI_Request> recv_req(numrecv, MPI_REQUEST_NULL); 01851 std::vector<MPI_Request> lrecv_req(numrecv, MPI_REQUEST_NULL); 01852 01853 std::vector< std::vector<unsigned long> > recv_buf(numrecv,std::vector<unsigned long>(init_buff_size)); 01854 int idx = 0; 01855 for (Range::iterator i = owned.begin(); i != owned.end(); ++i) { 01856 procs.clear(); 01857 rval = myPcomm->get_entityset_procs( *i, procs ); CHECK_MB(rval); 01858 for (size_t j = 0; j < procs.size(); ++j) { 01859 if (procs[j] == rank) 01860 continue; 01861 int tag = ID_FROM_HANDLE(*i); 01862 if (*i != CREATE_HANDLE(MBENTITYSET,tag)) { 01863 #ifndef NDEBUG 01864 abort(); 01865 #endif 01866 CHECK_MB(MB_FAILURE); 01867 } 01868 dbgOut.printf(5,"Posting buffer to receive set %d from proc %u\n", tag, procs[j] ); 01869 mperr = MPI_Irecv( &recv_buf[idx][0], init_buff_size, MPI_UNSIGNED_LONG, 01870 procs[j], tag, comm, &recv_req[idx] ); 01871 CHECK_MPI(mperr); 01872 ++idx; 01873 } 01874 } 01875 assert( (size_t)idx == numrecv ); 01876 01877 // now send set data for all remote sets that I know about 01878 std::vector<MPI_Request> send_req(remote.size()); 01879 std::vector< std::vector<unsigned long> > send_buf(remote.size()); 01880 idx = 0; 01881 for (Range::iterator i = remote.begin(); i != remote.end(); ++i, ++idx) { 01882 send_buf[idx].resize( init_buff_size ); 01883 rval = pack_set( i, &send_buf[idx][0], init_buff_size ); CHECK_MB(rval); 01884 EntityHandle remote_handle; 01885 unsigned owner; 01886 rval = myPcomm->get_entityset_owner( *i, owner, &remote_handle ); CHECK_MB(rval); 01887 01888 int tag = ID_FROM_HANDLE(remote_handle); 01889 assert(remote_handle == CREATE_HANDLE(MBENTITYSET,tag)); 01890 dbgOut.printf(5,"Sending %lu values for set %d to proc %u\n", 01891 send_buf[idx][1]+send_buf[idx][2]+send_buf[idx][3]+4, tag, owner ); 01892 mperr = MPI_Isend( &send_buf[idx][0], init_buff_size, MPI_UNSIGNED_LONG, 01893 owner, tag, comm, &send_req[idx] ); 01894 CHECK_MPI(mperr); 01895 } 01896 01897 //Tag mattag; 01898 //iFace->tag_get_handle( MATERIAL_SET_TAG_NAME, 1, MB_TYPE_INTEGER, mattag ); 01899 01900 // now initialize local data for managing contents of owned, shared sets 01901 assert(specialSets.empty()); 01902 specialSets.clear(); 01903 specialSets.reserve( owned.size() ); 01904 for (Range::iterator i = owned.begin(); i != owned.end(); ++i) { 01905 //int block; 01906 //if (MB_SUCCESS != iFace->tag_get_data( mattag, &*i, 1, &block )) 01907 // block = 0; 01908 //std::vector<int> ids; 01909 01910 SpecialSetData data; 01911 data.setHandle = *i; 01912 rval = iFace->get_meshset_options( *i, data.setFlags ); 01913 CHECK_MB(rval); 01914 specialSets.push_back( data ); 01915 std::vector<EntityHandle> list; 01916 if (data.setFlags & MESHSET_ORDERED) { 01917 list.clear(); 01918 rval = iFace->get_entities_by_handle( *i, list ); 01919 CHECK_MB(rval); 01920 rval = vector_to_id_list( list, specialSets.back().contentIds, true ); 01921 CHECK_MB(rval); 01922 //if (block) get_global_ids( iFace, &list[0], list.size(), MESHSET_ORDERED, ids ); 01923 } 01924 else { 01925 Range range; 01926 rval = iFace->get_entities_by_handle( *i, range ); 01927 CHECK_MB(rval); 01928 bool ranged; 01929 rval = range_to_blocked_list( range, specialSets.back().contentIds, ranged ); 01930 if (ranged) 01931 specialSets.back().setFlags |= mhdf_SET_RANGE_BIT; 01932 //if (block) { 01933 // std::vector<EntityHandle> tmp; 01934 // for (Range::const_pair_iterator pi = range.const_pair_begin(); pi != range.const_pair_end(); ++pi) { 01935 // tmp.push_back( pi->first ); 01936 // tmp.push_back( pi->second ); 01937 // } 01938 // get_global_ids( iFace, &tmp[0], tmp.size(), ranged ? 0 : MESHSET_ORDERED, ids ); 01939 //} 01940 } 01941 01942 list.clear(); 01943 rval = iFace->get_parent_meshsets( *i, list ); 01944 CHECK_MB(rval); 01945 rval = vector_to_id_list( list, specialSets.back().parentIds, true ); 01946 CHECK_MB(rval); 01947 rval = iFace->get_child_meshsets( *i, list ); 01948 CHECK_MB(rval); 01949 rval = vector_to_id_list( list, specialSets.back().childIds, true ); 01950 CHECK_MB(rval); 01951 } 01952 01953 // process received buffers, repost larger buffers where necessary 01954 size_t remaining = numrecv; 01955 numrecv = 0; 01956 while (remaining--) { 01957 std::vector<unsigned long> dead; 01958 MPI_Status status; 01959 mperr = MPI_Waitany( recv_req.size(), &recv_req[0], &idx, &status ); 01960 CHECK_MPI(mperr); 01961 EntityHandle handle = CREATE_HANDLE( MBENTITYSET, status.MPI_TAG ); 01962 std::vector<unsigned long>& buff = recv_buf[idx]; 01963 size_t size = buff[1]+buff[2]+buff[3]+4; 01964 dbgOut.printf(5,"Received %lu values for set %d from proc %d\n", 01965 (unsigned long)size, status.MPI_TAG, status.MPI_SOURCE); 01966 if (size <= init_buff_size) { 01967 rval = unpack_set( handle, &buff[0], init_buff_size ); 01968 CHECK_MB(rval); 01969 dead.swap(buff); // release memory 01970 } 01971 else { 01972 // data was too big for init_buff_size 01973 // repost with larger buffer 01974 buff.resize( size ); 01975 dbgOut.printf(5,"Re-Posting buffer to receive set %d from proc %d with size %lu\n", 01976 status.MPI_TAG, status.MPI_SOURCE, (unsigned long)size ); 01977 mperr = MPI_Irecv( &buff[0], size, MPI_UNSIGNED_LONG, status.MPI_SOURCE, 01978 status.MPI_TAG, comm, &lrecv_req[idx] ); 01979 CHECK_MPI(mperr); 01980 ++numrecv; 01981 } 01982 recv_req[idx] = MPI_REQUEST_NULL; 01983 } 01984 01985 01986 // wait for sends to complete 01987 MPI_Waitall( send_req.size(), &send_req[0], MPI_STATUSES_IGNORE ); 01988 01989 // re-send sets that didn't fit initial buffer size 01990 idx = 0; 01991 for (Range::iterator i = remote.begin(); i != remote.end(); ++i, ++idx) { 01992 std::vector<unsigned long>& buff = send_buf[idx]; 01993 size_t size = buff[1]+buff[2]+buff[3]+4; 01994 if (size <= init_buff_size) 01995 continue; 01996 01997 buff.resize( size ); 01998 rval = pack_set( i, &buff[0], size ); CHECK_MB(rval); 01999 EntityHandle remote_handle; 02000 unsigned owner; 02001 rval = myPcomm->get_entityset_owner( *i, owner, &remote_handle ); CHECK_MB(rval); 02002 02003 int tag = ID_FROM_HANDLE(remote_handle); 02004 assert(remote_handle == CREATE_HANDLE(MBENTITYSET,tag)); 02005 dbgOut.printf(5,"Sending %lu values for set %d to proc %u\n", 02006 (unsigned long)size, tag, owner ); 02007 mperr = MPI_Isend( &buff[0], size, MPI_UNSIGNED_LONG, 02008 owner, tag, comm, &send_req[idx] ); 02009 CHECK_MPI(mperr); 02010 } 02011 02012 // process received buffers 02013 remaining = numrecv; 02014 while (remaining--) { 02015 std::vector<unsigned long> dead; 02016 MPI_Status status; 02017 mperr = MPI_Waitany( lrecv_req.size(), &lrecv_req[0], &idx, &status ); 02018 CHECK_MPI(mperr); 02019 EntityHandle handle = CREATE_HANDLE( MBENTITYSET, status.MPI_TAG ); 02020 std::vector<unsigned long>& buff = recv_buf[idx]; 02021 dbgOut.printf(5,"Received %lu values for set %d from proc %d\n", 02022 4+buff[1]+buff[2]+buff[3], status.MPI_TAG, status.MPI_SOURCE); 02023 rval = unpack_set( handle, &buff[0], buff.size() ); 02024 CHECK_MB(rval); 02025 dead.swap(buff); // release memory 02026 02027 lrecv_req[idx] = MPI_REQUEST_NULL; 02028 } 02029 02030 // wait for sends to complete 02031 MPI_Waitall( send_req.size(), &send_req[0], MPI_STATUSES_IGNORE ); 02032 02033 return MB_SUCCESS; 02034 } 02035 02036 02037 ErrorCode WriteHDF5Parallel::create_meshset_tables(double* times) 02038 { 02039 ErrorCode rval = MB_SUCCESS; 02040 Range::const_iterator riter; 02041 const unsigned rank = myPcomm->proc_config().proc_rank(); 02042 02043 START_SERIAL; 02044 print_type_sets( iFace, &dbgOut, setSet.range ); 02045 END_SERIAL; 02046 CpuTimer timer; 02047 02048 // remove remote sets from setSets 02049 Range shared, owned, remote; 02050 rval = myPcomm->get_shared_sets( shared ); CHECK_MB(rval); 02051 shared = intersect( shared, setSet.range ); 02052 rval = myPcomm->get_owned_sets( rank, owned ); CHECK_MB(rval); 02053 owned = intersect( owned, setSet.range ); 02054 remote = subtract( shared, owned ); 02055 setSet.range = subtract( setSet.range, remote ); 02056 02057 // create set meta table 02058 struct SetDescCreator : public DataSetCreator { 02059 ErrorCode operator()( WriteHDF5* writer, long size, const ExportSet*, long& start_id ) const 02060 { return writer->create_set_meta( size, start_id ); } 02061 }; 02062 long count = setSet.range.size(); 02063 rval = create_dataset( 1, &count, 02064 &setSet.offset, 02065 &setSet.max_num_ents, 02066 &setSet.total_num_ents, 02067 SetDescCreator(), NULL, 02068 &setSet.first_id ); 02069 CHECK_MB(rval); 02070 writeSets = setSet.max_num_ents > 0; 02071 02072 rval = assign_ids( setSet.range, setSet.first_id + setSet.offset ); 02073 CHECK_MB(rval); 02074 if (times) times[SET_OFFSET_TIME] = timer.elapsed(); 02075 02076 // exchange file IDS for sets between all procs 02077 rval = communicate_shared_set_ids( owned, remote ); CHECK_MB(rval); 02078 if (times) times[SHARED_SET_IDS] = timer.elapsed(); 02079 02080 // communicate remote set contents,children,etc. 02081 rval = communicate_shared_set_data( owned, remote ); CHECK_MB(rval); 02082 if (times) times[SHARED_SET_CONTENTS] = timer.elapsed(); 02083 02084 // Communicate counts for owned sets 02085 long data_counts[3]; // { #contents, #children, #parents } 02086 rval = count_set_size( setSet.range, data_counts[0], data_counts[1], data_counts[2] ); 02087 CHECK_MB(rval); 02088 if (times) times[SET_OFFSET_TIME] += timer.elapsed(); 02089 02090 long offsets[3], max_counts[3], totals[3]; 02091 rval = create_dataset( 3, data_counts, offsets, max_counts, totals ); 02092 CHECK_MB(rval); 02093 02094 // create the datasets 02095 if (0 == myPcomm->proc_config().proc_rank()) { 02096 rval = create_set_tables( totals[0], totals[1], totals[2] ); 02097 CHECK_MB(rval); 02098 } 02099 02100 // Store communicated global data 02101 setContentsOffset = offsets[0]; 02102 setChildrenOffset = offsets[1]; 02103 setParentsOffset = offsets[2]; 02104 maxNumSetContents = max_counts[0]; 02105 maxNumSetChildren = max_counts[1]; 02106 maxNumSetParents = max_counts[2]; 02107 writeSetContents = totals[0] > 0; 02108 writeSetChildren = totals[1] > 0; 02109 writeSetParents = totals[2] > 0; 02110 02111 dbgOut.printf(2,"set contents: %ld local, %ld global, offset = %ld\n", 02112 data_counts[0], totals[0], offsets[0] ); 02113 dbgOut.printf(2,"set children: %ld local, %ld global, offset = %ld\n", 02114 data_counts[1], totals[1], offsets[1] ); 02115 dbgOut.printf(2,"set parents: %ld local, %ld global, offset = %ld\n", 02116 data_counts[2], totals[2], offsets[2] ); 02117 02118 return MB_SUCCESS; 02119 } 02120 02121 void WriteHDF5Parallel::remove_remote_entities( EntityHandle relative, 02122 Range& range ) 02123 { 02124 Range result; 02125 result.merge( intersect( range, nodeSet.range ) ); 02126 result.merge( intersect( range, setSet.range ) ); 02127 for (std::list<ExportSet>::iterator eiter = exportList.begin(); 02128 eiter != exportList.end(); ++eiter ) 02129 { 02130 result.merge( intersect( range, eiter->range ) ); 02131 } 02132 //result.merge( intersect( range, myParallelSets ) ); 02133 Range sets; 02134 int junk; 02135 sets.merge( Range::lower_bound( range.begin(), range.end(), CREATE_HANDLE(MBENTITYSET, 0, junk )), range.end() ); 02136 remove_remote_sets( relative, sets ); 02137 result.merge( sets ); 02138 range.swap(result); 02139 } 02140 02141 void WriteHDF5Parallel::remove_remote_sets( EntityHandle /* relative */, 02142 Range& range ) 02143 { 02144 Range result( intersect( range, setSet.range ) ); 02145 // Store the non-intersecting entities separately if needed 02146 // Range remaining( subtract( range, result ) ); 02147 range.swap( result ); 02148 } 02149 02150 02151 02152 void WriteHDF5Parallel::remove_remote_entities( EntityHandle relative, 02153 std::vector<EntityHandle>& vect ) 02154 { 02155 Range intrsct; 02156 for (std::vector<EntityHandle>::const_iterator iter = vect.begin(); 02157 iter != vect.end(); ++iter) 02158 intrsct.insert(*iter); 02159 remove_remote_entities( relative, intrsct ); 02160 02161 unsigned int read, write; 02162 for (read = write = 0; read < vect.size(); ++read) 02163 { 02164 if (intrsct.find(vect[read]) != intrsct.end()) 02165 { 02166 if (read != write) 02167 vect[write] = vect[read]; 02168 ++write; 02169 } 02170 } 02171 if (write != vect.size()) 02172 vect.resize(write); 02173 } 02174 02175 02176 02177 void WriteHDF5Parallel::remove_remote_sets( EntityHandle relative, 02178 std::vector<EntityHandle>& vect ) 02179 { 02180 Range intrsct; 02181 for (std::vector<EntityHandle>::const_iterator iter = vect.begin(); 02182 iter != vect.end(); ++iter) 02183 intrsct.insert(*iter); 02184 remove_remote_sets( relative, intrsct ); 02185 02186 unsigned int read, write; 02187 for (read = write = 0; read < vect.size(); ++read) 02188 { 02189 if (intrsct.find(vect[read]) != intrsct.end()) 02190 { 02191 if (read != write) 02192 vect[write] = vect[read]; 02193 ++write; 02194 } 02195 } 02196 if (write != vect.size()) 02197 vect.resize(write); 02198 } 02199 02200 ErrorCode WriteHDF5Parallel::exchange_file_ids( const Range& nonlocal ) 02201 { 02202 ErrorCode rval; 02203 02204 // For each entity owned on the interface, write its file id to 02205 // a tag. The sets of entities to be written should already contain 02206 // only owned entities so by intersecting with them we not only 02207 // filter by entities to be written, but also restrict to entities 02208 // owned by the proc 02209 02210 // Get list of interface entities 02211 Range imesh, tmp; 02212 for (std::list<ExportSet>::reverse_iterator i = exportList.rbegin(); 02213 i != exportList.rend(); ++i) { 02214 tmp.clear(); 02215 rval = myPcomm->filter_pstatus( i->range, PSTATUS_SHARED, PSTATUS_AND, -1, &tmp ); 02216 if (MB_SUCCESS != rval) return error(rval); 02217 imesh.merge(tmp); 02218 } 02219 tmp.clear(); 02220 rval = myPcomm->filter_pstatus( nodeSet.range, PSTATUS_SHARED, PSTATUS_AND, -1, &tmp ); 02221 if (MB_SUCCESS != rval) return error(rval); 02222 imesh.merge(tmp); 02223 02224 02225 // create tag to store file IDs 02226 EntityHandle default_val = 0; 02227 Tag file_id_tag = 0; 02228 rval = iFace->tag_get_handle( "__hdf5_ll_fileid", 02229 1, MB_TYPE_HANDLE, 02230 file_id_tag, 02231 MB_TAG_DENSE|MB_TAG_CREAT, 02232 &default_val ); 02233 if (MB_SUCCESS != rval) 02234 return error(rval); 02235 02236 02237 // copy file IDs into tag 02238 std::vector<EntityHandle> file_id_vect( imesh.size() ); 02239 Range::const_iterator i; 02240 std::vector<EntityHandle>::iterator j = file_id_vect.begin(); 02241 for (i = imesh.begin(); i != imesh.end(); ++i, ++j) { 02242 *j = idMap.find( *i ); 02243 if (!*j) { 02244 iFace->tag_delete( file_id_tag ); 02245 return error(MB_FAILURE); 02246 } 02247 } 02248 rval = iFace->tag_set_data( file_id_tag, imesh, &file_id_vect[0] ); 02249 if (MB_SUCCESS != rval) { 02250 iFace->tag_delete( file_id_tag ); 02251 return error(rval); 02252 } 02253 02254 02255 // do communication 02256 rval = myPcomm->exchange_tags( file_id_tag, imesh ); 02257 if (MB_SUCCESS != rval) { 02258 iFace->tag_delete( file_id_tag ); 02259 return error(rval); 02260 } 02261 02262 // copy file IDs from tag into idMap for remote entities 02263 file_id_vect.resize( nonlocal.size() ); 02264 rval = iFace->tag_get_data( file_id_tag, nonlocal, &file_id_vect[0] ); 02265 if (MB_SUCCESS != rval) { 02266 iFace->tag_delete( file_id_tag ); 02267 return error(rval); 02268 } 02269 02270 j = file_id_vect.begin(); 02271 for (i = nonlocal.begin(); i != nonlocal.end(); ++i, ++j) { 02272 if (*j == 0) { 02273 int owner = -1; 02274 myPcomm->get_owner( *i, owner ); 02275 const char* name = CN::EntityTypeName(TYPE_FROM_HANDLE(*i)); 02276 int id = ID_FROM_HANDLE(*i); 02277 writeUtil->report_error( "Process %u did not receive valid id handle " 02278 "for shared %s %d owned by process %d", 02279 myPcomm->proc_config().proc_rank(), 02280 name, id, owner ); 02281 dbgOut.printf(1,"Did not receive valid remote id for " 02282 "shared %s %d owned by process %d", 02283 name, id, owner ); 02284 iFace->tag_delete( file_id_tag ); 02285 return error(MB_FAILURE); 02286 } 02287 else { 02288 if (!idMap.insert( *i, *j, 1 ).second) { 02289 iFace->tag_delete( file_id_tag ); 02290 return error(MB_FAILURE); 02291 } 02292 } 02293 } 02294 02295 #ifndef NDEBUG 02296 // check that writer is correct with regards to which entities 02297 // that it owns by verifying that the file ids that we thought 02298 // we were sending where not received instead 02299 file_id_vect.resize( imesh.size() ); 02300 rval = iFace->tag_get_data( file_id_tag, imesh, &file_id_vect[0] ); 02301 if (MB_SUCCESS != rval) { 02302 iFace->tag_delete( file_id_tag ); 02303 return error(rval); 02304 } 02305 int invalid_count = 0; 02306 j = file_id_vect.begin(); 02307 for (i = imesh.begin(); i != imesh.end(); ++i, ++j) { 02308 EntityHandle h = idMap.find(*i); 02309 if (*j != h) { 02310 ++invalid_count; 02311 dbgOut.printf(1,"Conflicting owneship for %s %ld\n", 02312 CN::EntityTypeName(TYPE_FROM_HANDLE(*i)), 02313 (long)ID_FROM_HANDLE(*i)); 02314 } 02315 } 02316 if (invalid_count) { 02317 writeUtil->report_error("%d entities with conflicting ownership found " 02318 "by process %u. This will result in duplicate " 02319 "entities written to file.\n", 02320 invalid_count, myPcomm->proc_config().proc_rank() ); 02321 iFace->tag_delete( file_id_tag ); 02322 return error(MB_FAILURE); 02323 } 02324 #endif 02325 02326 return iFace->tag_delete( file_id_tag ); 02327 } 02328 02329 02330 void WriteHDF5Parallel::print_times( const double* times ) const 02331 { 02332 if (!myPcomm) { 02333 WriteHDF5::print_times( times ); 02334 } 02335 else { 02336 double recv[NUM_TIMES]; 02337 MPI_Reduce( (void*)times, recv, NUM_TIMES, MPI_DOUBLE, MPI_MAX, 0, myPcomm->proc_config().proc_comm() ); 02338 if (0 == myPcomm->proc_config().proc_rank()) 02339 WriteHDF5::print_times( recv ); 02340 } 02341 } 02342 02343 02344 } // namespace moab