Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wqueue remove enter_critical_section #14623

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions arch/arm/src/sama5/sam_hsmci.c
Original file line number Diff line number Diff line change
Expand Up @@ -3213,8 +3213,6 @@ static void sam_callback(void *arg)
ret = work_cancel(LPWORK, &priv->cbwork);
if (ret < 0)
{
/* NOTE: Currently, work_cancel only returns success */

lcderr("ERROR: Failed to cancel work: %d\n", ret);
}

Expand All @@ -3223,8 +3221,6 @@ static void sam_callback(void *arg)
priv->cbarg, 0);
if (ret < 0)
{
/* NOTE: Currently, work_queue only returns success */

lcderr("ERROR: Failed to schedule work: %d\n", ret);
}
}
Expand Down
4 changes: 0 additions & 4 deletions arch/arm/src/samv7/sam_hsmci.c
Original file line number Diff line number Diff line change
Expand Up @@ -3353,8 +3353,6 @@ static void sam_callback(void *arg)
ret = work_cancel(LPWORK, &priv->cbwork);
if (ret < 0)
{
/* NOTE: Currently, work_cancel only returns success */

mcerr("ERROR: Failed to cancel work: %d\n", ret);
}

Expand All @@ -3363,8 +3361,6 @@ static void sam_callback(void *arg)
priv->cbarg, 0);
if (ret < 0)
{
/* NOTE: Currently, work_queue only returns success */

mcerr("ERROR: Failed to schedule work: %d\n", ret);
}
}
Expand Down
6 changes: 0 additions & 6 deletions fs/mount/fs_automount.c
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,6 @@ static void automount_timeout(wdparm_t arg)
ret = work_queue(LPWORK, &priv->work, automount_worker, priv, 0);
if (ret < 0)
{
/* NOTE: Currently, work_queue only returns success */

ferr("ERROR: Failed to schedule work: %d\n", ret);
}
}
Expand Down Expand Up @@ -771,8 +769,6 @@ static int automount_interrupt(FAR const struct automount_lower_s *lower,
priv->lower->ddelay);
if (ret < 0)
{
/* NOTE: Currently, work_queue only returns success */

ferr("ERROR: Failed to schedule work: %d\n", ret);
}
else
Expand Down Expand Up @@ -840,8 +836,6 @@ FAR void *automount_initialize(FAR const struct automount_lower_s *lower)
priv->lower->ddelay);
if (ret < 0)
{
/* NOTE: Currently, work_queue only returns success */

ferr("ERROR: Failed to schedule work: %d\n", ret);
}

Expand Down
18 changes: 8 additions & 10 deletions sched/wqueue/kwork_cancel.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,23 +58,20 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
* new work is typically added to the work queue from interrupt handlers.
*/

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_wqueue_lock);
if (work->worker != NULL)
{
/* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/

if (WDOG_ISACTIVE(&work->u.timer))
{
wd_cancel(&work->u.timer);
}
else
work->worker = NULL;
wd_cancel(&work->u.timer);
if (dq_inqueue((FAR dq_entry_t *)work, &wqueue->q))
{
dq_rem((FAR dq_entry_t *)work, &wqueue->q);
}

work->worker = NULL;
ret = OK;
}
else if (!up_interrupt_context() && !sched_idletask() && sync)
Expand All @@ -86,14 +83,15 @@ static int work_qcancel(FAR struct kwork_wqueue_s *wqueue, bool sync,
if (wqueue->worker[wndx].work == work &&
wqueue->worker[wndx].pid != nxsched_gettid())
{
wqueue->worker[wndx].wait_count--;
spin_unlock_irqrestore(&g_wqueue_lock, flags);
nxsem_wait_uninterruptible(&wqueue->worker[wndx].wait);
ret = 1;
break;
return 1;
acassis marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

leave_critical_section(flags);
spin_unlock_irqrestore(&g_wqueue_lock, flags);
return ret;
}

Expand Down
45 changes: 25 additions & 20 deletions sched/wqueue/kwork_notifier.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ static dq_queue_t g_notifier_free;

static dq_queue_t g_notifier_pending;

static spinlock_t g_work_notifier_lock;

/****************************************************************************
* Private Functions
****************************************************************************/
Expand Down Expand Up @@ -166,17 +168,21 @@ static void work_notifier_worker(FAR void *arg)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_work_notifier_lock);

