7be038ed33ad0d18880f6cbc7c43d1384fdc2699
[alexxy/gromacs.git] / src / gmxlib / thread_mpi / p2p_wait.h
1 /*
2 This source code file is part of thread_mpi.  
3 Written by Sander Pronk, Erik Lindahl, and possibly others. 
4
5 Copyright (c) 2009, Sander Pronk, Erik Lindahl.
6 All rights reserved.
7
8 Redistribution and use in source and binary forms, with or without
9 modification, are permitted provided that the following conditions are met:
10 1) Redistributions of source code must retain the above copyright
11    notice, this list of conditions and the following disclaimer.
12 2) Redistributions in binary form must reproduce the above copyright
13    notice, this list of conditions and the following disclaimer in the
14    documentation and/or other materials provided with the distribution.
15 3) Neither the name of the copyright holders nor the
16    names of its contributors may be used to endorse or promote products
17    derived from this software without specific prior written permission.
18
19 THIS SOFTWARE IS PROVIDED BY US ''AS IS'' AND ANY
20 EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
21 WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22 DISCLAIMED. IN NO EVENT SHALL WE BE LIABLE FOR ANY
23 DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
24 (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
25 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
26 ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27 (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 If you want to redistribute modifications, please consider that
31 scientific software is very special. Version control is crucial -
32 bugs must be traceable. We will be happy to consider code for
33 inclusion in the official distribution, but derived work should not
34 be called official thread_mpi. Details are found in the README & COPYING
35 files.
36 */
37
38 /* this file is #included from p2p.c;it's not really a header file,
39    but this defines a lot of functions that probably need to be inlined.*/
40
41
42
43 int tMPI_Wait(tMPI_Request *request, tMPI_Status *status)
44 {
45     int ret=TMPI_SUCCESS;
46     struct tmpi_thread *cur=tMPI_Get_current();
47     struct req_list *rql=&(cur->rql);
48     struct tmpi_req_ *rq;
49
50 #ifdef TMPI_PROFILE
51     tMPI_Profile_count_start(cur);
52 #endif
53 #ifdef TMPI_TRACE
54     tMPI_Trace_print("tMPI_Wait(%p, %p)", request, status);
55 #endif
56     if (!request || !(*request))
57         return TMPI_SUCCESS;
58
59     rq=*request;
60     /* fix the pointers */
61     rq->next=rq;
62     rq->prev=rq;
63
64     /* and wait for our request */
65     do
66     {
67         if (tMPI_Test_single(cur, rq))
68             break;
69         tMPI_Wait_process_incoming(cur);
70     } while(TRUE);
71
72     rq->ev=NULL; /* we won't be using that envelope any more */
73     ret=rq->error;
74
75     tMPI_Set_status(rq, status);
76
77     /* deallocate */
78     tMPI_Return_req(rql, *request);
79
80 #ifdef TMPI_PROFILE
81     tMPI_Profile_count_stop(cur, TMPIFN_Wait);
82 #endif
83     return ret;
84 }
85
86 int tMPI_Test(tMPI_Request *request, int *flag, tMPI_Status *status)
87 {
88     int ret=TMPI_SUCCESS;
89     struct tmpi_thread *cur=tMPI_Get_current();
90     struct req_list *rql=&(cur->rql);
91     struct tmpi_req_ *rq;
92
93 #ifdef TMPI_PROFILE
94     tMPI_Profile_count_start(cur);
95 #endif
96 #ifdef TMPI_TRACE
97     tMPI_Trace_print("tMPI_Test(%p, %p, %p)", request, flag, status);
98 #endif
99     if (!request || !(*request))
100         return TMPI_SUCCESS;
101
102     rq=*request;
103     /* fix the pointers */
104     rq->next=rq;
105     rq->prev=rq;
106
107     /* and check our request */
108     if (tMPI_Test_single(cur, rq))
109         *flag=TRUE;
110
111     ret=rq->error;
112
113     if (rq->finished)
114     {
115         tMPI_Set_status(rq, status);
116         /* deallocate */
117         tMPI_Return_req(rql, *request);
118         *request=TMPI_REQUEST_NULL;
119     }
120
121 #ifdef TMPI_PROFILE
122     tMPI_Profile_count_stop(cur, TMPIFN_Test);
123 #endif
124     return ret;
125 }
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142 /* test multiple requests by first making a linked list of them, and then
143    checking for their completion. Used in tMPI_{Test|Wait}{all|any|some}
144
145    wait = whether to wait for incoming events 
146    blocking = whether to block until all reqs are completed */
147 static void tMPI_Test_multi_req(struct tmpi_thread *cur, 
148                                 int count, tMPI_Request *array_of_requests,
149                                 bool wait, bool blocking)
150 {
151     int i;
152     struct tmpi_req_ *first=NULL, *last=NULL;
153
154     /* construct the list of requests */
155     for(i=0;i<count;i++)
156     {
157         struct tmpi_req_ *curr=array_of_requests[i];
158         if (curr)
159         {
160             if (!first)
161                 first=curr;
162             /* fix the pointers */
163             if (!last)
164             {
165                 /* we connect to itself */
166                 last=curr;
167                 last->next=NULL;
168                 last->prev=NULL;
169             }
170             else
171             {
172                 /* we connect to the last */
173                 curr->next=NULL;
174                 curr->prev=last; 
175                 last->next=curr;
176                 last=curr;
177             }
178         }
179     }
180     /* and wait for our request */
181     do
182     {
183         if (tMPI_Test_multi(cur, first, NULL))
184             break;
185         if (wait)
186             tMPI_Wait_process_incoming(cur);
187     } while(blocking && wait);
188 }
189
190
191
192 int tMPI_Waitall(int count, tMPI_Request *array_of_requests,
193                  tMPI_Status *array_of_statuses)
194 {
195     int i;
196     int ret=TMPI_SUCCESS;
197     struct tmpi_thread *cur=tMPI_Get_current();
198     struct req_list *rql=&(cur->rql);
199 #ifdef TMPI_PROFILE
200     tMPI_Profile_count_start(cur);
201 #endif
202 #ifdef TMPI_TRACE
203     tMPI_Trace_print("tMPI_Waitall(%d, %p, %p)", count, array_of_requests, 
204                        array_of_statuses);
205 #endif
206     tMPI_Test_multi_req(cur, count, array_of_requests, TRUE, TRUE);
207
208     /* deallocate the now finished requests */
209     for(i=0;i<count;i++)
210     {
211         if (array_of_requests[i])
212         {
213             if (array_of_statuses)
214             {
215                 tMPI_Set_status(array_of_requests[i], &(array_of_statuses[i]));
216             }
217             if (array_of_requests[i]->error != TMPI_SUCCESS)
218             {
219                 ret=TMPI_ERR_IN_STATUS; 
220             }
221             tMPI_Return_req(rql, array_of_requests[i]);
222             array_of_requests[i]=TMPI_REQUEST_NULL;
223         }
224     }
225
226 #ifdef TMPI_PROFILE
227     tMPI_Profile_count_stop(cur, TMPIFN_Waitall);
228 #endif
229     return ret;
230 }
231
232 int tMPI_Testall(int count, tMPI_Request *array_of_requests,
233                  int *flag, tMPI_Status *array_of_statuses)
234 {
235     int i;
236     int ret=TMPI_SUCCESS;
237     struct tmpi_thread *cur=tMPI_Get_current();
238     struct req_list *rql=&(cur->rql);
239 #ifdef TMPI_PROFILE
240     tMPI_Profile_count_start(cur);
241 #endif
242 #ifdef TMPI_TRACE
243     tMPI_Trace_print("tMPI_Testall(%d, %p, %p, %p)", count, array_of_requests, 
244                        flag, array_of_statuses);
245 #endif
246     tMPI_Test_multi_req(cur, count, array_of_requests, FALSE, TRUE);
247
248     if (flag)
249         *flag=1;
250     /* deallocate the possibly finished requests */
251     for(i=0;i<count;i++)
252     {
253         if (array_of_requests[i] && array_of_requests[i]->finished)
254
255         {
256             if (array_of_statuses)
257             {
258                 tMPI_Set_status(array_of_requests[i], &(array_of_statuses[i]));
259             }
260             if (array_of_requests[i]->error != TMPI_SUCCESS)
261             {
262                 ret=TMPI_ERR_IN_STATUS; 
263             }
264             tMPI_Return_req(rql, array_of_requests[i]);
265             array_of_requests[i]=TMPI_REQUEST_NULL;
266         }
267         else
268         {
269             if (flag)
270                 *flag=0;
271         } 
272
273     }
274
275 #ifdef TMPI_PROFILE
276     tMPI_Profile_count_stop(cur, TMPIFN_Testall);
277 #endif
278     return ret;
279 }
280
281
282 int tMPI_Waitany(int count, tMPI_Request *array_of_requests, int *index,
283                  tMPI_Status *status)
284 {
285     int i;
286     int ret=TMPI_SUCCESS;
287     struct tmpi_thread *cur=tMPI_Get_current();
288     struct req_list *rql=&(cur->rql);
289 #ifdef TMPI_PROFILE
290     tMPI_Profile_count_start(cur);
291 #endif
292 #ifdef TMPI_TRACE
293     tMPI_Trace_print("tMPI_Waitany(%d, %p, %p, %p)", count, array_of_requests, 
294                        index, status);
295 #endif
296
297     tMPI_Test_multi_req(cur, count, array_of_requests, TRUE, FALSE);
298
299     /* deallocate the possibly finished requests */
300     for(i=0;i<count;i++)
301     {
302         if (array_of_requests[i] && array_of_requests[i]->finished)
303         {
304             tMPI_Set_status(array_of_requests[i], status);
305             if (index)
306                 *index=i;
307             if (array_of_requests[i]->error != TMPI_SUCCESS)
308             {
309                 ret=TMPI_ERR_IN_STATUS; 
310             }
311             tMPI_Return_req(rql, array_of_requests[i]);
312             array_of_requests[i]=TMPI_REQUEST_NULL;
313             /* and we only need one */
314             break;
315         }
316     }
317
318 #ifdef TMPI_PROFILE
319     tMPI_Profile_count_stop(cur, TMPIFN_Waitany);
320 #endif
321     return ret;
322 }
323
324
325
326 int tMPI_Testany(int count, tMPI_Request *array_of_requests, int *index,
327                  int *flag, tMPI_Status *status)
328 {
329     int i;
330     int ret=TMPI_SUCCESS;
331     struct tmpi_thread *cur=tMPI_Get_current();
332     struct req_list *rql=&(cur->rql);
333 #ifdef TMPI_PROFILE
334     tMPI_Profile_count_start(cur);
335 #endif
336 #ifdef TMPI_TRACE
337     tMPI_Trace_print("tMPI_Testany(%d, %p, %p %p, %p)", count, 
338                      array_of_requests, flag, index, status);
339 #endif
340
341     tMPI_Test_multi_req(cur, count, array_of_requests, FALSE, FALSE);
342
343     if (flag)
344         *flag=0;
345     if (index)
346         *index=TMPI_UNDEFINED;
347     /* deallocate the possibly finished requests */
348     for(i=0;i<count;i++)
349     {
350         if (array_of_requests[i] && array_of_requests[i]->finished)
351         {
352             tMPI_Set_status(array_of_requests[i], status);
353             if (index)
354                 *index=i;
355             if (flag)
356                 *flag=1;
357             if (array_of_requests[i]->error != TMPI_SUCCESS)
358             {
359                 ret=TMPI_ERR_IN_STATUS; 
360             }
361             tMPI_Return_req(rql, array_of_requests[i]);
362             array_of_requests[i]=TMPI_REQUEST_NULL;
363             /* and we only need one */
364             break;
365         }
366     }
367
368 #ifdef TMPI_PROFILE
369     tMPI_Profile_count_stop(cur, TMPIFN_Testany);
370 #endif
371     return ret;
372 }
373
374
375
376 int tMPI_Waitsome(int incount, tMPI_Request *array_of_requests,
377                   int *outcount, int *array_of_indices,
378                   tMPI_Status *array_of_statuses)
379 {
380     int i;
381     int ret=TMPI_SUCCESS;
382     struct tmpi_thread *cur=tMPI_Get_current();
383     struct req_list *rql=&(cur->rql);
384 #ifdef TMPI_PROFILE
385     tMPI_Profile_count_start(cur);
386 #endif
387 #ifdef TMPI_TRACE
388     tMPI_Trace_print("tMPI_Waitsome(%d, %p, %p, %p, %p)", incount, 
389                      array_of_requests, outcount, array_of_indices, 
390                      array_of_statuses);
391 #endif
392     tMPI_Test_multi_req(cur, incount, array_of_requests, TRUE, FALSE);
393
394     (*outcount)=0;
395     /* deallocate the possibly finished requests */
396     for(i=0;i<incount;i++)
397     {
398         if (array_of_requests[i] && array_of_requests[i]->finished)
399         {
400             array_of_indices[*outcount]++;
401             (*outcount)++;
402             if (array_of_statuses)
403             {
404                 tMPI_Set_status(array_of_requests[i], &(array_of_statuses[i]));
405             }
406             if (array_of_requests[i]->error != TMPI_SUCCESS)
407             {
408                 ret=TMPI_ERR_IN_STATUS; 
409             }
410             tMPI_Return_req(rql, array_of_requests[i]);
411             array_of_requests[i]=TMPI_REQUEST_NULL;
412         }
413     }
414
415 #ifdef TMPI_PROFILE
416     tMPI_Profile_count_stop(cur, TMPIFN_Waitsome);
417 #endif
418     return ret;
419 }
420
421 int tMPI_Testsome(int incount, tMPI_Request *array_of_requests,
422                   int *outcount, int *array_of_indices,
423                   tMPI_Status *array_of_statuses)
424 {
425     int i;
426     int ret=TMPI_SUCCESS;
427     struct tmpi_thread *cur=tMPI_Get_current();
428     struct req_list *rql=&(cur->rql);
429 #ifdef TMPI_PROFILE
430     tMPI_Profile_count_start(cur);
431 #endif
432 #ifdef TMPI_TRACE
433     tMPI_Trace_print("tMPI_Testsome(%d, %p, %p, %p, %p)", incount, 
434                      array_of_requests, outcount, array_of_indices, 
435                      array_of_statuses);
436 #endif
437     tMPI_Test_multi_req(cur, incount, array_of_requests, FALSE, TRUE);
438
439     (*outcount)=0;
440     /* deallocate the possibly finished requests */
441     for(i=0;i<incount;i++)
442     {
443         if (array_of_requests[i] && array_of_requests[i]->finished)
444         {
445             array_of_indices[*outcount]++;
446             (*outcount)++;
447             if (array_of_statuses)
448             {
449                 tMPI_Set_status(array_of_requests[i], &(array_of_statuses[i]));
450             }
451             if (array_of_requests[i]->error != TMPI_SUCCESS)
452             {
453                 ret=TMPI_ERR_IN_STATUS; 
454             }
455             tMPI_Return_req(rql, array_of_requests[i]);
456             array_of_requests[i]=TMPI_REQUEST_NULL;
457         }
458     }
459
460
461 #ifdef TMPI_PROFILE
462     tMPI_Profile_count_stop(cur, TMPIFN_Testsome);
463 #endif
464     return ret;
465 }
466
467