Skip to content

Commit

Permalink
commits history purged
Browse files Browse the repository at this point in the history
  • Loading branch information
yzqzss committed Mar 31, 2024
0 parents commit 0df7971
Show file tree
Hide file tree
Showing 15 changed files with 2,249 additions and 0 deletions.
8 changes: 8 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.vscode/
dist/
__pycache__/
\!\!\!*
core.html
core_html/
.ia_keys
stop
71 changes: 71 additions & 0 deletions ChinaXivXiv/defines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Optional
from bson import ObjectId

END_FILEID = 78000
DEFAULT_HEADERS = {
"User-Agent": "ChinaXiv Archive Mirror Project/0.1.0 (STW; SaveTheWeb; +github.com/saveweb; [email protected]) (qos-rate-limit: 3q/s)",
}
DEBUG = 1

class Status:
TODO = "TODO"
""" 任务刚创建,等待领取 """
PROCESSING = "PROCESSING"
DONE = "DONE"
EMPTY = "EMPTY"
""" 无数据,可能是不存在/被删除(?) """
FAIL = "FAIL"
# FEZZ = "FEZZ"
# """ 特殊: 任务冻结 """

# DOWNLOAD_TODO = "DOWNLOAD_TODO"
DOWNLOAD_PROCESSING = "DOWNLOAD_PROCESSING"
DOWNLOAD_DONE = "DOWNLOAD_DONE"
DOWNLOAD_EMPTY = "DOWNLOAD_EMPTY"
DOWNLOAD_FAIL = "DOWNLOAD_FAIL"

# METADATA_TODO = "METADATA_TODO"
METADATA_PROCESSING = "METADATA_PROCESSING"
METADATA_DONE = "METADATA_DONE"
METADATA_EMPTY = "METADATA_EMPTY"
METADATA_FAIL = "METADATA_FAIL"

# UPLOADTOIA_TODO = "UPLOADTOIA_TODO"
UPLOADTOIA_PROCESSING = "UPLOADTOIA_PROCESSING"
UPLOADTOIA_DONE = "UPLOADTOIA_DONE"
UPLOADTOIA_FAIL = "UPLOADTOIA_FAIL"


@dataclass
class Task:
_id: ObjectId
id: int
status: Status

claim_at: Optional[datetime] = None
update_at: Optional[datetime] = None

# downloading
content_type: Optional[str] = None
content_length: Optional[int] = None
content_disposition: Optional[str] = None
content_disposition_filename: Optional[str] = None


# metadata
metadata: Optional[Dict] = None
"""
- title: Optional[str] = None
- authors: Optional[List[str]] = None
- journal: Optional[str] = None
- pubyear: Optional[int] = None
- version: Optional[int] = None
- csoaid: Optional[str] = None
- copyQuotation: Optional[str] = None
"""

def __post_init__(self):
assert self.status in Status.__dict__.values()
3 changes: 3 additions & 0 deletions ChinaXivXiv/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class EmptyContent(Exception):
"""empty content 404"""
pass
78 changes: 78 additions & 0 deletions ChinaXivXiv/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import asyncio
from datetime import datetime
import os
import time
import motor.motor_asyncio
import httpx

from ChinaXivXiv.defines import DEBUG, DEFAULT_HEADERS, Status
from ChinaXivXiv.mongo_ops import create_fileids_queue_index, find_max_id, init_queue
from ChinaXivXiv.util import arg_parser
from ChinaXivXiv.workers.IA_uploader import IA_upload_worker
from ChinaXivXiv.workers.file_downloader import file_downloader_worker
from ChinaXivXiv.workers.fileid_finder import fileid_finder_worker
from ChinaXivXiv.workers.metadata_scraper import metadata_scraper_worker
from ChinaXivXiv.workers.status_mover import status_mover_worker
from ChinaXivXiv.workers.task_provider import task_provider_worker


async def main():
args = arg_parser()
transport = httpx.AsyncHTTPTransport(retries=3)
h_client = httpx.AsyncClient(timeout=60, transport=transport)
h_client.headers.update(DEFAULT_HEADERS)
m_client = motor.motor_asyncio.AsyncIOMotorClient(args.mongo)

db = m_client["chinaxiv"]
fileids_queue_collection = db["fileids_queue"]

MOVER = 0
if MOVER:
cors = [
status_mover_worker(
c_queue=fileids_queue_collection,
FROM = Status.UPLOADTOIA_FAIL,
TO = Status.METADATA_DONE,
) for _ in range(1 if DEBUG else 50)]
return await asyncio.gather(*cors)

if args.task_provider:
await task_provider_worker(
fileids_queue_collection=fileids_queue_collection,
args=args
)
elif args.fileid_finder:
cors = [
fileid_finder_worker(
c_queue=fileids_queue_collection,
client=h_client
)for _ in range(1 if DEBUG else 2)
]
await asyncio.gather(*cors)

