Re: [PATCH 0/9] Scalability requirements for sysv ipc - v3

From: Nadia Derbey
Date: Wed May 07 2008 - 07:42:11 EST


Nadia.Derbey@xxxxxxxx wrote:
After scalability problems have been detected when using the sysV ipcs, I
have proposed to use an RCU based implementation of the IDR api instead (see
threads http://lkml.org/lkml/2008/4/11/212 and
http://lkml.org/lkml/2008/4/29/295).

This resulted in many people asking to convert the idr API and make it
rcu safe (because most of the code was duplicated and thus unmaintanable
and unreviewable).

So here is a first attempt.

The important change wrt to the idr API itself is during idr removes:
idr layers are freed after a grace period, instead of being moved to the
free list.

The important change wrt to ipcs, is that idr_find() can now be called
locklessly inside a rcu read critical section.

Here are the results I've got for the pmsg test sent by Manfred:

2.6.25-rc3-mm1 2.6.25-rc3-mm1+ 2.6.25-mm1 Patched 2.6.25-mm1
1 1168441 1064021 876000 947488
2 1094264 921059 1549592 1730685
3 2082520 1738165 1694370 2324880
4 2079929 1695521 404553 2400408
5 2898758 406566 391283 3246580
6 2921417 261275 263249 3752148
7 3308761 126056 191742 4243142
8 3329456 100129 141722 4275780

1st column: stock 2.6.25-rc3-mm1
2nd column: 2.6.25-rc3-mm1 + ipc patches (store ipcs into idrs)
3nd column: stock 2.6.25-mm1
4th column: 2.6.25-mm1 + this pacth series.



You'll find in attachment the corresponding chart, and also the pmsg code (originally sent by Manfred).

Regards,
Nadia

Attachment: results.pdf
Description: application/force-download

/*
* pmsg.cpp, parallel sysv msg pingpong
*
* Copyright (C) 1999, 2001, 2005, 2008 by Manfred Spraul.
* All rights reserved except the rights granted by the GPL.
*
* Redistribution of this file is permitted under the terms of the GNU
* General Public License (GPL) version 2 or later.
* $Header$
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <getopt.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <pthread.h>

//////////////////////////////////////////////////////////////////////////////

static enum {
WAITING,
RUNNING,
STOPPED,
} volatile g_state = WAITING;

unsigned long long *g_results;
int *g_svmsg_ids;
pthread_t *g_threads;

struct taskinfo {
int svmsg_id;
int threadid;
int cpuid;
int sender;
};

#define DATASIZE 8

void* worker_thread(void *arg)
{
struct taskinfo *ti = (struct taskinfo*)arg;
unsigned long long rounds;
int ret;
struct {
long mtype;
char buffer[DATASIZE];
} mbuf;

{
cpu_set_t cpus;
CPU_ZERO(&cpus);
CPU_SET(ti->cpuid, &cpus);

ret = pthread_setaffinity_np(g_threads[ti->threadid], sizeof(cpus), &cpus);
if (ret < 0) {
printf("pthread_setaffinity_np failed for thread %d with errno %d.\n",
ti->threadid, errno);
}

ret = pthread_getaffinity_np(g_threads[ti->threadid], sizeof(cpus), &cpus);
if (ret < 0) {
printf("pthread_getaffinity_np() failed for thread %d with errno %d.\n",
ti->threadid, errno);
fflush(stdout);
} else {
printf("thread %d: sysvmsg %8d type %d bound to %04lxh\n",ti->threadid,
ti->svmsg_id, ti->sender, cpus.__bits[0]);
}
fflush(stdout);
}

rounds = 0;
while(g_state == WAITING) {
#ifdef __i386__
__asm__ __volatile__("pause": : :"memory");
#endif
}

if (ti->sender) {
mbuf.mtype = ti->sender+1;
ret = msgsnd(ti->svmsg_id, &mbuf, DATASIZE, 0);
if (ret != 0) {
printf("Initial send failed, errno %d.\n", errno);
exit(1);
}
}
while(g_state == RUNNING) {
int target = 1+!ti->sender;

ret = msgrcv(ti->svmsg_id, &mbuf, DATASIZE, target, 0);
if (ret != DATASIZE) {
if (errno == EIDRM)
break;
printf("Error on msgrcv, got %d, errno %d.\n", ret, errno);
exit(1);
}
mbuf.mtype = ti->sender+1;
ret = msgsnd(ti->svmsg_id, &mbuf, DATASIZE, 0);
if (ret != 0) {
if (errno == EIDRM)
break;
printf("send failed, errno %d.\n", errno);
exit(1);
}
rounds++;
}
/* store result */
g_results[ti->threadid] = rounds;

pthread_exit(0);
return NULL;
}

void init_threads(int cpu, int cpus)
{
int ret;
struct taskinfo *ti1, *ti2;

ti1 = new (struct taskinfo);
ti2 = new (struct taskinfo);
if (!ti1 || !ti2) {
printf("Could not allocate task info\n");
exit(1);
}

g_svmsg_ids[cpu] = msgget(IPC_PRIVATE,0777|IPC_CREAT);
if(g_svmsg_ids[cpu] == -1) {
printf(" message queue create failed.\n");
exit(1);
}

g_results[cpu] = 0;
g_results[cpu+cpus] = 0;

ti1->svmsg_id = g_svmsg_ids[cpu];
ti1->threadid = cpu;
ti1->cpuid = cpu;
ti1->sender = 1;
ti2->svmsg_id = g_svmsg_ids[cpu];
ti2->threadid = cpu+cpus;
ti2->cpuid = cpu;
ti2->sender = 0;

ret = pthread_create(&g_threads[ti1->threadid], NULL, worker_thread, ti1);
if (ret) {
printf(" pthread_create failed with error code %d\n", ret);
exit(1);
}
ret = pthread_create(&g_threads[ti2->threadid], NULL, worker_thread, ti2);
if (ret) {
printf(" pthread_create failed with error code %d\n", ret);
exit(1);
}
}

//////////////////////////////////////////////////////////////////////////////

int main(int argc, char **argv)
{
int queues, timeout;
unsigned long long totals;
int i;

printf("pmsg [nr queues] [timeout]\n");
if (argc != 3) {
printf(" Invalid parameters.\n");
return 0;
}
queues = atoi(argv[1]);
timeout = atoi(argv[2]);
printf("Using %d queues/cpus (%d threads) for %d seconds.\n",
queues, 2*queues, timeout);

g_results = new unsigned long long[2*queues];
g_svmsg_ids = new int[queues];
g_threads = new pthread_t[2*queues];
for (i=0;i<queues;i++) {
init_threads(i, queues);
}

sleep(1);
g_state = RUNNING;
sleep(timeout);
g_state = STOPPED;
sleep(1);
for (i=0;i<queues;i++) {
int res;
res = msgctl(g_svmsg_ids[i],IPC_RMID,NULL);
if (res < 0) {
printf("msgctl(IPC_RMID) failed for %d, errno%d.\n",
g_svmsg_ids[i], errno);
}
}
for (i=0;i<2*queues;i++)
pthread_join(g_threads[i], NULL);

printf("Result matrix:\n");
totals = 0;
for (i=0;i<queues;i++) {
printf(" Thread %3d: %8lld %3d: %8lld\n",
i, g_results[i], i+queues, g_results[i+queues]);
totals += g_results[i] + g_results[i+queues];
}
printf("Total: %lld\n", totals);
}