FORM 4.3
mpi.c
Go to the documentation of this file.
1
9/* #[ License : */
10/*
11 * Copyright (C) 1984-2022 J.A.M. Vermaseren
12 * When using this file you are requested to refer to the publication
13 * J.A.M.Vermaseren "New features of FORM" math-ph/0010025
14 * This is considered a matter of courtesy as the development was paid
15 * for by FOM the Dutch physics granting agency and we would like to
16 * be able to track its scientific use to convince FOM of its value
17 * for the community.
18 *
19 * This file is part of FORM.
20 *
21 * FORM is free software: you can redistribute it and/or modify it under the
22 * terms of the GNU General Public License as published by the Free Software
23 * Foundation, either version 3 of the License, or (at your option) any later
24 * version.
25 *
26 * FORM is distributed in the hope that it will be useful, but WITHOUT ANY
27 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
28 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
29 * details.
30 *
31 * You should have received a copy of the GNU General Public License along
32 * with FORM. If not, see <http://www.gnu.org/licenses/>.
33 */
34/* #] License : */
35/*
36 #[ Includes and variables :
37*/
38
39#include <limits.h>
40#include "form3.h"
41
42#ifdef MPICH_PROFILING
43# include "mpe.h"
44#endif
45
46#ifdef MPIDEBUGGING
47#include "mpidbg.h"
48#endif
49
50/*[12oct2005 mt]:*/
51/*
52 Today there was some cleanup, some stuff is moved into another place
53 in this file, and PF.packsize is removed and PF_packsize is used
54 instead. It is rather difficult to proper comment it, so not all these
55 changing are marked by "[12oct2005 mt]"
56*/
57
58#define PF_PACKSIZE 1600
59
60/*
61 Size in bytes, will be initialized soon as
62 PF_packsize=PF_PACKSIZE/sizeof(int)*sizeof(int); for possible
63 future developing we prefer to do this initialization not here,
64 but in PF_LibInit:
65*/
66
67static int PF_packsize = 0;
68static MPI_Status PF_status;
69LONG PF_maxDollarChunkSize = 0; /*:[04oct2005 mt]*/
70
71static int PF_ShortPackInit(void);
72static int PF_longPackInit(void); /*:[12oct2005 mt]*/
73
84#define MPI_ERRCODE_CHECK(err) \
85 do { \
86 int _tmp_err = (err); \
87 if ( _tmp_err != MPI_SUCCESS ) return _tmp_err != 0 ? _tmp_err : -1; \
88 } while (0)
89
90/*
91 #] Includes and variables :
92 #[ PF_RealTime :
93*/
94
101LONG PF_RealTime(int i)
102{
103 static double starttime;
104 if ( i == PF_RESET ) {
105 starttime = MPI_Wtime();
106 return((LONG)0);
107 }
108 return((LONG)( 100. * (MPI_Wtime() - starttime) ) );
109}
110
111/*
112 #] PF_RealTime :
113 #[ PF_LibInit :
114*/
115
123int PF_LibInit(int *argcp, char ***argvp)
124{
125 int ret;
126 ret = MPI_Init(argcp,argvp);
127 if ( ret != MPI_SUCCESS ) return(ret);
128 ret = MPI_Comm_rank(PF_COMM,&PF.me);
129 if ( ret != MPI_SUCCESS ) return(ret);
130 ret = MPI_Comm_size(PF_COMM,&PF.numtasks);
131 if ( ret != MPI_SUCCESS ) return(ret);
132
133 /* Initialization of packed communications. */
134 PF_packsize = PF_PACKSIZE/sizeof(int)*sizeof(int);
135 if ( PF_ShortPackInit() ) return -1;
136 if ( PF_longPackInit() ) return -1;
137
138 {/*Block*/
139 int bytes, totalbytes=0;
140/*
141 There is one problem with maximal possible packing: there is no API to
142 convert bytes to the record number. So, here we calculate the buffer
143 size needed for storing dollarvars:
144
145 LONG PF_maxDollarChunkSize is the size for the portion of the dollar
146 variable buffer suitable for broadcasting. This variable should be
147 visible from parallel.c
148
149 Evaluate PF_Pack(numterms,1,PF_INT):
150*/
151 if ( ( ret = MPI_Pack_size(1,PF_INT,PF_COMM,&bytes) )!=MPI_SUCCESS )
152 return(ret);
153
154 totalbytes+=bytes;
155/*
156 Evaluate PF_Pack( newsize,1,PF_LONG):
157*/
158 if ( ( ret = MPI_Pack_size(1,PF_LONG,PF_COMM,&bytes) )!=MPI_SUCCESS )
159 return(ret);
160
161 totalbytes += bytes;
162/*
163 Now available room is PF_packsize-totalbytes
164*/
165 totalbytes = PF_packsize-totalbytes;
166/*
167 Now totalbytes is the size of chunk in bytes.
168 Evaluate this size in number of records:
169
170 Rough estimate:
171*/
172 PF_maxDollarChunkSize=totalbytes/sizeof(WORD);
173/*
174 Go to the up limit:
175*/
176 do {
177 if ( ( ret = MPI_Pack_size(
178 ++PF_maxDollarChunkSize,PF_WORD,PF_COMM,&bytes) )!=MPI_SUCCESS )
179 return(ret);
180 } while ( bytes<totalbytes );
181/*
182 Now the chunk size is too large
183 And now evaluate the exact value:
184*/
185 do {
186 if ( ( ret = MPI_Pack_size(
187 --PF_maxDollarChunkSize,PF_WORD,PF_COMM,&bytes) )!=MPI_SUCCESS )
188 return(ret);
189 } while ( bytes>totalbytes );
190/*
191 Now PF_maxDollarChunkSize is the size of chunk of PF_WORD fitting the
192 buffer <= (PF_packsize-PF_INT-PF_LONG)
193*/
194 }/*Block*/
195 return(0);
196}
197/*
198 #] PF_LibInit :
199 #[ PF_LibTerminate :
200*/
201
209int PF_LibTerminate(int error)
210{
211 DUMMYUSE(error);
212 return(MPI_Finalize());
213}
214
215/*
216 #] PF_LibTerminate :
217 #[ PF_Probe :
218*/
219
230int PF_Probe(int *src)
231{
232 int ret, flag;
233 if ( *src == PF_ANY_SOURCE ) { /*Blocking call*/
234 ret = MPI_Probe(*src,MPI_ANY_TAG,PF_COMM,&PF_status);
235 flag = 1;
236 }
237 else { /*Non-blocking call*/
238 ret = MPI_Iprobe(*src,MPI_ANY_TAG,PF_COMM,&flag,&PF_status);
239 }
240 *src = PF_status.MPI_SOURCE;
241 if ( ret != MPI_SUCCESS ) { if ( ret > 0 ) ret *= -1; return(ret); }
242 if ( !flag ) return(0);
243 return(PF_status.MPI_TAG);
244}
245
246/*
247 #] PF_Probe :
248 #[ PF_ISendSbuf :
249*/
250
261int PF_ISendSbuf(int to, int tag)
262{
263 PF_BUFFER *s = PF.sbuf;
264 int a = s->active;
265 int size = s->fill[a] - s->buff[a];
266 int r = 0;
267
268 static int finished;
269
270 s->fill[a] = s->buff[a];
271 if ( s->numbufs == 1 ) {
272 r = MPI_Ssend(s->buff[a],size,PF_WORD,MASTER,tag,PF_COMM);
273 if ( r != MPI_SUCCESS ) {
274 fprintf(stderr,"[%d|%d] PF_ISendSbuf: MPI_Ssend returns: %d \n",
275 PF.me,(int)AC.CModule,r);
276 fflush(stderr);
277 return(r);
278 }
279 return(0);
280 }
281
282 switch ( tag ) { /* things to do before sending */
283 case PF_TERM_MSGTAG:
284 if ( PF.sbuf->request[to] != MPI_REQUEST_NULL)
285 r = MPI_Wait(&PF.sbuf->request[to],&PF.sbuf->retstat[to]);
286 if ( r != MPI_SUCCESS ) return(r);
287 break;
288 default:
289 break;
290 }
291
292 r = MPI_Isend(s->buff[a],size,PF_WORD,to,tag,PF_COMM,&s->request[a]);
293
294 if ( r != MPI_SUCCESS ) return(r);
295
296 switch ( tag ) { /* things to do after initialising sending */
297 case PF_TERM_MSGTAG:
298 finished = 0;
299 break;
300 case PF_ENDSORT_MSGTAG:
301 if ( ++finished == PF.numtasks - 1 )
302 r = MPI_Waitall(s->numbufs,s->request,s->status);
303 if ( r != MPI_SUCCESS ) return(r);
304 break;
305 case PF_BUFFER_MSGTAG:
306 if ( ++s->active >= s->numbufs ) s->active = 0;
307 while ( s->request[s->active] != MPI_REQUEST_NULL ) {
308 r = MPI_Waitsome(s->numbufs,s->request,&size,s->index,s->retstat);
309 if ( r != MPI_SUCCESS ) return(r);
310 }
311 break;
312 case PF_ENDBUFFER_MSGTAG:
313 if ( ++s->active >= s->numbufs ) s->active = 0;
314 r = MPI_Waitall(s->numbufs,s->request,s->status);
315 if ( r != MPI_SUCCESS ) return(r);
316 break;
317 default:
318 return(-99);
319 break;
320 }
321 return(0);
322}
323
324/*
325 #] PF_ISendSbuf :
326 #[ PF_RecvWbuf :
327*/
328
337int PF_RecvWbuf(WORD *b, LONG *s, int *src)
338{
339 int i, r = 0;
340
341 r = MPI_Recv(b,(int)*s,PF_WORD,*src,PF_ANY_MSGTAG,PF_COMM,&PF_status);
342 if ( r != MPI_SUCCESS ) { if ( r > 0 ) r *= -1; return(r); }
343
344 r = MPI_Get_count(&PF_status,PF_WORD,&i);
345 if ( r != MPI_SUCCESS ) { if ( r > 0 ) r *= -1; return(r); }
346
347 *s = (LONG)i;
348 *src = PF_status.MPI_SOURCE;
349 return(PF_status.MPI_TAG);
350}
351
352/*
353 #] PF_RecvWbuf :
354 #[ PF_IRecvRbuf :
355*/
356
366int PF_IRecvRbuf(PF_BUFFER *r, int bn, int from)
367{
368 int ret;
369 r->type[bn] = PF_WORD;
370
371 if ( r->numbufs == 1 ) {
372 r->tag[bn] = MPI_ANY_TAG;
373 r->from[bn] = from;
374 }
375 else {
376 ret = MPI_Irecv(r->full[bn],(int)(r->stop[bn] - r->full[bn]),PF_WORD,from,
377 MPI_ANY_TAG,PF_COMM,&r->request[bn]);
378 if (ret != MPI_SUCCESS) { if(ret > 0) ret *= -1; return(ret); }
379 }
380 return(0);
381}
382
383/*
384 #] PF_IRecvRbuf :
385 #[ PF_WaitRbuf :
386*/
387
400int PF_WaitRbuf(PF_BUFFER *r, int bn, LONG *size)
401{
402 int ret, rsize;
403
404 if ( r->numbufs == 1 ) {
405 *size = r->stop[bn] - r->full[bn];
406 ret = MPI_Recv(r->full[bn],(int)*size,r->type[bn],r->from[bn],r->tag[bn],
407 PF_COMM,&(r->status[bn]));
408 if ( ret != MPI_SUCCESS ) { if ( ret > 0 ) ret *= -1; return(ret); }
409 ret = MPI_Get_count(&(r->status[bn]),r->type[bn],&rsize);
410 if ( ret != MPI_SUCCESS ) { if ( ret > 0 ) ret *= -1; return(ret); }
411 if ( rsize > *size ) return(-99);
412 *size = (LONG)rsize;
413 }
414 else {
415 while ( r->request[bn] != MPI_REQUEST_NULL ) {
416 ret = MPI_Waitsome(r->numbufs,r->request,&rsize,r->index,r->retstat);
417 if ( ret != MPI_SUCCESS ) { if ( ret > 0 ) ret *= -1; return(ret); }
418 while ( --rsize >= 0 ) r->status[r->index[rsize]] = r->retstat[rsize];
419 }
420 ret = MPI_Get_count(&(r->status[bn]),r->type[bn],&rsize);
421 if ( ret != MPI_SUCCESS ) { if ( ret > 0 ) ret *= -1; return(ret); }
422 *size = (LONG)rsize;
423 }
424 return(r->status[bn].MPI_TAG);
425}
426
427/*
428 #] PF_WaitRbuf :
429 #[ PF_Bcast :
430*/
431
440int PF_Bcast(void *buffer, int count)
441{
442 if ( MPI_Bcast(buffer,count,MPI_BYTE,MASTER,PF_COMM) != MPI_SUCCESS )
443 return(-1);
444 return(0);
445}
446
447/*
448 #] PF_Bcast :
449 #[ PF_RawSend :
450*/
451
462
463int PF_RawSend(int dest, void *buf, LONG l, int tag)
464{
465 int ret=MPI_Ssend(buf,(int)l,MPI_BYTE,dest,tag,PF_COMM);
466 if ( ret != MPI_SUCCESS ) return(-1);
467 return(0);
468}
469/*
470 #] PF_RawSend :
471 #[ PF_RawRecv :
472*/
473
484LONG PF_RawRecv(int *src,void *buf,LONG thesize,int *tag)
485{
486 MPI_Status stat;
487 int ret=MPI_Recv(buf,(int)thesize,MPI_BYTE,*src,MPI_ANY_TAG,PF_COMM,&stat);
488 if ( ret != MPI_SUCCESS ) return(-1);
489 if ( MPI_Get_count(&stat,MPI_BYTE,&ret) != MPI_SUCCESS ) return(-1);
490 *tag = stat.MPI_TAG;
491 *src = stat.MPI_SOURCE;
492 return(ret);
493}
494
495/*
496 #] PF_RawRecv :
497 #[ PF_RawProbe :
498*/
499
508int PF_RawProbe(int *src, int *tag, int *bytesize)
509{
510 MPI_Status stat;
511 int srcval = src != NULL ? *src : PF_ANY_SOURCE;
512 int tagval = tag != NULL ? *tag : PF_ANY_MSGTAG;
513 int ret = MPI_Probe(srcval, tagval, PF_COMM, &stat);
514 if ( ret != MPI_SUCCESS ) return -1;
515 if ( src != NULL ) *src = stat.MPI_SOURCE;
516 if ( tag != NULL ) *tag = stat.MPI_TAG;
517 if ( bytesize != NULL ) {
518 ret = MPI_Get_count(&stat, MPI_BYTE, bytesize);
519 if ( ret != MPI_SUCCESS ) return -1;
520 }
521 return 0;
522}
523
524/*
525 #] PF_RawProbe :
526 #[ The pack buffer :
527 #[ Variables :
528*/
529
530/*
531 * The pack buffer with the fixed size (= PF_packsize).
532 */
533static UBYTE *PF_packbuf = NULL;
534static UBYTE *PF_packstop = NULL;
535static int PF_packpos = 0;
536
537/*
538 #] Variables :
539 #[ PF_ShortPackInit :
540*/
541
548static int PF_ShortPackInit(void)
549{
550 PF_packbuf = (UBYTE *)Malloc1(sizeof(UBYTE) * PF_packsize, "PF_ShortPackInit");
551 if ( PF_packbuf == NULL ) return -1;
552 PF_packstop = PF_packbuf + PF_packsize;
553 return 0;
554}
555
556/*
557 #] PF_ShortPackInit :
558 #[ PF_InitPackBuf :
559*/
560
566static inline int PF_InitPackBuf(void)
567{
568/*
569 This is definitely not the best place for allocating the
570 buffer! Moved to PF_LibInit():
571
572 if ( PF_packbuf == 0 ) {
573 PF_packbuf = (UBYTE *)Malloc1(sizeof(UBYTE)*PF.packsize,"PF_InitPackBuf");
574 if ( PF_packbuf == 0 ) return(-1);
575 PF_packstop = PF_packbuf + PF.packsize;
576 }
577*/
578 PF_packpos = 0;
579 return(0);
580}
581
582/*
583 #] PF_InitPackBuf :
584 #[ PF_PrintPackBuf :
585*/
586
594int PF_PrintPackBuf(char *s, int size)
595{
596#ifdef NOMESPRINTYET
597/*
598 The use of printf should be discouraged. The results are flushed to
599 the output at unpredictable moments. We should use printf only
600 during startup when MesPrint doesn't have its buffers and output
601 channels initialized.
602*/
603 int i;
604 printf("[%d] %s: ",PF.me,s);
605 for(i=0;i<size;i++) printf("%d ",PF_packbuf[i]);
606 printf("\n");
607#else
608 MesPrint("[%d] %s: %a",PF.me,s,size,(WORD *)(PF_packbuf));
609#endif
610 return(0);
611}
612
613/*
614 #] PF_PrintPackBuf :
615 #[ PF_PreparePack :
616*/
617
623
625{
626 return PF_InitPackBuf();
627}
628
629/*
630 #] PF_PreparePack :
631 #[ PF_Pack :
632*/
633
642int PF_Pack(const void *buffer, size_t count, MPI_Datatype type)
643{
644 int err, bytes;
645
646 if ( count > INT_MAX ) return -99;
647
648 err = MPI_Pack_size((int)count, type, PF_COMM, &bytes);
650 if ( PF_packpos + bytes > PF_packstop - PF_packbuf ) return -99;
651
652 err = MPI_Pack((void *)buffer, (int)count, type, PF_packbuf, PF_packsize, &PF_packpos, PF_COMM);
654
655 return 0;
656}
657
658/*
659 #] PF_Pack :
660 #[ PF_Unpack :
661*/
662
671int PF_Unpack(void *buffer, size_t count, MPI_Datatype type)
672{
673 int err;
674
675 if ( count > INT_MAX ) return -99;
676
677 err = MPI_Unpack(PF_packbuf, PF_packsize, &PF_packpos, buffer, (int)count, type, PF_COMM);
679
680 return 0;
681}
682
683/*
684 #] PF_Unpack :
685 #[ PF_PackString :
686*/
687
706int PF_PackString(const UBYTE *str)
707{
708 int ret,buflength,bytes,length;
709/*
710 length will be packed in the beginning.
711 Decrement buffer size by the length of the field "length":
712*/
713 if ( ( ret = MPI_Pack_size(1,PF_INT,PF_COMM,&bytes) ) != MPI_SUCCESS )
714 return(ret);
715 buflength = PF_packsize - bytes;
716/*
717 Calculate the string length (INCLUDING the trailing zero!):
718*/
719 for ( length = 0; length < buflength; length++ ) {
720 if ( str[length] == '\0' ) {
721 length++; /* since the trailing zero must be accounted */
722 break;
723 }
724 }
725/*
726 The string "\0!\0" is used as an image of the NULL.
727*/
728 if ( ( str[0] == '\0' ) /* empty string */
729 && ( str[1] == '!' ) /* Special case? */
730 && ( str[2] == '\0' ) /* Yes, pass 3 initial symbols */
731 ) length += 2; /* all 3 characters will be packed */
732 length++; /* Will be decremented in the following loop */
733/*
734 The problem: packed size of byte may be not equal 1! So first, suppose
735 it is 1, and if this is not the case decrease the length of the string
736 until it fits the buffer:
737*/
738 do {
739 if ( ( ret = MPI_Pack_size(--length,PF_BYTE,PF_COMM,&bytes) )
740 != MPI_SUCCESS ) return(ret);
741 } while ( bytes > buflength );
742/*
743 Note, now if str[length-1] == '\0' then the string fits to the buffer
744 (INCLUDING the trailing zero!);if not, the rest must be packed further!
745
746 Pack the length to PF_packbuf:
747*/
748 if ( ( ret = MPI_Pack(&length,1,PF_INT,PF_packbuf,PF_packsize,
749 &PF_packpos,PF_COMM) ) != MPI_SUCCESS ) return(ret);
750/*
751 Pack the string to PF_packbuf:
752*/
753 if ( ( ret = MPI_Pack((UBYTE *)str,length,PF_BYTE,PF_packbuf,PF_packsize,
754 &PF_packpos,PF_COMM) ) != MPI_SUCCESS ) return(ret);
755 return(length);
756}
757
758/*
759 #] PF_PackString :
760 #[ PF_UnpackString :
761*/
762
774int PF_UnpackString(UBYTE *str)
775{
776 int ret,length;
777/*
778 Unpack the length:
779*/
780 if( (ret = MPI_Unpack(PF_packbuf,PF_packsize,&PF_packpos,
781 &length,1,PF_INT,PF_COMM))!= MPI_SUCCESS )
782 return(ret);
783/*
784 Unpack the string:
785*/
786 if ( ( ret = MPI_Unpack(PF_packbuf,PF_packsize,&PF_packpos,
787 str,length,PF_BYTE,PF_COMM) ) != MPI_SUCCESS ) return(ret);
788/*
789 Now if str[length-1]=='\0' then the whole string
790 (INCLUDING the trailing zero!) was unpacked ;if not, the rest
791 must be unpacked to str+length.
792*/
793 return(length);
794}
795
796/*
797 #] PF_UnpackString :
798 #[ PF_Send :
799*/
800
821
822int PF_Send(int to, int tag)
823{
824 int err;
825 err = MPI_Ssend(PF_packbuf, PF_packpos, MPI_PACKED, to, tag, PF_COMM);
827 return 0;
828}
829
830/*
831 #] PF_Send :
832 #[ PF_Receive :
833*/
834
848int PF_Receive(int src, int tag, int *psrc, int *ptag)
849{
850 int err;
851 MPI_Status status;
852 PF_InitPackBuf();
853 err = MPI_Recv(PF_packbuf, PF_packsize, MPI_PACKED, src, tag, PF_COMM, &status);
855 if ( psrc ) *psrc = status.MPI_SOURCE;
856 if ( ptag ) *ptag = status.MPI_TAG;
857 return 0;
858}
859
860/*
861 #] PF_Receive :
862 #[ PF_Broadcast :
863*/
864
884{
885 int err;
886/*
887 * If PF_SHORTBROADCAST is defined, then the broadcasting will be performed in
888 * 2 steps. First, the size of the buffer will be broadcast, then the buffer of
889 * exactly used size. This should be faster with slow connections, but slower on
890 * SMP shmem MPI because of the latency.
891 */
892#ifdef PF_SHORTBROADCAST
893 int pos = PF_packpos;
894#endif
895 if ( PF.me != MASTER ) {
896 err = PF_InitPackBuf();
897 if ( err ) return err;
898 }
899#ifdef PF_SHORTBROADCAST
900 err = MPI_Bcast(&pos, 1, MPI_INT, MASTER, PF_COMM);
902 err = MPI_Bcast(PF_packbuf, pos, MPI_PACKED, MASTER, PF_COMM);
903#else
904 err = MPI_Bcast(PF_packbuf, PF_packsize, MPI_PACKED, MASTER, PF_COMM);
905#endif
907 return 0;
908}
909
910/*
911 #] PF_Broadcast :
912 #] The pack buffer :
913 #[ Long pack stuff :
914 #[ Explanations :
915
916 The problems here are:
917 1. We need to send/receive long dollar variables. For
918 preprocessor-defined dollarvars we used multiply
919 packing/broadcasting (see parallel.c:PF_BroadcastPreDollar())
920 since each variable must be broadcast immediately. For run-time
921 the changed dollar variables, collecting and broadcasting are
922 performed at the end of the module and all modified dollarvars
923 are transferred "at once", that is why the size of packed and
924 transferred buffers may be really very large.
925 2. There is some strange feature of MPI_Bcast() on Altix MPI
926 implementation, namely, sometimes it silently fails with big
927 buffers. For better performance, it would be useful to send one
928 big buffer instead of several small ones (since the latency is more
929 important than the bandwidth). That is why we need two different
930 sets of routines: for long point-to-point communication we collect
931 big re-allocatable buffer, the corresponding routines have the
932 prefix PF_longSingle, and for broadcasting we pack data into
933 several smaller buffers, the corresponding routines have the
934 prefix PF_longMulti.
935 Note, from portability reasons we cannot split large packed
936 buffer into small chunks, send them and collect back on the other
937 side, see "Advice to users" on page 180 MPI--The Complete Reference
938 Volume1, second edition.
939 OPTIMIZING:
940 We assume, for most communications, the single buffer of size
941 PF_packsize is enough.
942
943 How does it work:
944 For point-to-point, there is one big re-allocatable
945 buffer PF_longPackBuf with two integer positions: PF_longPackPos
946 and PF_longPackTop (due to re-allocatable character of the buffer,
947 it is better to use integers rather than pointers).
948 Each time of re-allocation, the size of the buffer
949 PF_longPackBuf is incremented by the same size of a "standard" chunk
950 PF_packsize.
951 For broadcasting there is one linked list (PF_longMultiRoot),
952 which contains either positions of a chunk of PF_longPackBuf, or
953 it's own buffer. This is done for better memory utilisation:
954 longSingle and longMulti are never used simultaneously.
955 When a new cell is needed for LongMulti packing, we increment
956 the counter PF_longPackN and just follow the list. If it is not
957 possible, we allocate the cell's own buffer and link it to the end
958 of the list PF_longMultiRoot.
959 When PF_longPackPos is reallocated, we link new chunks into
960 existing PF_longMultiRoot list before the first longMulti allocated
961 cell's own buffer. The pointer PF_longMultiLastChunk points to the last
962 cell of PF_longMultiRoot containing the pointer to the chunk of
963 PF_longPackBuf.
964 Initialization PF_longPackBuf is made by the function
965 PF_longSingleReset(). In the begin of the PF_longPackBuf it packs
966 the size of the last sent buffer. Upon sending, the program checks,
967 whether there was at list one re-allocation (PF_longPackN>1) .
968 If so, the sender first packs and sends small buffer
969 (PF_longPackSmallBuf) containing one integer number -- the
970 _negative_ new size of the send buffer. Getting the buffer, a
971 receiver unpacks one integer and checks whether it is <0 . If so,
972 the receiver will repeat receiving, but first it checks whether
973 it has enough buffer and increase it, if necessary.
974 Initialization PF_longMultiRoot is made by the function
975 PF_longMultiReset(). In the begin of the first chunk it packs
976 one integer -- the number 1. Upon sending, the program checks,
977 how many cells were packed (PF_longPackN). If more than 1, the
978 sender packs to the next cell the integer PF_longPackN, than
979 packs PF_longPackN pairs of integers -- the information about how many
980 times chunk on each cell was accessed by the packing procedure,
981 this information is contained by the nPacks field of the cell
982 structure, and how many non-complete items was at the end of this
983 chunk the structure field lastLen. Then the sender sends first
984 this auxiliary chunk.
985 The receiver unpacks the integer from obtained chunk and, if this
986 integer is more than 1, it gets more chunks, unpacking information
987 from the first auxiliary chunk into the corresponding nPacks
988 fields. Unpacking information from multiple chunks, the receiver
989 knows, when the chunk is expired and it must switch to the next cell,
990 successively decrementing corresponding nPacks field.
991
992 XXX: There are still some flaws:
993 PF_LongSingleSend/PF_LongSingleReceive may fail, for example, for data
994 transfers from the master to many slaves. Suppose that the master sends big
995 data to slaves, which needs an increase of the buffer of the receivers. For
996 the first data transfer, the master sends the new buffer size as the first
997 message, and then sends the data as the second message, because
998 PF_LongSinglePack records the increase of the buffer size on the master. For
999 the next time, however, the master sends the data without sending the new
1000 buffer size, and then MPI_Recv fails due to the data overflow.
1001 In parallel.c, they are used for the communication from slaves to the
1002 master. In this case, this problem does not occur because the master always
1003 has enough buffer.
1004 The maximum size that PF_LongMultiBroadcast can broadcast is limited to
1005 around 320kB because the current implementation tries to pack all
1006 information of chained buffers into one buffer, whose size is PF_packsize
1007 = 1600B.
1008
1009 #] Explanations :
1010 #[ Variables :
1011*/
1012
1013typedef struct longMultiStruct {
1014 UBYTE *buffer; /* NULL if */
1015 int bufpos; /* if >=0, PF_longPackBuf+bufpos is the chunk start */
1016 int packpos; /* the current position */
1017 int nPacks; /* How many times PF_longPack operates on this cell */
1018 int lastLen; /* if > 0, the last packing didn't fit completely to this
1019 chunk, only lastLen items was packed, the rest is in
1020 the next cell. */
1021 struct longMultiStruct *next; /* next linked cell, or NULL */
1022} PF_LONGMULTI;
1023
1024static UBYTE *PF_longPackBuf = NULL;
1025static VOID *PF_longPackSmallBuf = NULL;
1026static int PF_longPackPos = 0;
1027static int PF_longPackTop = 0;
1028static PF_LONGMULTI *PF_longMultiRoot = NULL;
1029static PF_LONGMULTI *PF_longMultiTop = NULL;
1030static PF_LONGMULTI *PF_longMultiLastChunk = NULL;
1031static int PF_longPackN = 0;
1032
1033/*
1034 #] Variables :
1035 #[ Long pack private functions :
1036 #[ PF_longMultiNewCell :
1037*/
1038
1039static inline int PF_longMultiNewCell(void)
1040{
1041/*
1042 Allocate a new cell:
1043*/
1044 PF_longMultiTop->next = (PF_LONGMULTI *)
1045 Malloc1(sizeof(PF_LONGMULTI),"PF_longMultiCell");
1046 if ( PF_longMultiTop->next == NULL ) return(-1);
1047/*
1048 Allocate a private buffer:
1049*/
1050 PF_longMultiTop->next->buffer=(UBYTE*)
1051 Malloc1(sizeof(UBYTE)*PF_packsize,"PF_longMultiChunk");
1052 if ( PF_longMultiTop->next->buffer == NULL ) return(-1);
1053/*
1054 For the private buffer position is -1:
1055*/
1056 PF_longMultiTop->next->bufpos = -1;
1057/*
1058 This is the last cell in the chain:
1059*/
1060 PF_longMultiTop->next->next = NULL;
1061/*
1062 packpos and nPacks are not initialized!
1063*/
1064 return(0);
1065}
1066
1067/*
1068 #] PF_longMultiNewCell :
1069 #[ PF_longMultiPack2NextCell :
1070*/
1071static inline int PF_longMultiPack2NextCell(void)
1072{
1073/*
1074 Is there a free cell in the chain?
1075*/
1076 if ( PF_longMultiTop->next == NULL ) {
1077/*
1078 No, allocate the new cell with a private buffer:
1079*/
1080 if ( PF_longMultiNewCell() ) return(-1);
1081 }
1082/*
1083 Move to the next cell in the chain:
1084*/
1085 PF_longMultiTop = PF_longMultiTop->next;
1086/*
1087 if >=0, the cell buffer is the chunk of PF_longPackBuf, initialize it:
1088*/
1089 if ( PF_longMultiTop->bufpos > -1 )
1090 PF_longMultiTop->buffer = PF_longPackBuf+PF_longMultiTop->bufpos;
1091/*
1092 else -- the cell has it's own private buffer.
1093 Initialize the cell fields:
1094*/
1095 PF_longMultiTop->nPacks = 0;
1096 PF_longMultiTop->lastLen = 0;
1097 PF_longMultiTop->packpos = 0;
1098 return(0);
1099}
1100
1101/*
1102 #] PF_longMultiPack2NextCell :
1103 #[ PF_longMultiNewChunkAdded :
1104*/
1105
1106static inline int PF_longMultiNewChunkAdded(int n)
1107{
1108/*
1109 Store the list tail:
1110*/
1111 PF_LONGMULTI *MemCell = PF_longMultiLastChunk->next;
1112 int pos = PF_longPackTop;
1113
1114 while ( n-- > 0 ) {
1115/*
1116 Allocate a new cell:
1117*/
1118 PF_longMultiLastChunk->next = (PF_LONGMULTI *)
1119 Malloc1(sizeof(PF_LONGMULTI),"PF_longMultiCell");
1120 if ( PF_longMultiLastChunk->next == NULL ) return(-1);
1121/*
1122 Update the Last Chunk Pointer:
1123*/
1124 PF_longMultiLastChunk = PF_longMultiLastChunk->next;
1125/*
1126 Initialize the new cell:
1127*/
1128 PF_longMultiLastChunk->bufpos = pos;
1129 pos += PF_packsize;
1130 PF_longMultiLastChunk->buffer = NULL;
1131 PF_longMultiLastChunk->packpos = 0;
1132 PF_longMultiLastChunk->nPacks = 0;
1133 PF_longMultiLastChunk->lastLen = 0;
1134 }
1135/*
1136 Hitch the tail:
1137*/
1138 PF_longMultiLastChunk->next = MemCell;
1139 return(0);
1140}
1141
1142/*
1143 #] PF_longMultiNewChunkAdded :
1144 #[ PF_longCopyChunk :
1145*/
1146
1147static inline void PF_longCopyChunk(int *to, int *from, int n)
1148{
1149 NCOPYI(to,from,n)
1150/* for ( ; n > 0; n-- ) *to++ = *from++; */
1151}
1152
1153/*
1154 #] PF_longCopyChunk :
1155 #[ PF_longAddChunk :
1156
1157 The chunk must be increased by n*PF_packsize.
1158*/
1159
1160static int PF_longAddChunk(int n, int mustRealloc)
1161{
1162 UBYTE *newbuf;
1163 if ( ( newbuf = (UBYTE*)Malloc1(sizeof(UBYTE)*(PF_longPackTop+n*PF_packsize),
1164 "PF_longPackBuf") ) == NULL ) return(-1);
1165/*
1166 Allocate and chain a new cell for longMulti:
1167*/
1168 if ( PF_longMultiNewChunkAdded(n) ) return(-1);
1169/*
1170 Copy the content to the new buffer:
1171*/
1172 if ( mustRealloc ) {
1173 PF_longCopyChunk((int*)newbuf,(int*)PF_longPackBuf,PF_longPackTop/sizeof(int));
1174 }
1175/*
1176 Note, PF_packsize is multiple by sizeof(int) by construction!
1177*/
1178 PF_longPackTop += (n*PF_packsize);
1179/*
1180 Free the old buffer and store the new one:
1181*/
1182 M_free(PF_longPackBuf,"PF_longPackBuf");
1183 PF_longPackBuf = newbuf;
1184/*
1185 Count number of re-allocs:
1186*/
1187 PF_longPackN += n;
1188 return(0);
1189}
1190
1191/*
1192 #] PF_longAddChunk :
1193 #[ PF_longMultiHowSplit :
1194
1195 "count" of "type" elements in an input buffer occupy "bytes" bytes.
1196 We know from the algorithm, that it is too many. How to split
1197 the buffer so that the head fits to rest of a storage buffer?*/
1198static inline int PF_longMultiHowSplit(int count, MPI_Datatype type, int bytes)
1199{
1200 int ret, items, totalbytes;
1201
1202 if ( count < 2 ) return(0); /* Nothing to split */
1203/*
1204 A rest of a storage buffer:
1205*/
1206 totalbytes = PF_packsize - PF_longMultiTop->packpos;
1207/*
1208 Rough estimate:
1209*/
1210 items = (int)((double)totalbytes*count/bytes);
1211/*
1212 Go to the up limit:
1213*/
1214 do {
1215 if ( ( ret = MPI_Pack_size(++items,type,PF_COMM,&bytes) )
1216 !=MPI_SUCCESS ) return(ret);
1217 } while ( bytes < totalbytes );
1218/*
1219 Now the value of "items" is too large
1220 And now evaluate the exact value:
1221*/
1222 do {
1223 if ( ( ret = MPI_Pack_size(--items,type,PF_COMM,&bytes) )
1224 !=MPI_SUCCESS ) return(ret);
1225 if ( items == 0 ) /* Nothing about MPI_Pack_size(0) == 0 in standards! */
1226 return(0);
1227 } while ( bytes > totalbytes );
1228 return(items);
1229}
1230/*
1231 #] PF_longMultiHowSplit :
1232 #[ PF_longPackInit :
1233*/
1234
1235static int PF_longPackInit(void)
1236{
1237 int ret;
1238 PF_longPackBuf = (UBYTE*)Malloc1(sizeof(UBYTE)*PF_packsize,"PF_longPackBuf");
1239 if ( PF_longPackBuf == NULL ) return(-1);
1240/*
1241 PF_longPackTop is not initialized yet, use in as a return value:
1242*/
1243 ret = MPI_Pack_size(1,MPI_INT,PF_COMM,&PF_longPackTop);
1244 if ( ret != MPI_SUCCESS ) return(ret);
1245
1246 PF_longPackSmallBuf =
1247 (VOID*)Malloc1(sizeof(UBYTE)*PF_longPackTop,"PF_longPackSmallBuf");
1248
1249 PF_longPackTop = PF_packsize;
1250 PF_longMultiRoot =
1251 (PF_LONGMULTI *)Malloc1(sizeof(PF_LONGMULTI),"PF_longMultiRoot");
1252 if ( PF_longMultiRoot == NULL ) return(-1);
1253 PF_longMultiRoot->bufpos = 0;
1254 PF_longMultiRoot->buffer = NULL;
1255 PF_longMultiRoot->next = NULL;
1256 PF_longMultiLastChunk = PF_longMultiRoot;
1257
1258 PF_longPackPos = 0;
1259 PF_longMultiRoot->packpos = 0;
1260 PF_longMultiTop = PF_longMultiRoot;
1261 PF_longPackN = 1;
1262 return(0);
1263}
1264
1265/*
1266 #] PF_longPackInit :
1267 #[ PF_longMultiPreparePrefix :
1268*/
1269
1270static inline int PF_longMultiPreparePrefix(void)
1271{
1272 int ret;
1273 PF_LONGMULTI *thePrefix;
1274 int i = PF_longPackN;
1275/*
1276 Here we have PF_longPackN>1!
1277 New cell (at the list end) to create the auxiliary chunk:
1278*/
1279 if ( PF_longMultiPack2NextCell() ) return(-1);
1280/*
1281 Store the pointer to the chunk we will proceed:
1282*/
1283 thePrefix = PF_longMultiTop;
1284/*
1285 Pack PF_longPackN:
1286*/
1287 ret = MPI_Pack(&(PF_longPackN),
1288 1,
1289 MPI_INT,
1290 thePrefix->buffer,
1291 PF_packsize,
1292 &(thePrefix->packpos),
1293 PF_COMM);
1294 if ( ret != MPI_SUCCESS ) return(ret);
1295/*
1296 And start from the beginning:
1297*/
1298 for ( PF_longMultiTop = PF_longMultiRoot; i > 0; i-- ) {
1299/*
1300 Pack number of Pack hits:
1301*/
1302 ret = MPI_Pack(&(PF_longMultiTop->nPacks),
1303 1,
1304 MPI_INT,
1305 thePrefix->buffer,
1306 PF_packsize,
1307 &(thePrefix->packpos),
1308 PF_COMM);
1309/*
1310 Pack the length of the last fit portion:
1311*/
1312 ret |= MPI_Pack(&(PF_longMultiTop->lastLen),
1313 1,
1314 MPI_INT,
1315 thePrefix->buffer,
1316 PF_packsize,
1317 &(thePrefix->packpos),
1318 PF_COMM);
1319/*
1320 Check the size -- not necessary, MPI_Pack did it.
1321*/
1322 if ( ret != MPI_SUCCESS ) return(ret);
1323/*
1324 Go to the next cell:
1325*/
1326 PF_longMultiTop = PF_longMultiTop->next;
1327 }
1328
1329 PF_longMultiTop = thePrefix;
1330/*
1331 PF_longMultiTop is ready!
1332*/
1333 return(0);
1334}
1335
1336/*
1337 #] PF_longMultiPreparePrefix :
1338 #[ PF_longMultiProcessPrefix :
1339*/
1340
1341static inline int PF_longMultiProcessPrefix(void)
1342{
1343 int ret,i;
1344/*
1345 We have PF_longPackN records packed in PF_longMultiRoot->buffer,
1346 pairs nPacks and lastLen. Loop through PF_longPackN cells,
1347 unpacking these integers into proper fields:
1348*/
1349 for ( PF_longMultiTop = PF_longMultiRoot, i = 0; i < PF_longPackN; i++ ) {
1350/*
1351 Go to th next cell, allocating, when necessary:
1352*/
1353 if ( PF_longMultiPack2NextCell() ) return(-1);
1354/*
1355 Unpack the number of Pack hits:
1356*/
1357 ret = MPI_Unpack(PF_longMultiRoot->buffer,
1358 PF_packsize,
1359 &( PF_longMultiRoot->packpos),
1360 &(PF_longMultiTop->nPacks),
1361 1,
1362 MPI_INT,
1363 PF_COMM);
1364 if ( ret != MPI_SUCCESS ) return(ret);
1365/*
1366 Unpack the length of the last fit portion:
1367*/
1368 ret = MPI_Unpack(PF_longMultiRoot->buffer,
1369 PF_packsize,
1370 &( PF_longMultiRoot->packpos),
1371 &(PF_longMultiTop->lastLen),
1372 1,
1373 MPI_INT,
1374 PF_COMM);
1375 if ( ret != MPI_SUCCESS ) return(ret);
1376 }
1377 return(0);
1378}
1379
1380/*
1381 #] PF_longMultiProcessPrefix :
1382 #[ PF_longSingleReset :
1383*/
1384
1392static inline int PF_longSingleReset(int is_sender)
1393{
1394 int ret;
1395 PF_longPackPos=0;
1396 if ( is_sender ) {
1397 ret = MPI_Pack(&PF_longPackTop,1,MPI_INT,
1398 PF_longPackBuf,PF_longPackTop,&PF_longPackPos,PF_COMM);
1399 if ( ret != MPI_SUCCESS ) return(ret);
1400 PF_longPackN = 1;
1401 }
1402 else {
1403 PF_longPackN=0;
1404 }
1405 return(0);
1406}
1407
1408/*
1409 #] PF_longSingleReset :
1410 #[ PF_longMultiReset :
1411*/
1412
1420static inline int PF_longMultiReset(int is_sender)
1421{
1422 int ret = 0, theone = 1;
1423 PF_longMultiRoot->packpos = 0;
1424 if ( is_sender ) {
1425 ret = MPI_Pack(&theone,1,MPI_INT,
1426 PF_longPackBuf,PF_longPackTop,&(PF_longMultiRoot->packpos),PF_COMM);
1427 PF_longPackN = 1;
1428 }
1429 else {
1430 PF_longPackN = 0;
1431 }
1432 PF_longMultiRoot->nPacks = 0; /* The auxiliary field is not counted */
1433 PF_longMultiRoot->lastLen = 0;
1434 PF_longMultiTop = PF_longMultiRoot;
1435 PF_longMultiRoot->buffer = PF_longPackBuf;
1436 return ret;
1437}
1438
1439/*
1440 #] PF_longMultiReset :
1441 #] Long pack private functions :
1442 #[ PF_PrepareLongSinglePack :
1443*/
1444
1450
1452{
1453 return PF_longSingleReset(1);
1454}
1455
1456/*
1457 #] PF_PrepareLongSinglePack :
1458 #[ PF_LongSinglePack :
1459*/
1460
1469int PF_LongSinglePack(const void *buffer, size_t count, MPI_Datatype type)
1470{
1471 int ret, bytes;
1472 /* XXX: Limited by int size. */
1473 if ( count > INT_MAX ) return -99;
1474 ret = MPI_Pack_size((int)count,type,PF_COMM,&bytes);
1475 if ( ret != MPI_SUCCESS ) return(ret);
1476
1477 while ( PF_longPackPos+bytes > PF_longPackTop ) {
1478 if ( PF_longAddChunk(1, 1) ) return(-1);
1479 }
1480/*
1481 PF_longAddChunk(1, 1) means, the chunk must
1482 be increased by 1 and re-allocated
1483*/
1484 ret = MPI_Pack((void *)buffer,(int)count,type,
1485 PF_longPackBuf,PF_longPackTop,&PF_longPackPos,PF_COMM);
1486 if ( ret != MPI_SUCCESS ) return(ret);
1487 return(0);
1488}
1489
1490/*
1491 #] PF_LongSinglePack :
1492 #[ PF_LongSingleUnpack :
1493*/
1494
1503int PF_LongSingleUnpack(void *buffer, size_t count, MPI_Datatype type)
1504{
1505 int ret;
1506 /* XXX: Limited by int size. */
1507 if ( count > INT_MAX ) return -99;
1508 ret = MPI_Unpack(PF_longPackBuf,PF_longPackTop,&PF_longPackPos,
1509 buffer,(int)count,type,PF_COMM);
1510 if ( ret != MPI_SUCCESS ) return(ret);
1511 return(0);
1512}
1513
1514/*
1515 #] PF_LongSingleUnpack :
1516 #[ PF_LongSingleSend :
1517*/
1518
1540int PF_LongSingleSend(int to, int tag)
1541{
1542 int ret, pos = 0;
1543/*
1544 Note, here we assume that this function couldn't be used
1545 with to == PF_ANY_SOURCE!
1546*/
1547 if ( PF_longPackN > 1 ) {
1548 /* The buffer was incremented, pack send the new size first: */
1549 int tmp = -PF_longPackTop;
1550/*
1551 Negative value means there will be the second buffer
1552*/
1553 ret = MPI_Pack(&tmp, 1,PF_INT,
1554 PF_longPackSmallBuf,PF_longPackTop,&pos,PF_COMM);
1555 if ( ret != MPI_SUCCESS ) return(ret);
1556 ret = MPI_Ssend(PF_longPackSmallBuf,pos,MPI_PACKED,to,tag,PF_COMM);
1557 if ( ret != MPI_SUCCESS ) return(ret);
1558 }
1559 ret = MPI_Ssend(PF_longPackBuf,PF_longPackPos,MPI_PACKED,to,tag,PF_COMM);
1560 if ( ret != MPI_SUCCESS ) return(ret);
1561 return(0);
1562}
1563
1564/*
1565 #] PF_LongSingleSend :
1566 #[ PF_LongSingleReceive :
1567*/
1568
1583int PF_LongSingleReceive(int src, int tag, int *psrc, int *ptag)
1584{
1585 int ret, missed, oncemore;
1586 MPI_Status status;
1587 PF_longSingleReset(0);
1588 do {
1589 ret = MPI_Recv(PF_longPackBuf,PF_longPackTop,MPI_PACKED,src,tag,
1590 PF_COMM,&status);
1591 if ( ret != MPI_SUCCESS ) return(ret);
1592/*
1593 The source and tag must be specified here for the case if
1594 MPI_Recv is performed more than once:
1595*/
1596 src = status.MPI_SOURCE;
1597 tag = status.MPI_TAG;
1598 if ( psrc ) *psrc = status.MPI_SOURCE;
1599 if ( ptag ) *ptag = status.MPI_TAG;
1600/*
1601 Now we got either small buffer with the new PF_longPackTop,
1602 or just a regular chunk.
1603*/
1604 ret = MPI_Unpack(PF_longPackBuf,PF_longPackTop,&PF_longPackPos,
1605 &missed,1,MPI_INT,PF_COMM);
1606 if ( ret != MPI_SUCCESS ) return(ret);
1607
1608 if ( missed < 0 ) { /* The small buffer was received. */
1609 oncemore = 1; /* repeat receiving afterwards */
1610 /* Reallocate the buffer and get the data */
1611 missed = -missed;
1612/*
1613 restore after unpacking small from buffer:
1614*/
1615 PF_longPackPos = 0;
1616 }
1617 else {
1618 oncemore = 0; /* That's all, no repetition */
1619 }
1620 if ( missed > PF_longPackTop ) {
1621 /*
1622 * The room must be increased. We need a re-allocation for the
1623 * case that there is no repetition.
1624 */
1625 if ( PF_longAddChunk( (missed-PF_longPackTop)/PF_packsize, !oncemore ) )
1626 return(-1);
1627 }
1628 } while ( oncemore );
1629 return(0);
1630}
1631
1632/*
1633 #] PF_LongSingleReceive :
1634 #[ PF_PrepareLongMultiPack :
1635*/
1636
1642
1644{
1645 return PF_longMultiReset(1);
1646}
1647
1648/*
1649 #] PF_PrepareLongMultiPack :
1650 #[ PF_LongMultiPackImpl :
1651*/
1652
1662int PF_LongMultiPackImpl(const void*buffer, size_t count, size_t eSize, MPI_Datatype type)
1663{
1664 int ret, items;
1665
1666 /* XXX: Limited by int size. */
1667 if ( count > INT_MAX ) return -99;
1668
1669 ret = MPI_Pack_size((int)count,type,PF_COMM,&items);
1670 if ( ret != MPI_SUCCESS ) return(ret);
1671
1672 if ( PF_longMultiTop->packpos + items <= PF_packsize ) {
1673 ret = MPI_Pack((void *)buffer,(int)count,type,PF_longMultiTop->buffer,
1674 PF_packsize,&(PF_longMultiTop->packpos),PF_COMM);
1675 if ( ret != MPI_SUCCESS ) return(ret);
1676 PF_longMultiTop->nPacks++;
1677 return(0);
1678 }
1679/*
1680 The data do not fit to the rest of the buffer.
1681 There are two possibilities here: go to the next cell
1682 immediately, or first try to pack some portion. The function
1683 PF_longMultiHowSplit() returns the number of items could be
1684 packed in the end of the current cell:
1685*/
1686 if ( ( items = PF_longMultiHowSplit((int)count,type,items) ) < 0 ) return(items);
1687
1688 if ( items > 0 ) { /* store the head */
1689 ret = MPI_Pack((void *)buffer,items,type,PF_longMultiTop->buffer,
1690 PF_packsize,&(PF_longMultiTop->packpos),PF_COMM);
1691 if ( ret != MPI_SUCCESS ) return(ret);
1692 PF_longMultiTop->nPacks++;
1693 PF_longMultiTop->lastLen = items;
1694 }
1695/*
1696 Now the rest should be packed to the new cell.
1697 Slide to the new cell:
1698*/
1699 if ( PF_longMultiPack2NextCell() ) return(-1);
1700 PF_longPackN++;
1701/*
1702 Pack the rest to the next cell:
1703*/
1704 return(PF_LongMultiPackImpl((char *)buffer+items*eSize,count-items,eSize,type));
1705}
1706
1707/*
1708 #] PF_LongMultiPackImpl :
1709 #[ PF_LongMultiUnpackImpl :
1710*/
1711
1721int PF_LongMultiUnpackImpl(void *buffer, size_t count, size_t eSize, MPI_Datatype type)
1722{
1723 int ret;
1724
1725 /* XXX: Limited by int size. */
1726 if ( count > INT_MAX ) return -99;
1727
1728 if ( PF_longPackN < 2 ) { /* Just unpack the buffer from the single cell */
1729 ret = MPI_Unpack(
1730 PF_longMultiTop->buffer,
1731 PF_packsize,
1732 &(PF_longMultiTop->packpos),
1733 buffer,
1734 count,type,PF_COMM);
1735 if ( ret != MPI_SUCCESS ) return(ret);
1736 return(0);
1737 }
1738/*
1739 More than one cell is in use.
1740*/
1741 if ( ( PF_longMultiTop->nPacks > 1 ) /* the cell is not expired */
1742 || /* The last cell contains exactly required portion: */
1743 ( ( PF_longMultiTop->nPacks == 1 ) && ( PF_longMultiTop->lastLen == 0 ) )
1744 ) { /* Just unpack the buffer from the current cell */
1745 ret = MPI_Unpack(
1746 PF_longMultiTop->buffer,
1747 PF_packsize,
1748 &(PF_longMultiTop->packpos),
1749 buffer,
1750 count,type,PF_COMM);
1751 if ( ret != MPI_SUCCESS ) return(ret);
1752 (PF_longMultiTop->nPacks)--;
1753 return(0);
1754 }
1755 if ( ( PF_longMultiTop->nPacks == 1 ) && ( PF_longMultiTop->lastLen != 0 ) ) {
1756/*
1757 Unpack the head:
1758*/
1759 ret = MPI_Unpack(
1760 PF_longMultiTop->buffer,
1761 PF_packsize,
1762 &(PF_longMultiTop->packpos),
1763 buffer,
1764 PF_longMultiTop->lastLen,type,PF_COMM);
1765 if ( ret != MPI_SUCCESS ) return(ret);
1766/*
1767 Decrement the counter by read items:
1768*/
1769 count -= PF_longMultiTop->lastLen;
1770 if ( count <= 0 ) return(-1); /*Something is wrong! */
1771/*
1772 Shift the output buffer position:
1773*/
1774 buffer = (char *)buffer + PF_longMultiTop->lastLen * eSize;
1775 (PF_longMultiTop->nPacks)--;
1776 }
1777/*
1778 Here PF_longMultiTop->nPacks == 0
1779*/
1780 if ( ( PF_longMultiTop = PF_longMultiTop->next ) == NULL ) return(-1);
1781 return(PF_LongMultiUnpackImpl(buffer,count,eSize,type));
1782}
1783
1784/*
1785 #] PF_LongMultiUnpackImpl :
1786 #[ PF_LongMultiBroadcast :
1787*/
1788
1808{
1809 int ret, i;
1810
1811 if ( PF.me == MASTER ) {
1812/*
1813 PF_longPackN is the number of packed chunks. If it is more
1814 than 1, we have to pack a new one and send it first
1815*/
1816 if ( PF_longPackN > 1 ) {
1817 if ( PF_longMultiPreparePrefix() ) return(-1);
1818 ret = MPI_Bcast((VOID*)PF_longMultiTop->buffer,
1819 PF_packsize,MPI_PACKED,MASTER,PF_COMM);
1820 if ( ret != MPI_SUCCESS ) return(ret);
1821/*
1822 PF_longPackN was not incremented by PF_longMultiPreparePrefix()!
1823*/
1824 }
1825/*
1826 Now we start from the beginning:
1827*/
1828 PF_longMultiTop = PF_longMultiRoot;
1829/*
1830 Just broadcast all the chunks:
1831*/
1832 for ( i = 0; i < PF_longPackN; i++ ) {
1833 ret = MPI_Bcast((VOID*)PF_longMultiTop->buffer,
1834 PF_packsize,MPI_PACKED,MASTER,PF_COMM);
1835 if ( ret != MPI_SUCCESS ) return(ret);
1836 PF_longMultiTop = PF_longMultiTop->next;
1837 }
1838 return(0);
1839 }
1840/*
1841 else - the slave
1842*/
1843 PF_longMultiReset(0);
1844/*
1845 Get the first chunk; it can be either the only data chunk, or
1846 an auxiliary chunk, if the data do not fit the single chunk:
1847*/
1848 ret = MPI_Bcast((VOID*)PF_longMultiRoot->buffer,
1849 PF_packsize,MPI_PACKED,MASTER,PF_COMM);
1850 if ( ret != MPI_SUCCESS ) return(ret);
1851
1852 ret = MPI_Unpack((VOID*)PF_longMultiRoot->buffer,
1853 PF_packsize,
1854 &(PF_longMultiRoot->packpos),
1855 &PF_longPackN,1,MPI_INT,PF_COMM);
1856 if ( ret != MPI_SUCCESS ) return(ret);
1857/*
1858 Now in PF_longPackN we have the number of cells used
1859 for broadcasting. If it is >1, then we have to allocate
1860 enough cells, initialize them and receive all the chunks.
1861*/
1862 if ( PF_longPackN < 2 ) /* That's all, the single chunk is received. */
1863 return(0);
1864/*
1865 Here we have to get PF_longPackN chunks. But, first,
1866 initialize cells by info from the received auxiliary chunk.
1867*/
1868 if ( PF_longMultiProcessPrefix() ) return(-1);
1869/*
1870 Now we have free PF_longPackN cells, starting
1871 from PF_longMultiRoot->next, with properly initialized
1872 nPacks and lastLen fields. Get chunks:
1873*/
1874 for ( PF_longMultiTop = PF_longMultiRoot->next, i = 0; i < PF_longPackN; i++ ) {
1875 ret = MPI_Bcast((VOID*)PF_longMultiTop->buffer,
1876 PF_packsize,MPI_PACKED,MASTER,PF_COMM);
1877 if ( ret != MPI_SUCCESS ) return(ret);
1878 if ( i == 0 ) { /* The first chunk, it contains extra "1". */
1879 int tmp;
1880/*
1881 Extract this 1 into tmp and forget about it.
1882*/
1883 ret = MPI_Unpack((VOID*)PF_longMultiTop->buffer,
1884 PF_packsize,
1885 &(PF_longMultiTop->packpos),
1886 &tmp,1,MPI_INT,PF_COMM);
1887 if ( ret != MPI_SUCCESS ) return(ret);
1888 }
1889 PF_longMultiTop = PF_longMultiTop->next;
1890 }
1891/*
1892 multiUnPack starts with PF_longMultiTop, skip auxiliary chunk in
1893 PF_longMultiRoot:
1894*/
1895 PF_longMultiTop = PF_longMultiRoot->next;
1896 return(0);
1897}
1898
1899/*
1900 #] PF_LongMultiBroadcast :
1901 #] Long pack stuff :
1902*/
int PF_RecvWbuf(WORD *b, LONG *s, int *src)
Definition mpi.c:337
int PF_LongSingleReceive(int src, int tag, int *psrc, int *ptag)
Definition mpi.c:1583
int PF_PackString(const UBYTE *str)
Definition mpi.c:706
int PF_LibInit(int *argcp, char ***argvp)
Definition mpi.c:123
int PF_LongSingleSend(int to, int tag)
Definition mpi.c:1540
int PF_PrepareLongSinglePack(void)
Definition mpi.c:1451
int PF_Unpack(void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:671
int PF_IRecvRbuf(PF_BUFFER *r, int bn, int from)
Definition mpi.c:366
int PF_Receive(int src, int tag, int *psrc, int *ptag)
Definition mpi.c:848
int PF_Send(int to, int tag)
Definition mpi.c:822
LONG PF_RealTime(int i)
Definition mpi.c:101
int PF_PreparePack(void)
Definition mpi.c:624
int PF_LongSingleUnpack(void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:1503
LONG PF_RawRecv(int *src, void *buf, LONG thesize, int *tag)
Definition mpi.c:484
int PF_Pack(const void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:642
int PF_PrepareLongMultiPack(void)
Definition mpi.c:1643
int PF_LongMultiPackImpl(const void *buffer, size_t count, size_t eSize, MPI_Datatype type)
Definition mpi.c:1662
int PF_RawProbe(int *src, int *tag, int *bytesize)
Definition mpi.c:508
int PF_Broadcast(void)
Definition mpi.c:883
int PF_LongMultiBroadcast(void)
Definition mpi.c:1807
int PF_UnpackString(UBYTE *str)
Definition mpi.c:774
int PF_PrintPackBuf(char *s, int size)
Definition mpi.c:594
int PF_RawSend(int dest, void *buf, LONG l, int tag)
Definition mpi.c:463
int PF_LongMultiUnpackImpl(void *buffer, size_t count, size_t eSize, MPI_Datatype type)
Definition mpi.c:1721
#define MPI_ERRCODE_CHECK(err)
Definition mpi.c:84
int PF_LongSinglePack(const void *buffer, size_t count, MPI_Datatype type)
Definition mpi.c:1469
int PF_Bcast(void *buffer, int count)
Definition mpi.c:440
int PF_ISendSbuf(int to, int tag)
Definition mpi.c:261
int PF_Probe(int *src)
Definition mpi.c:230
int PF_WaitRbuf(PF_BUFFER *r, int bn, LONG *size)
Definition mpi.c:400
int PF_LibTerminate(int error)
Definition mpi.c:209