MPI4Py:OpenMPI如何跨进程更新字典? 同步情况:异步情况:

在我的场景中,我正在尝试从中采样。每个过程将从该环境中多次采样。

df <- structure(data.frame(id = c(1,2,NA,NA),cat = c("SS","SS","SV","SV"),val = c(220L,222L,223L,2206L)),.Names = c("id","cat","val"),class = "data.frame",row.names = c(NA,-6L)) 

sapply(df,function(x) ((sum(is.na(x))))*.01)%>%
  stack %>% rev %>% filter(values > 0) %>% setNames(nm=c("variable","missing"))%>%
paste0(colnames ->NulCols)

目标是使每个OpenmPI进程同步或异步共享从环境中采样的内容。 import numpy as np class EnvSim(object): @staticmethod def get(): return np.random.randint(0,2000) from collections import defaultdict class Dict(object): def __init__(self): self.d = defaultdict(int) def update(self,key): self.d[key] += 1 print(key) data_array = [np.empty(1,dtype=np.int) for _ in range(num_cpu)] data_array[proc_id()] = np.array([key],dtype=np.int) MPI.COMM_WORLD.Bcast(data_array[proc_id()],root=proc_id()) for data in data_array: self.d[data.tolist()[0]] += 1 是在此处使用的正确方法还是我应该使用其他方法?

这是我用来执行程序的主要声明:(当前不起作用。

Bcast
zhaopingg 回答:MPI4Py:OpenMPI如何跨进程更新字典? 同步情况:异步情况:

同步情况:

我不确定“同步和异步”是什么意思,所以在这里我只关注同步情况。

如果您想对所有等级进行抽样并发送给所有人,那么我想您想用alltoall而不是Bcast

下面是一个示例脚本,其中每个rank从间隔N中采样(rank,rank+1)个值,其中N是通信器的大小。

from mpi4py import MPI
import numpy as np

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

senddata = np.random.uniform(rank,rank+1,size)
recvdata = np.empty(size,dtype=float)
comm.Alltoall(senddata,recvdata)

print("process %s sending %s receiving %s " % (rank,senddata,recvdata))

除了让脚本本身启动之外,您还可以直接使用以下命令调用命令行形式吗:

$ mpirun -np 3 python test.py

,您应该会看到类似

的输出
Rank 0 sent [0.37362478 0.74304362 0.25090876] and received [0.37362478 1.81852273 2.48959575] 
Rank 1 sent [1.81852273 1.65782547 1.85142608] and received [0.74304362 1.65782547 2.23064501] 
Rank 2 sent [2.48959575 2.23064501 2.644848  ] and received [0.25090876 1.85142608 2.644848  ] 

如果需要多轮采样/通讯,则可以将其包含在for循环中。

异步情况:

如果对采样时间有一定的期望,那么您可以将零级作为主数据,并对其余每个级别执行无阻塞查询。例如:

from mpi4py import MPI
import numpy as np
from time import sleep

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()

tag_denoting_ready_to_send = 1
while 1:
    if comm.rank == 0:
        if comm.Iprobe(source=MPI.ANY_SOURCE,tag=tag_denoting_ready_to_send):
            buffer_for_receiving = np.empty(1,dtype='i')
            comm.Recv([buffer_for_receiving,MPI.INT],source=MPI.ANY_SOURCE,tag=tag_denoting_ready_to_send)
            print(buffer_for_receiving[0])
    else:
        sleep(comm.rank*np.random.uniform())
        send_buffer = np.array(rank,dtype='i')
        comm.Send([send_buffer,dest=0,tag=tag_denoting_ready_to_send)

每个非零等级正在休眠,并尝试将其等级Send在缓冲区中以等级0(将其打印出来)。同样,使用

运行
$ mpirun -np 20 python test2.py 

应产生如下输出:

13
6
1
1
2
7
1
2
1
4
1
8
3
本文链接:https://www.f2er.com/3155564.html

大家都在问