[patch 11/12] pollfs: asynchronous workqueue

From: davi
Date: Sun Apr 01 2007 - 12:04:29 EST


Asynchronously run work items.

If the worker thread blocks while the kernel executes the work function
call a new worker thread is created (if one is not available) to handle
the remaining workqueue items.

Various errors and resource limitations are not yet handled.

Signed-off-by: Davi E. M. Arnaut <davi@xxxxxxxxxxxxx>
---

Index: linux-2.6/include/linux/workqueue.h
===================================================================
--- linux-2.6.orig/include/linux/workqueue.h
+++ linux-2.6/include/linux/workqueue.h
@@ -25,7 +25,8 @@ struct work_struct {
atomic_long_t data;
#define WORK_STRUCT_PENDING 0 /* T if work item pending execution */
#define WORK_STRUCT_NOAUTOREL 1 /* F if work item automatically released on exec */
-#define WORK_STRUCT_FLAG_MASK (3UL)
+#define WORK_STRUCT_ASYNC 2 /* T if work item can be executed asynchronously */
+#define WORK_STRUCT_FLAG_MASK (7UL)
#define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)
struct list_head entry;
work_func_t func;
@@ -171,6 +172,7 @@ extern int FASTCALL(queue_work(struct wo
extern int FASTCALL(queue_delayed_work(struct workqueue_struct *wq, struct delayed_work *work, unsigned long delay));
extern int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
struct delayed_work *work, unsigned long delay);
+extern int FASTCALL(queue_async_work(struct workqueue_struct *wq, struct work_struct *work));
extern void FASTCALL(flush_workqueue(struct workqueue_struct *wq));

extern int FASTCALL(schedule_work(struct work_struct *work));
Index: linux-2.6/kernel/workqueue.c
===================================================================
--- linux-2.6.orig/kernel/workqueue.c
+++ linux-2.6/kernel/workqueue.c
@@ -14,6 +14,7 @@
* Theodore Ts'o <tytso@xxxxxxx>
*
* Made to use alloc_percpu by Christoph Lameter <clameter@xxxxxxx>.
+ * Asynchronous workqueue by Davi E. M. Arnaut <davi.arnaut@xxxxxxxxx>
*/

#include <linux/module.h>
@@ -60,6 +61,8 @@ struct cpu_workqueue_struct {
int run_depth; /* Detect run_workqueue() recursion depth */

int freezeable; /* Freeze the thread during suspend */
+
+ struct list_head threadlist;
} ____cacheline_aligned;

/*
@@ -297,9 +300,27 @@ int queue_delayed_work_on(int cpu, struc
}
EXPORT_SYMBOL_GPL(queue_delayed_work_on);

+/**
+ * queue_async_work - queue an asynchronous work on a workqueue
+ * @wq: workqueue to use
+ * @work: work to queue
+ *
+ * Returns 0 if @work was already on a queue, non-zero otherwise.
+ *
+ * We queue the work to the CPU it was submitted, but there is no
+ * guarantee that it will be processed by that CPU.
+ */
+int fastcall queue_async_work(struct workqueue_struct *wq, struct work_struct *work)
+{
+ set_bit(WORK_STRUCT_ASYNC, work_data_bits(work));
+
+ return queue_work(wq, work);
+}
+EXPORT_SYMBOL_GPL(queue_async_work);
+
static void run_workqueue(struct cpu_workqueue_struct *cwq)
{
- unsigned long flags;
+ unsigned long flags, async;

/*
* Keep taking off work from the queue until
@@ -324,8 +345,18 @@ static void run_workqueue(struct cpu_wor
BUG_ON(get_wq_data(work) != cwq);
if (!test_bit(WORK_STRUCT_NOAUTOREL, work_data_bits(work)))
work_release(work);
+
+ async = test_bit(WORK_STRUCT_ASYNC, work_data_bits(work));
+ if (unlikely(async))
+ current->cwq = cwq;
+
f(work);

+ if (current->cwq)
+ current->cwq = NULL;
+ else if (async)
+ async++;
+
if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
"%s/0x%08x/%d\n",
@@ -340,6 +371,17 @@ static void run_workqueue(struct cpu_wor
spin_lock_irqsave(&cwq->lock, flags);
cwq->remove_sequence++;
wake_up(&cwq->work_done);
+
+ if (async > 1) {
+ if (cwq->thread) {
+ list_add_tail(&current->cwq_entry, &cwq->threadlist);
+ spin_unlock_irqrestore(&cwq->lock, flags);
+ schedule();
+ spin_lock_irqsave(&cwq->lock, flags);
+ }
+ else
+ cwq->thread = current;
+ }
}
cwq->run_depth--;
spin_unlock_irqrestore(&cwq->lock, flags);
@@ -467,6 +509,7 @@ static struct task_struct *create_workqu
cwq->remove_sequence = 0;
cwq->freezeable = freezeable;
INIT_LIST_HEAD(&cwq->worklist);
+ INIT_LIST_HEAD(&cwq->threadlist);
init_waitqueue_head(&cwq->more_work);
init_waitqueue_head(&cwq->work_done);

@@ -534,15 +577,19 @@ static void cleanup_workqueue_thread(str
{
struct cpu_workqueue_struct *cwq;
unsigned long flags;
- struct task_struct *p;
+ struct task_struct *p, *tmp;
+ LIST_HEAD(threadlist);

cwq = per_cpu_ptr(wq->cpu_wq, cpu);
spin_lock_irqsave(&cwq->lock, flags);
p = cwq->thread;
cwq->thread = NULL;
+ list_splice_init(&cwq->threadlist, &threadlist);
spin_unlock_irqrestore(&cwq->lock, flags);
if (p)
kthread_stop(p);
+ list_for_each_entry_safe(p, tmp, &threadlist, cwq_entry)
+ kthread_stop(p);
}

/**
@@ -811,6 +858,68 @@ static int __devinit workqueue_cpu_callb
return NOTIFY_OK;
}

+static void create_cpu_worker(struct cpu_workqueue_struct *cwq)
+{
+ unsigned long flags;
+ struct task_struct *p;
+ struct workqueue_struct *wq = cwq->wq;
+ int cpu = first_cpu(current->cpus_allowed);
+
+ mutex_lock(&workqueue_mutex);
+ if (is_single_threaded(wq))
+ p = kthread_create(worker_thread, cwq, "%s", wq->name);
+ else
+ p = kthread_create(worker_thread, cwq, "%s/%d", wq->name, cpu);
+
+ if (IS_ERR(p))
+ /* oh well, there isn't much we can do anyway. */
+ goto unlock;
+
+ kthread_bind(p, cpu);
+
+ spin_lock_irqsave(&cwq->lock, flags);
+ if (!cwq->thread)
+ wake_up_process(p);
+ else
+ list_add_tail(&p->cwq_entry, &cwq->threadlist);
+ spin_unlock_irqrestore(&cwq->lock, flags);
+
+unlock:
+ mutex_unlock(&workqueue_mutex);
+}
+
+static inline void wake_up_cpu_worker(struct cpu_workqueue_struct *cwq)
+{
+ struct task_struct *worker = list_entry(cwq->threadlist.next,
+ struct task_struct, cwq_entry);
+
+ list_del_init(cwq->threadlist.next);
+
+ cwq->thread = worker;
+
+ wake_up_process(worker);
+}
+
+void schedule_workqueue(struct task_struct *task)
+{
+ struct cpu_workqueue_struct *cwq = task->cwq;
+ unsigned long flags;
+
+ task->cwq = NULL;
+
+ spin_lock_irqsave(&cwq->lock, flags);
+ if (cwq->thread == task) {
+ if (!list_empty(&cwq->threadlist))
+ wake_up_cpu_worker(cwq);
+ else
+ task = cwq->thread = NULL;
+ }
+ spin_unlock_irqrestore(&cwq->lock, flags);
+
+ if (!task)
+ create_cpu_worker(cwq);
+}
+
void init_workqueues(void)
{
singlethread_cpu = first_cpu(cpu_possible_map);
Index: linux-2.6/include/linux/sched.h
===================================================================
--- linux-2.6.orig/include/linux/sched.h
+++ linux-2.6/include/linux/sched.h
@@ -843,6 +843,9 @@ struct task_struct {

struct mm_struct *mm, *active_mm;

+ /* (asynchronous) cpu workqueue */
+ void *cwq;
+ struct list_head cwq_entry;
/* task state */
struct linux_binfmt *binfmt;
long exit_state;
@@ -1409,6 +1412,7 @@ extern int disallow_signal(int);
extern int do_execve(char *, char __user * __user *, char __user * __user *, struct pt_regs *);
extern long do_fork(unsigned long, unsigned long, struct pt_regs *, unsigned long, int __user *, int __user *);
struct task_struct *fork_idle(int);
+extern void schedule_workqueue(struct task_struct *);

extern void set_task_comm(struct task_struct *tsk, char *from);
extern void get_task_comm(char *to, struct task_struct *tsk);
Index: linux-2.6/kernel/sched.c
===================================================================
--- linux-2.6.orig/kernel/sched.c
+++ linux-2.6/kernel/sched.c
@@ -3305,6 +3305,12 @@ asmlinkage void __sched schedule(void)
}
profile_hit(SCHED_PROFILING, __builtin_return_address(0));

+ /* asynchronous queue worker */
+ if (unlikely(current->cwq))
+ /* only if it's a voluntary sleep */
+ if (!(preempt_count() & PREEMPT_ACTIVE) && current->state != TASK_RUNNING)
+ schedule_workqueue(current);
+
need_resched:
preempt_disable();
prev = current;
Index: linux-2.6/include/linux/init_task.h
===================================================================
--- linux-2.6.orig/include/linux/init_task.h
+++ linux-2.6/include/linux/init_task.h
@@ -112,6 +112,7 @@ extern struct group_info init_groups;
.tasks = LIST_HEAD_INIT(tsk.tasks), \
.ptrace_children= LIST_HEAD_INIT(tsk.ptrace_children), \
.ptrace_list = LIST_HEAD_INIT(tsk.ptrace_list), \
+ .cwq_entry = LIST_HEAD_INIT(tsk.cwq_entry), \
.real_parent = &tsk, \
.parent = &tsk, \
.children = LIST_HEAD_INIT(tsk.children), \
Index: linux-2.6/kernel/fork.c
===================================================================
--- linux-2.6.orig/kernel/fork.c
+++ linux-2.6/kernel/fork.c
@@ -1173,6 +1173,7 @@ static struct task_struct *copy_process(
INIT_LIST_HEAD(&p->thread_group);
INIT_LIST_HEAD(&p->ptrace_children);
INIT_LIST_HEAD(&p->ptrace_list);
+ INIT_LIST_HEAD(&p->cwq_entry);

/* Perform scheduler related setup. Assign this task to a CPU. */
sched_fork(p, clone_flags);

--
-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/