-
Notifications
You must be signed in to change notification settings - Fork 156
/
killswitch.py
527 lines (389 loc) · 18.1 KB
/
killswitch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
"""
killswitch.py - Fetch kill switches from EDMC Repo.
Copyright (c) EDCD, All Rights Reserved
Licensed under the GNU General Public License.
See LICENSE file.
"""
from __future__ import annotations
import json
import threading
from copy import deepcopy
from typing import (
TYPE_CHECKING, Any, Callable, Mapping, MutableMapping, MutableSequence, NamedTuple, Sequence,
TypedDict, TypeVar, cast, Union
)
import requests
import semantic_version
from semantic_version.base import Version
import config
import EDMCLogging
logger = EDMCLogging.get_main_logger()
OLD_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches.json'
DEFAULT_KILLSWITCH_URL = 'https://raw.githubusercontent.com/EDCD/EDMarketConnector/releases/killswitches_v2.json'
CURRENT_KILLSWITCH_VERSION = 2
UPDATABLE_DATA = Union[Mapping, Sequence] # Have to keep old-style
_current_version: semantic_version.Version = config.appversion_nobuild()
T = TypeVar('T', bound=UPDATABLE_DATA)
class SingleKill(NamedTuple):
"""A single KillSwitch. Possibly with additional rules."""
match: str
reason: str
redact_fields: list[str] | None = None
delete_fields: list[str] | None = None
set_fields: dict[str, Any] | None = None
@property
def has_rules(self) -> bool:
"""Return whether this SingleKill can apply rules to a dict to make it safe to use."""
return any(x is not None for x in (self.redact_fields, self.delete_fields, self.set_fields))
def apply_rules(self, target: T) -> T:
"""
Apply the rules this SingleKill instance has to make some data okay to send.
Note that this MODIFIES DATA IN PLACE.
:param target: data to apply a rule to
:raises: Any and all exceptions _deep_apply and _apply can raise.
"""
for key, value in (self.set_fields if self .set_fields is not None else {}).items():
_deep_apply(target, key, value)
for key in (self.redact_fields if self.redact_fields is not None else []):
_deep_apply(target, key, "REDACTED")
for key in (self.delete_fields if self.delete_fields is not None else []):
_deep_apply(target, key, delete=True)
return target
def _apply(target: UPDATABLE_DATA, key: str, to_set: Any = None, delete: bool = False):
"""
Set or delete the given target key on the given target.
:param target: The thing to set data on
:param key: the key or index to set the data to
:param to_set: the data to set, if any, defaults to None
:param delete: whether or not to delete the key or index, defaults to False
:raises ValueError: when an unexpected target type is passed
:raises IndexError: when an invalid index is set or deleted
"""
if isinstance(target, MutableMapping):
if delete:
target.pop(key, None)
else:
target[key] = to_set
elif isinstance(target, MutableSequence):
idx = _get_int(key)
if idx is None:
raise ValueError(f'Cannot use string {key!r} as int for index into Sequence')
if delete and len(target) > 0:
length = len(target)
if idx in range(-length, length):
target.pop(idx)
elif len(target) == idx:
target.append(to_set)
else:
target[idx] = to_set # this can raise, that's fine
else:
raise ValueError(f'Dont know how to apply data to {type(target)} {target!r}')
def _deep_apply(target: UPDATABLE_DATA, path: str, to_set=None, delete=False): # noqa: CCR001 # Recursive silliness.
"""
Set the given path to the given value, if it exists.
if the path has dots (ascii period -- '.'), it will be successively split
if possible for deeper indices into target
:param target: the dict to modify
:param to_set: the data to set, defaults to None
:param delete: whether or not to delete the key rather than set it
:raises IndexError: when an invalid index is traversed into
:raises KeyError: when an invalid key is traversed into
"""
current = target
key: str = ""
while '.' in path:
if path in current:
# it exists on this level, dont go further
break
if isinstance(current, Mapping) and any('.' in k and path.startswith(k) for k in current.keys()):
# there is a dotted key in here that can be used for this
# if there's a dotted key in here (must be a mapping), use that if we can
keys = current.keys()
for k in filter(lambda x: '.' in x, keys):
if path.startswith(k):
key = k
path = path.removeprefix(k)
# we assume that the `.` here is for "accessing" the next key.
if path[0] == '.':
path = path[1:]
if len(path) == 0:
path = key
break
else:
key, _, path = path.partition('.')
if isinstance(current, Mapping):
current = current[key] # type: ignore # I really don't know at this point what you want from me mypy.
elif isinstance(current, Sequence):
target_idx = _get_int(key) # mypy is broken. doesn't like := here.
if target_idx is not None:
current = current[target_idx]
else:
raise ValueError(f'Cannot index sequence with non-int key {key!r}')
else:
raise ValueError(f'Dont know how to index a {type(current)} ({current!r})')
_apply(current, path, to_set, delete)
def _get_int(s: str) -> int | None:
try:
return int(s)
except ValueError:
return None
class KillSwitches(NamedTuple):
"""One version's set of kill switches."""
version: semantic_version.SimpleSpec
kills: dict[str, SingleKill]
@staticmethod
def from_dict(data: KillSwitchSetJSON) -> KillSwitches:
"""Create a KillSwitches instance from a dictionary."""
ks = {}
for match, ks_data in data['kills'].items():
ks[match] = SingleKill(
match=match,
reason=ks_data['reason'],
redact_fields=ks_data.get('redact_fields'),
set_fields=ks_data.get('set_fields'),
delete_fields=ks_data.get('delete_fields')
)
return KillSwitches(version=semantic_version.SimpleSpec(data['version']), kills=ks)
class DisabledResult(NamedTuple):
"""DisabledResult is the result returned from various is_disabled calls."""
disabled: bool
kill: SingleKill | None
@property
def reason(self) -> str:
"""Reason provided for why this killswitch exists."""
return self.kill.reason if self.kill is not None else ""
def has_kill(self) -> bool:
"""Return whether this DisabledResult has a Kill associated with it."""
return self.kill is not None
def has_rules(self) -> bool:
"""Return whether the kill on this Result contains rules."""
# HACK: 2021-07-09 # Python/mypy/pyright does not support type guards like this yet. self.kill will always
# be non-None at the point it is evaluated
return self.has_kill() and self.kill.has_rules # type: ignore
class KillSwitchSet:
"""Queryable set of kill switches."""
def __init__(self, kill_switches: list[KillSwitches]) -> None:
self.kill_switches = kill_switches
def get_disabled(self, id: str, *, version: Union[Version, str] = _current_version) -> DisabledResult:
"""
Return whether the given feature ID is disabled by a killswitch for the given version.
:param id: The feature ID to check
:param version: The version to check killswitches for, defaults to the
current EDMC version
:return: a namedtuple indicating status and reason, if any
"""
if isinstance(version, str):
version = semantic_version.Version.coerce(version)
for ks in self.kill_switches:
if version not in ks.version:
continue
return DisabledResult(id in ks.kills, ks.kills.get(id, None))
return DisabledResult(False, None)
def is_disabled(self, id: str, *, version: semantic_version.Version = _current_version) -> bool:
"""Return whether a given feature ID is disabled for the given version."""
return self.get_disabled(id, version=version).disabled
def get_reason(self, id: str, version: semantic_version.Version = _current_version) -> str:
"""Return a reason for why the given id is disabled for the given version, if any."""
return self.get_disabled(id, version=version).reason
def kills_for_version(self, version: semantic_version.Version = _current_version) -> list[KillSwitches]:
"""
Get all killswitch entries that apply to the given version.
:param version: the version to check against, defaults to the current EDMC version
:return: the matching kill switches
"""
return [k for k in self.kill_switches if version in k.version]
def check_killswitch(
self, name: str, data: T, log=logger, version=_current_version
) -> tuple[bool, T]:
"""
Check whether a killswitch is enabled. If it is, apply rules if any.
:param name: The killswitch to check
:param data: The data to modify if needed
:return: A two tuple consisting of: A bool indicating if the caller should return, and either the
original data or a *COPY* that has been modified by rules
"""
res = self.get_disabled(name, version=version)
if not res.disabled:
return False, data
log.info(f'Killswitch {name} is enabled. Checking if rules exist to make use safe')
if not res.has_rules():
logger.info('No rules exist. Stopping processing')
return True, data
if TYPE_CHECKING: # pyright, mypy, please -_-
assert res.kill is not None
try:
new_data = res.kill.apply_rules(deepcopy(data))
except Exception as e:
log.exception(f'Exception occurred while attempting to apply rules! bailing out! {e=}')
return True, data
log.info('Rules applied successfully, allowing execution to continue')
return False, new_data
def check_multiple_killswitches(self, data: T, *names: str, log=logger, version=_current_version) -> tuple[bool, T]:
"""
Check multiple killswitches in order.
Note that the names are applied in the order passed, and that the first true
return from check_killswitch causes this to return
:param data: the data to update
:param log: the logger to use, defaults to the standard EDMC main logger
:return: A two tuple of bool and updated data, where the bool is true when the caller _should_ halt processing
"""
for name in names:
should_return, data = self.check_killswitch(name=name, data=data, log=log, version=version)
if should_return:
return True, data
return False, data
def __str__(self) -> str:
"""Return a string representation of KillSwitchSet."""
return f'KillSwitchSet: {str(self.kill_switches)}'
def __repr__(self) -> str:
"""Return __repr__ for KillSwitchSet."""
return f'KillSwitchSet(kill_switches={self.kill_switches!r})'
class BaseSingleKillSwitch(TypedDict): # noqa: D101
reason: str
class SingleKillSwitchJSON(BaseSingleKillSwitch, total=False): # noqa: D101
redact_fields: list[str] # set fields to "REDACTED"
delete_fields: list[str] # remove fields entirely
set_fields: dict[str, Any] # set fields to given data
class KillSwitchSetJSON(TypedDict): # noqa: D101
version: str
kills: dict[str, SingleKillSwitchJSON]
class KillSwitchJSONFile(TypedDict): # noqa: D101
version: int
last_updated: str
kill_switches: list[KillSwitchSetJSON]
def fetch_kill_switches(target=DEFAULT_KILLSWITCH_URL) -> KillSwitchJSONFile | None:
"""
Fetch the JSON representation of our kill switches.
:param target: the URL to fetch the kill switch list from, defaults to DEFAULT_KILLSWITCH_URL
:return: a list of dicts containing kill switch data, or None
"""
logger.info("Attempting to fetch kill switches")
if target.startswith('file:'):
target = target.replace('file:', '')
try:
with open(target) as t:
return json.load(t)
except FileNotFoundError:
logger.warning(f"No such file '{target}'")
return None
try:
data = requests.get(target, timeout=10).json()
except ValueError as e:
logger.warning(f"Failed to get kill switches, data was invalid: {e}")
return None
except requests.exceptions.RequestException as requ_err:
logger.warning(f"unable to connect to {target!r}: {requ_err}")
return None
return data
class _KillSwitchV1(TypedDict):
version: str
kills: dict[str, str]
class _KillSwitchJSONFileV1(TypedDict):
version: int
last_updated: str
kill_switches: list[_KillSwitchV1]
def _upgrade_kill_switch_dict(data: KillSwitchJSONFile) -> KillSwitchJSONFile:
version = data['version']
if version == CURRENT_KILLSWITCH_VERSION:
return data
if version == 1:
logger.info('Got an old version killswitch file (v1) upgrading!')
to_return: KillSwitchJSONFile = deepcopy(data)
data_v1 = cast(_KillSwitchJSONFileV1, data)
to_return['kill_switches'] = [
cast(KillSwitchSetJSON, { # I need to cheat here a touch. It is this I promise
'version': d['version'],
'kills': {
match: {'reason': reason} for match, reason in d['kills'].items()
}
})
for d in data_v1['kill_switches']
]
to_return['version'] = CURRENT_KILLSWITCH_VERSION
return to_return
raise ValueError(f'Unknown Killswitch version {data["version"]}')
def parse_kill_switches(data: KillSwitchJSONFile) -> list[KillSwitches]:
"""
Parse kill switch dict to List of KillSwitches.
:param data: dict containing raw killswitch data
:return: a list of all provided killswitches
"""
data = _upgrade_kill_switch_dict(data)
last_updated = data['last_updated']
ks_version = data['version']
logger.info(f'Kill switches last updated {last_updated}')
if ks_version != CURRENT_KILLSWITCH_VERSION:
logger.warning(f'Unknown killswitch version {ks_version} (expected {CURRENT_KILLSWITCH_VERSION}). Bailing out')
return []
kill_switches = data['kill_switches']
out = []
for idx, ks_data in enumerate(kill_switches):
try:
ks = KillSwitches.from_dict(ks_data)
out.append(ks)
except Exception as e:
logger.exception(f'Could not parse killswitch idx {idx}: {e}')
return out
def get_kill_switches(target=DEFAULT_KILLSWITCH_URL, fallback: str | None = None) -> KillSwitchSet | None:
"""
Get a kill switch set object.
:param target: the URL to fetch the killswitch JSON from, defaults to DEFAULT_KILLSWITCH_URL
:return: the KillSwitchSet for the URL, or None if there was an error
"""
if (data := fetch_kill_switches(target)) is None:
if fallback is not None:
logger.warning('could not get killswitches, trying fallback')
data = fetch_kill_switches(fallback)
if data is None:
logger.warning('Could not get killswitches.')
return None
return KillSwitchSet(parse_kill_switches(data))
def get_kill_switches_thread(
target, callback: Callable[[KillSwitchSet | None], None], fallback: str | None = None,
) -> None:
"""
Threaded version of get_kill_switches. Request is performed off thread, and callback is called when it is available.
:param target: Target killswitch file
:param callback: The callback to pass the newly created KillSwitchSet
:param fallback: Fallback killswitch file, if any, defaults to None
"""
def make_request():
callback(get_kill_switches(target, fallback=fallback))
threading.Thread(target=make_request, daemon=True).start()
active: KillSwitchSet = KillSwitchSet([])
def setup_main_list(filename: str | None):
"""
Set up the global set of kill switches for querying.
Plugins should NOT call this EVER.
"""
if filename is None:
filename = DEFAULT_KILLSWITCH_URL
if (data := get_kill_switches(filename, OLD_KILLSWITCH_URL)) is None:
logger.warning("Unable to fetch kill switches. Setting global set to an empty set")
return
global active
active = data
logger.trace(f'{len(active.kill_switches)} Active Killswitches:')
for v in active.kill_switches:
logger.trace(v)
def get_disabled(id: str, *, version: semantic_version.Version = _current_version) -> DisabledResult:
"""
Query the global KillSwitchSet for whether or not a given ID is disabled.
See KillSwitchSet#is_disabled for more information
"""
return active.get_disabled(id, version=version)
def check_killswitch(name: str, data: T, log=logger) -> tuple[bool, T]:
"""Query the global KillSwitchSet#check_killswitch method."""
return active.check_killswitch(name, data, log)
def check_multiple_killswitches(data: T, *names: str, log=logger) -> tuple[bool, T]:
"""Query the global KillSwitchSet#check_multiple method."""
return active.check_multiple_killswitches(data, *names, log=log)
def is_disabled(id: str, *, version: semantic_version.Version = _current_version) -> bool:
"""Query the global KillSwitchSet#is_disabled method."""
return active.is_disabled(id, version=version)
def get_reason(id: str, *, version: semantic_version.Version = _current_version) -> str:
"""Query the global KillSwitchSet#get_reason method."""
return active.get_reason(id, version=version)
def kills_for_version(version: semantic_version.Version = _current_version) -> list[KillSwitches]:
"""Query the global KillSwitchSet for kills matching a particular version."""
return active.kills_for_version(version)