/* Remove the notification from the pending list */

dq_rem(&notifier->entry, &g_notifier_pending);
notifier = work_notifier_find(notifier->key);
if (notifier != NULL)
{
dq_rem(&notifier->entry, &g_notifier_pending);

/* Put the notification to the free list */
/* Put the notification to the free list */

dq_addlast(&notifier->entry, &g_notifier_free);
dq_addlast(&notifier->entry, &g_notifier_free);
}

leave_critical_section(flags);
spin_unlock_irqrestore(&g_work_notifier_lock, flags);
}

/****************************************************************************
Expand Down Expand Up @@ -213,14 +219,14 @@ int work_notifier_setup(FAR struct work_notifier_s *info)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_work_notifier_lock);

/* Try to get the entry from the free list */

notifier = (FAR struct work_notifier_entry_s *)
dq_remfirst(&g_notifier_free);

leave_critical_section(flags);
spin_unlock_irqrestore(&g_work_notifier_lock, flags);

if (notifier == NULL)
{
Expand All @@ -245,7 +251,7 @@ int work_notifier_setup(FAR struct work_notifier_s *info)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_work_notifier_lock);

/* Generate a unique key for this notification */

Expand All @@ -262,7 +268,7 @@ int work_notifier_setup(FAR struct work_notifier_s *info)
dq_addlast(&notifier->entry, &g_notifier_pending);
ret = notifier->key;

leave_critical_section(flags);
spin_unlock_irqrestore(&g_work_notifier_lock, flags);
}

return ret;
Expand Down Expand Up @@ -293,7 +299,7 @@ void work_notifier_teardown(int key)

/* Disable interrupts very briefly. */

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_work_notifier_lock);

/* Find the entry matching this key in the g_notifier_pending list. We
* assume that there is only one.
Expand All @@ -304,19 +310,18 @@ void work_notifier_teardown(int key)
{
/* Cancel the work, this may be waiting */

if (work_cancel_sync(notifier->info.qid, &notifier->work) != 1)
{
/* Remove the notification from the pending list */
work_cancel(notifier->info.qid, &notifier->work);

dq_rem(&notifier->entry, &g_notifier_pending);
/* Remove the notification from the pending list */

/* Put the notification to the free list */
dq_rem(&notifier->entry, &g_notifier_pending);

dq_addlast(&notifier->entry, &g_notifier_free);
}
/* Put the notification to the free list */

dq_addlast(&notifier->entry, &g_notifier_free);
}

leave_critical_section(flags);
spin_unlock_irqrestore(&g_work_notifier_lock, flags);
}

/****************************************************************************
Expand Down Expand Up @@ -352,7 +357,7 @@ void work_notifier_signal(enum work_evtype_e evtype,
* the notifications have been sent.
*/

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_work_notifier_lock);
sched_lock();

/* Process the notification at the head of the pending list until the
Expand Down Expand Up @@ -397,7 +402,7 @@ void work_notifier_signal(enum work_evtype_e evtype,
}

sched_unlock();
leave_critical_section(flags);
spin_unlock_irqrestore(&g_work_notifier_lock, flags);
}

#endif /* CONFIG_WQUEUE_NOTIFIER */
36 changes: 24 additions & 12 deletions sched/wqueue/kwork_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,10 @@
#define queue_work(wqueue, work) \
do \
{ \
int sem_count; \
dq_addlast((FAR dq_entry_t *)(work), &(wqueue)->q); \
nxsem_get_value(&(wqueue)->sem, &sem_count); \
if (sem_count < 0) /* There are threads waiting for sem. */ \
if ((wqueue)->wait_count < 0) /* There are threads waiting for sem. */ \
{ \
(wqueue)->wait_count++; \
nxsem_post(&(wqueue)->sem); \
} \
} \
Expand All @@ -68,24 +67,28 @@
static void work_timer_expiry(wdparm_t arg)
{
FAR struct work_s *work = (FAR struct work_s *)arg;
irqstate_t flags = enter_critical_section();
irqstate_t flags = spin_lock_irqsave(&g_wqueue_lock);

queue_work(work->wq, work);
leave_critical_section(flags);
/* We have being canceled */

if (work->worker != NULL)
{
queue_work(work->wq, work);
}

spin_unlock_irqrestore(&g_wqueue_lock, flags);
}

