From 19a47f0bd4ddb61950b7ab598296d399502416ac Mon Sep 17 00:00:00 2001 From: Volara Date: Sun, 6 Oct 2024 11:31:50 -0400 Subject: [PATCH] Move to sync --- volara_proof/__main__.py | 2 +- volara_proof/extract/__init__.py | 2 +- volara_proof/proof.py | 11 +-------- volara_proof/proofs/proof.py | 8 +++--- volara_proof/proofs/proof_of_quality.py | 33 +++++++++---------------- volara_proof/storage/rewards.py | 30 ++++++++-------------- volara_proof/storage/tweet_info.py | 23 +++++++++-------- 7 files changed, 40 insertions(+), 69 deletions(-) diff --git a/volara_proof/__main__.py b/volara_proof/__main__.py index e67e72e..5f6e6ae 100644 --- a/volara_proof/__main__.py +++ b/volara_proof/__main__.py @@ -8,7 +8,7 @@ from volara_proof.proof import Proof -INPUT_DIR, OUTPUT_DIR, SEALED_DIR = "/input", "/output", "/sealed" +INPUT_DIR, OUTPUT_DIR, SEALED_DIR = "./demo/input", "./demo/output", "./demo/sealed" logging.basicConfig(level=logging.INFO, format="%(message)s") diff --git a/volara_proof/extract/__init__.py b/volara_proof/extract/__init__.py index 413deb0..cc85e87 100644 --- a/volara_proof/extract/__init__.py +++ b/volara_proof/extract/__init__.py @@ -2,7 +2,7 @@ from volara_proof.buffers.tweets import Tweets -async def extract_data(zip_file_path: str): +def extract_data(zip_file_path: str): """ Extracts the data from the zip file :param zip_file_path: Path to the zip file diff --git a/volara_proof/proof.py b/volara_proof/proof.py index 3e2dc5a..cc087f3 100644 --- a/volara_proof/proof.py +++ b/volara_proof/proof.py @@ -1,13 +1,11 @@ import logging import os -import asyncio from typing import Dict, Any from volara_proof.proofs.proof import proof from volara_proof.models.proof_response import ProofResponse from volara_proof.models.proof_config import ProofConfig -import volara_proof.exceptions as exceptions class Proof: @@ -23,13 +21,6 @@ def generate(self) -> ProofResponse: input_file = os.path.join(self.config.input_dir, input_filename) break - try: - output_message = asyncio.run( - proof(input_file, self.proof_response, self.config) - ) - except exceptions.ValidatorEphemeralException as e: - logging.exception("[EPHEMERAL ERROR]: Error during proof of contribution:") - # TODO what happens if we ephemerally fail? - return self.proof_response + output_message = proof(input_file, self.proof_response, self.config) return output_message diff --git a/volara_proof/proofs/proof.py b/volara_proof/proofs/proof.py index 31fdc21..34bea99 100644 --- a/volara_proof/proofs/proof.py +++ b/volara_proof/proofs/proof.py @@ -10,13 +10,13 @@ rewards_storage = RewardsStorage() -async def proof( +def proof( input_file: str, proof_response: ProofResponse, config: ProofConfig ) -> ProofResponse: proof_response = copy.deepcopy(proof_response) - tweets_data = await extract_data(input_file) + tweets_data = extract_data(input_file) is_valid, file_score, tweet_info, unique_tweets, total_tweets = ( - await proof_of_quality(tweets_data, config.file_id, config) + proof_of_quality(tweets_data, config.file_id, config) ).values() proof_response.valid = is_valid proof_response.score = file_score @@ -25,7 +25,7 @@ async def proof( proof_response.quality = file_score proof_response.uniqueness = unique_tweets / total_tweets if total_tweets > 0 else 0 if is_valid and file_score > 0: - await rewards_storage.post_rewards( + rewards_storage.post_rewards( config.file_id, config.miner_address, file_score, tweet_info ) return proof_response diff --git a/volara_proof/proofs/proof_of_quality.py b/volara_proof/proofs/proof_of_quality.py index e53914b..8671dca 100644 --- a/volara_proof/proofs/proof_of_quality.py +++ b/volara_proof/proofs/proof_of_quality.py @@ -37,16 +37,16 @@ def get_scraper(config: ProofConfig): } -async def proof_of_quality(tweets_data: Tweets, file_id: str, config: ProofConfig): +def proof_of_quality(tweets_data: Tweets, file_id: str, config: ProofConfig): if not _no_duplicates(tweets_data): return NIL_RESPONSE_INVALID - unique_tweets = await _unique_tweets(tweets_data, file_id) + unique_tweets = _unique_tweets(tweets_data, file_id) if len(unique_tweets) == 0: return NIL_RESPONSE_VALID - tweets_validated = await _validate_tweets(tweets_data, unique_tweets, config) + tweets_validated = _validate_tweets(tweets_data, unique_tweets, config) if not tweets_validated: return NIL_RESPONSE_INVALID - tweet_info = await _score_tweets(tweets_data, unique_tweets) + tweet_info = _score_tweets(tweets_data, unique_tweets) file_score = min(sum([tweet.score for tweet in tweet_info]) / 100_000, 1) return { "is_valid": True, @@ -57,7 +57,7 @@ async def proof_of_quality(tweets_data: Tweets, file_id: str, config: ProofConfi } -async def _validate_tweets( +def _validate_tweets( tweets_data: Tweets, unique_tweets: T.Set[str], config: ProofConfig ): unique_tweet_data = [ @@ -71,7 +71,7 @@ async def _validate_tweets( tweet_sample = sample(range(len(unique_tweet_data)), sample_count) tweet_ids = [unique_tweet_data[i].TweetId().decode() for i in tweet_sample] - scraped_tweets = await _scrape_tweets(tweet_ids, config) + scraped_tweets = _scrape_tweets(tweet_ids, config) for tweet_data_i, tweet in zip(tweet_sample, scraped_tweets): if "result" not in tweet: # Does this allow fake tweet attacks? @@ -88,18 +88,11 @@ async def _validate_tweets( return True -async def _scrape_tweets( - tweet_ids: list[str], config: ProofConfig -) -> list[dict[str, T.Any]]: +def _scrape_tweets(tweet_ids: list[str], config: ProofConfig) -> list[dict[str, T.Any]]: try: - operation = Operation.TweetResultsByRestIds - queries = list(batch_ids(tweet_ids)) - keys, qid, name = operation - _queries = [{k: q} for q in queries for k, v in keys.items()] scraper = get_scraper(config) - scraped_tweets_raw = await scraper._process(operation, _queries) - scraped_tweets = scraped_tweets_raw[0][0].json() - tweets = scraped_tweets["data"]["tweetResult"] + scraped_tweets = scraper.tweets_by_ids(tweet_ids) + tweets = scraped_tweets[0]["data"]["tweetResult"] return tweets except Exception as e: logging.exception( @@ -117,12 +110,12 @@ def _no_duplicates(tweets_data: Tweets) -> bool: return True -async def _unique_tweets(tweets_data: Tweets, file_id: str) -> T.Set[str]: +def _unique_tweets(tweets_data: Tweets, file_id: str) -> T.Set[str]: tweet_ids = [ str(tweets_data.Tweets(i).TweetId().decode()) for i in range(tweets_data.TweetsLength()) ] - tweet_id_existance = await tweet_info_storage.get(tweet_ids, file_id) + tweet_id_existance = tweet_info_storage.get(tweet_ids, file_id) unique_tweets = set( [ tweet_id @@ -143,9 +136,7 @@ def _form_tweet_info(tweet: T.Optional[Tweet], score: float) -> TweetInfo: ) -async def _score_tweets( - tweets_data: Tweets, unique_tweets: T.Set[str] -) -> list[TweetInfo]: +def _score_tweets(tweets_data: Tweets, unique_tweets: T.Set[str]) -> list[TweetInfo]: # TODO: Evaluate dynamic score scored_tweets: list[TweetInfo] = [] for i in range(tweets_data.TweetsLength()): diff --git a/volara_proof/storage/rewards.py b/volara_proof/storage/rewards.py index 8991912..df9b0f9 100644 --- a/volara_proof/storage/rewards.py +++ b/volara_proof/storage/rewards.py @@ -1,16 +1,13 @@ import logging import os -import json -from uuid import uuid1 - -from aiohttp import ClientSession, ClientTimeout +import requests from volara_proof.models.tweet_info import TweetInfo from volara_proof.constants import VOLARA_API_URL class RewardsStorage: - async def post_rewards( + def post_rewards( self, file_id: str, miner_address: str, @@ -33,23 +30,16 @@ async def post_rewards( "tweetRecords": tweet_records, } try: - async with ClientSession(timeout=ClientTimeout(30)) as session: - async with session.post( - url=f"{VOLARA_API_URL}/v1/validator/submit-validation", - json=request_body, - headers={"Authorization": f"Bearer {os.environ['VOLARA_API_KEY']}"}, - ) as resp: - resp.raise_for_status() - resp = await resp.json() - logging.info("Succesfully uploaded rewards to validator.") + resp = requests.post( + url=f"{VOLARA_API_URL}/v1/validator/submit-validation", + json=request_body, + headers={"Authorization": f"Bearer {os.environ['VOLARA_API_KEY']}"}, + ) + resp.raise_for_status() + logging.info("Succesfully uploaded rewards to validator.") + return resp.json() except Exception: logging.exception( "[CRITICAL FAILURE] Failed to upload rewards to the director." ) - try: - os.mkdir(".critical_reward_failures/") - except FileExistsError: - pass - with open(f".critical_reward_failures/{str(uuid1())}", "+w") as f: - f.write(json.dumps(request_body)) raise diff --git a/volara_proof/storage/tweet_info.py b/volara_proof/storage/tweet_info.py index 5758599..79e4949 100644 --- a/volara_proof/storage/tweet_info.py +++ b/volara_proof/storage/tweet_info.py @@ -1,4 +1,4 @@ -from aiohttp import ClientSession, ClientTimeout +import requests import os import volara_proof.exceptions @@ -7,23 +7,22 @@ class TweetInfoStorage: - async def get_info(self, tweet_info: list[TweetInfo], file_id: str) -> list[bool]: - return await self.get([tweet.tweet_id for tweet in tweet_info], file_id) + def get_info(self, tweet_info: list[TweetInfo], file_id: str) -> list[bool]: + return self.get([tweet.tweet_id for tweet in tweet_info], file_id) - async def get(self, tweet_ids: list[str], file_id: str) -> list[bool]: + def get(self, tweet_ids: list[str], file_id: str) -> list[bool]: """ Returns a list of booleans indicating whether each tweet ID exists in the index. """ if len(tweet_ids) == 0: return [] try: - async with ClientSession(timeout=ClientTimeout(10)) as session: - async with session.post( - f"{VOLARA_API_URL}/v1/validator/unique", - json={"tweetIds": tweet_ids, "fileId": file_id}, - headers={"Authorization": f"Bearer {os.environ['VOLARA_API_KEY']}"}, - ) as resp: - resp.raise_for_status() - return [False for unique in await resp.json()] + resp = requests.post( + f"{VOLARA_API_URL}/v1/validator/unique", + json={"tweetIds": tweet_ids, "fileId": file_id}, + headers={"Authorization": f"Bearer {os.environ['VOLARA_API_KEY']}"}, + ) + resp.raise_for_status() + return [not unique for unique in resp.json()] except Exception as e: raise volara_proof.exceptions.VolaraApiServerException(e)