Skip to content

Commit

Permalink
Switch to simple built-in FileFetcher on import (#3881)
Browse files Browse the repository at this point in the history
  • Loading branch information
dafeder authored Dec 14, 2022
1 parent cfcd55c commit edd187c
Show file tree
Hide file tree
Showing 20 changed files with 331 additions and 150 deletions.
1 change: 0 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
"fmizzell/maquina": "^1.1.0",
"getdkan/contracts": "^1.0.0",
"getdkan/csv-parser": "^1.2.3",
"getdkan/file-fetcher" : "^4.1.0",
"getdkan/harvest": "^1.0.0",
"getdkan/json-schema-provider": "^0.1.2",
"getdkan/locker": "^1.1.0",
Expand Down
5 changes: 0 additions & 5 deletions modules/common/common.services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,6 @@ services:
factory: entity_type.manager:getStorage
arguments: ['node']

dkan.common.file_fetcher:
class: \Drupal\common\FileFetcher\Factory
arguments:
- '@dkan.common.job_store'

dkan.common.dataset_info:
class: \Drupal\common\DatasetInfo
calls:
Expand Down
56 changes: 0 additions & 56 deletions modules/common/src/FileFetcher/Factory.php

This file was deleted.

4 changes: 2 additions & 2 deletions modules/common/tests/src/Traits/CleanUp.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Drupal\Tests\common\Traits;

use Drupal\node\Entity\Node;
use FileFetcher\FileFetcher;
use Drupal\datastore\Plugin\QueueWorker\FileFetcherJob;

/**
*
Expand Down Expand Up @@ -50,7 +50,7 @@ private function removeAllFileFetchingJobs() {
$jobStoreFactory = \Drupal::service('dkan.common.job_store');

/** @var \Drupal\common\Storage\JobStore $jobStore */
$jobStore = $jobStoreFactory->getInstance(FileFetcher::class);
$jobStore = $jobStoreFactory->getInstance(FileFetcherJob::class);
foreach ($jobStore->retrieveAll() as $id) {
$jobStore->remove($id);
}
Expand Down
18 changes: 9 additions & 9 deletions modules/common/tests/src/Unit/Storage/JobStoreTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
use Drupal\Core\Database\Schema;
use Drupal\Core\Database\StatementWrapper;
use Drupal\common\Storage\JobStore;
use Drupal\datastore\Plugin\QueueWorker\FileFetcherJob;

use Contracts\Mock\Storage\Memory;
use FileFetcher\FileFetcher;
use MockChain\Chain;
use MockChain\Sequence;
use PHPUnit\Framework\TestCase;
Expand All @@ -30,7 +30,7 @@ public function testConstruction() {
->add(Connection::class, "schema", Schema::class)
->add(Schema::class, "tableExists", FALSE);

$jobStore = new JobStore(FileFetcher::class, $chain->getMock());
$jobStore = new JobStore(FileFetcherJob::class, $chain->getMock());
$this->assertTrue(is_object($jobStore));
}

Expand Down Expand Up @@ -59,8 +59,8 @@ public function testRetrieve() {
->add(Connection::class, 'query', StatementWrapper::class)
->add(StatementWrapper::class, 'fetchAll', $fieldInfo);

$jobStore = new JobStore(FileFetcher::class, $chain->getMock());
$this->assertEquals($job_data, $jobStore->retrieve("1", FileFetcher::class));
$jobStore = new JobStore(FileFetcherJob::class, $chain->getMock());
$this->assertEquals($job_data, $jobStore->retrieve("1", FileFetcherJob::class));
}

/**
Expand Down Expand Up @@ -90,7 +90,7 @@ public function testRetrieveAll() {
->add(Connection::class, 'query', StatementWrapper::class)
->add(StatementWrapper::class, 'fetchAll', $sequence);

$jobStore = new JobStore(FileFetcher::class, $chain->getMock());
$jobStore = new JobStore(FileFetcherJob::class, $chain->getMock());
$this->assertTrue(is_array($jobStore->retrieveAll()));
}

Expand Down Expand Up @@ -127,7 +127,7 @@ public function testStore() {
->add(StatementWrapper::class, 'fetchAll', $fieldInfo)
->getMock();

$jobStore = new JobStore(FileFetcher::class, $connection);
$jobStore = new JobStore(FileFetcherJob::class, $connection);

$this->assertEquals("1", $jobStore->store(json_encode($jobObject), "1"));
}
Expand All @@ -151,16 +151,16 @@ public function testRemove() {
->add(StatementWrapper::class, 'fetchAll', $fieldInfo)
->getMock();

$jobStore = new JobStore(FileFetcher::class, $connection);
$jobStore = new JobStore(FileFetcherJob::class, $connection);

$this->assertEquals("", $jobStore->remove("1", FileFetcher::class));
$this->assertEquals("", $jobStore->remove("1", FileFetcherJob::class));
}

/**
* Private.
*/
private function getFileFetcher() {
return FileFetcher::get("1", new Memory(), ["filePath" => "file://" . __DIR__ . "/../../data/countries.csv"]);
return FileFetcherJob::get("1", new Memory(), ["filePath" => "file://" . __DIR__ . "/../../data/countries.csv"]);
}

}
1 change: 0 additions & 1 deletion modules/datastore/datastore.services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ services:
class: \Drupal\datastore\Service\ResourceLocalizer
arguments:
- '@dkan.metastore.resource_mapper'
- '@dkan.common.file_fetcher'
- '@dkan.common.drupal_files'
- '@dkan.common.job_store'

Expand Down
165 changes: 165 additions & 0 deletions modules/datastore/src/Plugin/QueueWorker/FileFetcherJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
<?php

