diff --git a/pNbody/io.py b/pNbody/io.py index 9ca9840..40f6600 100644 --- a/pNbody/io.py +++ b/pNbody/io.py @@ -1,774 +1,774 @@ # -*- coding: iso-8859-1 -*- # standard modules import os,sys,string,types import pickle # array module from numpy import * import pyfits import mpi ################################# def checkfile(name): ################################# ''' Check if a file exists. An error is generated if the file does not exists. Parameters ---------- name : the path to a filename Examples -------- >>> io.checkfile('an_existing_file') >>> >>> io.checkfile('a_non_existing_file') Traceback (most recent call last): File "", line 1, in File "/home/epfl/revaz/local/lib64/python2.6/site-packages/pNbody/io.py", line 33, in checkfile raise IOError(915,'file %s not found ! Pease check the file name.'%(name)) IOError: [Errno 915] file nofile not found ! Pease check the file name. ''' if name == None: raise Exception("file name set to None ! Please check the file name.") if not os.path.isfile(name): raise IOError(915,'file %s not found ! Pease check the file name.'%(name)) ################################# def end_of_file(f,pio='no',MPI=None): ################################# ''' Return True if we have reached the end of the file f, False instead Parameters ---------- f : ndarray or matrix object an open file pio : 'yes' or 'no' if the file is read in parallel or not MPI : MPI communicator Returns ------- status : Bool True if the we reached the end of the file False if not ''' if pio=='no': # here, the master decide for all slaves if mpi.ThisTask == 0: p1 = f.tell() f.seek(0,2) p2 = f.tell() f.seek(p1) if p1 == p2: status = True else: status = False else: status = None status = mpi.mpi_bcast(status,0) return status else: # each processus decide for himself p1 = f.tell() f.seek(0,2) p2 = f.tell() f.seek(p1) if p1 == p2: status = True else: status = False return status ##################################################### def write_array(file,vec): ##################################################### ''' Write an array to a file, in a very simple ascii format. Parameters ---------- file : the path to a file vec : an ndarray object Examples -------- >>> from numpy import * >>> x = array([1,2,3]) >>> io.write_array('/tmp/array.dat',x) ''' f = open(file,'w') for i in range(len(vec)): f.write("%f\n"%vec[i]) f.close() ##################################################### def read_ascii(file,columns=None,lines=None,dtype=float,skipheader=False,cchar='#'): ##################################################### ''' Read an ascii file. The function allows to set the number of columns or line to read. If it contains a header, the header is used to label all column. In this case, a dictionary is returned. Parameters ---------- file : the path to a file or an open file columns : list the list of the columns to read if none, all columns are read lines : list the list of the lines to read if none, all lines are read dtype : dtype the ndtype of the objects to read skipheader : bool if true, do not read the header if there is one cchar : char lines begining with cchar are skiped the first line is considered as the header Returns ------- data : Dict or ndarray A python dictionary or an ndarray object Examples -------- >>> from numpy import * >>> x = arange(10) >>> y = x*x >>> f = open('afile.txt','w') >>> f.write("# x y") >>> for i in xrange(len(x)): ... f.write('%g %g'%(x[i],y[i])) ... >>> f.close() >>> from pNbody import io >>> data = io.read_ascii("afile.txt") >>> data['x'] array([ 0., 1., 2., 3., 4., 5., 6., 7., 8., 9.]) >>> data['y'] array([ 0., 1., 4., 9., 16., 25., 36., 49., 64., 81.]) ''' def RemoveComments(l): if l[0]==cchar: return None else: return l def toNumList(l): return map(dtype,l) if type(file) != types.FileType: f = open(file,'r') else: f = file # read header while there is one while 1: fpos = f.tell() header = f.readline() if header[0] != cchar: f.seek(fpos) header = None break else: if skipheader: header = None else: # create dict from header header = string.strip(header[2:]) elts = string.split(header) break ''' # read header if there is one header = f.readline() if header[0] != cchar: f.seek(0) header = None else: if skipheader: header = None else: # create dict from header header = string.strip(header[2:]) elts = string.split(header) ''' # now, read the file content lines = f.readlines() # remove trailing lines = map(string.strip, lines) # remove comments #lines = map(RemoveComments, lines) # split lines = map(string.split, lines) # convert into float lines = map(toNumList, lines) # convert into array lines = array(map(array, lines)) # transpose lines = transpose(lines) if header != None: iobs = {} i = 0 for elt in elts: iobs[elt]=i i = i + 1 vals = {} for key in iobs.keys(): vals[key] = lines[iobs[key]] return vals # return if columns == None: return lines else: return lines.take(axis=0,indices=columns) ##################################################### def write_dmp(file,data): ##################################################### ''' Write a dmp (pickle) file. In other word, dump the data object. Parameters ---------- file : the path to a file data : a pickable python object Examples -------- >>> x = {'a':1,'b':2} >>> io.write_dmp('/tmp/afile.dmp',x) ''' f = open(file,'w') pickle.dump(data, f) f.close() ##################################################### def read_dmp(file): ##################################################### ''' Read a dmp (pickle) file. Parameters ---------- file : the path to a file Returns ------- data : a python object Examples -------- >>> x = {'a':1,'b':2} >>> io.write_dmp('/tmp/afile.dmp',x) >>> y = io.read_dmp('/tmp/afile.dmp') >>> y {'a': 1, 'b': 2} ''' f = open(file,'r') data = pickle.load(f) f.close() return data ##################################################### def WriteFits(data, filename, extraHeader = None) : ##################################################### ''' Write a fits file ''' # image creation fitsimg = pyfits.HDUList() # add data hdu = pyfits.PrimaryHDU() hdu.data = data fitsimg.append(hdu) # add keys keys = [] if extraHeader != None: #keys.append(('INSTRUME','st4 SBIG ccd camera','Instrument name')) #keys.append(('LOCATION',"175 OFXB St-Luc (VS)",'Location')) keys = extraHeader hdr = fitsimg[0].header for key in keys: hdr.update(key[0],key[1],comment=key[2]) fitsimg.writeto(filename) ##################################################### def ReadFits(filename) : ##################################################### ''' Read a fits file. ''' # read image fitsimg = pyfits.open(filename) data = fitsimg[0].data return data ################################# -def readblock(f,data_type,shape=None,byteorder=sys.byteorder,skip=False): +def readblock(f,data_type,shape=None,byteorder=sys.byteorder,skip=False,htype=int32): ################################# ''' data_type = int,float32,float or data_type = array shape = tuple ''' # compute the number of bytes that should be read nbytes_to_read=None if shape!=None: shape_a = array(shape) nelts_to_read = shape_a[0] for n in shape_a[1:]: nelts_to_read = nelts_to_read*n nbytes_to_read = nelts_to_read*dtype(data_type).itemsize try: - nb1 = fromstring(f.read(4),int32) + nb1 = fromstring(f.read(4),htype) if sys.byteorder != byteorder: nb1.byteswap(True) nb1 = nb1[0] nbytes=nb1 # check if nbytes_to_read: if nbytes_to_read!=nbytes: print "inconsistent block header, using nbytes=%d instead"%nbytes_to_read nbytes=nbytes_to_read except IndexError: raise "ReadBlockError" if skip: f.seek(nbytes,1) data = None shape = None print " skipping %d bytes... "%(nbytes) else: if type(data_type) == types.TupleType: data = [] for tpe in data_type: if type(tpe) == int: val = f.read(tpe) else: bytes = dtype(tpe).itemsize val = fromstring(f.read(bytes),tpe) if sys.byteorder != byteorder: val.byteswap(True) val = val[0] data.append(val) else: data = fromstring(f.read(nbytes),data_type) if sys.byteorder != byteorder: data.byteswap(True) - nb2 = fromstring(f.read(4),int32) + nb2 = fromstring(f.read(4),htype) if sys.byteorder != byteorder: nb2.byteswap(True) nb2 = nb2[0] if nb1 != nb2: print "ReadBlockError","nb1=%d nb2=%d"%(nb1,nb2) raise "ReadBlockError","nb1=%d nb2=%d"%(nb1,nb2) # reshape if needed if shape != None: data.shape=shape return data ################################# -def ReadBlock(f,data_type,shape=None,byteorder=sys.byteorder,pio='no'): +def ReadBlock(f,data_type,shape=None,byteorder=sys.byteorder,pio='no',htype=int32): ################################# ''' data_type = int,float32,float or data_type = array shape = tuple pio : parallel io, 'yes' or 'no' if 'yes', each proc read each file if 'no', proc 0 read and send to each other ''' if mpi.NTask==1: - data = readblock(f,data_type=data_type,shape=shape,byteorder=byteorder) + data = readblock(f,data_type=data_type,shape=shape,byteorder=byteorder,htype=htype) return data if pio == 'yes': - data = readblock(f,data_type=data_type,shape=shape,byteorder=byteorder) + data = readblock(f,data_type=data_type,shape=shape,byteorder=byteorder,htype=htype) return data else: - data = mpi.mpi_ReadAndSendBlock(f,data_type=data_type,shape=shape,byteorder=byteorder) + data = mpi.mpi_ReadAndSendBlock(f,data_type=data_type,shape=shape,byteorder=byteorder,htype=htype) return data ################################# -def ReadArray(f,data_type,shape=None,byteorder=sys.byteorder,pio='no',nlocal=None): +def ReadArray(f,data_type,shape=None,byteorder=sys.byteorder,pio='no',nlocal=None,htype=int32): ################################# ''' data_type = int,float32,float or data_type = array shape = tuple ''' if mpi.NTask==1: - data = readblock(f,data_type=data_type,shape=shape,byteorder=byteorder) + data = readblock(f,data_type=data_type,shape=shape,byteorder=byteorder,htype=int32) return data if pio == 'yes': - data = readblock(f,data_type=data_type,shape=shape,byteorder=byteorder) + data = readblock(f,data_type=data_type,shape=shape,byteorder=byteorder,htype=int32) return data else: - data = mpi.mpi_OldReadAndSendArray(f,data_type,shape=shape,byteorder=byteorder,nlocal=nlocal) + data = mpi.mpi_OldReadAndSendArray(f,data_type,shape=shape,byteorder=byteorder,nlocal=nlocal,htype=int32) return data ################################# def ReadDataBlock(f,data_type,shape=None,byteorder=sys.byteorder,pio='no',npart=None,skip=False): ################################# ''' Read a block containg data. If NTask = 1 or pio = 'yes', the block is read normally. If NTask > 1 and pio = 'no', the master reads the block and send the data to the slaves. In the second case : a) the master send N/Ntask element to each task. b) if the var npart is present, he send Np/Ntask to each task, for each Np of npart. data_type = array shape = tuple ''' if mpi.NTask==1 or pio == 'yes': data = readblock(f,data_type=data_type,shape=shape,byteorder=byteorder,skip=skip) return data else: data = mpi.mpi_ReadAndSendArray(f,data_type,shape=shape,byteorder=byteorder,npart=npart,skip=skip) return data ################################# -def writeblock(f,data,byteorder=sys.byteorder): +def writeblock(f,data,byteorder=sys.byteorder,htype=int32): ################################# ''' data = array or data = ((x,float32),(y,int),(z,float32),(label,40)) shape = tuple ''' if type(data) == types.TupleType: # first, compute nbytes nbytes = 0 for dat in data: if type(dat[0])==types.StringType: nbytes = nbytes + dat[1] else: nbytes = nbytes + array([dat[0]],dat[1]).type().bytes* array([dat[0]],dat[1]).size() - nbytes = array([nbytes],int32) + nbytes = array([nbytes],htype) # write block if sys.byteorder != byteorder: nbytes.byteswap(True) f.write(nbytes.tostring()) for dat in data: if type(dat[0])==types.StringType: f.write(string.ljust(dat[0],dat[1])[:dat[1]]) else: ar = array([dat[0]],dat[1]) if sys.byteorder != byteorder: ar.byteswap(True) f.write(ar.tostring()) f.write(nbytes.tostring()) else: # write block #nbytes = array([data.type().bytes*data.size()],int) - nbytes = array([data.nbytes],int32) + nbytes = array([data.nbytes],htype) if sys.byteorder != byteorder: nbytes.byteswap(True) data.byteswap(True) f.write(nbytes.tostring()) f.write(data.tostring()) f.write(nbytes.tostring()) ################################# -def WriteBlock(f,data,byteorder=sys.byteorder): +def WriteBlock(f,data,byteorder=sys.byteorder,htype=int32): ################################# ''' data = ((x,float32),(y,int),(z,float32),(label,40)) shape = tuple ''' if f!=None: if type(data) == types.TupleType: # first, compute nbytes nbytes = 0 for dat in data: if type(dat[0])==types.StringType: nbytes = nbytes + dat[1] elif type(dat[0]) == string_: nbytes = nbytes + dat[1] else: #nbytes = nbytes + array([dat[0]],dat[1]).type().bytes* array([dat[0]],dat[1]).size() nbytes = nbytes + array([dat[0]],dat[1]).nbytes - nbytes = array([nbytes],int32) + nbytes = array([nbytes],htype) # write block if sys.byteorder != byteorder: nbytes.byteswap(True) f.write(nbytes.tostring()) for dat in data: if type(dat[0])==types.StringType: f.write(string.ljust(dat[0],dat[1])[:dat[1]]) elif type(dat[0]) == string_: f.write(string.ljust(dat[0],dat[1])[:dat[1]]) else: ar = array([dat[0]],dat[1]) if sys.byteorder != byteorder: ar.byteswap(True) f.write(ar.tostring()) f.write(nbytes.tostring()) ################################# -def WriteArray(f,data,byteorder=sys.byteorder,pio='no',npart=None): +def WriteArray(f,data,byteorder=sys.byteorder,pio='no',npart=None,htype=int32): ################################# ''' data = array shape = tuple ''' if mpi.NTask==1 or pio == 'yes': - writeblock(f,data,byteorder=byteorder) + writeblock(f,data,byteorder=byteorder,htype=htype) else: - mpi.mpi_GatherAndWriteArray(f,data,byteorder=byteorder,npart=npart) + mpi.mpi_GatherAndWriteArray(f,data,byteorder=byteorder,npart=npart,htype=htype) ################################# def WriteDataBlock(f,data,byteorder=sys.byteorder,pio='no',npart=None): ################################# ''' Write a block containg data. If NTask = 1 or pio = 'yes', the block is written normally. If NTask > 1 and pio = 'no', the master get the block from the slaves and write it. In the second case : a) the master get N/Ntask element from each task. b) if the var npart is present, he get Np/Ntask from each task, for each Np of npart. data = array shape = tuple ''' if mpi.NTask==1 or pio == 'yes': writeblock(f,data,byteorder=byteorder) else: mpi.mpi_GatherAndWriteArray(f,data,byteorder=byteorder,npart=npart) ############################################################### # # some special function reading gadget related files # ############################################################### ################################# def read_cooling(file): ################################# ''' Read cooling file ''' f = open(file,'r') f.readline() f.readline() lines = f.readlines() f.close() lines = map(string.strip,lines) elts = map(string.split,lines) logT = array(map(lambda x:float(x[0]),elts)) logL0 = array(map(lambda x:float(x[1]),elts)) logL1 = array(map(lambda x:float(x[2]),elts)) logL2 = array(map(lambda x:float(x[3]),elts)) logL3 = array(map(lambda x:float(x[4]),elts)) logL4 = array(map(lambda x:float(x[5]),elts)) logL5 = array(map(lambda x:float(x[6]),elts)) logL6 = array(map(lambda x:float(x[7]),elts)) return logT,logL0,logL1,logL2,logL3,logL4,logL5,logL6 ################################# def read_params(file): ################################# ''' Read params Gadget file and return the content in a dictionary ''' f = open(file) lines = f.readlines() f.close() # remove empty lines lines = filter(lambda l:l!='\n', lines) # remove trailing lines = map(string.strip, lines) # remove comments lines = filter(lambda x:x[0]!='%', lines) # split lines elts = map(string.split,lines) # make dictionary params = {} for e in elts: try : params[e[0]]=float(e[1]) except ValueError: params[e[0]]= e[1] return params diff --git a/pNbody/mpi.py b/pNbody/mpi.py index 283e397..37b2cf4 100644 --- a/pNbody/mpi.py +++ b/pNbody/mpi.py @@ -1,1165 +1,1165 @@ # -*- coding: iso-8859-1 -*- from numpy import * import types import libutil import sys class myMPI: def __init__(self): pass try: from mpi4py import MPI comm = MPI.COMM_WORLD comm_self = MPI.COMM_SELF ThisTask = comm.Get_rank() NTask = comm.Get_size() Procnm = MPI.Get_processor_name() Rank = ThisTask except: MPI = myMPI() MPI.SUM = sum ThisTask = 0 NTask = 1 Rank = ThisTask Procnm = "" def mpi_IsMaster(): return ThisTask==0 def mpi_Rank(): return ThisTask def mpi_ThisTask(): return ThisTask def mpi_NTask(): return NTask def mpi_barrier(): if NTask>1: comm.barrier() else: pass ##################################### # # Output functions # ##################################### def mpi_iprint(msg,mode=None): ''' Synchronized print, including info on node. ''' msg = "[%d] : %s"%(ThisTask,msg) pprint(msg) def mpi_pprint(msg): ''' Synchronized print. ''' if NTask>1: MPI.pprint(msg) else: print msg def mpi_rprint(msg): ''' Rooted print. ''' if NTask>1: MPI.rprint(msg) else: print msg ##################################### # # communication functions # ##################################### def mpi_send(x,dest): ''' Send x to node dest. When there is only one is defined, it does nothing. ''' if NTask>1: return comm.send(x, dest = dest) else: pass def mpi_recv(source): ''' Return a variable sent by node \var{source}. When there is only one is defined, it does nothing. ''' if NTask>1: return comm.recv(source = source) else: pass def mpi_sendrecv(x,dest,source): ''' ''' if NTask>1: return comm.sendrecv(x,dest=dest,source = source) else: pass def mpi_reduce(x,root=0,op=MPI.SUM): """ Reduce x from all node only for root. When there is only one is defined, the function return x. """ if NTask>1: sx = comm.reduce(x,op=op,root=root) else: sx = x return sx def mpi_allreduce(x,op=MPI.SUM): """ Reduce x from all node for all nodes. When there is only one is defined, the function return x. """ if NTask>1: sx = comm.allreduce(x,op=op) else: sx = x return sx def mpi_bcast(x,root=0): ''' Broadcast from node root the variable x. When there is only one is defined, it simplay returns x. ''' if NTask>1: x = comm.bcast(x,root=root) else: x = x return x def mpi_gather(x,root=0): """ Gather x from all nodes to node dest. Returns a list. """ if NTask>1: x = comm.gather(x,root=root) else: x = [x] return x def mpi_allgather(x): """ Gather x from all to all. Returns a list. """ if NTask>1: x = comm.allgather(x) else: x = [x] return x def mpi_AllgatherAndConcatArray(vec): ''' AllGather array vec and concatenate it in a unique array (concatenation order is reversed). ''' if NTask>1: vec_all = array([],vec.dtype) if vec.ndim==1: vec_all.shape = (0,) else: vec_all.shape = (0,vec.shape[1]) if ThisTask==0: for Task in range(NTask-1,-1,-1): if Task != 0: v = comm.recv(source=Task) vec_all = concatenate((vec_all,v)) else: vec_all = concatenate((vec_all,vec)) else: comm.send(vec, dest = 0) # send to everybody xi = comm.bcast(vec_all) return xi else: return vec ##################################### # # array functions # ##################################### def mpi_sum(x): """ Sum elements of array x. """ if NTask>1: sx = comm.allreduce(sum(x),op=MPI.SUM) else: sx = sum(x) return sx def mpi_min(x): """ Minimum element of array x. """ if NTask>1: mx = comm.allreduce(min(x),op=MPI.MIN) else: mx = min(x) return mx def mpi_max(x): """ Maximum element of array x. """ if NTask>1: mx = comm.allreduce(max(x),op=MPI.MAX) else: mx = max(x) return mx def mpi_mean(x): """ Mean of elements of array x. """ if NTask>1: sm = comm.allreduce(sum(x),op=MPI.SUM) sn = comm.allreduce(len(x),op=MPI.SUM) mn = sm/sn else: mn = x.mean() return mn def mpi_len(x): """ Lenght of array x. """ if NTask>1: ln = comm.allreduce(len(x),op=MPI.SUM) else: ln = len(x) return ln def mpi_arange(n): """ Create an integer array containing elements from 0 to n spreaded over all nodes. """ if NTask>1: ns = array(comm.allgather(n)) c = ((NTask-1-ThisTask) > arange(len(ns)-1,-1,-1) ) i = sum(ns*c) v = arange(i,i+ns[ThisTask]) else: v = arange(n) return v def mpi_sarange(npart_all): """ Create an integer array containing elements from 0 to n, spreaded over all nodes. The repartition of elements and type of elements over nodes is given by the array npart_all """ npart = npart_all[ThisTask] if NTask>1: v = array([],int) o = 0 for i in arange(len(npart)): # number of particles of this type for this proc n = npart_all[ThisTask,i] n_tot = sum(npart_all[:,i]) ns = array(comm.allgather(n)) c = ((NTask-1-ThisTask) > arange(len(ns)-1,-1,-1) ) j = sum(ns*c) vj = arange(o+j,o+j+ns[ThisTask]) v = concatenate((v,vj)) o = o + n_tot else: n = sum(ravel(npart)) v = arange(n) return v def mpi_argmax(x): """ Find the arument of the amximum value in x. idx = (p,i) : where i = index in proc p """ if NTask>1: # 1) get all max and all argmax maxs = array(comm.allgather(max(x))) args = array(comm.allgather(argmax(x))) # 2) choose the right one p = argmax(maxs) # proc number i = args[p] # index in proc p else: p = 0 i = argmax(x) return p,i def mpi_argmin(x): """ Find the arument of the maximum value in x. idx = (p,i) : where i = index in proc p """ if NTask>1: # 1) get all mins and all argmins mins = array(comm.allgather(min(x))) args = array(comm.allgather(argmin(x))) # 2) choose the right one p = argmin(mins) # proc number i = args[p] # index in proc p else: p = 0 i = argmin(x) return p,i def mpi_getval(x,idx): """ Return the value of array x corresponding to the index idx. idx = (p,i) : where i = index in proc p equivalent to x[i] from proc p """ p = idx[0] i = idx[1] if NTask>1: # send to everybody xi = comm.bcast(x[i]) else: xi = x[i] return xi def mpi_histogram(x,bins): """ Return an histogram of vector x binned using binx. """ # histogram n = searchsorted(sort(x),bins) n = concatenate([n,[len(x)]]) hx = n[1:]-n[:-1] if NTask>1: hx = comm.allreduce(hx,op=MPI.SUM) return hx ##################################### # # exchange function # ##################################### ################################# def mpi_find_a_toTask(begTask,fromTask,ex_table,delta_n): ################################# ''' This function is used to find recursively an exange table ''' for toTask in range(begTask,NTask): if delta_n[toTask] > 0: # find a node who accept particles if delta_n[toTask] + delta_n[fromTask] >= 0: # can give all delta = -delta_n[fromTask] else: # can give only a fiew delta = +delta_n[toTask] ex_table[fromTask,toTask] = +delta ex_table[toTask,fromTask] = -delta delta_n[fromTask] = delta_n[fromTask] +delta delta_n[toTask] = delta_n[toTask] -delta if delta_n[fromTask] == 0: # everything has been distributed return ex_table else: return mpi_find_a_toTask(begTask+1,fromTask,ex_table,delta_n) ################################# def mpi_GetExchangeTable(n_i): ################################# ''' This function returns the exchange table ''' # n_i : initial repartition ntot = sum(n_i) if ntot==0: return zeros((NTask,NTask)) # final repartition n_f = zeros(NTask) for Task in range(NTask-1,-1,-1): n_f[Task] = ntot/NTask + ntot%NTask*(Task==0) # delta delta_n = n_f - n_i # find who gives what to who ex_table = zeros((NTask,NTask)) for fromTask in range(NTask): if delta_n[fromTask] < 0: # the node gives particles ex_table = mpi_find_a_toTask(0,fromTask,ex_table,delta_n) return ex_table ################################# def mpi_ExchangeFromTable(T,procs,ids,vec,num): ################################# ''' Exchange an array according to a transfer array T T : exchange table procs : list of processor (from Tree.GetExchanges()) ids : list of id (from Tree.GetExchanges()) vec : vector to exchange num : id correspondings to particles ''' # now, we have to send / recv SendPart = T[NTask-1-ThisTask,:] RecvPart = T[:,ThisTask] new_procs = array([]) new_vec = array([]) if vec.ndim == 1: #new_vec.shape = (0,3) pass elif vec.ndim == 2: new_vec.shape = (0,3) # send/recv (1) for i in range(NTask): if i > ThisTask: # here, we send to i N = T[NTask-1-ThisTask,i] comm.send(N,i) if N>0: to_procs = compress((procs==i),procs,axis=0) to_ids = compress((procs==i),ids,axis=0) to_vec = libutil.compress_from_lst(vec,num,to_ids) comm.send(to_vec,i) vec = libutil.compress_from_lst(vec,num,to_ids,reject=True) num = libutil.compress_from_lst(num,num,to_ids,reject=True) elif i < ThisTask: N = comm.recv(i) if N > 0: new_vec = concatenate((new_vec,comm.recv(i))) # send/recv (1) for i in range(NTask): if i < ThisTask: # here, we send to i N = T[NTask-1-ThisTask,i] comm.send(N,i) if N>0: to_procs = compress((procs==i),procs,axis=0) to_ids = compress((procs==i),ids,axis=0) to_vec = libutil.compress_from_lst(vec,num,to_ids) comm.send(to_vec,i) vec = libutil.compress_from_lst(vec,num,to_ids,reject=True) num = libutil.compress_from_lst(num,num,to_ids,reject=True) elif i > ThisTask: N = comm.recv(i) if N > 0: new_vec = concatenate((new_vec,comm.recv(i))) # check c = (new_procs!=ThisTask).astype(int) if sum(c)>0: print "here we are in trouble" sys.exit() return concatenate((vec,new_vec)) # !!!!!!!!!!!!!!!!!!!!!!! this has to be changed .... ##################################### # # io functions # ##################################### ################################# -def mpi_ReadAndSendBlock(f,data_type,shape=None,byteorder=sys.byteorder,split=None): +def mpi_ReadAndSendBlock(f,data_type,shape=None,byteorder=sys.byteorder,split=None,htype=int32): ################################# ''' Read and brodcast a binary block. data_type = int,float32,float or data_type = array shape = tuple ''' ##################### # read first header ##################### if ThisTask==0: try: - nb1 = fromstring(f.read(4),int32) + nb1 = fromstring(f.read(4),htype) if sys.byteorder != byteorder: nb1.byteswap(True) nb1 = nb1[0] except IndexError: raise "ReadBlockError" ##################### # read a tuple ##################### if type(data_type) == types.TupleType: if ThisTask==0: data = [] for tpe in data_type: if type(tpe) == int: val = f.read(tpe) else: bytes = dtype(tpe).itemsize val = fromstring(f.read(bytes),tpe) if sys.byteorder != byteorder: val.byteswap(True) val = val[0] data.append(val) # send the data if NTask > 1: if ThisTask==0: for Task in range(1,NTask): comm.send(data, dest = Task) else: data = comm.recv(source = 0) ##################### # read an array ##################### else: if split: if ThisTask==0: bytes_per_elt = data_type.bytes if shape != None: ndim = shape[1] else: ndim = 1 nelt = nb1/bytes_per_elt/ndim nleft = nelt nread = nelt/NTask for Task in range(NTask-1,-1,-1): if nleft < 2*nread and nleft > nread: nread = nleft nleft = nleft-nread data = f.read(nread*bytes_per_elt*ndim) shape = (nread,ndim) if Task==ThisTask: # this should be last data = fromstring(data,data_type) if sys.byteorder != byteorder: data.byteswap(True) data.shape=shape else: comm.send(data, dest = Task) comm.send(shape, dest = Task) else : data = comm.recv(source = 0) shape = comm.recv(source = 0) data = fromstring(data,data_type) if sys.byteorder != byteorder: data.byteswap(True) data.shape=shape else: if ThisTask==0: data = fromstring(f.read(nb1),data_type) if sys.byteorder != byteorder: data.byteswap(True) # send the data if NTask > 1: if ThisTask==0: for Task in range(1,NTask): comm.send(data, dest = Task) else: data = comm.recv(source = 0) ##################### # read second header ##################### if ThisTask==0: - nb2 = fromstring(f.read(4),int32) + nb2 = fromstring(f.read(4),htype) if sys.byteorder != byteorder: nb2.byteswap(True) nb2 = nb2[0] if nb1 != nb2: raise "ReadBlockError" # reshape if needed if split: # already done before pass else: if shape != None: data.shape=shape return data ################################# -def mpi_ReadAndSendArray(f,data_type,shape=None,byteorder=sys.byteorder,npart=None,skip=False): +def mpi_ReadAndSendArray(f,data_type,shape=None,byteorder=sys.byteorder,npart=None,skip=False,htype=int32): ################################# ''' Read and Brodcast a binary block assuming it contains an array. ''' oneD = False if len(shape)==1: shape = (shape[0],1) oneD = True if shape != None: n_dim = shape[1] data = array([],data_type) data.shape = (0,shape[1]) n_elts = shape[0] else: raise "mpi_ReadAndSendArray : var shape must be defined here." #n_dim = 1 #data = array([],data_type) #data.shape = (0,) nbytes_per_elt = dtype(data_type).itemsize * n_dim n_left = nbytes_per_elt * n_elts # check npart if npart != None: ntype = len(npart) else: ntype = 1 npart = array([n_elts]) # this may be wrong in 64b, as nb1 is not equal to the number of bytes in block #if sum(npart,0) != n_elts: # raise "We are in trouble here : sum(npart)=%d n_elts=%d"%(sum(npart,0),n_elts) ##################### # read first header ##################### if ThisTask==0: try: - nb1 = fromstring(f.read(4),int32) + nb1 = fromstring(f.read(4),htype) if sys.byteorder != byteorder: nb1.byteswap(True) nb1 = nb1[0] # this may be wrong in 64b, as nb1 is not equal to the number of bytes in block #if nb1 != n_left: # raise "We are in trouble here : nb1=%d n_left=%d"%(nb1,n_left) except IndexError: raise "ReadBlockError" ##################### # read data ##################### if ThisTask == 0: if skip: nbytes = sum(npart)*nbytes_per_elt print " skipping %d bytes... "%(nbytes) f.seek(nbytes,1) data = None else: for i in range(ntype): n_write = libutil.get_n_per_task(npart[i],NTask) for Task in range(NTask-1,-1,-1): sdata = f.read(nbytes_per_elt*n_write[Task]) # send block for a slave if Task != 0: comm.send(sdata, dest = Task) # keep block for the master else: sdata = fromstring(sdata,data_type) if shape!=None: sdata.shape = (n_write[Task],shape[1]) data = concatenate((data,sdata)) else: if skip: data=None else: for i in range(ntype): sdata = comm.recv(source = 0) sdata = fromstring(sdata,data_type) if shape!=None: ne = len(sdata)/shape[1] sdata.shape = (ne,shape[1]) data = concatenate((data,sdata)) if oneD and data!=None: data.shape = (data.shape[0],) ##################### # read second header ##################### if ThisTask==0: - nb2 = fromstring(f.read(4),int32) + nb2 = fromstring(f.read(4),htype) if sys.byteorder != byteorder: nb2.byteswap(True) nb2 = nb2[0] if nb1 != nb2: raise "ReadBlockError" return data ################################# -def mpi_GatherAndWriteArray(f,data,byteorder=sys.byteorder,npart=None): +def mpi_GatherAndWriteArray(f,data,byteorder=sys.byteorder,npart=None,htype=int32): ################################# ''' Gather and array and write it in a binary block. data = array shape = tuple ''' # check npart if npart != None: pass else: npart = array([len(data)]) data_nbytes = mpi_allreduce(data.nbytes) if ThisTask==0: - nbytes = array([data_nbytes],int32) + nbytes = array([data_nbytes],htype) if sys.byteorder != byteorder: nbytes.byteswap(True) # write header 1 f.write(nbytes.tostring()) # loop over particles type for i in range(len(npart)): n_write = libutil.get_n_per_task(npart[i],NTask) if ThisTask==0: for Task in range(NTask-1,-1,-1): if Task != 0: sdata = comm.recv(source = Task) else: i1 = sum(npart[:i]) i2 = i1+npart[i] sdata = data[i1:i2] if sys.byteorder != byteorder: sdata.byteswap(True) f.write(sdata.tostring()) else: i1 = sum(npart[:i]) i2 = i1+npart[i] comm.send(data[i1:i2], dest = 0) if ThisTask==0: # write header 2 f.write(nbytes.tostring()) ################################# def mpi_OldReadAndSendArray(f,data_type,shape=None,skip=None,byteorder=sys.byteorder,nlocal=None): ################################# ''' Read and Brodcast a binary block assuming it contains an array. The array is splitted acroding to the variable nlocal. data_type = array type shape = tuple nlocal : array NTask x Npart array NTask ''' ##################### # read first header ##################### if ThisTask==0: try: nb1 = fromstring(f.read(4),int32) if sys.byteorder != byteorder: nb1.byteswap(True) nb1 = nb1[0] except IndexError: raise "ReadBlockError" ##################### # read an array ##################### if nlocal.ndim != 2: raise "ReadAndSendArray error","nlocal must be of rank 2" else: ntype = nlocal.shape[1] if shape != None: ndim = shape[1] data = array([],data_type) data.shape = (0,shape[1]) else: ndim = 1 data = array([],data_type) data.shape = (0,) nbytes_per_elt = data_type.bytes * ndim if ThisTask == 0: for i in range(ntype): for Task in range(NTask-1,-1,-1): sdata = f.read(nbytes_per_elt*nlocal[Task,i]) # send block for a slave if Task != 0: comm.send(sdata, dest = Task) # keep block for the master else: sdata = fromstring(sdata,data_type) if shape!=None: sdata.shape = (nlocal[ThisTask,i],shape[1]) data = concatenate((data,sdata)) else: for i in range(ntype): sdata = comm.recv(source = 0) sdata = fromstring(sdata,data_type) if shape!=None: sdata.shape = (nlocal[ThisTask,i],shape[1]) data = concatenate((data,sdata)) ##################### # read second header ##################### if ThisTask==0: nb2 = fromstring(f.read(4),int32) if sys.byteorder != byteorder: nb2.byteswap(True) nb2 = nb2[0] if nb1 != nb2: raise "ReadBlockError" return data ################################# def mpi_OldGatherAndWriteArray(f,data,byteorder=sys.byteorder,nlocal=None): ################################# ''' Gather and array and write it in a binary block. data = array shape = tuple ''' if nlocal.ndim != 2: raise "ReadAndSendArray error","nlocal must be of rank 2" else: ntype = nlocal.shape[1] data_size = mpi_allreduce(data.size()) if ThisTask==0: nbytes = array([data.type().bytes*data_size],int32) if sys.byteorder != byteorder: nbytes.byteswap(True) # write header 1 f.write(nbytes.tostring()) # loop over particles type i1 = 0 for i in range(ntype): if ThisTask==0: for Task in range(NTask-1,-1,-1): if Task != 0: sdata = comm.recv(source = Task) else: i2 = i1 + nlocal[Task,i] sdata = data[i1:i2] i1 = i2 if sys.byteorder != byteorder: sdata.byteswap(True) f.write(sdata.tostring()) else: i2 = i1 + nlocal[ThisTask,i] comm.send(data[i1:i2], dest = 0) i1 = i2 if ThisTask==0: # write header 2 f.write(nbytes.tostring()) diff --git a/src/pygsl/pygsl.so b/src/pygsl/pygsl.so index e5b3da1..ead6831 100755 Binary files a/src/pygsl/pygsl.so and b/src/pygsl/pygsl.so differ