Re: [PATCH V2] PCI/DOE: Detect on stack work items automatically

From: Dan Williams
Date: Fri Nov 18 2022 - 14:46:15 EST


Ira Weiny wrote:
> On Fri, Nov 18, 2022 at 09:20:38AM +0000, David Laight wrote:
> > From: ira.weiny@xxxxxxxxx
> > > Sent: 18 November 2022 00:05
> > >
> > > Work item initialization needs to be done with either
> > > INIT_WORK_ONSTACK() or INIT_WORK() depending on how the work item is
> > > allocated.
> > >
> > > The callers of pci_doe_submit_task() allocate struct pci_doe_task on the
> > > stack and pci_doe_submit_task() incorrectly used INIT_WORK().
> > >
> > > Jonathan suggested creating doe task allocation macros such as
> > > DECLARE_CDAT_DOE_TASK_ONSTACK().[1] The issue with this is the work
> > > function is not known to the callers and must be initialized correctly.
> > >
> > > A follow up suggestion was to have an internal 'pci_doe_work' item
> > > allocated by pci_doe_submit_task().[2] This requires an allocation which
> > > could restrict the context where tasks are used.
> > >
> > > Another idea was to have an intermediate step to initialize the task
> > > struct with a new call.[3] This added a lot of complexity.
> > >
> > > Lukas pointed out that object_is_on_stack() is available to detect this
> > > automatically.
> > >
> > > Use object_is_on_stack() to determine the correct init work function to
> > > call.
> >
> > This is all a bit strange.
> > The 'onstack' flag is needed for the diagnostic check:
> > is_on_stack = object_is_on_stack(addr);
> > if (is_on_stack == onstack)
> > return;
> > pr_warn(...);
> > WARN_ON(1);
> >
>
> :-(
>
> > So setting the flag to the location of the buffer just subverts the check.
> > It that is sane there ought to be a proper way to do it.
>
> Ok this brings me back to my previous point and suggested patch.[*] The
> fundamental bug is that the work item is allocated in different code from
> the code which uses it. Separating the work item from the task.
>
> [*] https://lore.kernel.org/linux-cxl/20221014151045.24781-1-Jonathan.Cameron@xxxxxxxxxx/T/#m63c636c5135f304480370924f4d03c00357be667
>
> Bjorn would this solution be acceptable and just use GFP_KERNEL and mark the
> required context for pci_doe_submit_task()?

It is a waste to have an allocation when one is not needed. The value of
having INIT_WORK_ONSTACK and DECLARE_COMPLETION_ONSTACK is to be clear
at the call site that an async context cares about this stack frame not
going out of scope.

However, coming full circle, we have zero async users today, and having
the completion and work struct in the task are causing a maintenance
burden. So let's just rip it out for now with something like the
following and circle back to add async support later when it becomes
necessary: (only compile tested)


diff --git a/drivers/cxl/core/pci.c b/drivers/cxl/core/pci.c
index 0dbbe8d39b07..69873cdcc911 100644
--- a/drivers/cxl/core/pci.c
+++ b/drivers/cxl/core/pci.c
@@ -488,21 +488,14 @@ static struct pci_doe_mb *find_cdat_doe(struct device *uport)
CXL_DOE_TABLE_ACCESS_TABLE_TYPE_CDATA) | \
FIELD_PREP(CXL_DOE_TABLE_ACCESS_ENTRY_HANDLE, (entry_handle)))

-static void cxl_doe_task_complete(struct pci_doe_task *task)
-{
- complete(task->private);
-}
-
struct cdat_doe_task {
u32 request_pl;
u32 response_pl[32];
- struct completion c;
struct pci_doe_task task;
};

