-
Notifications
You must be signed in to change notification settings - Fork 0
/
ingest.py
executable file
·143 lines (126 loc) · 6.37 KB
/
ingest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#!/usr/bin/env python3
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See LICENSE in the project root for license information.
# --------------------------------------------------------------------------------------------
from typing import Any
from functools import cache
import glob
import sys
import shutil
import os.path
import subprocess
import multiprocessing
import tqdm
import json
from fstar_harness import InsightFile, InsightFileFirstPass
def run_insights(*args):
return subprocess.check_output(['fstar_insights/ocaml/bin/fstar_insights.exe'] + list(args), encoding='utf-8')
def run_digest(fn) -> tuple[str, str]:
return fn, run_insights('--digest', fn)
def run_print_checked_deps(fn) -> tuple[str, Any, str]:
return fn, json.loads(run_insights('--print_checked_deps', fn)), run_insights('--digest', fn)
def run_extract(fn_orig_src_fn: tuple[str, str]):
fn, orig_src_fn = fn_orig_src_fn
try:
out: InsightFileFirstPass = json.loads(run_insights('--include', 'dataset', '--all_defs_and_premises', fn))
except:
sys.stderr.write(f'Cannot extract {fn}\n'); sys.stderr.flush()
return
orig_dir = os.path.dirname(orig_src_fn)
git_rev = subprocess.check_output(['git', 'rev-parse', 'HEAD'], cwd=orig_dir, encoding='utf-8').strip()
git_repo_dir = subprocess.check_output(['git', 'rev-parse', '--show-toplevel'], cwd=orig_dir, encoding='utf-8').strip()
source_file_name = os.path.relpath(os.path.realpath(orig_src_fn), git_repo_dir)
git_url = subprocess.check_output(['git', 'remote', 'get-url', 'origin'], cwd=orig_dir, encoding='utf-8').strip()
git_url = git_url.replace('[email protected]:', 'https://github.com/')
j: InsightFile = {
**out,
'source': {
'project_name': os.path.basename(git_repo_dir),
'file_name': source_file_name,
'git_rev': git_rev,
'git_url': git_url,
},
}
json.dump(j, open(f'dataset/{fn}.json', 'w'))
def main():
os.makedirs('dataset', exist_ok=True)
dirs = sys.argv[1:]
pool = multiprocessing.Pool()
fns = [ fn for dir in dirs for fn in glob.iglob(f'{dir}/**/*.fst*.checked', recursive=True, include_hidden=True) if not os.path.isdir(fn) and os.path.getsize(fn) > 0 ]
checked_deps: list[tuple[str, Any, str]] = \
list(tqdm.tqdm(pool.imap_unordered(run_print_checked_deps, fns), total=len(fns), desc='Parsing checked files'))
dig2checked: dict[str, list[tuple[str, Any]]] = {}
for fn, j, dig in checked_deps:
if dig not in dig2checked: dig2checked[dig] = []
dig2checked[dig].append((fn, j))
expected_source_fns = set(os.path.splitext(os.path.basename(fn))[0] for fn, _, _ in checked_deps)
fns = [ fn for dir in dirs for fn in glob.iglob(f'{dir}/**/*.fst*', recursive=True, include_hidden=True) \
if not os.path.isdir(fn) and os.path.basename(fn) in expected_source_fns ]
digs = list(tqdm.tqdm(pool.imap_unordered(run_digest, fns), total=len(fns), desc='Computing source digests'))
digest2src: dict[str, list[str]] = {}
for fn, dig in digs:
if dig not in digest2src: digest2src[dig] = []
digest2src[dig].append(fn)
basename2files: dict[str, tuple[str, str]] = {}
@cache
def resolve_checked(dig: str) -> bool:
if dig not in dig2checked: return False
error = None
for checked_fn, dep_info in dig2checked[dig]:
basename = os.path.splitext(os.path.basename(checked_fn))[0]
if basename.startswith('Test.fst'):
error = f'Skipping {checked_fn} because name causes lots of shadowing'
continue
if ('FStar.' + basename) in expected_source_fns:
error = f'Skipping {checked_fn} because module name clashes with FStar.{basename} with the default open'
continue
if basename in ('AES.fst', 'SHA1.fst'):
error = f'Skipping {checked_fn} because it shadows modules in HACL*'
continue
if basename in basename2files:
error = f'Skipping duplicate module {checked_fn} in favor of {basename2files[basename]}'
continue
src_fn = None
for src_fn_cand in digest2src.get(dep_info['source_digest'], []):
if os.path.basename(src_fn_cand) == basename:
src_fn = src_fn_cand
break
if src_fn is None:
error = f'Skipping {checked_fn} because of unavailable source file {basename}'
continue
dep_missing = False
for dep in dep_info['deps_digest']:
if dep['module_name'] == 'source':
assert dep['digest'] == dep_info['source_digest'] # duplicate info
else:
if not resolve_checked(dep['digest']):
error = f'Skipping {checked_fn} because of unavailable dependency {dep["module_name"]}'
dep_missing = True
break
if dep_missing: continue
if basename in basename2files:
error = f'Skipping duplicate module {checked_fn} in favor of {basename2files[basename]}'
continue
basename2files[basename] = (checked_fn, src_fn)
return True
print(error)
return False
# We try to resolve the module names to checked files in a priority order,
# preferring real code over tests and examples.
def file_priority(fn) -> int:
if '/examples/' in fn: return -10
if '/tests/' in fn: return -10
if '/FStar/' in fn: return 10
if '/zeta/' in fn: return -1 # zeta duplicates some checked files for evercrypt
if '/tls/cache/Karamel/' in fn: return -5 # prefer the Model variant
return 0
for fn, _, dig in sorted(checked_deps, key = lambda item: -file_priority(item[0])):
resolve_checked(dig)
for checked_fn, src_fn in tqdm.tqdm(basename2files.values(), desc = 'Copying files'):
shutil.copy(src_fn, 'dataset/')
shutil.copy(checked_fn, 'dataset/')
list(tqdm.tqdm(pool.imap_unordered(run_extract, [ (bn, fn[1]) for bn, fn in basename2files.items() ]),
total=len(basename2files), desc='Extracting insights'))
if __name__ == '__main__':
main()