Skip to content

Commit

Permalink
Merge pull request #27 from efabless/tokens_proxies
Browse files Browse the repository at this point in the history
SOCKS5 Proxy Support, Better Token Support
  • Loading branch information
marwaneltoukhy authored Aug 31, 2024
2 parents 293766a + 7b56d7e commit c553759
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 72 deletions.
175 changes: 134 additions & 41 deletions ipm/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,30 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json
import re
import sys
import json
import shutil
import tarfile
import tempfile
import hashlib
import pathlib
import requests
import subprocess
from dataclasses import dataclass
from typing import Callable, ClassVar, Dict, Iterable, Optional, Tuple
from ipm.version_check import check_for_updates

import ssl
import click
import httpx
import yaml
from rich.console import Console
from rich.table import Table
from bs4 import BeautifulSoup

from .__version__ import __version__
from .version_check import check_for_updates

# import bus_wrapper_gen

VERIFIED_JSON_FILE_URL = (
Expand All @@ -54,15 +59,83 @@ def opt_ipm_root(function: Callable):


class GitHubSession(httpx.Client):
def __init__(self, follow_redirects=True, **kwargs) -> None:
super().__init__(follow_redirects=follow_redirects, **kwargs)
headers_raw = {
"User-Agent": "Efabless IPM",
class Token(object):
override: ClassVar[Optional[str]] = None

@classmethod
def get_gh_token(Self) -> Optional[str]:
token = None

# 0. Lowest priority: ghcli
try:
token = subprocess.check_output(
[
"gh",
"auth",
"token",
],
encoding="utf8",
).strip()
except FileNotFoundError:
pass
except subprocess.CalledProcessError:
pass

# 1. Higher priority: environment GITHUB_TOKEN
env_token = os.getenv("GITHUB_TOKEN")
if env_token is not None and env_token.strip() != "":
token = env_token

# 2. Highest priority: the -t flag
if Self.override is not None:
token = Self.override

return token

def __init__(
self,
*,
follow_redirects: bool = True,
github_token: Optional[str] = None,
ssl_context=None,
**kwargs,
):
if ssl_context is None:
try:
import truststore

ssl_context = truststore.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
except ImportError:
pass

try:
super().__init__(
follow_redirects=follow_redirects,
verify=ssl_context,
**kwargs,
)
except ValueError as e:
if "Unknown scheme for proxy URL" in e.args[0] and "socks://" in e.args[0]:
print(
f"Invalid SOCKS proxy: IPM only supports http://, https:// and socks5:// schemes: {e.args[0]}",
file=sys.stderr,
)
exit(-1)
else:
raise e from None
github_token = github_token or GitHubSession.Token.get_gh_token()
self.github_token = github_token

raw_headers = {
"User-Agent": type(self).get_user_agent(),
}
token = os.getenv("GITHUB_TOKEN", None)
if token is not None:
headers_raw["Authorization"] = f"Bearer {token}"
self.headers = httpx.Headers(headers_raw)
if github_token is not None:
raw_headers["Authorization"] = f"Bearer {github_token}"
self.headers = httpx.Headers(raw_headers)

@classmethod
def get_user_agent(Self) -> str:
return f"ipm/{__version__}"

def throw_status(self, r: httpx.Response, purpose: str):
try:
Expand Down Expand Up @@ -760,11 +833,20 @@ def download_tarball(self, dest_path, no_verify_hash=False):


class Checks:
def __init__(self, ip_root, ip_name, version=None, category=None, maturity=None, technology=None, type=None) -> None:
def __init__(
self,
ip_root,
ip_name,
version=None,
category=None,
maturity=None,
technology=None,
type=None,
) -> None:
self.version = version
self.ip_name = ip_name
self.category= category
self.maturity= maturity
self.category = category
self.maturity = maturity
self.technology = technology
self.type = type
self.ip_root = ip_root
Expand All @@ -780,8 +862,8 @@ def check_url(self, url):
"""

logger = Logger()
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
if not url.startswith(("http://", "https://")):
url = "https://" + url

try:
repo_response = GitHubSession().get(url)
Expand Down Expand Up @@ -824,7 +906,7 @@ def check_yaml(self):
"digital_supply_voltage",
"analog_supply_voltage",
"clock_freq_mhz",
"cell_count"
"cell_count",
]
flag = True
with open(yaml_path) as json_file:
Expand All @@ -836,11 +918,9 @@ def check_yaml(self):
f"The given IP name {self.ip_name} is not the same as the one in yaml file"
)
flag = False

if not self.check_url(info['repo']):
logger.print_err(
f"The repo {info['repo']} is incorrect"
)

if not self.check_url(info["repo"]):
logger.print_err(f"The repo {info['repo']} is incorrect")
flag = False

for field in yaml_fields:
Expand All @@ -849,14 +929,14 @@ def check_yaml(self):
f"The field '{field}' was not included in the {self.ip_name}.yaml file"
)
flag = False

if flag:
self.gh_url = info['repo']
self.version = info['version']
self.maturity = info['maturity']
self.category = info['category']
self.technology = info['technology']
self.type = info['type']
self.gh_url = info["repo"]
self.version = info["version"]
self.maturity = info["maturity"]
self.category = info["category"]
self.technology = info["technology"]
self.type = info["type"]

return flag

Expand Down Expand Up @@ -899,10 +979,10 @@ def check_hierarchy(self):
)
flag = False
return flag

def read_readme(self, file_path):
"""Reads the content of a README file."""
with open(file_path, 'r', encoding='utf-8') as file:
with open(file_path, "r", encoding="utf-8") as file:
return file.read()

def has_minimum_word_count(self, content, min_words=100):
Expand All @@ -914,7 +994,9 @@ def check_required_sections(self, content, sections):
"""Checks if the content includes specific sections and returns any missing ones."""
missing_sections = []
for section in sections:
if not re.search(rf'^\s*#*\s*{re.escape(section)}', content, re.MULTILINE | re.IGNORECASE):
if not re.search(
rf"^\s*#*\s*{re.escape(section)}", content, re.MULTILINE | re.IGNORECASE
):
missing_sections.append(section)
return missing_sections

Expand All @@ -925,21 +1007,31 @@ def analyze_readme(self):
logger = Logger()
# Check if README has significant content
if not self.has_minimum_word_count(content):
logger.print_err(f"The README file does not meet the minimum word count requirement.")
logger.print_err(
f"The README file does not meet the minimum word count requirement."
)
return False

# Define required sections
required_sections = [
"Overview", "Installation", "Features", "Block Diagram",
"Pin Description", "Specifications", "Timing Diagram", "Tapeout History"
"Overview",
"Installation",
"Features",
"Block Diagram",
"Pin Description",
"Specifications",
"Timing Diagram",
"Tapeout History",
]

# Check for missing sections
missing_sections = self.check_required_sections(content, required_sections)
if missing_sections:
logger.print_err(f"The README file is missing the following sections: {', '.join(missing_sections)}")
logger.print_err(
f"The README file is missing the following sections: {', '.join(missing_sections)}"
)
return False

return True


Expand Down Expand Up @@ -1226,6 +1318,7 @@ def update_ips(ipm_root, ip_root=None, ip_to_update=None):
else:
logger.print_warn("No IPs in your project to be updated.")


def check_ip(ip_root, ip_name=None):
logger = Logger()

Expand All @@ -1240,7 +1333,7 @@ def check_ip(ip_root, ip_name=None):
else:
logger.print_err(f"Can't find Yaml file at {yaml_info_file}")
exit(1)

# Step 2: Check content of Yaml file
logger.print_step("[STEP 2]: Check content of Yaml file")
checker = Checks(ip_root, ip_name)
Expand All @@ -1249,15 +1342,15 @@ def check_ip(ip_root, ip_name=None):
else:
logger.print_err("Yaml content check failed")
exit(1)

# Step 3: Check Hierarchy
logger.print_step("[STEP 3]: Check IP hierarchy")
if checker.check_hierarchy():
logger.print_success("Hierarchy check passed")
else:
logger.print_err("Hierarchy check failed")
exit(1)

# Step 3: Check README
logger.print_step("[STEP 4]: check README documentation")
if checker.analyze_readme():
Expand Down
Loading

0 comments on commit c553759

Please sign in to comment.