2 This source code file is part of thread_mpi.
3 Written by Sander Pronk, Erik Lindahl, and possibly others.
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11 notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13 notice, this list of conditions and the following disclaimer in the
14 documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16 names of its contributors may be used to endorse or promote products
17 derived from this software without specific prior written permission.
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
38 /* this file is #included from p2p.c;it's not really a header file,
39 but this defines a lot of functions that probably need to be inlined.*/
43 int tMPI_Wait(tMPI_Request *request, tMPI_Status *status)
46 struct tmpi_thread *cur=tMPI_Get_current();
47 struct req_list *rql=&(cur->rql);
51 tMPI_Profile_count_start(cur);
54 tMPI_Trace_print("tMPI_Wait(%p, %p)", request, status);
56 if (!request || !(*request))
60 /* fix the pointers */
64 /* and wait for our request */
67 if (tMPI_Test_single(cur, rq))
69 tMPI_Wait_process_incoming(cur);
72 rq->ev=NULL; /* we won't be using that envelope any more */
75 tMPI_Set_status(rq, status);
78 tMPI_Return_req(rql, *request);
81 tMPI_Profile_count_stop(cur, TMPIFN_Wait);
86 int tMPI_Test(tMPI_Request *request, int *flag, tMPI_Status *status)
89 struct tmpi_thread *cur=tMPI_Get_current();
90 struct req_list *rql=&(cur->rql);
94 tMPI_Profile_count_start(cur);
97 tMPI_Trace_print("tMPI_Test(%p, %p, %p)", request, flag, status);
99 if (!request || !(*request))
103 /* fix the pointers */
107 /* and check our request */
108 if (tMPI_Test_single(cur, rq))
115 tMPI_Set_status(rq, status);
117 tMPI_Return_req(rql, *request);
118 *request=TMPI_REQUEST_NULL;
122 tMPI_Profile_count_stop(cur, TMPIFN_Test);
142 /* test multiple requests by first making a linked list of them, and then
143 checking for their completion. Used in tMPI_{Test|Wait}{all|any|some}
145 wait = whether to wait for incoming events
146 blocking = whether to block until all reqs are completed */
147 static void tMPI_Test_multi_req(struct tmpi_thread *cur,
148 int count, tMPI_Request *array_of_requests,
149 bool wait, bool blocking)
152 struct tmpi_req_ *first=NULL, *last=NULL;
154 /* construct the list of requests */
157 struct tmpi_req_ *curr=array_of_requests[i];
162 /* fix the pointers */
165 /* we connect to itself */
172 /* we connect to the last */
180 /* and wait for our request */
183 if (tMPI_Test_multi(cur, first, NULL))
186 tMPI_Wait_process_incoming(cur);
187 } while(blocking && wait);
192 int tMPI_Waitall(int count, tMPI_Request *array_of_requests,
193 tMPI_Status *array_of_statuses)
196 int ret=TMPI_SUCCESS;
197 struct tmpi_thread *cur=tMPI_Get_current();
198 struct req_list *rql=&(cur->rql);
200 tMPI_Profile_count_start(cur);
203 tMPI_Trace_print("tMPI_Waitall(%d, %p, %p)", count, array_of_requests,
206 tMPI_Test_multi_req(cur, count, array_of_requests, TRUE, TRUE);
208 /* deallocate the now finished requests */
211 if (array_of_requests[i])
213 if (array_of_statuses)
215 tMPI_Set_status(array_of_requests[i], &(array_of_statuses[i]));
217 if (array_of_requests[i]->error != TMPI_SUCCESS)
219 ret=TMPI_ERR_IN_STATUS;
221 tMPI_Return_req(rql, array_of_requests[i]);
222 array_of_requests[i]=TMPI_REQUEST_NULL;
227 tMPI_Profile_count_stop(cur, TMPIFN_Waitall);
232 int tMPI_Testall(int count, tMPI_Request *array_of_requests,
233 int *flag, tMPI_Status *array_of_statuses)
236 int ret=TMPI_SUCCESS;
237 struct tmpi_thread *cur=tMPI_Get_current();
238 struct req_list *rql=&(cur->rql);
240 tMPI_Profile_count_start(cur);
243 tMPI_Trace_print("tMPI_Testall(%d, %p, %p, %p)", count, array_of_requests,
244 flag, array_of_statuses);
246 tMPI_Test_multi_req(cur, count, array_of_requests, FALSE, TRUE);
250 /* deallocate the possibly finished requests */
253 if (array_of_requests[i] && array_of_requests[i]->finished)
256 if (array_of_statuses)
258 tMPI_Set_status(array_of_requests[i], &(array_of_statuses[i]));
260 if (array_of_requests[i]->error != TMPI_SUCCESS)
262 ret=TMPI_ERR_IN_STATUS;
264 tMPI_Return_req(rql, array_of_requests[i]);
265 array_of_requests[i]=TMPI_REQUEST_NULL;
276 tMPI_Profile_count_stop(cur, TMPIFN_Testall);
282 int tMPI_Waitany(int count, tMPI_Request *array_of_requests, int *index,
286 int ret=TMPI_SUCCESS;
287 struct tmpi_thread *cur=tMPI_Get_current();
288 struct req_list *rql=&(cur->rql);
290 tMPI_Profile_count_start(cur);
293 tMPI_Trace_print("tMPI_Waitany(%d, %p, %p, %p)", count, array_of_requests,
297 tMPI_Test_multi_req(cur, count, array_of_requests, TRUE, FALSE);
299 /* deallocate the possibly finished requests */
302 if (array_of_requests[i] && array_of_requests[i]->finished)
304 tMPI_Set_status(array_of_requests[i], status);
307 if (array_of_requests[i]->error != TMPI_SUCCESS)
309 ret=TMPI_ERR_IN_STATUS;
311 tMPI_Return_req(rql, array_of_requests[i]);
312 array_of_requests[i]=TMPI_REQUEST_NULL;
313 /* and we only need one */
319 tMPI_Profile_count_stop(cur, TMPIFN_Waitany);
326 int tMPI_Testany(int count, tMPI_Request *array_of_requests, int *index,
327 int *flag, tMPI_Status *status)
330 int ret=TMPI_SUCCESS;
331 struct tmpi_thread *cur=tMPI_Get_current();
332 struct req_list *rql=&(cur->rql);
334 tMPI_Profile_count_start(cur);
337 tMPI_Trace_print("tMPI_Testany(%d, %p, %p %p, %p)", count,
338 array_of_requests, flag, index, status);
341 tMPI_Test_multi_req(cur, count, array_of_requests, FALSE, FALSE);
346 *index=TMPI_UNDEFINED;
347 /* deallocate the possibly finished requests */
350 if (array_of_requests[i] && array_of_requests[i]->finished)
352 tMPI_Set_status(array_of_requests[i], status);
357 if (array_of_requests[i]->error != TMPI_SUCCESS)
359 ret=TMPI_ERR_IN_STATUS;
361 tMPI_Return_req(rql, array_of_requests[i]);
362 array_of_requests[i]=TMPI_REQUEST_NULL;
363 /* and we only need one */
369 tMPI_Profile_count_stop(cur, TMPIFN_Testany);
376 int tMPI_Waitsome(int incount, tMPI_Request *array_of_requests,
377 int *outcount, int *array_of_indices,
378 tMPI_Status *array_of_statuses)
381 int ret=TMPI_SUCCESS;
382 struct tmpi_thread *cur=tMPI_Get_current();
383 struct req_list *rql=&(cur->rql);
385 tMPI_Profile_count_start(cur);
388 tMPI_Trace_print("tMPI_Waitsome(%d, %p, %p, %p, %p)", incount,
389 array_of_requests, outcount, array_of_indices,
392 tMPI_Test_multi_req(cur, incount, array_of_requests, TRUE, FALSE);
395 /* deallocate the possibly finished requests */
396 for(i=0;i<incount;i++)
398 if (array_of_requests[i] && array_of_requests[i]->finished)
400 array_of_indices[*outcount]++;
402 if (array_of_statuses)
404 tMPI_Set_status(array_of_requests[i], &(array_of_statuses[i]));
406 if (array_of_requests[i]->error != TMPI_SUCCESS)
408 ret=TMPI_ERR_IN_STATUS;
410 tMPI_Return_req(rql, array_of_requests[i]);
411 array_of_requests[i]=TMPI_REQUEST_NULL;
416 tMPI_Profile_count_stop(cur, TMPIFN_Waitsome);
421 int tMPI_Testsome(int incount, tMPI_Request *array_of_requests,
422 int *outcount, int *array_of_indices,
423 tMPI_Status *array_of_statuses)
426 int ret=TMPI_SUCCESS;
427 struct tmpi_thread *cur=tMPI_Get_current();
428 struct req_list *rql=&(cur->rql);
430 tMPI_Profile_count_start(cur);
433 tMPI_Trace_print("tMPI_Testsome(%d, %p, %p, %p, %p)", incount,
434 array_of_requests, outcount, array_of_indices,
437 tMPI_Test_multi_req(cur, incount, array_of_requests, FALSE, TRUE);
440 /* deallocate the possibly finished requests */
441 for(i=0;i<incount;i++)
443 if (array_of_requests[i] && array_of_requests[i]->finished)
445 array_of_indices[*outcount]++;
447 if (array_of_statuses)
449 tMPI_Set_status(array_of_requests[i], &(array_of_statuses[i]));
451 if (array_of_requests[i]->error != TMPI_SUCCESS)
453 ret=TMPI_ERR_IN_STATUS;
455 tMPI_Return_req(rql, array_of_requests[i]);
456 array_of_requests[i]=TMPI_REQUEST_NULL;
462 tMPI_Profile_count_stop(cur, TMPIFN_Testsome);