Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(commands): Always return score #97

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 116 additions & 51 deletions Packs/VirusTotal/Integrations/VirusTotalV3/VirusTotalV3.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def ip(self, ip: str, relationships: str = '') -> dict:
"""
return self._http_request(
'GET',
f'ip_addresses/{ip}?relationships={relationships}', ok_codes=(429, 200)
f'ip_addresses/{ip}?relationships={relationships}', ok_codes=(404, 429, 200)
)

def file(self, file: str, relationships: str = '') -> dict:
Expand Down Expand Up @@ -158,7 +158,7 @@ def url(self, url: str, relationships: str = ''):
"""
return self._http_request(
'GET',
f'urls/{encode_url_to_base64(url)}?relationships={relationships}', ok_codes=(429, 200)
f'urls/{encode_url_to_base64(url)}?relationships={relationships}', ok_codes=(404, 429, 200)
)

def domain(self, domain: str, relationships: str = '') -> dict:
Expand All @@ -168,7 +168,7 @@ def domain(self, domain: str, relationships: str = '') -> dict:
"""
return self._http_request(
'GET',
f'domains/{domain}?relationships={relationships}', ok_codes=(429, 200)
f'domains/{domain}?relationships={relationships}', ok_codes=(404, 429, 200)
)

# endregion
Expand Down Expand Up @@ -622,6 +622,7 @@ class ScoreCalculator:
Calculating DBotScore of files, ip, etc.
"""
DEFAULT_SUSPICIOUS_THRESHOLD = 5
DEFAULT_RELATIONSHIP_SUSPICIOUS_THRESHOLD = 2

logs: List[str]

Expand All @@ -648,12 +649,6 @@ class ScoreCalculator:

def __init__(self, params: dict):
self.trusted_vendors = argToList(params['preferredVendors'])
trusted_vendors_threshold = arg_to_number_must_int(
params['preferredVendorsThreshold'],
arg_name='Preferred Vendor Threshold',
required=True
)
assert isinstance(trusted_vendors_threshold, int)
self.trusted_vendors_threshold = arg_to_number_must_int(
params['preferredVendorsThreshold'],
arg_name='Preferred Vendor Threshold',
Expand Down Expand Up @@ -717,7 +712,7 @@ def __init__(self, params: dict):
arg_name='Relationship Files Malicious Threshold',
required=True),
'suspicious': arg_to_number_must_int(
params['relationship_suspicious_threshold'],
params['relationship_suspicious_threshold'] or self.DEFAULT_RELATIONSHIP_SUSPICIOUS_THRESHOLD,
arg_name='Relationship Files Suspicious Threshold',
required=True)
}
Expand Down Expand Up @@ -1240,6 +1235,90 @@ def decrease_data_size(data: Union[dict, list]) -> Union[dict, list]:
return data


def _get_error_result(client: Client, ioc_id: str, ioc_type: str, message: str) -> CommandResults:
dbot_type = ioc_type.upper()
assert dbot_type in ('FILE', 'DOMAIN', 'IP', 'URL')
common_type = dbot_type if dbot_type in ('IP', 'URL') else dbot_type.capitalize()
desc = f'{common_type} "{ioc_id}" {message}'
dbot = Common.DBotScore(ioc_id,
getattr(DBotScoreType, dbot_type),
INTEGRATION_NAME,
Common.DBotScore.NONE,
desc,
client.reliability)
options: dict[str, Common.DBotScore | str] = {'dbot_score': dbot}
if dbot_type == 'FILE':
options[get_hash_type(ioc_id)] = ioc_id
else:
options[dbot_type.lower()] = ioc_id
return CommandResults(indicator=getattr(Common, common_type)(**options), readable_output=desc)


def build_unknown_output(client: Client, ioc_id: str, ioc_type: str) -> CommandResults:
return _get_error_result(client, ioc_id, ioc_type, 'was not found in VirusTotal')


def build_quota_exceeded_output(client: Client, ioc_id: str, ioc_type: str) -> CommandResults:
return _get_error_result(client, ioc_id, ioc_type, 'was not enriched. Quota was exceeded.')


def build_error_output(client: Client, ioc_id: str, ioc_type: str) -> CommandResults:
return _get_error_result(client, ioc_id, ioc_type, 'could not be processed')


def build_unknown_file_output(client: Client, file: str) -> CommandResults:
return build_unknown_output(client, file, 'file')