static bool work_is_canceling(FAR struct kworker_s *kworkers, int nthreads,
FAR struct work_s *work)
{
int semcount;
int wndx;

for (wndx = 0; wndx < nthreads; wndx++)
{
if (kworkers[wndx].work == work)
{
nxsem_get_value(&kworkers[wndx].wait, &semcount);
if (semcount < 0)
if (kworkers[wndx].wait_count < 0)
{
return true;
}
Expand Down Expand Up @@ -145,13 +148,22 @@ int work_queue_wq(FAR struct kwork_wqueue_s *wqueue,
* task logic or from interrupt handling logic.
*/

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_wqueue_lock);

/* Remove the entry from the timer and work queue. */

if (work->worker != NULL)
{
work_cancel_wq(wqueue, work);
/* Remove the entry from the work queue and make sure that it is
* marked as available (i.e., the worker field is nullified).
*/

work->worker = NULL;
wd_cancel(&work->u.timer);
if (dq_inqueue((FAR dq_entry_t *)work, &wqueue->q))
{
dq_rem((FAR dq_entry_t *)work, &wqueue->q);
}
}

if (work_is_canceling(wqueue->worker, wqueue->nthreads, work))
Expand All @@ -177,7 +189,7 @@ int work_queue_wq(FAR struct kwork_wqueue_s *wqueue,
}

out:
leave_critical_section(flags);
spin_unlock_irqrestore(&g_wqueue_lock, flags);
return ret;
}

Expand Down
20 changes: 13 additions & 7 deletions sched/wqueue/kwork_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ struct lp_wqueue_s g_lpwork =

#endif /* CONFIG_SCHED_LPWORK */

spinlock_t g_wqueue_lock = SP_UNLOCKED;

/****************************************************************************
* Private Functions
****************************************************************************/
Expand Down Expand Up @@ -138,7 +140,6 @@ static int work_thread(int argc, FAR char *argv[])
worker_t worker;
irqstate_t flags;
FAR void *arg;
int semcount;

/* Get the handle from argv */

Expand All @@ -147,7 +148,7 @@ static int work_thread(int argc, FAR char *argv[])
kworker = (FAR struct kworker_s *)
((uintptr_t)strtoul(argv[2], NULL, 16));

flags = enter_critical_section();
flags = spin_lock_irqsave(&g_wqueue_lock);

/* Loop forever */

Expand Down Expand Up @@ -189,19 +190,19 @@ static int work_thread(int argc, FAR char *argv[])
* performed... we don't have any idea how long this will take!
*/

leave_critical_section(flags);
spin_unlock_irqrestore(&g_wqueue_lock, flags);
CALL_WORKER(worker, arg);
flags = enter_critical_section();
flags = spin_lock_irqsave(&g_wqueue_lock);

/* Mark the thread un-busy */

kworker->work = NULL;

/* Check if someone is waiting, if so, wakeup it */

nxsem_get_value(&kworker->wait, &semcount);
while (semcount++ < 0)
while (kworker->wait_count < 0)
{
kworker->wait_count++;
nxsem_post(&kworker->wait);
}
}
Expand All @@ -211,10 +212,13 @@ static int work_thread(int argc, FAR char *argv[])
* posted.
*/

wqueue->wait_count--;
spin_unlock_irqrestore(&g_wqueue_lock, flags);
nxsem_wait_uninterruptible(&wqueue->sem);
flags = spin_lock_irqsave(&g_wqueue_lock);
}

leave_critical_section(flags);
spin_unlock_irqrestore(&g_wqueue_lock, flags);

nxsem_post(&wqueue->exsem);
return OK;
Expand Down Expand Up @@ -276,6 +280,7 @@ static int work_thread_create(FAR const char *name, int priority,
}

wqueue->worker[wndx].pid = pid;
wqueue->worker[wndx].wait_count = 0;
}

sched_unlock();
Expand Down Expand Up @@ -334,6 +339,7 @@ FAR struct kwork_wqueue_s *work_queue_create(FAR const char *name,
nxsem_init(&wqueue->sem, 0, 0);
nxsem_init(&wqueue->exsem, 0, 0);
wqueue->nthreads = nthreads;
wqueue->wait_count = 0;

/* Create the work queue thread pool */

Expand Down
Loading
Loading