diff --git a/src/Service/QueueManager.php b/src/Service/QueueManager.php index 6d11660..5484fdd 100644 --- a/src/Service/QueueManager.php +++ b/src/Service/QueueManager.php @@ -219,29 +219,22 @@ private function processBatch(): \Generator { $this->logger->debug('Processing batch'); $result = yield $this->elasticSearch->bulkIndexJobs($this->batch, $this->flowConfig->getTube()); - $successfullyIndexedJobs = $this->batch; - $notIndexedJobs = []; if ($result['errors'] === true) { - $successfullyIndexedJobs = array_filter($this->batch, function ($job) use ($result) { - return $result['items'][$job->getUuid()]['index']['status'] === 201; - }); - - $notIndexedJobs = array_filter($this->batch, function ($job) use ($result) { - return $result['items'][$job->getUuid()]['index']['status'] !== 201; - }); - } - - foreach ($notIndexedJobs as $singleJob) { - $this->logger->error( - sprintf( - 'Job with UUID "%s" could not be indexed in ElasticSearch', - $singleJob->getUuid() - ) - ); + foreach ($result['items'] as $item) { + // TODO: Handle missing keys in result array + if (!$this->isSuccessfulStatusCode($item['index']['status'])) { + $uuid = $item['index']['_id']; + unset($this->batch[$uuid]); + $this->logger->error( + 'Job could not be indexed in ElasticSearch', + ['bulk_index_response_item' => $item] + ); + } + } } - foreach ($successfullyIndexedJobs as $singleJob) { + foreach ($this->batch as $singleJob) { yield $this->beanstalkClient->put( $singleJob->getUuid(), $singleJob->getTimeout(), @@ -275,4 +268,9 @@ private function getJobBeanstalkId(JobInterface $job): int throw new \RuntimeException("Unknown Beanstalk id for job {$uuid}"); } + + public function isSuccessfulStatusCode(int $statusCode): bool + { + return $statusCode >= 200 && $statusCode < 300; + } } diff --git a/tests/DummyFilesystemRepeatProducer.php b/tests/DummyFilesystemRepeatProducer.php index a43f631..2672f3d 100644 --- a/tests/DummyFilesystemRepeatProducer.php +++ b/tests/DummyFilesystemRepeatProducer.php @@ -65,7 +65,12 @@ public function produce($data = null): Iterator continue; } yield $this->longRunningOperation(); - yield $emit(new Job(['file' => $file, 'data' => (yield File\read($file))])); + $fileContent = yield File\read($file); + $fileContentAsArray = json_decode($fileContent, true); + $payloadData = is_array($fileContentAsArray) ? + $fileContentAsArray : + ['file' => $file, 'data' => $fileContent]; + yield $emit(new Job($payloadData)); yield \Amp\File\deleteFile($file); } }); diff --git a/tests/Integration/ElasticSearchIndexingTest.php b/tests/Integration/ElasticSearchIndexingTest.php index 1b6df61..21ebded 100644 --- a/tests/Integration/ElasticSearchIndexingTest.php +++ b/tests/Integration/ElasticSearchIndexingTest.php @@ -14,13 +14,10 @@ use Webgriffe\Esb\KernelTestCase; use Webgriffe\Esb\Model\ErroredJobEvent; use Webgriffe\Esb\Model\Job; -use Webgriffe\Esb\Model\JobEventInterface; use Webgriffe\Esb\Model\ProducedJobEvent; use Webgriffe\Esb\Model\ReservedJobEvent; use Webgriffe\Esb\Model\WorkedJobEvent; use Webgriffe\Esb\TestUtils; -use Webgriffe\Esb\Unit\Model\DummyJobEvent; -use function Amp\Http\formatDateHeader; class ElasticSearchIndexingTest extends KernelTestCase { @@ -199,16 +196,9 @@ public function itLogsAndSkipsJobsThatCouldNotBeIndexedOntoElasticSearchWithAllE Loop::delay( 200, function () use ($producerDir) { - touch($producerDir . DIRECTORY_SEPARATOR . 'job1'); - // TODO: It needs to become a document with more than 1000 fields - $veryLargeDocument = 'TODO'; + $veryLargeDocument = json_encode(array_fill_keys(range(1, 1001), 'value')); file_put_contents($producerDir . DIRECTORY_SEPARATOR . 'job1', $veryLargeDocument); - Loop::delay( - 200, - function () use ($producerDir) { - touch($producerDir . DIRECTORY_SEPARATOR . 'job2'); - } - ); + touch($producerDir . DIRECTORY_SEPARATOR . 'job2'); } ); $this->stopWhen(function () { @@ -224,8 +214,8 @@ function ($log) { Promise\wait($this->esClient->refresh()); $search = Promise\wait($this->esClient->uriSearchOneIndex(self::FLOW_CODE, '')); - $this->assertCount(1, $search['hits']['hits']); // TODO: Make it green - // TODO: Add assertions on logs + $this->assertCount(1, $search['hits']['hits']); + $this->assertTrue($this->logHandler()->hasErrorThatContains('Job could not be indexed in ElasticSearch')); } private function assertForEachJob(callable $callable, array $jobsData)