#define DECLARE_CDAT_DOE_TASK(req, cdt) \
struct cdat_doe_task cdt = { \
- .c = COMPLETION_INITIALIZER_ONSTACK(cdt.c), \
.request_pl = req, \
.task = { \
.prot.vid = PCI_DVSEC_VENDOR_ID_CXL, \
@@ -511,8 +504,6 @@ struct cdat_doe_task cdt = { \
.request_pl_sz = sizeof(cdt.request_pl), \
.response_pl = cdt.response_pl, \
.response_pl_sz = sizeof(cdt.response_pl), \
- .complete = cxl_doe_task_complete, \
- .private = &cdt.c, \
} \
}

@@ -523,12 +514,12 @@ static int cxl_cdat_get_length(struct device *dev,
DECLARE_CDAT_DOE_TASK(CDAT_DOE_REQ(0), t);
int rc;

- rc = pci_doe_submit_task(cdat_doe, &t.task);
+ rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
if (rc < 0) {
dev_err(dev, "DOE submit failed: %d", rc);
return rc;
}
- wait_for_completion(&t.c);
+
if (t.task.rv < sizeof(u32))
return -EIO;

@@ -552,12 +543,11 @@ static int cxl_cdat_read_table(struct device *dev,
u32 *entry;
int rc;

- rc = pci_doe_submit_task(cdat_doe, &t.task);
+ rc = pci_doe_submit_task_wait(cdat_doe, &t.task);
if (rc < 0) {
dev_err(dev, "DOE submit failed: %d", rc);
return rc;
}
- wait_for_completion(&t.c);
/* 1 DW header + 1 DW data min */
if (t.task.rv < (2 * sizeof(u32)))
return -EIO;
diff --git a/drivers/pci/doe.c b/drivers/pci/doe.c
index e402f05068a5..115a8ff14afc 100644
--- a/drivers/pci/doe.c
+++ b/drivers/pci/doe.c
@@ -18,7 +18,6 @@
#include <linux/mutex.h>
#include <linux/pci.h>
#include <linux/pci-doe.h>
-#include <linux/workqueue.h>

#define PCI_DOE_PROTOCOL_DISCOVERY 0

@@ -40,7 +39,6 @@
* @cap_offset: Capability offset
* @prots: Array of protocols supported (encoded as long values)
* @wq: Wait queue for work item
- * @work_queue: Queue of pci_doe_work items
* @flags: Bit array of PCI_DOE_FLAG_* flags
*/
struct pci_doe_mb {
@@ -49,7 +47,6 @@ struct pci_doe_mb {
struct xarray prots;

wait_queue_head_t wq;
- struct workqueue_struct *work_queue;
unsigned long flags;
};

@@ -211,7 +208,6 @@ static int pci_doe_recv_resp(struct pci_doe_mb *doe_mb, struct pci_doe_task *tas
static void signal_task_complete(struct pci_doe_task *task, int rv)
{
task->rv = rv;
- task->complete(task);
}

static void signal_task_abort(struct pci_doe_task *task, int rv)
@@ -231,10 +227,9 @@ static void signal_task_abort(struct pci_doe_task *task, int rv)
signal_task_complete(task, rv);
}

-static void doe_statemachine_work(struct work_struct *work)
+
+static void exec_task(struct pci_doe_task *task)
{
- struct pci_doe_task *task = container_of(work, struct pci_doe_task,
- work);
struct pci_doe_mb *doe_mb = task->doe_mb;
struct pci_dev *pdev = doe_mb->pdev;
int offset = doe_mb->cap_offset;
@@ -295,18 +290,12 @@ static void doe_statemachine_work(struct work_struct *work)
signal_task_complete(task, rc);
}

-static void pci_doe_task_complete(struct pci_doe_task *task)
-{
- complete(task->private);
-}
-
static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
u8 *protocol)
{
u32 request_pl = FIELD_PREP(PCI_DOE_DATA_OBJECT_DISC_REQ_3_INDEX,
*index);
u32 response_pl;
- DECLARE_COMPLETION_ONSTACK(c);
struct pci_doe_task task = {
.prot.vid = PCI_VENDOR_ID_PCI_SIG,
.prot.type = PCI_DOE_PROTOCOL_DISCOVERY,
@@ -314,17 +303,13 @@ static int pci_doe_discovery(struct pci_doe_mb *doe_mb, u8 *index, u16 *vid,
.request_pl_sz = sizeof(request_pl),
.response_pl = &response_pl,
.response_pl_sz = sizeof(response_pl),
- .complete = pci_doe_task_complete,
- .private = &c,
};
int rc;

- rc = pci_doe_submit_task(doe_mb, &task);
+ rc = pci_doe_submit_task_wait(doe_mb, &task);
if (rc < 0)
return rc;

- wait_for_completion(&c);
-
if (task.rv != sizeof(response_pl))
return -EIO;

@@ -376,13 +361,6 @@ static void pci_doe_xa_destroy(void *mb)
xa_destroy(&doe_mb->prots);
}

-static void pci_doe_destroy_workqueue(void *mb)
-{
- struct pci_doe_mb *doe_mb = mb;
-
- destroy_workqueue(doe_mb->work_queue);
-}
-
static void pci_doe_flush_mb(void *mb)
{
struct pci_doe_mb *doe_mb = mb;
@@ -393,9 +371,6 @@ static void pci_doe_flush_mb(void *mb)
/* Cancel an in progress work item, if necessary */
set_bit(PCI_DOE_FLAG_CANCEL, &doe_mb->flags);
wake_up(&doe_mb->wq);
-
- /* Flush all work items */
- flush_workqueue(doe_mb->work_queue);
}

/**
@@ -429,19 +404,6 @@ struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset)
if (rc)
return ERR_PTR(rc);

- doe_mb->work_queue = alloc_ordered_workqueue("%s %s DOE [%x]", 0,
- dev_driver_string(&pdev->dev),
- pci_name(pdev),
- doe_mb->cap_offset);
- if (!doe_mb->work_queue) {
- pci_err(pdev, "[%x] failed to allocate work queue\n",
- doe_mb->cap_offset);
- return ERR_PTR(-ENOMEM);
- }
- rc = devm_add_action_or_reset(dev, pci_doe_destroy_workqueue, doe_mb);
- if (rc)
- return ERR_PTR(rc);
-
/* Reset the mailbox by issuing an abort */
rc = pci_doe_abort(doe_mb);
if (rc) {
@@ -496,23 +458,20 @@ bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type)
EXPORT_SYMBOL_GPL(pci_doe_supports_prot);

/**
- * pci_doe_submit_task() - Submit a task to be processed by the state machine
+ * pci_doe_submit_task_wait() - Submit and execute a task
*
* @doe_mb: DOE mailbox capability to submit to
- * @task: task to be queued
+ * @task: task to be run
*
- * Submit a DOE task (request/response) to the DOE mailbox to be processed.
- * Returns upon queueing the task object. If the queue is full this function
- * will sleep until there is room in the queue.
- *
- * task->complete will be called when the state machine is done processing this
- * task.
+ * Submit and run DOE task (request/response) to the DOE mailbox to be
+ * processed.
*
* Excess data will be discarded.
*
- * RETURNS: 0 when task has been successfully queued, -ERRNO on error
+ * RETURNS: 0 when task was executed, the @task->rv holds the status
+ * result of the executed opertion, -ERRNO on failure to submit.
*/
-int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
+int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
{
if (!pci_doe_supports_prot(doe_mb, task->prot.vid, task->prot.type))
return -EINVAL;
@@ -529,8 +488,8 @@ int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task)
return -EIO;

task->doe_mb = doe_mb;
- INIT_WORK(&task->work, doe_statemachine_work);
- queue_work(doe_mb->work_queue, &task->work);
+ exec_task(task);
+
return 0;
}
-EXPORT_SYMBOL_GPL(pci_doe_submit_task);
+EXPORT_SYMBOL_GPL(pci_doe_submit_task_wait);
diff --git a/include/linux/pci-doe.h b/include/linux/pci-doe.h
index ed9b4df792b8..c94122a66221 100644
--- a/include/linux/pci-doe.h
+++ b/include/linux/pci-doe.h
@@ -30,8 +30,6 @@ struct pci_doe_mb;
* @response_pl_sz: Size of the response payload (bytes)
* @rv: Return value. Length of received response or error (bytes)
* @complete: Called when task is complete
- * @private: Private data for the consumer
- * @work: Used internally by the mailbox
* @doe_mb: Used internally by the mailbox
*
* The payload sizes and rv are specified in bytes with the following
@@ -50,11 +48,6 @@ struct pci_doe_task {
u32 *response_pl;
size_t response_pl_sz;
int rv;
- void (*complete)(struct pci_doe_task *task);
- void *private;
-
- /* No need for the user to initialize these fields */
- struct work_struct work;
struct pci_doe_mb *doe_mb;
};

@@ -72,6 +65,5 @@ struct pci_doe_task {

struct pci_doe_mb *pcim_doe_create_mb(struct pci_dev *pdev, u16 cap_offset);
bool pci_doe_supports_prot(struct pci_doe_mb *doe_mb, u16 vid, u8 type);
-int pci_doe_submit_task(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
-
+int pci_doe_submit_task_wait(struct pci_doe_mb *doe_mb, struct pci_doe_task *task);
#endif


> > OTOH using an on-stack structure for INIT_WORK seems rather strange.
> > Since the kernel thread must sleep waiting for the 'work' to complete
> > why not just perform the required code there.
>
> It is not strange if some task submitters want to wait while others do not. It
> was suggested that all submit task operations be async and the callers who
> wanted to be synchronous would wait like this.
>
> As Dan said there is a difference between submit_bio() and submit_bio_wait().
>
> We have simply left the wait part up to the users who all wait right now.

Yeah, my bad for jumping ahead to worry about async when it is not yet
needed.