elif args.file_downloader:
return await file_downloader_worker(
c_queue=fileids_queue_collection,
client=h_client,
qos=args.qos
)
elif args.metadata_scraper:
cors = [
metadata_scraper_worker(
c_queue=fileids_queue_collection,
client=h_client
) for _ in range(1 if DEBUG else 3)]
return await asyncio.gather(*cors)
elif args.ia_uploader:
cors = [
IA_upload_worker(
client=h_client,
c_queue=fileids_queue_collection,
args=args
) for _ in range(1 if DEBUG else int(args.qos))]
return await asyncio.gather(*cors)
else:
print("no worker specified")

if __name__ == '__main__':
asyncio.run(main())
89 changes: 89 additions & 0 deletions ChinaXivXiv/mongo_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from datetime import datetime
import time
from typing import Optional
import httpx
import motor.motor_asyncio

from ChinaXivXiv.defines import Status, Task


async def init_queue(queue_col: motor.motor_asyncio.AsyncIOMotorCollection, start_id: int, end_id: int, status: str = Status.TODO):
"""
start_id: 1, end_id: 5
will create id: 1, 2, 3, 4
doc: {"id": int,"status": str}
"""
assert queue_col.name == "fileids_queue"
assert status in Status.__dict__.values()
assert start_id > 0
assert start_id <= end_id
if start_id == end_id:
print(f"start_id == end_id: {start_id}")
return
docs = []
for i in range(start_id, end_id):
docs.append({
"id": i,
"status": status,
})
if len(docs) == 100000:
s_time = time.time()
await queue_col.insert_many(docs, ordered=False)
e_time = time.time()
docs = []
print(f"inserted c_queue={i} | {e_time - s_time}", end="\r")
if docs:
await queue_col.insert_many(docs)
print(f"inserted c_queue={end_id}", end="\r")


async def claim_task(queue: motor.motor_asyncio.AsyncIOMotorCollection,
status_from: str = Status.TODO,
status_to: str=Status.PROCESSING) -> Optional[Task]:
assert status_from in Status.__dict__.values()
assert status_to in Status.__dict__.values()

TASK = await queue.find_one_and_update(
filter={"status": status_from},
update={"$set": {
"status": status_to,
"claim_at": datetime.utcnow(),
"update_at": datetime.utcnow(),
}},
# sort=[("id", -1)],
)
return Task(**TASK) if TASK else None

async def update_task(queue: motor.motor_asyncio.AsyncIOMotorCollection, TASK: Task, status: str,
headers: Optional[httpx.Headers] = None,
metadata: "Task.metadata.__class__" = None):
assert status in Status.__dict__.values()
update = {"$set": {
"status": status,
"update_at": datetime.utcnow(),
}}
if headers:
update["$set"]["content_type"] = headers["Content-Type"]
update["$set"]["content_length"] = int(headers["Content-Length"])
update["$set"]["content_disposition"] = headers["Content-Disposition"]
update["$set"]["content_disposition_filename"] = headers["Content-Disposition"].split('"')[1]
if metadata:
update["$set"]["metadata"] = metadata

await queue.update_one(
filter={"_id": TASK._id},
update=update
)


async def create_fileids_queue_index(collection: motor.motor_asyncio.AsyncIOMotorCollection):
await collection.create_index("status")
await collection.create_index("id", unique=True)

async def find_max_id(collection: motor.motor_asyncio.AsyncIOMotorCollection):
doc = await collection.find_one(sort=[("id", -1)])
if doc:
return doc["id"]
else:
return 0
36 changes: 36 additions & 0 deletions ChinaXivXiv/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import argparse
from dataclasses import dataclass

from ChinaXivXiv.defines import END_FILEID

@dataclass
class Args:
mongo: str = "mongodb://localhost:27017"
""" mongodb://xxx:yy@zzz:1111 """
task_provider: bool = False
""" 定义为任务提供者,全局只能有一个 """
end_fileid: int = END_FILEID
""" 任务队列结束的*大概 id (任务提供者) 精度为 +- qos """
qos: float = 3.0
""" 每秒生成任务数 (任务提供者) """
fileid_finder: bool = False
""" 文件id嗅探 """
file_downloader: bool = False
""" 定义为文件下载者 """
metadata_scraper: bool = False
""" 定义为元数据获取者 """
ia_uploader: bool = False
""" 上传文件到 IA """


def arg_parser():
parser = argparse.ArgumentParser()
parser.add_argument("--mongo", type=str, default=Args.mongo, help=Args.mongo)
parser.add_argument("--task_provider", action="store_true", default=False, help=str(Args.task_provider))
parser.add_argument("--end_fileid", type=int, default=Args.end_fileid, help=str(Args.end_fileid))
parser.add_argument("--qos", type=float, default=Args.qos, help=str(Args.qos))
parser.add_argument("--fileid_finder", action="store_true", default=False, help=str(Args.fileid_finder))
parser.add_argument("--file_downloader",action="store_true", default=False, help=str(Args.file_downloader))
parser.add_argument("--metadata_scraper",action="store_true", default=False, help=str(Args.metadata_scraper))
parser.add_argument("--ia_uploader", action="store_true", default=False, help=str(Args.ia_uploader))
return Args(**vars(parser.parse_args()))
Loading

0 comments on commit 0df7971

Please sign in to comment.