namespace Drupal\datastore\Plugin\QueueWorker;

use GuzzleHttp\Client;
use Procrastinator\Job\AbstractPersistentJob;
use Procrastinator\Result;

/**
* These can be utilized to make a local copy of a remote file aka fetch a file.
*/
class FileFetcherJob extends AbstractPersistentJob {

/**
* Constructor.
*/
public function __construct(string $identifier, $storage, array $config = NULL) {
parent::__construct($identifier, $storage, $config);

if (!isset($config['filePath'])) {
throw new \Exception("Constructor missing expected config filePath.");
}

$state = [
'source' => $config['filePath'],
'total_bytes' => 0,
'total_bytes_copied' => 0,
'temporary' => FALSE,
'temporary_directory' => $config['temporaryDirectory'] ?? '/tmp',
];

$this->getResult()->setData(json_encode($state));
}

/**
* {@inheritdoc}
*/
protected function runIt() {
$state = $this->setupState($this->getState());
$this->getResult()->setData(json_encode($state));
$info = $this->copy($this->getState(), $this->getResult(), $this->getTimeLimit());
$this->setState($info['state']);
return $info['result'];
}

/**
* Set up the job state.
*
* @param array $state
* Incoming state array.
*
* @return array
* Modified state array.
*/
protected function setupState(array $state): array {
$state['total_bytes'] = PHP_INT_MAX;
$state['temporary'] = TRUE;
$state['destination'] = $this->getTemporaryFilePath($state);

return $state;
}

/**
* Get temporary file path, depending on flag keep_original_filename value.
*
* @param array $state
* State.
*
* @return string
* Temporary file path.
*/
private function getTemporaryFilePath(array $state): string {
$file_name = basename($state['source']);
return "{$state['temporary_directory']}/{$file_name}";
}

/**
* Copy the file to local storage.
*
* @param array $state
* State array.
* @param \Procrastinator\Result $result
* Job result object.
*
* @return array
* Array with two elements: state and result.
*/
public function copy(array $state, Result $result): array {
if (stream_is_local($state['source'])) {
return $this->copyLocal($state, $result);
}
else {
return $this->copyRemote($state, $result);
}
}

/**
* Copy local file to proper local storage.
*
* @param array $state
* State array.
* @param \Procrastinator\Result $result
* Job result object.
*
* @return array
* Array with two elements: state and result.
*/
protected function copyLocal(array $state, Result $result): array {
$this->ensureCreatingForWriting($state['destination']);
if (copy($state['source'], $state['destination'])) {
$result->setStatus(Result::DONE);
}
else {
throw new \Exception("File copy failed.");
}
$state['total_bytes_copied'] = $state['total_bytes'] = filesize($state['destination']);
return ['state' => $state, 'result' => $result];
}

/**
* Copy remote file to local storage.
*
* @param array $state
* State array.
* @param \Procrastinator\Result $result
* Job result object.
*
* @return array
* Array with two elements: state and result.
*/
protected function copyRemote(array $state, Result $result): array {
$client = new Client();
try {
$fout = $this->ensureCreatingForWriting($state['destination']);
$client->get($state['source'], ['sink' => $fout]);
$result->setStatus(Result::DONE);
}
catch (\Exception $e) {
$result->setStatus(Result::ERROR);
$result->setError($e->getMessage());
}

$state['total_bytes_copied'] = $state['total_bytes'] = filesize($state['destination']);
return ['state' => $state, 'result' => $result];
}

/**
* Ensure the destination file can be created.
*
* @param string $to
* The destination filename.
*
* @return false|resource
* File resource.
*/
private function ensureCreatingForWriting(string $to) {
// Delete destination first to avoid appending if existing.
if (file_exists($to)) {
unlink($to);
}
$fout = fopen($to, "w");
return $fout;
}

}
2 changes: 1 addition & 1 deletion modules/datastore/src/Plugin/QueueWorker/ImportJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ protected function setStorageSchema(array $header) {
* Verify headers are unique.
*
* @param array $header
* List of strings
* List of strings.
*
* @throws \Exception
*/
Expand Down
11 changes: 9 additions & 2 deletions modules/datastore/src/Service.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Drupal\datastore;

use Drupal\common\DataResource;
use Drupal\datastore\Plugin\QueueWorker\FileFetcherJob;
use Drupal\common\Storage\JobStoreFactory;
use Procrastinator\Result;
use Symfony\Component\DependencyInjection\ContainerInterface;
Expand All @@ -12,7 +13,6 @@
use Drupal\datastore\Service\ResourceLocalizer;
use Drupal\datastore\Service\Factory\ImportFactoryInterface;
use Drupal\datastore\Service\Info\ImportInfoList;
use FileFetcher\FileFetcher;

/**
* Main services for the datastore.
Expand All @@ -33,6 +33,13 @@ class Service implements ContainerInjectionInterface {
*/
private $importServiceFactory;

/**
* Import info list service.
*
* @var \Drupal\datastore\Service\Info\ImportInfoList
*/
private ImportInfoList $importInfoList;

/**
* Drupal queue.
*
Expand Down Expand Up @@ -204,7 +211,7 @@ public function drop(string $identifier, ?string $version = NULL, bool $local_re
if ($local_resource) {
$this->resourceLocalizer->remove($identifier, $version);
$this->jobStoreFactory
->getInstance(FileFetcher::class)
->getInstance(FileFetcherJob::class)
->remove(substr(str_replace('__', '_', $resource_id), 0, -11));
}
}
Expand Down
Loading

0 comments on commit edd187c

Please sign in to comment.