[PATCH, RFC] RCU : OOM avoidance and lower latency

From: Eric Dumazet
Date: Fri Jan 06 2006 - 05:18:11 EST



In order to avoid some OOM triggered by a flood of call_rcu() calls, we increased in linux 2.6.14 maxbatch from 10 to 10000, and conditionally call set_need_resched() in call_rcu().

This solution doesnt solve all the problems and has drawbacks.

1) Using a big maxbatch has a bad impact on latency.
2) A flood of call_rcu_bh() still can OOM

I have some servers that once in a while crashes when the ip route cache is flushed. After raising /proc/sys/net/ipv4/route/secret_interval (so that *no* flush is done), I got better uptime for these servers. But in some cases I think the network stack can floods call_rcu_bh(), and a fatal OOM occurs.

I suggest in this patch :

1) To lower maxbatch to a more reasonable value (as far as the latency is concerned)

2) To be able to guard a RCU cpu queue against a maximal count (10.000 for example). If this limit is reached, free the oldest entry of this queue.

I assume that if a CPU queued 10.000 items in its RCU queue, then the oldest entry cannot still be in use by another CPU. This might sounds as a violation of RCU rules, (I'm not an RCU expert) but seems quite reasonable.


Signed-off-by: Eric Dumazet <dada1@xxxxxxxxxxxxx> --- linux-2.6.15/kernel/rcupdate.c 2006-01-03 04:21:10.000000000 +0100
+++ linux-2.6.15-edum/kernel/rcupdate.c 2006-01-06 11:10:45.000000000 +0100
@@ -71,14 +71,14 @@

/* Fake initialization required by compiler */
static DEFINE_PER_CPU(struct tasklet_struct, rcu_tasklet) = {NULL};
-static int maxbatch = 10000;
+static int maxbatch = 100;

#ifndef __HAVE_ARCH_CMPXCHG
/*
* We use an array of spinlocks for the rcurefs -- similar to ones in sparc
* 32 bit atomic_t implementations, and a hash function similar to that
* for our refcounting needs.
- * Can't help multiprocessors which donot have cmpxchg :(
+ * Can't help multiprocessors which dont have cmpxchg :(
*/

spinlock_t __rcuref_hash[RCUREF_HASH_SIZE] = {
@@ -110,9 +110,17 @@
*rdp->nxttail = head;
rdp->nxttail = &head->next;

- if (unlikely(++rdp->count > 10000))
- set_need_resched();
-
+/*
+ * OOM avoidance : If we queued too many items in this queue,
+ * free the oldest entry
+ */
+ if (unlikely(++rdp->count > 10000)) {
+ rdp->count--;
+ head = rdp->donelist;
+ rdp->donelist = head->next;
+ local_irq_restore(flags);
+ return head->func(head);
+ }
local_irq_restore(flags);
}

@@ -148,12 +156,17 @@
rdp = &__get_cpu_var(rcu_bh_data);
*rdp->nxttail = head;
rdp->nxttail = &head->next;
- rdp->count++;
/*
- * Should we directly call rcu_do_batch() here ?
- * if (unlikely(rdp->count > 10000))
- * rcu_do_batch(rdp);
+ * OOM avoidance : If we queued too many items in this queue,
+ * free the oldest entry
*/
+ if (unlikely(++rdp->count > 10000)) {
+ rdp->count--;
+ head = rdp->donelist;
+ rdp->donelist = head->next;
+ local_irq_restore(flags);
+ return head->func(head);
+ }
local_irq_restore(flags);
}

@@ -209,7 +222,7 @@
static void rcu_do_batch(struct rcu_data *rdp)
{
struct rcu_head *next, *list;
- int count = 0;
+ int count = maxbatch;

list = rdp->donelist;
while (list) {
@@ -217,7 +230,7 @@
list->func(list);
list = next;
rdp->count--;
- if (++count >= maxbatch)
+ if (--count <= 0)
break;
}
if (!rdp->donelist)