Skip to content

Commit

Permalink
feat: expose ds uploader and upload tasks (#142)
Browse files Browse the repository at this point in the history
Signed-off-by: Michele Dolfi <[email protected]>
Signed-off-by: Panos Vagenas <[email protected]>
Co-authored-by: Panos Vagenas <[email protected]>
  • Loading branch information
dolfim-ibm and vagenas authored Oct 23, 2023
1 parent b47fbe1 commit a00fc03
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 33 deletions.
3 changes: 3 additions & 0 deletions deepsearch/cps/client/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
CpsApiTasks,
DSApiDocuments,
)
from deepsearch.cps.client.components.uploader import DSApiUploader


class CpsApiClient:
Expand Down Expand Up @@ -101,6 +102,7 @@ class CpsApi:
elastic: CpsApiElastic
data_indices: CpsApiDataIndices
documents: DSApiDocuments
uploader: DSApiUploader

def __init__(self, client: CpsApiClient) -> None:
self.client = client
Expand All @@ -115,6 +117,7 @@ def _create_members(self):
self.elastic = CpsApiElastic(self)
self.data_indices = CpsApiDataIndices(self)
self.documents = DSApiDocuments(self)
self.uploader = DSApiUploader(self)

def refresh_token(self, admin: bool = False):
"""Refresh access token
Expand Down
46 changes: 43 additions & 3 deletions deepsearch/cps/client/components/data_indices.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from urllib.parse import urlparse

import requests
from pydantic import BaseModel
Expand All @@ -13,6 +14,7 @@
from deepsearch.cps.apis.public.models.attachment_upload_data import (
AttachmentUploadData,
)
from deepsearch.cps.apis.public.models.task import Task
from deepsearch.cps.apis.public.models.token_response import TokenResponse
from deepsearch.cps.client.components.api_object import ApiConnectedObject

Expand Down Expand Up @@ -109,13 +111,51 @@ def upload_file(
coords: ElasticProjectDataCollectionSource,
body: Dict[str, Any],
) -> str:
"""
Deprecated. Use upload_and_convert() instead.
"""
return self.upload_and_convert(coords, body).task_id

def upload_and_convert(
self,
coords: ElasticProjectDataCollectionSource,
body: Dict[str, Any],
) -> Task:
"""
Call api for converting and uploading file to a project's data index.
"""
task_id = self.sw_api.ccs_convert_upload_file_project_data_index(
task: Task = self.sw_api.ccs_convert_upload_file_project_data_index(
proj_key=coords.proj_key, index_key=coords.index_key, body=body
).task_id
return task_id
)
return task

def upload(
self,
coords: ElasticProjectDataCollectionSource,
source: Union[Path, str],
) -> Task:
"""
Call api for uploading files to a project's data index.
The source files can be provided by local path or URL via `source`.
"""

parsed = urlparse(str(source))
if parsed.scheme and parsed.netloc: # is url
source_url = source
else:
uploaded_file = self.api.uploader.upload_file(
project=coords.proj_key, source_path=source
)
source_url = uploaded_file.internal_url

task = self.sw_api.upload_project_data_index_file(
proj_key=coords.proj_key,
index_key=coords.index_key,
params={
"file_url": source_url,
},
)
return task


class ElasticProjectDataCollectionSource(BaseModel):
Expand Down
67 changes: 67 additions & 0 deletions deepsearch/cps/client/components/uploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING, Union

import requests
from pydantic import BaseModel

from deepsearch.cps.apis import public as sw_client
from deepsearch.cps.apis.public.models.temporary_upload_file_result import (
TemporaryUploadFileResult,
)
from deepsearch.cps.client.components.projects import Project

if TYPE_CHECKING:
from deepsearch.cps.client import CpsApi


class UploadedFile(BaseModel):
download_url: str
internal_url: str


class DSApiUploader:
def __init__(self, api: CpsApi) -> None:
self.api = api
self.upload_api = sw_client.UploadsApi(self.api.client.swagger_client)

def upload_file(
self,
project: Union[Project, str],
source_path: Union[Path, str],
tls_verify: bool = True,
) -> UploadedFile:
"""
Upload a file to the scratch storage of Deep Search.
The returned object provides the `download_url` and `internal_url` which can be
use for retrieving the file or submitting to other Deep Search APIs, respectively.
"""

proj_key = project.key if isinstance(project, Project) else project
source_path = Path(source_path)

# Register file
source_basename = source_path.name
scratch_specs: TemporaryUploadFileResult = (
self.upload_api.create_project_scratch_file(
proj_key=proj_key, filename=source_basename
)
)

# Upload file
upload_specs = scratch_specs.upload
with source_path.open("rb") as f:
files = {"file": (source_basename, f)}
request_upload = requests.post(
url=upload_specs.url,
data=upload_specs.fields,
files=files,
verify=tls_verify,
)
request_upload.raise_for_status()

return UploadedFile(
download_url=scratch_specs.download.url,
internal_url=scratch_specs.download_private.url,
)
6 changes: 3 additions & 3 deletions deepsearch/cps/data_indices/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,10 @@ def process_local_file(
# loop over all files
for single_zip in files_zip:
# upload file
private_download_url = convert.upload_single_file(
api=api, cps_proj_key=coords.proj_key, source_path=Path(single_zip)
uploaded_file = api.uploader.upload_file(
project=coords.proj_key, source_path=Path(single_zip)
)
file_url_array = [private_download_url]
file_url_array = [uploaded_file.internal_url]
payload: Dict[str, Any] = {
"file_url": file_url_array,
}
Expand Down
30 changes: 3 additions & 27 deletions deepsearch/documents/core/convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,16 @@ def send_files_for_conversion(
# loop over all files
for single_zip in files_zip:
# upload file
private_download_url = upload_single_file(
api=api, cps_proj_key=cps_proj_key, source_path=Path(single_zip)
uploaded_file = api.uploader.upload_file(
project=cps_proj_key, source_path=Path(single_zip)
)
# submit url for conversion
task_id = submit_conversion_payload(
api=api,
cps_proj_key=cps_proj_key,
source={
"type": "url",
"download_url": private_download_url,
"download_url": uploaded_file.internal_url,
},
target=target,
conversion_settings=conversion_settings,
Expand Down Expand Up @@ -296,30 +296,6 @@ def download_converted_documents(
return


def upload_single_file(api: CpsApi, cps_proj_key: str, source_path: Path) -> str:
"""
Uploads a single file. Return internal download url.
"""
filename = os.path.basename(source_path)
sw_api = sw_client.UploadsApi(api.client.swagger_client)

get_pointer: TemporaryUploadFileResult = sw_api.create_project_scratch_file(
proj_key=cps_proj_key, filename=filename
)
# upload file
upload = get_pointer.upload
private_download_url = get_pointer.download_private.url

with open(source_path, "rb") as f:
files = {"file": (os.path.basename(source_path), f)}
request_upload = requests.post(
url=upload.url, data=upload.fields, files=files, verify=False
)
request_upload.raise_for_status()

return private_download_url


def send_urls_for_conversion(
api: CpsApi,
cps_proj_key: str,
Expand Down

0 comments on commit a00fc03

Please sign in to comment.