OpenMPI and Precomputed Values in Shared Memory

Think OpenMPI. Let’s say you have a 12-node cluster at your disposal. Every node has 16GB of RAM and two 4-core processors which in normal situations enables you to run 8 processes per node. What happens if you want to precompute a huge constant lookup table (e.g. 3GB in size) to speed up your sophisticated algorithms in every process? If you’re going to do it in a naive way (i.e. allocate the lookup space in every process) you will soon realize that you can’t start the usual maximum of 8 processes per node because they would eat up 24GB (!!) of RAM on each node so you would be short of at least some extra 8GB per node. You could either reduce the number of processes per node to 5… or you could make use of the solution I presented below.

If the lookup is the same for every process (let’s say it’s a trivial lookup saying whether square root of an integer is also integer like this: lut[0]=1, lut[1]=1, lut[2]=0, lut[3]=0, lut[4]=1, 0, 0, 0, 0, lut[9]=1, etc. etc.) it’s pretty obvious that it can be computed once per node and shared between processes running on the same node. Standard POSIX mechanisms (shared memory and semaphores) can be used to achieve this goal as illustrated in the code below.

By using this solution you cut the required 24GB back to just 3GB and you can run your 8 processes per node again. Optionally, you can also significantly increase the size of your lookup which could bring further speed improvements.

I hope it helps somebody 😉 It helped me quite recently 😉

// Copyright (C) 2010, Stanislaw Adaszewski
// BSD Licensed

#include <sys/mman.h>
#include <semaphore.h>
#include <fcntl.h>
#include <stdio.h>
#include <unistd.h>
#include <mpi.h>
#include <error.h>
#include <errno.h>
#include <stdlib.h>

#define P(x) { if (x) { error(0, errno, "%s:%d : %s ", __FILE__, __LINE__, #x); abort(); } }

int main(int argc, char *argv[])
{
	size_t shm_size = 1024*1024*128; // 128MB
	const char *name = "mpi";

	MPI_Init(&argc, &argv);

	int commsize;
	int taskrank;
	MPI_Comm_size(MPI_COMM_WORLD, &commsize);
	MPI_Comm_rank(MPI_COMM_WORLD, &taskrank);

	int shmfd = shm_open(name, O_CREAT | O_EXCL | O_RDWR, 0600);
	sem_t *sem;
	void *shmem;

	P(!(sem = sem_open(name, O_CREAT, 0600, 0)));
	if (shmfd == -1)
	{
		P((shmfd = shm_open(name, O_RDWR, 0600)) == -1);

		printf("Slave task %d waiting on semaphore...n", taskrank);
		P(sem_wait(sem));
		P(sem_post(sem));

		P((shmem = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0)) == MAP_FAILED);

		printf("Slave task %d done.n", taskrank);
	} else
	{
		printf("Master task %d preparing shared memory...n", taskrank);
		P(ftruncate(shmfd, shm_size));
		P((shmem = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0)) == MAP_FAILED);
		*((int*) shmem) = 0xdeadbeef;
		sleep(2); // Prepare lookup tables, precomputed coefficients, whatever is necessary

		P(sem_post(sem));
		P(sem_unlink(name));
		P(shm_unlink(name));

		printf("Master task %d done.n", taskrank);
	}

	// All tasks can use shared memory now
	printf("Task %d read from shared memory: 0x%08xn", taskrank, *((int*) shmem));

	P(munmap(shmem, shm_size));
	P(close(shmfd));
	P(sem_close(sem));

	printf("Task %d finalizing...n", taskrank);

	MPI_Finalize();

	return 0;
}

Leave a Reply

Your email address will not be published. Required fields are marked *