Skip to content

Commit

Permalink
Merge commit (#24)
Browse files Browse the repository at this point in the history
* Update RetryCommand.php (#23)

- Fix rod RetryCommand not taking all argument

* Fix wrong check for stopped propagation inside daemonShouldRun method. (#22)

* Fix wrong check for stopped propagation inside daemonShouldRun method.

* Made some test for daemonShouldRun
Made paused property a public isPaused method.

* Fix missing return in cache

* Unit test the return bug.

* Added timeout feature and other minor refactor.
  • Loading branch information
nicholasnet authored Nov 22, 2017
1 parent 8a5165f commit 4578dce
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 18 deletions.
4 changes: 2 additions & 2 deletions src/Command/RetryCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ protected function execute(InputInterface $input, OutputInterface $output)

$ids = $input->getArgument('id');

$failedJobs = ($ids === 'all') ? $this->failed->findAll() : $this->failed->findByIds($ids);
$failedJobs = (in_array('all', $ids)) ? $this->failed->findAll() : $this->failed->findByIds($ids);

// If entity is found.
if (!empty($failedJobs)) {
Expand Down Expand Up @@ -118,4 +118,4 @@ protected function resetAttempts($payload)

return $payload;
}
}
}
10 changes: 10 additions & 0 deletions src/Job/AbstractJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,16 @@ public function timeout()
return ArrayHelper::get($this->payload(), 'timeout');
}

/**
* Get the timestamp indicating when the job should timeout.
*
* @return int|null
*/
public function timeoutAt()
{
return !empty($this->payload()['timeoutAt']) ? $this->payload()['timeoutAt'] : null;
}

/**
* @inheritdoc
*/
Expand Down
7 changes: 7 additions & 0 deletions src/Job/JobsInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public function maxTries();
*/
public function timeout();

/**
* Get the timestamp indicating when the job should timeout.
*
* @return int|null
*/
public function timeoutAt();

/**
* Get the name of the queued job class.
*
Expand Down
47 changes: 47 additions & 0 deletions src/Type/AbstractQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ protected function createStringPayload($job, $data)
$payload['timeout'] = null;
}

$payload['timeoutAt'] = $this->getJobExpiration($job);

return $payload;
}

Expand Down Expand Up @@ -233,4 +235,49 @@ public function setContainer(ContainerInterface $container)

return $this;
}

/**
* Get the expiration timestamp for an object-based queue handler.
*
* @param mixed $job
*
* @return mixed
*/
public function getJobExpiration($job)
{
if (isset($job->timeoutAt)) {

$timeoutAt = $job->timeoutAt;

} elseif (method_exists($job, 'getTimeoutAt')) {

$timeoutAt = $job->getTimeoutAt();

} else {

$timeoutAt = null;
}

if (isset($job->retryUntil)) {

$retryUntil = $job->retryUntil;

} elseif (method_exists($job, 'getRetryUntil')) {

$retryUntil = $job->getRetryUntil();

} else {

$retryUntil = null;
}

if (empty($retryUntil) && empty($timeoutAt)) {

return;
}

$expiration = (!empty($timeoutAt)) ? $timeoutAt : $retryUntil;

return $expiration instanceof \DateTimeInterface ? $expiration->getTimestamp() : $expiration;
}
}
2 changes: 1 addition & 1 deletion src/Util/CacheAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public function __construct($cache)
*/
public function get($key, $default = null)
{
$this->cache->get($key, $default);
return $this->cache->get($key, $default);
}

/**
Expand Down
38 changes: 30 additions & 8 deletions src/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,9 @@ protected function registerTimeoutHandler($job, WorkerOptions $options)
$this->kill(1);
});

$timeout = $this->timeoutForJob($job, $options);
pcntl_alarm($timeout > 0 ? $timeout + $options->sleep : 0);
pcntl_alarm(
max($this->timeoutForJob($job, $options), 0)
);
}
}

Expand All @@ -171,11 +172,11 @@ protected function timeoutForJob($job, WorkerOptions $options)
*
* @return bool
*/
protected function daemonShouldRun(WorkerOptions $options)
public function daemonShouldRun(WorkerOptions $options)
{
return ! (($this->manager->isDownForMaintenance() && ! $options->force) ||
$this->paused ||
$this->until() === false);
$this->isPaused() ||
$this->until() !== false);
}

