Re: [PATCH v2 4/5] trace: Make removal of ring buffer pages atomic

From: Steven Rostedt
Date: Mon Aug 22 2011 - 23:27:22 EST


On Tue, 2011-08-16 at 14:46 -0700, Vaibhav Nagarnaik wrote:
> This patch adds the capability to remove pages from a ring buffer
> without destroying any existing data in it.
>
> This is done by removing the pages after the tail page. This makes sure
> that first all the empty pages in the ring buffer are removed. If the
> head page is one in the list of pages to be removed, then the page after
> the removed ones is made the head page. This removes the oldest data
> from the ring buffer and keeps the latest data around to be read.
>
> To do this in a non-racey manner, tracing is stopped for a very short
> time while the pages to be removed are identified and unlinked from the
> ring buffer. The pages are freed after the tracing is restarted to
> minimize the time needed to stop tracing.
>
> The context in which the pages from the per-cpu ring buffer are removed
> runs on the respective CPU. This minimizes the events not traced to only
> NMI trace contexts.

Could you do the same with this patch, as this one fails to build as
well. And probably should check patch 5 while your at it.

-- Steve

>
> Signed-off-by: Vaibhav Nagarnaik <vnagarnaik@xxxxxxxxxx>
> ---
> Changelog v2-v1:
> * The earlier patch removed pages after the tail page by using cmpxchg()
> operations, which were identified as racey by Steven Rostedt. Now, the
> logic is changed to stop tracing till all the pages are identified and
> unlinked, to remove the race with the writer.
>
> kernel/trace/ring_buffer.c | 207 +++++++++++++++++++++++++++++++++-----------
> kernel/trace/trace.c | 20 +----
> 2 files changed, 156 insertions(+), 71 deletions(-)
>
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index a627680..1c86065 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -23,6 +23,8 @@
> #include <asm/local.h>
> #include "trace.h"
>
> +static void update_pages_handler(struct work_struct *work);
> +
> /*
> * The ring buffer header is special. We must manually up keep it.
> */
> @@ -502,6 +504,8 @@ struct ring_buffer_per_cpu {
> /* ring buffer pages to update, > 0 to add, < 0 to remove */
> int nr_pages_to_update;
> struct list_head new_pages; /* new pages to add */
> + struct work_struct update_pages_work;
> + struct completion update_completion;
> };
>
> struct ring_buffer {
> @@ -1080,6 +1084,8 @@ rb_allocate_cpu_buffer(struct ring_buffer *buffer, int nr_pages, int cpu)
> spin_lock_init(&cpu_buffer->reader_lock);
> lockdep_set_class(&cpu_buffer->reader_lock, buffer->reader_lock_key);
> cpu_buffer->lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
> + INIT_WORK(&cpu_buffer->update_pages_work, update_pages_handler);
> + init_completion(&cpu_buffer->update_completion);
>
> bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
> GFP_KERNEL, cpu_to_node(cpu));
> @@ -1267,32 +1273,107 @@ void ring_buffer_set_clock(struct ring_buffer *buffer,
>
> static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer);
>
> +static inline unsigned long rb_page_entries(struct buffer_page *bpage)
> +{
> + return local_read(&bpage->entries) & RB_WRITE_MASK;
> +}
> +
> +static inline unsigned long rb_page_write(struct buffer_page *bpage)
> +{
> + return local_read(&bpage->write) & RB_WRITE_MASK;
> +}
> +
> static void
> -rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned nr_pages)
> +rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned int nr_pages)
> {
> - struct buffer_page *bpage;
> - struct list_head *p;
> - unsigned i;
> + unsigned int nr_removed;
> + int page_entries;
> + struct list_head *tail_page, *to_remove, *next_page;
> + unsigned long head_bit;
> + struct buffer_page *last_page, *first_page;
> + struct buffer_page *to_remove_page, *tmp_iter_page;
>
> + head_bit = 0;
> spin_lock_irq(&cpu_buffer->reader_lock);
> - rb_head_page_deactivate(cpu_buffer);
> -
> - for (i = 0; i < nr_pages; i++) {
> - if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
> - goto out;
> - p = cpu_buffer->pages->next;
> - bpage = list_entry(p, struct buffer_page, list);
> - list_del_init(&bpage->list);
> - free_buffer_page(bpage);
> + atomic_inc(&cpu_buffer->record_disabled);
> + /*
> + * We don't race with the readers since we have acquired the reader
> + * lock. We also don't race with writers after disabling recording.
> + * This makes it easy to figure out the first and the last page to be
> + * removed from the list. We remove all the pages in between including
> + * the first and last pages. This is done in a busy loop so that we
> + * lose the least number of traces.
> + * The pages are freed after we restart recording and unlock readers.
> + */
> + tail_page = &cpu_buffer->tail_page->list;
> + /*
> + * tail page might be on reader page, we remove the next page
> + * from the ring buffer
> + */
> + if (cpu_buffer->tail_page == cpu_buffer->reader_page)
> + tail_page = rb_list_head(tail_page->next);
> + to_remove = tail_page;
> +
> + /* start of pages to remove */
> + first_page = list_entry(rb_list_head(to_remove->next),
> + struct buffer_page, list);
> + for (nr_removed = 0; nr_removed < nr_pages; nr_removed++) {
> + to_remove = rb_list_head(to_remove)->next;
> + head_bit |= (unsigned long)to_remove & RB_PAGE_HEAD;
> }
> - if (RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages)))
> - goto out;
> -
> - rb_reset_cpu(cpu_buffer);
> - rb_check_pages(cpu_buffer);
>
> -out:
> + next_page = rb_list_head(to_remove)->next;
> + /* now we remove all pages between tail_page and next_page */
> + tail_page->next = (struct list_head *)((unsigned long)next_page |
> + head_bit);
> + next_page = rb_list_head(next_page);
> + next_page->prev = tail_page;
> + /* make sure pages points to a valid page in the ring buffer */
> + cpu_buffer->pages = next_page;
> + /* update head page */
> + if (head_bit)
> + cpu_buffer->head_page = list_entry(next_page,
> + struct buffer_page, list);
> + /*
> + * change read pointer to make sure any read iterators reset
> + * themselves
> + */
> + cpu_buffer->read = 0;
> + /* pages are removed, resume tracing and then free the pages */
> + atomic_dec(&cpu_buffer->record_disabled);
> spin_unlock_irq(&cpu_buffer->reader_lock);
> +
> + RB_WARN_ON(cpu_buffer, list_empty(cpu_buffer->pages));
> +
> + /* last buffer page to remove */
> + last_page = list_entry(rb_list_head(to_remove), struct buffer_page,
> + list);
> + tmp_iter_page = first_page;
> + do {
> + to_remove_page = tmp_iter_page;
> + rb_inc_page(cpu_buffer, &tmp_iter_page);
> + /* update the counters */
> + page_entries = rb_page_entries(to_remove_page);
> + if (page_entries) {
> + /*
> + * If something was added to this page, it was full
> + * since it is not the tail page. So we deduct the
> + * bytes consumed in ring buffer from here.
> + * No need to update overruns, since this page is
> + * deleted from ring buffer and its entries are
> + * already accounted for.
> + */
> + local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
> + }
> + /*
> + * We have already removed references to this list item, just
> + * free up the buffer_page and its page
> + */
> + nr_removed--;
> + free_buffer_page(to_remove_page);
> + } while (to_remove_page != last_page);
> +
> + RB_WARN_ON(cpu_buffer, nr_removed);
> }
>
> static void
> @@ -1303,6 +1384,12 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
> struct list_head *p;
> unsigned i;
>
> + /* stop the writers while inserting pages */
> + atomic_inc(&cpu_buffer->record_disabled);
> +
> + /* Make sure all writers are done with this buffer. */
> + synchronize_sched();
> +
> spin_lock_irq(&cpu_buffer->reader_lock);
> rb_head_page_deactivate(cpu_buffer);
>
> @@ -1319,17 +1406,21 @@ rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer,
>
> out:
> spin_unlock_irq(&cpu_buffer->reader_lock);
> + atomic_dec(&cpu_buffer->record_disabled);
> }
>
> -static void update_pages_handler(struct ring_buffer_per_cpu *cpu_buffer)
> +static void update_pages_handler(struct work_struct *work)
> {
> + struct ring_buffer_per_cpu *cpu_buffer = container_of(work,
> + struct ring_buffer_per_cpu, update_pages_work);
> +
> if (cpu_buffer->nr_pages_to_update > 0)
> rb_insert_pages(cpu_buffer, &cpu_buffer->new_pages,
> cpu_buffer->nr_pages_to_update);
> else
> rb_remove_pages(cpu_buffer, -cpu_buffer->nr_pages_to_update);
> - /* reset this value */
> - cpu_buffer->nr_pages_to_update = 0;
> +
> + complete(&cpu_buffer->update_completion);
> }
>
> /**
> @@ -1339,7 +1430,7 @@ static void update_pages_handler(struct ring_buffer_per_cpu *cpu_buffer)
> *
> * Minimum size is 2 * BUF_PAGE_SIZE.
> *
> - * Returns -1 on failure.
> + * Returns 0 on success and < 0 on failure.
> */
> int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
> int cpu_id)
> @@ -1361,21 +1452,28 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
> if (size < BUF_PAGE_SIZE * 2)
> size = BUF_PAGE_SIZE * 2;
>
> - atomic_inc(&buffer->record_disabled);
> -
> - /* Make sure all writers are done with this buffer. */
> - synchronize_sched();
> + nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
>
> + /*
> + * Don't succeed if recording is disabled globally, as a reader might
> + * be manipulating the ring buffer and is expecting a sane state while
> + * this is true.
> + */
> + if (atomic_read(&buffer->record_disabled))
> + return -EBUSY;
> + /* prevent another thread from changing buffer sizes */
> mutex_lock(&buffer->mutex);
> - get_online_cpus();
> -
> - nr_pages = DIV_ROUND_UP(size, BUF_PAGE_SIZE);
>
> if (cpu_id == RING_BUFFER_ALL_CPUS) {
> /* calculate the pages to update */
> for_each_buffer_cpu(buffer, cpu) {
> cpu_buffer = buffer->buffers[cpu];
>
> + if (atomic_read(&cpu_buffer->record_disabled)) {
> + err = -EBUSY;
> + goto out_err;
> + }
> +
> cpu_buffer->nr_pages_to_update = nr_pages -
> cpu_buffer->nr_pages;
>
> @@ -1396,16 +1494,31 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
> goto no_mem;
> }
>
> + /* fire off all the required work handlers */
> + for_each_buffer_cpu(buffer, cpu) {
> + cpu_buffer = buffer->buffers[cpu];
> + if (!cpu_buffer->nr_pages_to_update)
> + continue;
> + schedule_work_on(cpu, &cpu_buffer->update_pages_work);
> + }
> +
> /* wait for all the updates to complete */
> for_each_buffer_cpu(buffer, cpu) {
> cpu_buffer = buffer->buffers[cpu];
> - if (cpu_buffer->nr_pages_to_update) {
> - update_pages_handler(cpu_buffer);
> - cpu_buffer->nr_pages = nr_pages;
> - }
> + if (!cpu_buffer->nr_pages_to_update)
> + continue;
> + wait_for_completion(&cpu_buffer->update_completion);
> + cpu_buffer->nr_pages = nr_pages;
> + /* reset this value */
> + cpu_buffer->nr_pages_to_update = 0;
> }
> } else {
> cpu_buffer = buffer->buffers[cpu_id];
> + if (atomic_read(&cpu_buffer->record_disabled)) {
> + err = -EBUSY;
> + goto out_err;
> + }
> +
> if (nr_pages == cpu_buffer->nr_pages)
> goto out;
>
> @@ -1418,36 +1531,36 @@ int ring_buffer_resize(struct ring_buffer *buffer, unsigned long size,
> &cpu_buffer->new_pages, cpu_id))
> goto no_mem;
>
> - update_pages_handler(cpu_buffer);
> + schedule_work_on(cpu_id, &cpu_buffer->update_pages_work);
> + wait_for_completion(&cpu_buffer->update_completion);
>
> cpu_buffer->nr_pages = nr_pages;
> + /* reset this value */
> + cpu_buffer->nr_pages_to_update = 0;
> }
>
> out:
> - put_online_cpus();
> mutex_unlock(&buffer->mutex);
> -
> - atomic_dec(&buffer->record_disabled);
> -
> return size;
>
> no_mem:
> for_each_buffer_cpu(buffer, cpu) {
> struct buffer_page *bpage, *tmp;
> +
> cpu_buffer = buffer->buffers[cpu];
> /* reset this number regardless */
> cpu_buffer->nr_pages_to_update = 0;
> +
> if (list_empty(&cpu_buffer->new_pages))
> continue;
> +
> list_for_each_entry_safe(bpage, tmp, &cpu_buffer->new_pages,
> list) {
> list_del_init(&bpage->list);
> free_buffer_page(bpage);
> }
> }
> - put_online_cpus();
> mutex_unlock(&buffer->mutex);
> - atomic_dec(&buffer->record_disabled);
> return -ENOMEM;
> }
> EXPORT_SYMBOL_GPL(ring_buffer_resize);
> @@ -1487,21 +1600,11 @@ rb_iter_head_event(struct ring_buffer_iter *iter)
> return __rb_page_index(iter->head_page, iter->head);
> }
>
> -static inline unsigned long rb_page_write(struct buffer_page *bpage)
> -{
> - return local_read(&bpage->write) & RB_WRITE_MASK;
> -}
> -
> static inline unsigned rb_page_commit(struct buffer_page *bpage)
> {
> return local_read(&bpage->page->commit);
> }
>
> -static inline unsigned long rb_page_entries(struct buffer_page *bpage)
> -{
> - return local_read(&bpage->entries) & RB_WRITE_MASK;
> -}
> -
> /* Size is determined by what has been committed */
> static inline unsigned rb_page_size(struct buffer_page *bpage)
> {
> diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c
> index 305832a..908cecc 100644
> --- a/kernel/trace/trace.c
> +++ b/kernel/trace/trace.c
> @@ -2934,20 +2934,10 @@ static int __tracing_resize_ring_buffer(unsigned long size, int cpu)
>
> static ssize_t tracing_resize_ring_buffer(unsigned long size, int cpu_id)
> {
> - int cpu, ret = size;
> + int ret = size;
>
> mutex_lock(&trace_types_lock);
>
> - tracing_stop();
> -
> - /* disable all cpu buffers */
> - for_each_tracing_cpu(cpu) {
> - if (global_trace.data[cpu])
> - atomic_inc(&global_trace.data[cpu]->disabled);
> - if (max_tr.data[cpu])
> - atomic_inc(&max_tr.data[cpu]->disabled);
> - }
> -
> if (cpu_id != RING_BUFFER_ALL_CPUS) {
> /* make sure, this cpu is enabled in the mask */
> if (!cpumask_test_cpu(cpu_id, tracing_buffer_mask)) {
> @@ -2961,14 +2951,6 @@ static ssize_t tracing_resize_ring_buffer(unsigned long size, int cpu_id)
> ret = -ENOMEM;
>
> out:
> - for_each_tracing_cpu(cpu) {
> - if (global_trace.data[cpu])
> - atomic_dec(&global_trace.data[cpu]->disabled);
> - if (max_tr.data[cpu])
> - atomic_dec(&max_tr.data[cpu]->disabled);
> - }
> -
> - tracing_start();
> mutex_unlock(&trace_types_lock);
>
> return ret;


--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/