Python and MPI (Part 1)
Authors: Gregor von Laszewski (Laszewski@gmail.com),
Fidel Leal, Jacques Fleischer, Cooper Young
Today Python has become the predominantly programming language to coordinate scientific applications, especially machine and deep learning applications. However, previously existing parallel programming paradigms such as Message Passing Interface (MPI) have proven to be a useful asset when it comes to enhancing complex data flows while executing them on multiple computers, including supercomputers. The framework is well known in the C-language community. However, many practitioners do not have the time to learn C to utilize such advanced cyberinfrastructure. Hence, it is advantageous to access MPI from Python. We showcase how you can easily deploy and use MPI from Python via a tool called mpi4pi
.
Multiple implementations following the standard exist, including the two most popular MPICH and OpenMPI. However, other free or commercial implementations exist.
Additionally, MPI is a language-independent interface. Although support for C and Fortran is included as part of the standard, multiple libraries providing bindings for other languages are available, including those for Java, Julia, R, Ruby, and Python.
Thanks to its user-focused abstractions, standardization, portability, and scalability, and availability MPI is a popular tool in the creation of high-performance and parallel computing programs.
Installation
Next, we discuss how to install mpi4p on various systems. We will focus on installing it on a single computer using multiple cores.
Getting the CPU Count
For the examples listed in this document, knowing the number of cores on your computer is important. This can be found out through the command line or a python program.
In Python, you can do it with
import multiprocessing
multiprocessing.cpu_count()
or as a command line
$ python -c "import multiprocessing; print(multiprocessing.cpu_count())"
However, you can also use the command line tools that we have included in our documentation.
Windows 10 EDU or PRO
Note: We have not tested this on Windows home.
- We assume you have installed GitBash on your computer. The installation is easy, but be careful to watch the various options at install time. Make sure it is added to the Path variable. For details see: https://git-scm.com/downloads
- We also assume you have installed Python3.9 according to either the installation at python.org or conda. We do recommend the installation from python.org: https://www.python.org/downloads/
- You will need to install a python virtual env to avoid conflict by accident with your system installed version of Python.
- Microsoft has its own implementation of MPI which we recommend at this time. First, you need to download msmpi from
https://docs.microsoft.com/en-us/message-passing-interface/microsoft-mpi#ms-mpi-downloads - Go to the download link underneath the heading
MS-MPI Downloads
and download and install it. Select the two packages and click Next. When downloaded, click on them and complete the setups.
msmpisetup.exe msmpisdk.msi
6. Open the system control panel and click on Advanced system settings
(which can be searched for with the search box in the top-right, and then click View advanced system settings
) and then click Environment Variables...
7. Under the user variables box, click on Path
8. Click New in order to add
C:\Program Files (x86)\Microsoft SDKs\MPI
and
C:\Program Files\Microsoft MPI\Bin
to the Path. The Browse Directory...
button makes this easier, and the Variable name
can correspond to each directory, e.g., “MPI” and “MPI Bin” respectively
9. Close any open bash windows and then open a new one
10. Type the command
$ which mpiexec
to verify if it works.
11. After you verified it is available, install mpi4py with
$ pip install mpi4py
- ideally, while bash is in venv
- Next, find out how many processes you can run on your machine and remember that number. You can do this with
$ wmic CPU Get DeviceID,NumberOfCores,NumberOfLogicalProcessors
- Alternatively, you can use a python program as discussed in the section “Getting the CPU Count”
macOS
- Find out how many processes you can run on your machine and remember that number. You can do this with
$ sysctl hw.physicalcpu hw.logicalcpu
- First, install python 3 from https://www.python.org/downloads/
- Next, install homebrew and install the open-mpi version of MPI as well as mpi4py:
$ xcode-select --install
$ /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
$ brew install wget $ brew install open-mpi $ python3 -m venv ~/ENV3 $ source ~/ENV3/bin/activate $ pip install mpi4py
Ubuntu
These instructions apply to 20.04 and 21.04. Please use 20.04 in case you like to use GPUs.
- First, find out how many processes you can run on your machine and remember that number. You can do this with
$ nproc
- The installation of mpi4py on Ubuntu is relatively easy. Please follow these steps. We recommend that you create a python
venv
so you do not by accident interfere with your system python. As usual, you can activate it in your.bashrc
file while adding the source line there. Lastly, make sure you check it out and adjust the-n
parameters to the number of cores of your machine. In our example, we have chosen the number 4, you may have to change that value
$ sudo apt install python3.9 python3.9-dev
$ python3 -m venev ~/ENV3
$ source `/ENV3/bin/activate` (ENV3)
$ sudo apt-get install -y mpich-doc mpich (ENV3)
$ pip install mpi4py -U
Raspberry Pi
- Install Open MPI in your pi by entering the following command assuming a PI4, PI3B+ PI3, PI2:
$ python -m venv ~/ENV3
$ source ~/ENV3/bin/activate
$ sudo apt-get install openmpi-bin
$ mpicc --showme:version
$ pip install mpi4py
- If you have other Raspberry Pi’s you may need to update the core count according to the hardware specification.
Testing the Installation
On all systems, the installation is very easy. Just change in our example the number 4 to the number of cores in your system.
(ENV3) $ mpiexec -n 4 python -m mpi4py.bench helloworld
You will see an output similar to
Hello, World! I am process 0 of 4 on myhost.
Hello, World! I am process 1 of 4 on myhost.
Hello, World! I am process 2 of 4 on myhost.
Hello, World! I am process 3 of 4 on myhost.
where myhost
is the name of your computer.
Note: the messages can be in a different order.
Hosts, Machinefile, Rankfile
Running MPI on a Single Computer
In case you like to try out MPI and just use it on a single computer with multiple cors, you can skip this section for now and revisit it, once you scale up and use multiple computers.
Running MPI on Multiple Computers
MPI is designed for running programs on multiple computers. One of these computers serves as manager and communicates to its workers. To define on which computer is running what, we need to have a configuration file that lists a number of hosts to participate in our set of machines, the MPI cluster.
The configuration file specifying this is called a machinefile or rankfile. We will explain the differences to them in this section.
Prerequisite
Naturally, the requisite to use a cluster is that you
- have MPI and mpi4py installed on each of the computers, and
- have access via ssh on each of these computers
If you use a Raspberry PI cluster, we recommend using our cloudmesh-pi-burn program [TODOREF]. This will conveniently create you a Raspberry PI cluster with login features established. You still need to install mpi4py, however on each node.
If you use another set of resources, you will often see the recommendation to use password less ssh key between the nodes. This we only recommend if you are an expert and have placed the cluster behind a firewall. If you experiment instead with your own cluster, we recommend that you use password-protected SSH keys on your manager node and populate them with ssh-copy-id to the worker computers. To not always have to type in your password to the different machines, we recommend you use ssh-agent
, and ssh-add
.
Using Hosts
In the case of multiple computers, you can simply specify the hosts as a parameter to your MPI program that you run on your manager node
(ENV3) $ mpiexec -n 4 -host re0,red1,red2,red3 python -m mpi4py.bench helloworld
To specify how many processes you like to run on each of them, you can use the option -ppn
followed by the number.
(ENV3) $ mpiexec -n 4 -pn 2 -host re0,red1,red2,red3 python -m mpi4py.bench helloworld
As today we usually have multiple cores on a processor, you could be using that core count as the parameter.
Machinefile
To simplify the parameter passing to MPI you can use machine files instead. This allows you also to define different numbers of processes for different hosts. Thus it is more flexible. In fact, we recommend that you use a machine file in most cases as you then also have a record of how you configured your cluster.
The machine file is a simple text file that lists all the different computers participating in your cluster. As MPI was originally designed at a time when there was only one core on a computer, the simplest machine file just lists the different computers. When starting a program with the machine file as option, only one core of the computer is utilized.
The machinefile can be explicitly passed along as a parameter while placing it in the manager machine
mpirun.openmpi \
-np 2 \
-machinefile /home/pi/mpi_testing/machinefile \
python helloworld.py
An example of a simple machinefile contains the IP addresses. The username can be proceeded by the IP address.
pi@192.168.0.10:1
pi@192.168.0.11:2
pi@192.168.0.12:2
pi@192.168.0.13:2
pi@192.168.0.14:2
In many cases, your machine name may be available within your network and known to all hosts in the cluster. In that case, it is more convenient. To sue the machine names.
pi@red0:1
pi@red1:2
pi@red2:2
pi@red3:2
pi@red4:2
Please make sure to change the IP addresses or name of your hosts according to your network.
Rankfiles for Multiple Cores
In contrast to the host parameter, you can fine-tune the placement of processes to computers with a rankfile
. This may be important if your hardware has, for example specific computers for data storage or GPUs.
If you like to add multiple cores from a machine, you can also use a rankfile
mpirun -r my_rankfile --report-bindings ... Where the rankfile contains:
rank 0=pi@192.168.0.10 slot=1:0
rank 1=pi@192.168.0.10 slot=1:1
rank 2=pi@192.168.0.11 slot=1:0
rank 3=pi@192.168.0.10 slot=1:1
In this configuration, we only use 2 cores from two different PIs.
MPI Functionality
In this section, we will discuss several useful MPI communication features.
Differences to the C Implementation of MPI
Before we start with a detailed introduction, we like to make those that have experience with non Python versions of MPI aware of some differences.
Initialization
In mpi4py, the standard MPI_INIT() and MPI_FINALIZE() commonly used to initialize and terminate the MPI environment are automatically handled after importing the mpi4py module. Although not generally advised, mpi4py still provides MPI.Init() and MPI.Finalize() for users interested in manually controlling these operations. Additionally, the automatic initialization and termination can be deactivated. For more information on this topic, please check the original mpi4py documentation:
Capitalization for Pickle vs. Memory Messages
Another characteristic feature of mpi4py is the availability of uppercase and lowercase communication methods. Lowercase methods like comm.send()
use Python’s pickle
module to transmit objects in a serialized manner. In contrast, the uppercase versions of methods like comm.Send()
enable transmission of data contained in a contiguous memory buffer, as featured in the MPI standard. For additional information on the topic, the manual section Communicating Python Objects and Array Data.
MPI Functionality
Communicator
All MPI processes need to be addressable and are grouped in a communicator
. The default communicator is called world
and assigns a rank to each process within the communicator.
Thus all MPI programs we will discuss here start with
comm = MPI.COMM_WORLD
In the MPI program, the function
rank = comm.Get_rank()
returns the rank. This is useful to be able to write conditional programs that depend on the rank. Rank 0
is the rank of the manager process.
Point-to-Point Communication
Send and Recieve Python Objects
The send()
and recv()
methods provide for functionality to transmit data between two specific processes in the communicator group. It can be applied to any Python data object that can be pickled. The advantage is that the object is preserved, however it comes with the disadvantage that pickling the data takes more time than a direct memory copy.
Here is the definition for the send()
method:
comm.send(buf, dest, tag)
buf
represents the data to be transmitted, dest
and tag
are integer values that specify the rank of the destination process, and a tag to identify the message being passed, respectively. tag
is particularly useful for cases when a process sends multiple kinds of messages to another process.
On the other end is the recv()
method, with the following definition:
comm.recv(buf, source, tag, status)
In this case, buf
can specify the location for the received data to be stored. In more recent versions of MPI, ‘buf’ has been deprecated. In those cases, we can simply assign comm.recv(source, tag, status)
as the value of our buffer variable in the receiving process. Additionally, source
and tag
can specify the desired source and tag of the data to be received. They can also be set to MPI.ANY_SOURCE
and MPI.ANY_TAG
, or be left unspecified.
In the following example, an integer is transmitted from process 0 to process 1.
#!/usr/bin/env python
from mpi4py import MPI# Communicator
comm = MPI.COMM_WORLD# Get the rank of the current process in the communicator group
rank = comm.Get_rank()# Variable to receive the data
data = None# Process with rank 0 sends data to process with rank 1
if rank == 0:
comm.send(42, dest=1)# Process with rank 1 receives and stores data
if rank == 1:
data = comm.recv(source=0)# Each process in the communicator group prints its data
print(f'After send/receive, the value in process {rank} is {data}')
Executing mpiexec -n 4 python send_receive.py
yields:
After send/receive, the value in process 2 is None
After send/receive, the value in process 3 is None
After send/receive, the value in process 0 is None
After send/receive, the value in process 1 is 42
As we can see, the transmission only occurred between processes 0 and 1, and no other process was affected.
Send and Recive Python Memory Objects
The following example illustrates the use of the uppercase versions of the methods comm.Send()
and comm.Recv()
to perform a transmission of data between processes from memory to memory. In our example we will agian be sending a message between processors of rank 0 and 1 in the communicator group.
#!/usr/bin/env python
import numpy as np
from mpi4py import MPI# Communicator
comm = MPI.COMM_WORLD# Get the rank of the current process in the communicator group
rank = comm.Get_rank()# Create empty buffer to receive data
buf = np.zeros(5, dtype=int)# Process with rank 0 sends data to process with rank 1
if rank == 0:
data = np.arange(1, 6)
comm.Send([data, MPI.INT], dest=1)# Process with rank 1 receives and stores data
if rank == 1:
comm.Recv([buf, MPI.INT], source=0)# Each process in the communicator group prints the content of its buffer
print(f'After Send/Receive, the value in process {rank} is {buf}')
Executing mpiexec -n 4 python send_receive_buffer.py
yields:
After Send/Receive, the value in process 3 is [0 0 0 0 0]
After Send/Receive, the value in process 2 is [0 0 0 0 0]
After Send/Receive, the value in process 0 is [0 0 0 0 0]
After Send/Receive, the value in process 1 is [1 2 3 4 5]
Non-blocking send and Recieve
MPI can also use non-blocking communications. This allows the program to send the message without waiting for the completion of the submission. This is useful for many parallel programs so we can overlap communication and computation while both take place simultaneously. The same can be done with receive, but if a message is not available and you do need the message, you may have to probe or even use a blocked receive. To wait for a message to be sent or received, we can also use the wait method , effectively converting the non-blocking message to a blocking one.
Next, we showcase an example of the non-blocking send and receive methods comm.isend()
and comm.irecv()
. Non-blocking versions of these methods allow for the processes involved in transmission/reception of data to perform other operations in overlap with the communication. In In contrast, the blocking versions of these methods previously exemplified do not allow data buffers involved in transmission or reception of data to be accessed until any ongoing communication involving the particular processes has been finalized.
#!/usr/bin/env python
from mpi4py import MPI# Communicator
comm = MPI.COMM_WORLD# Get the rank of the current process in the communicator group
rank = comm.Get_rank()# Variable to receive the data
data = None# Process with rank 0 sends data to process with rank 1
if rank == 0:
send = comm.isend(42, dest=1)
send.wait()# Process with rank 1 receives and stores data
if rank == 1:
receive = comm.irecv(source=0)
data = receive.wait()# Each process in the communicator group prints its data
print(f'After isend/ireceive, the value in process {rank} is {data}')
Executing mpiexec -n 4 python isend_ireceive.py
yields:
After isend/ireceive, the value in process 2 is None
After isend/ireceive, the value in process 3 is None
After isend/ireceive, the value in process 0 is None
After isend/ireceive, the value in process 1 is 42
Collective Communication
Broadcast
The bcast()
method and it is memory version Bcast()
broadcast a message from a specified root process to all other processes in the communicator group.
Broadcast of a Python Object
In terms of syntax, bcast()
takes the object to be broadcast and the parameter root
, which establishes the rank number of the process broadcasting the data. If no root parameter is specified, bcast
will default to broadcasting from the process with rank 0.
Thus, the two lines are functionally equivalent.
data = comm.bcast(data, root=0)
data = comm.bcast(data)
In our following example, we broadcast a two-entry Python dictionary from a root process to the rest of the processes in the communicator group.
The following code snippet shows the creation of the dictionary in process with rank 0. Notice how the variable data
remains empty in all the other processes.
#!/usr/bin/env python
from mpi4py import MPI# Set up the MPI Communicator
comm = MPI.COMM_WORLD# Get the rank of the current process in the communicator group
rank = comm.Get_rank()if rank == 0: # Process with rank 0 gets the data to be broadcast
data = {'size': [1, 3, 8],
'name': ['disk1', 'disk2', 'disk3']}
else: # Other processes' data is empty
data = None# Print data in each process
print(f'before broadcast, data on rank {rank} is: {data}')# Data from process with rank 0 is broadcast to other processes in our
# communicator group
data = comm.bcast(data, root=0)# Print data in each process after broadcast
print(f'after broadcast, data on rank {rank} is: {data}')
After running mpiexec -n 4 python broadcast.py
we get the following:
before broadcast, data on rank 3 is: None
before broadcast, data on rank 0 is:
{'size': [1, 3, 8], 'name': ['disk1', 'disk2', 'disk3']}
before broadcast, data on rank 1 is: None
before broadcast, data on rank 2 is: None
after broadcast, data on rank 3 is:
{'size': [1, 3, 8], 'name': ['disk1', 'disk2', 'disk3']}
after broadcast, data on rank 0 is:
{'size': [1, 3, 8], 'name': ['disk1', 'disk2', 'disk3']}
after broadcast, data on rank 1 is:
{'size': [1, 3, 8], 'name': ['disk1', 'disk2', 'disk3']}
after broadcast, data on rank 2 is:
{'size': [1, 3, 8], 'name': ['disk1', 'disk2', 'disk3']}
As we can see, all other processes received the data broadcast from the root process.
Broadcast of a Memory Object
In our following example, we broadcast a NumPy array from process 0 to the rest of the processes in the communicator group using the uppercase comm.Bcast()
method.
#!/usr/bin/env python
import numpy as np
from mpi4py import MPI# Communicator
comm = MPI.COMM_WORLD# Get the rank of the current process in the communicator group
rank = comm.Get_rank()# Rank 0 gets a NumPy array containing values from 0 to 9
if rank == 0:
data = np.arange(0, 10, 1, dtype='i')# Rest of the processes get an empty buffer
else:
data = np.zeros(10, dtype='i')# Print data in each process before broadcast
print(f'before broadcasting, data for rank {rank} is: {data}')# Broadcast occurs
comm.Bcast(data, root=0)# Print data in each process after broadcast
print(f'after broadcasting, data for rank {rank} is: {data}')
Executing mpiexec -n 4 python npbcast.py
yields:
before broadcasting, data for rank 1 is: [0 0 0 0 0 0 0 0 0 0]
before broadcasting, data for rank 2 is: [0 0 0 0 0 0 0 0 0 0]
before broadcasting, data for rank 3 is: [0 0 0 0 0 0 0 0 0 0]
before broadcasting, data for rank 0 is: [0 1 2 3 4 5 6 7 8 9]
after broadcasting, data for rank 0 is: [0 1 2 3 4 5 6 7 8 9]
after broadcasting, data for rank 2 is: [0 1 2 3 4 5 6 7 8 9]
after broadcasting, data for rank 3 is: [0 1 2 3 4 5 6 7 8 9]
after broadcasting, data for rank 1 is: [0 1 2 3 4 5 6 7 8 9]
As we can see, the values in the array at the process with rank 0 have been broadcast to the rest of the processes in the communicator group.
Scatter
While bradcast send all data to all processes, scatter send chunks of data to each process.
In our next example, we will scatter
the members of a list among the processes in the communicator group. We illustrate the concept in the next figure, where we indicate the data that is scattered to the rnaked processes with Di
Scatter Python Objects
The example program executing the sactter is showcased next
#!/usr/bin/env python
from mpi4py import MPI# Communicator
comm = MPI.COMM_WORLD# Number of processes in the communicator group
size = comm.Get_size()# Get the rank of the current process in the communicator group
rank = comm.Get_rank()# Process with rank 0 gets a list with the data to be scattered
if rank == 0:
data = [(i + 1) ** 2 for i in range(size)]
else:
data = None# Print data in each process before scattering
print(f'before scattering, data on rank {rank} is: {data}')# Scattering occurs
data = comm.scatter(data, root=0)# Print data in each process after scattering
print(f'after scattering, data on rank {rank} is: {data}')
Executing mpiexec -n 4 python scatter.py
yields:
before scattering, data on rank 2 is None
before scattering, data on rank 3 is None
before scattering, data on rank 0 is [1, 4, 9, 16]
before scattering, data on rank 1 is None
data for rank 2 is 9
data for rank 1 is 4
data for rank 3 is 16
data for rank 0 is 1
The members of the list from process 0 have been successfully scattered among the rest of the processes in the communicator group.
Scatter from Python Memory
In the following example, we scatter a NumPy array among the processes in the communicator group by using the uppercase version of the method comm.Scatter()
.
#!/usr/bin/env python
import numpy as np
from mpi4py import MPI# Communicator
comm = MPI.COMM_WORLD# Number of processes in the communicator group
size = comm.Get_size()# Get the rank of the current process in the communicator group
rank = comm.Get_rank()# Data to be sent
sendbuf = None# Process with rank 0 populates sendbuf with a 2-D array,
# based on the number of processes in our communicator group
if rank == 0:
sendbuf = np.zeros([size, 10], dtype='i')
sendbuf.T[:, :] = range(size) # Print the content of sendbuf before scattering
print(f'sendbuf in 0: {sendbuf}')# Each process gets a buffer (initially containing just zeros)
# to store scattered data.
recvbuf = np.zeros(10, dtype='i')# Print the content of recvbuf in each process before scattering
print(f'recvbuf in {rank}: {recvbuf}')# Scattering occurs
comm.Scatter(sendbuf, recvbuf, root=0)# Print the content of sendbuf in each process after scattering
print(f'Buffer in process {rank} contains: {recvbuf}')
Executing mpiexec -n 4 python npscatter.py
yields:
recvbuf in 1: [0 0 0 0 0 0 0 0 0 0]
recvbuf in 2: [0 0 0 0 0 0 0 0 0 0]
recvbuf in 3: [0 0 0 0 0 0 0 0 0 0]
sendbuf in 0: [[0 0 0 0 0 0 0 0 0 0]
[1 1 1 1 1 1 1 1 1 1]
[2 2 2 2 2 2 2 2 2 2]
[3 3 3 3 3 3 3 3 3 3]]
recvbuf in 0: [0 0 0 0 0 0 0 0 0 0]
Buffer in process 2 contains: [2 2 2 2 2 2 2 2 2 2]
Buffer in process 0 contains: [0 0 0 0 0 0 0 0 0 0]
Buffer in process 3 contains: [3 3 3 3 3 3 3 3 3 3]
Buffer in process 1 contains: [1 1 1 1 1 1 1 1 1 1]
As we can see, the values in the 2-D array at process with rank 0, have been scattered among all our processes in the communicator group, based on their rank value.
Gather
The gather function is the inverse function to scatter. Data from each process is gathered in consecutive order based on the rank of the processor.
Gather Python Objects
In this example, data from each process in the communicator group is gathered in the process with rank 0.
#!/usr/bin/env python
from mpi4py import MPI# Communicator
comm = MPI.COMM_WORLD# Number of processes in the communicator group
size = comm.Get_size()# Get the rank of the current process in the communicator group
rank = comm.Get_rank()# Each process gets different data, depending on its rank number
data = (rank + 1) ** 2# Print data in each process
print(f'before gathering, data on rank {rank} is: {data}')# Gathering occurs
data = comm.gather(data, root=0)# Process 0 prints out the gathered data, rest of the processes
# print their data as well
if rank == 0:
print(f'after gathering, process 0\'s data is: {data}')
else:
print(f'after gathering, data in rank {rank} is: {data}')
Executing mpiexec -n 4 python gather.py
yields:
before gathering, data on rank 2 is 9
before gathering, data on rank 3 is 16
before gathering, data on rank 0 is 1
before gathering, data on rank 1 is 4
after gathering, data in rank 2 is None
after gathering, data in rank 1 is None
after gathering, data in rank 3 is None
after gathering, process 0's data is [1, 4, 9, 16]
The data from processes with rank 1
to size - 1
have been successfully gathered in process 0.
Gather from Python Memory
The example showcases the use of the uppercase method comm.Gather()
. NumPy arrays from the processes in the communicator group are gathered into a 2-D array in process with rank 0.
#!/usr/bin/env python
import numpy as np
from mpi4py import MPI# Communicator group
comm = MPI.COMM_WORLD# Number of processes in the communicator group
size = comm.Get_size()# Get the rank of the current process in the communicator group
rank = comm.Get_rank()# Each process gets an array with data based on its rank.
sendbuf = np.zeros(10, dtype='i') + rank# Print the data in sendbuf before gathering
print(f'Buffer in process {rank} before gathering: {sendbuf}')# Variable to store gathered data
recvbuf = None# Process with rank 0 initializes recvbuf to a 2-D array conatining
# only zeros. The size of the array is determined by the number of
# processes in the communicator group
if rank == 0:
recvbuf = np.zeros([size, 10], dtype='i') # Print recvbuf
print(f'recvbuf in process 0 before gathering: {recvbuf}')# Gathering occurs
comm.Gather(sendbuf, recvbuf, root=0)# Print recvbuf in process with rank 0 after gathering
if rank == 0:
print(f'recvbuf in process 0 after gathering: \n{recvbuf}')
Executing mpiexec -n 4 python npgather.py
yields:
Buffer in process 2 before gathering: [2 2 2 2 2 2 2 2 2 2]
Buffer in process 3 before gathering: [3 3 3 3 3 3 3 3 3 3]
Buffer in process 0 before gathering: [0 0 0 0 0 0 0 0 0 0]
Buffer in process 1 before gathering: [1 1 1 1 1 1 1 1 1 1]
recvbuf in process 0 before gathering:
[[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]
[0 0 0 0 0 0 0 0 0 0]]
recvbuf in process 0 after gathering:
[[0 0 0 0 0 0 0 0 0 0]
[1 1 1 1 1 1 1 1 1 1]
[2 2 2 2 2 2 2 2 2 2]
[3 3 3 3 3 3 3 3 3 3]]
The values contained in the buffers from the different processes in the group have been gathered in the 2-D array in process with rank 0.
Allgather Memory Objects
This method is a many-to-many communication operation, where data from all processors is gathered in a continuous memory object on each of the processors. This is functionally equivalent to
- A gather on rank 0
- A Scatter from rank 0
However, this operation naturally has a performance bottleneck while all communication goes through rank0. Instead, we can use parallel communication between all of the processes at once to improve the performance. The optimization is implicit, and the user does not need to worry about it.
We demonstrate its use in the following example. Each process in the communicator group computes and stores values in a NumPy array (row). For each process, these values correspond to the multiples of the process’ rank and the integers in the range of the communicator group’s size. After values have been computed in each process, the different arrays are gathered into a 2D array (table) and distributed to ALL the members of the communicator group (as opposed to a single member, which is the case when comm.Gather()
is used instead).
#!/usr/bin/env python
import numpy as np
from mpi4py import MPI# Communicator group
comm = MPI.COMM_WORLD# Number of processes in the communicator group
size = comm.Get_size()# Get the rank of the current process in the communicator group
rank = comm.Get_rank()# Initialize array and table
row = np.zeros(size)
table = np.zeros((size, size))# Each process computes the local values and fills its array
for i in range(size):
j = i * rank
row[i] = j# Print array in each process
print(f'Process {rank} table before Allgather: {table}\n')# Gathering occurs
comm.Allgather([row, MPI.INT], [table, MPI.INT])# Print table in each process after gathering
print(f'Process {rank} table after Allgather: {table}\n')
Executing
$ mpiexec -n 4 python allgather_buffer.py
results in the output similar to
Process 1 table before Allgather: [[0. 0.][0. 0.]]
Process 0 table before Allgather: [[0. 0.][0. 0.]]
Process 1 table after Allgather: [[0. 0.][0. 1.]]
Process 0 table after Allgather: [[0. 0.][0. 1.]]
As we see, after comm.Allgather()
is called, every process gets a copy of the full multiplication table.
We have not provided an example for the Python object version as it is essentially the same and can easily be developed as an exercise.
Process Management
Dynamic Process Management with spawn
So far, we have focussed on MPI used on a number of hosts that are automatically creating the process when mpirun is used. However, MPI also offers the ability to sawn a process in a communicator group. This can be achieved by using a spawn communicator and command.
Using
MPI.Comm_Self.Spawn
will create a child process that can communicate with the parent. In the spawn code example, the manager broadcasts an array to the worker.
In this example, we have two python programs, the first one being the manager and the second being the worker.
#!/usr/bin/env python
from mpi4py import MPI
import numpy
import sys
import time
print("Hello")
comm = MPI.COMM_SELF.Spawn(sys.executable,
args=['worker.py'],
maxprocs=5)
rank = comm.Get_rank()
print(f"b and rank: {rank}")N = numpy.array(100, 'i')
comm.Bcast([N, MPI.INT], root=MPI.ROOT)
#print(f"ROOT: {MPI.ROOT}")
print('c')
PI = numpy.array(0.0, 'd')print('d')
comm.Reduce(None, [PI, MPI.DOUBLE],
op=MPI.SUM, root=MPI.ROOT)
print(PI)comm.Disconnect()#!/usr/bin/env python
from mpi4py import MPI
import numpy
import time
import sys
comm = MPI.Comm.Get_parent()
size = comm.Get_size()
rank = comm.Get_rank()N = numpy.array(0, dtype='i')
comm.Bcast([N, MPI.INT], root=0)
print(f"N: {N} rank: {rank}")h = 1.0 / N
s = 0.0
for i in range(rank, N, size):
x = h * (i + 0.5)
s += 4.0 / (1.0 + x**2)
PI = numpy.array(s * h, dtype='d')
comm.Reduce([PI, MPI.DOUBLE], None,
op=MPI.SUM, root=0)#time.sleep(60)
comm.Disconnect()
#MPI.Finalize()
#sys.exit()
#MPI.Unpublish_name()
#MPI.Close_port()
To execute the example please go to the examples directory and run the manager program
$ cd examples/spawn
$ mpiexec -n 4 python manager.py
This will result in:
N: 100 rank: 4
N: 100 rank: 1
N: 100 rank: 3
N: 100 rank: 2
Hello
b and rank: 0
c
d
3.1416009869231245
N: 100 rank: 0
N: 100 rank: 1
N: 100 rank: 4
N: 100 rank: 3
N: 100 rank: 2
Hello
b and rank: 0
c
d
3.1416009869231245
N: 100 rank: 0
This output depends on which child process is received first. The output can vary.
WARNING:
When running this program it may not terminate. To >terminate use for now CTRL-C
.
Futures
Futures is an mpi4py module that runs processes in parallel for intercommunication between such processes. The following Python program creates a visualization of a Julia set by utilizing the Futures modules, specifically via MPIPoolExecutor.
from mpi4py.futures import MPIPoolExecutor
import matplotlib.pyplot as plt
import numpy as np
from cloudmesh.common.StopWatch import StopWatchmultiplier = int(input('Enter 1 for 640x480 pixels of Julia '
'visualization image, 2 for 1280x960, '
'and so on...'))StopWatch.start("Overall time")
x0, x1, w = -2.0, +2.0, 640*multiplier
y0, y1, h = -1.5, +1.5, 480*multiplier
dx = (x1 - x0) / w
dy = (y1 - y0) / hc = complex(0, 0.65)def julia(x, y):
z = complex(x, y)
n = 255
while abs(z) < 3 and n > 1:
z = z**2 + c
n -= 1
return ndef julia_line(k):
line = bytearray(w)
y = y1 - k * dy
for j in range(w):
x = x0 + j * dx
line[j] = julia(x, y)
return lineif __name__ == '__main__': with MPIPoolExecutor() as executor:
image = executor.map(julia_line, range(h))
image = np.array([list(l) for l in image])
plt.imsave("julia.png", image)StopWatch.stop("Overall time")
StopWatch.benchmark()
To run the program use:
mpiexec -n 1 python julia-futures.py
The number after -n
can be changed to however many cores are in the computer’s processor. For example, a dual-core processor can use -n 2
so that more worker processes work to execute the same program.
The program will output a png image of a Julia set upon successful execution.
You can modify your number of processors accordingly matching your hardware infrastructure.
For example, entering the number 3
will produce a 1920x1440 photo because 640x480 times 3 is 1920x1440. Then, the program should output a visualization of a Julia data set as a png image.
Part 2 will include a couple of more examples.
REFERENCES
For a convenient cluster burning program please look at
You may also like this story that tells you how to conveniently generate benchmarks