/**
Expand Down Expand Up @@ -260,11 +261,11 @@ protected function getNextJob($connection, $queue)
} catch (\Exception $e) {

$this->exceptions->report($e);
$this->stopWorkerIfLostConnection($e);

} catch (\Throwable $e) {

$this->exceptions->report($e = new FatalThrowableError($e));

$this->stopWorkerIfLostConnection($e);
}
}
Expand Down Expand Up @@ -417,8 +418,14 @@ protected function handleJobException($connectionName, $job, WorkerOptions $opti
protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $job, $maxTries)
{
$maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
$timeoutAt = $job->timeout();

if ($timeoutAt && (new \DateTimeImmutable)->getTimestamp() <= $timeoutAt) {

return;
}

if ($maxTries === 0 || $job->attempts() <= $maxTries) {
if (! $timeoutAt && ($maxTries === 0 || $job->attempts() <= $maxTries)) {

return;
}
Expand All @@ -440,7 +447,12 @@ protected function markJobAsFailedIfAlreadyExceedsMaxAttempts($connectionName, $
*/
protected function markJobAsFailedIfWillExceedMaxAttempts($connectionName, $job, $maxTries, $e)
{
$maxTries = !is_null($job->maxTries()) ? $job->maxTries() : $maxTries;
$maxTries = ! is_null($job->maxTries()) ? $job->maxTries() : $maxTries;

if ($job->timeoutAt() && $job->timeoutAt() <= (new \DateTimeImmutable)->getTimestamp()) {

$this->failJob($connectionName, $job, $e);
}

if ($maxTries > 0 && $job->attempts() >= $maxTries) {

Expand Down Expand Up @@ -606,6 +618,8 @@ public function stop($status = 0)
*/
public function kill($status = 0)
{
$this->events->dispatch(EventsList::WORKER_STOPPING, new Event\WorkerStopping);

if (extension_loaded('posix')) {

posix_kill(getmypid(), SIGKILL);
Expand Down Expand Up @@ -662,6 +676,14 @@ protected function until()
return $this->events->dispatch(EventsList::LOOPING, new Event\Looping)->isPropagationStopped();
}

/**
* @return bool
*/
public function isPaused()
{
return $this->paused;
}

/**
* Raise the failed queue job event.
*
Expand Down
4 changes: 2 additions & 2 deletions tests/Type/BeanstalkdQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public function testPushProperlyPushesJobOntoBeanstalkd()

$pheanstalk->shouldReceive('useTube')->once()->with('stack')->andReturn($pheanstalk);
$pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk);
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => null, 'timeout' => null]), 1024, 0, 60);
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => null, 'timeout' => null, 'timeoutAt' => null]), 1024, 0, 60);
$queue->push('foo', ['data'], 'stack');
$queue->push('foo', ['data']);
}
Expand All @@ -43,7 +43,7 @@ public function testDelayedPushProperlyPushesJobOntoBeanstalkd()
$pheanstalk = $queue->getPheanstalk();
$pheanstalk->shouldReceive('useTube')->once()->with('stack')->andReturn($pheanstalk);
$pheanstalk->shouldReceive('useTube')->once()->with('default')->andReturn($pheanstalk);
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => 1, 'timeout' => 1]), \Pheanstalk\Pheanstalk::DEFAULT_PRIORITY, 5, \Pheanstalk\Pheanstalk::DEFAULT_TTR);
$pheanstalk->shouldReceive('put')->twice()->with(json_encode(['job' => 'foo', 'data' => ['data'], 'maxTries' => 1, 'timeout' => 1, 'timeoutAt' => null]), \Pheanstalk\Pheanstalk::DEFAULT_PRIORITY, 5, \Pheanstalk\Pheanstalk::DEFAULT_TTR);
$queue->later(5, 'foo', ['data'], 'stack');
$queue->later(5, 'foo', ['data']);
}
Expand Down
6 changes: 3 additions & 3 deletions tests/Type/RedisQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public function testPushProperlyPushesJobOntoRedis()
$queue->setContainer($this->getContainer());

$queue->expects($this->once())->method('getRandomId')->will($this->returnValue('foo'));
$redis->shouldReceive('rpush')->once()->with('queues:default', '{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"id":"foo","attempts":0}');
$redis->shouldReceive('rpush')->once()->with('queues:default', '{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"timeoutAt":null,"id":"foo","attempts":0}');
$id = $queue->push('foo', ['data']);
$this->assertEquals('foo', $id);
}
Expand All @@ -35,7 +35,7 @@ public function testDelayedPushProperlyPushesJobOntoRedis()
$redis->shouldReceive('zadd')->once()->with(
'queues:default:delayed',
2,
'{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"id":"foo","attempts":0}'
'{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"timeoutAt":null,"id":"foo","attempts":0}'
);
$id = $queue->later(1, 'foo', ['data']);
$this->assertEquals('foo', $id);
Expand All @@ -51,7 +51,7 @@ public function testDelayedPushWithDateTimeProperlyPushesJobOntoRedis()
$redis->shouldReceive('zadd')->once()->with(
'queues:default:delayed',
2,
'{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"id":"foo","attempts":0}'
'{"job":"foo","data":["data"],"maxTries":null,"timeout":null,"timeoutAt":null,"id":"foo","attempts":0}'
);
$queue->later($date, 'foo', ['data']);
}
Expand Down
31 changes: 31 additions & 0 deletions tests/Util/CacheAdapterTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<?php

namespace IdeasBucket\QueueBundle\Util;

use Mockery as m;
use PHPUnit\Framework\TestCase;
use Psr\SimpleCache\CacheInterface;


class CacheAdapterTest extends TestCase
{
public function tearDown()
{
m::close();
}

public function testGetter()
{
$cacheMock = m::mock(CacheInterface::class)
->shouldReceive('get')
->once()
->andReturn('test')
->getMock();

$class = new CacheAdapter($cacheMock);


$this->assertEquals('test', $class->get('test'));
}

}
Loading

0 comments on commit 4578dce

Please sign in to comment.