def build_quota_exceeded_file_output(client: Client, file: str) -> CommandResults:
return build_quota_exceeded_output(client, file, 'file')


def build_error_file_output(client: Client, file: str) -> CommandResults:
return build_error_output(client, file, 'file')


def build_unknown_domain_output(client: Client, domain: str) -> CommandResults:
return build_unknown_output(client, domain, 'domain')


def build_quota_exceeded_domain_output(client: Client, domain: str) -> CommandResults:
return build_quota_exceeded_output(client, domain, 'domain')


def build_error_domain_output(client: Client, domain: str) -> CommandResults:
return build_error_output(client, domain, 'domain')


def build_unknown_url_output(client: Client, url: str) -> CommandResults:
return build_unknown_output(client, url, 'url')


def build_quota_exceeded_url_output(client: Client, url: str) -> CommandResults:
return build_quota_exceeded_output(client, url, 'url')


def build_error_url_output(client: Client, url: str) -> CommandResults:
return build_error_output(client, url, 'url')


def build_unknown_ip_output(client: Client, ip: str) -> CommandResults:
return build_unknown_output(client, ip, 'ip')


def build_quota_exceeded_ip_output(client: Client, ip: str) -> CommandResults:
return build_quota_exceeded_output(client, ip, 'ip')


def build_error_ip_output(client: Client, ip: str) -> CommandResults:
return build_error_output(client, ip, 'ip')


def build_skipped_enrichment_ip_output(client: Client, ip: str) -> CommandResults:
return _get_error_result(client, ip, 'ip',
'was not enriched. Reputation lookups have been disabled for private IP addresses.')


def build_domain_output(
client: Client,
score_calculator: ScoreCalculator,
Expand Down Expand Up @@ -1525,12 +1604,6 @@ def build_file_output(
)


def build_unknown_file_output(client: Client, file_hash: str) -> CommandResults:
desc = f'File "{file_hash}" was not found in VirusTotal'
dbot = Common.DBotScore(file_hash, DBotScoreType.FILE, INTEGRATION_NAME, 0, desc, client.reliability)
return CommandResults(indicator=Common.File(dbot), readable_output=desc)


def build_private_file_output(file_hash: str, raw_response: dict) -> CommandResults:
data = raw_response.get('data', {})
attributes = data.get('attributes', {})
Expand Down Expand Up @@ -1692,29 +1765,27 @@ def ip_command(client: Client, score_calculator: ScoreCalculator, args: dict,
for ip in ips:
raise_if_ip_not_valid(ip)
if disable_private_ip_lookup and ipaddress.ip_address(ip).is_private and not override_private_lookup:
result = CommandResults(
readable_output=f'Reputation lookups have been disabled for private IP addresses. Enrichment skipped for {ip}')
results.append(result)
results.append(build_skipped_enrichment_ip_output(client, ip))
execution_metrics.success += 1
continue
try:
raw_response = client.ip(ip, relationships)
if raw_response.get('error', {}).get('code') == "QuotaExceededError":
execution_metrics.quota_error += 1
result = CommandResults(readable_output=f'Quota exceeded for IP: {ip}')
results.append(result)
results.append(build_quota_exceeded_ip_output(client, ip))
continue
if raw_response.get('error', {}).get('code') == 'NotFoundError':
results.append(build_unknown_ip_output(client, ip))
continue
except Exception as exception:
except Exception as exc:
# If anything happens, just keep going
demisto.debug(f'Could not process IP: "{ip}"\n {str(exception)}')
demisto.debug(f'Could not process IP: "{ip}"\n {str(exc)}')
execution_metrics.general_error += 1
results.append(build_error_ip_output(client, ip))
continue
execution_metrics.success += 1
results.append(
build_ip_output(client, score_calculator, ip, raw_response, argToBoolean(args.get('extended_data', False))))
if len(results) == 0:
result = CommandResults(readable_output='No IPs were found.').to_context()
results.append(result)
if execution_metrics.is_supported():
_metric_results = execution_metrics.metrics
metric_results = cast(CommandResults, _metric_results)
Expand All @@ -1737,8 +1808,7 @@ def file_command(client: Client, score_calculator: ScoreCalculator, args: dict,
raw_response = client.file(file, relationships)
if raw_response.get('error', {}).get('code') == "QuotaExceededError":
execution_metrics.quota_error += 1
result = CommandResults(readable_output=f'Quota exceeded for file: {file}')
results.append(result)
results.append(build_quota_exceeded_file_output(client, file))
continue
if raw_response.get('error', {}).get('code') == 'NotFoundError':
results.append(build_unknown_file_output(client, file))
Expand All @@ -1749,10 +1819,8 @@ def file_command(client: Client, score_calculator: ScoreCalculator, args: dict,
# If anything happens, just keep going
demisto.debug(f'Could not process file: "{file}"\n {str(exc)}')
execution_metrics.general_error += 1
results.append(build_error_file_output(client, file))
continue
if len(results) == 0:
result = CommandResults(readable_output='No files were found.')
results.append(result)
if execution_metrics.is_supported():
_metric_results = execution_metrics.metrics
metric_results = cast(CommandResults, _metric_results)
Expand All @@ -1774,22 +1842,19 @@ def private_file_command(client: Client, args: dict) -> List[CommandResults]:
raw_response = client.private_file(file)
if raw_response.get('error', {}).get('code') == "QuotaExceededError":
execution_metrics.quota_error += 1
result = CommandResults(readable_output=f'Quota exceeded for file: {file}')
results.append(result)
results.append(build_quota_exceeded_file_output(client, file))
continue
if raw_response.get('error', {}).get('code') == 'NotFoundError':
results.append(CommandResults(readable_output=f'File "{file}" was not found in VirusTotal'))
results.append(build_unknown_file_output(client, file))
continue
results.append(build_private_file_output(file, raw_response))
execution_metrics.success += 1
except Exception as exc:
# If anything happens, just keep going
demisto.debug(f'Could not process file: "{file}"\n {str(exc)}')
execution_metrics.general_error += 1
results.append(build_error_file_output(client, file))
continue
if len(results) == 0:
result = CommandResults(readable_output='No files were found.')
results.append(result)
if execution_metrics.is_supported():
_metric_results = execution_metrics.metrics
metric_results = cast(CommandResults, _metric_results)
Expand All @@ -1811,19 +1876,19 @@ def url_command(client: Client, score_calculator: ScoreCalculator, args: dict, r
raw_response = client.url(url, relationships)
if raw_response.get('error', {}).get('code') == "QuotaExceededError":
execution_metrics.quota_error += 1
result = CommandResults(readable_output=f'Quota exceeded for url: {url}')
results.append(result)
results.append(build_quota_exceeded_url_output(client, url))
continue
if raw_response.get('error', {}).get('code') == 'NotFoundError':
results.append(build_unknown_url_output(client, url))
continue
except Exception as exception:
except Exception as exc:
# If anything happens, just keep going
demisto.debug(f'Could not process URL: "{url}".\n {str(exception)}')
demisto.debug(f'Could not process URL: "{url}".\n {str(exc)}')
execution_metrics.general_error += 1
results.append(build_error_url_output(client, url))
continue
execution_metrics.success += 1
results.append(build_url_output(client, score_calculator, url, raw_response, extended_data))
if len(results) == 0:
result = CommandResults(readable_output='No urls were found.')
results.append(result)
if execution_metrics.is_supported():
_metric_results = execution_metrics.metrics
metric_results = cast(CommandResults, _metric_results)
Expand All @@ -1844,21 +1909,21 @@ def domain_command(client: Client, score_calculator: ScoreCalculator, args: dict
raw_response = client.domain(domain, relationships)
if raw_response.get('error', {}).get('code') == "QuotaExceededError":
execution_metrics.quota_error += 1
result = CommandResults(readable_output=f'Quota exceeded for domain: {domain}')
results.append(result)
results.append(build_quota_exceeded_domain_output(client, domain))
continue
except Exception as exception:
if raw_response.get('error', {}).get('code') == 'NotFoundError':
results.append(build_unknown_domain_output(client, domain))
continue
except Exception as exc:
# If anything happens, just keep going
demisto.debug(f'Could not process domain: "{domain}"\n {str(exception)}')
demisto.debug(f'Could not process domain: "{domain}"\n {str(exc)}')
execution_metrics.general_error += 1
results.append(build_error_domain_output(client, domain))
continue
execution_metrics.success += 1
result = build_domain_output(client, score_calculator, domain, raw_response,
argToBoolean(args.get('extended_data', False)))
results.append(result)
if len(results) == 0:
result = CommandResults(readable_output='No domains were found.')
results.append(result)
if execution_metrics.is_supported():
_metric_results = execution_metrics.metrics
metric_results = cast(CommandResults, _metric_results)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1666,7 +1666,7 @@ script:
description: The analysis ID.
type: String

dockerimage: demisto/python3:3.10.13.83255
dockerimage: demisto/python3:3.10.13.86272
tests:
- VirusTotalV3-test
- VirusTotal (API v3) Detonate Test
51 changes: 49 additions & 2 deletions Packs/VirusTotal/Integrations/VirusTotalV3/VirusTotalV3_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,18 @@ def test_domain_command(mocker, requests_mock):
assert results[0].execution_metrics is None
assert results[0].outputs == expected_results

mock_response = {'error': {'code': 'NotFoundError'}}
requests_mock.get(f'https://www.virustotal.com/api/v3/domains/testing.com?relationships={domain_relationships}',
json=mock_response)

results = domain_command(
client=client, score_calculator=mocked_score_calculator,
args=demisto.args(), relationships=domain_relationships)

assert results[0].execution_metrics is None
assert results[0].readable_output == 'Domain "testing.com" was not found in VirusTotal'
assert results[0].indicator.dbot_score.score == 0


def test_ip_command(mocker, requests_mock):
"""
Expand Down Expand Up @@ -327,8 +339,8 @@ def test_ip_command(mocker, requests_mock):

assert results[1].execution_metrics == [{'APICallsCount': 1, 'Type': 'Successful'}]
assert results[0].execution_metrics is None
assert results[0].readable_output == ('Reputation lookups have been disabled for private IP addresses. '
'Enrichment skipped for 192.168.0.1')
assert results[0].readable_output == ('IP "192.168.0.1" was not enriched. '
'Reputation lookups have been disabled for private IP addresses.')

# Run command but enabling private IP enrichment after disabling it
mocker.patch.object(demisto, 'args', return_value={'ip': '192.168.0.1', 'extended_data': 'false',
Expand All @@ -342,6 +354,19 @@ def test_ip_command(mocker, requests_mock):
assert results[0].execution_metrics is None
assert results[0].outputs == expected_results

mock_response = {'error': {'code': 'NotFoundError'}}
requests_mock.get(f'https://www.virustotal.com/api/v3/ip_addresses/192.168.0.1?relationships={ip_relationships}',
json=mock_response)

results = ip_command(
client=client, score_calculator=mocked_score_calculator,
args=demisto.args(), relationships=ip_relationships,
disable_private_ip_lookup=True)

assert results[0].execution_metrics is None
assert results[0].readable_output == 'IP "192.168.0.1" was not found in VirusTotal'
assert results[0].indicator.dbot_score.score == 0


def test_url_command_success(mocker, requests_mock):
"""
Expand Down Expand Up @@ -383,6 +408,18 @@ def test_url_command_success(mocker, requests_mock):
assert results[0].execution_metrics is None
assert results[0].outputs == expected_results

mock_response = {'error': {'code': 'NotFoundError'}}
requests_mock.get(f'https://www.virustotal.com/api/v3/urls/{encode_url_to_base64(testing_url)}'
f'?relationships={url_relationships}', json=mock_response)

results = url_command(
client=client, score_calculator=mocked_score_calculator,
args=demisto.args(), relationships=url_relationships)

assert results[0].execution_metrics is None
assert results[0].readable_output == f'URL "{testing_url}" was not found in VirusTotal'
assert results[0].indicator.dbot_score.score == 0


def test_private_file_command(mocker, requests_mock):
"""
Expand Down Expand Up @@ -419,3 +456,13 @@ def test_private_file_command(mocker, requests_mock):
assert results[1].execution_metrics == [{'APICallsCount': 1, 'Type': 'Successful'}]
assert results[0].execution_metrics is None
assert results[0].outputs == expected_results

mock_response = {'error': {'code': 'NotFoundError'}}
requests_mock.get(f'https://www.virustotal.com/api/v3/private/files/{sha256}',
json=mock_response)

results = private_file_command(client=client, args=demisto.args())

assert results[0].execution_metrics is None
assert results[0].readable_output == f'File "{sha256}" was not found in VirusTotal'
assert results[0].indicator.dbot_score.score == 0
Loading
Loading