Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't force fuzzing auth headers for API endpoints #17358

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
62 changes: 60 additions & 2 deletions w3af/core/data/parsers/doc/open_api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def __init__(self, http_response, no_validation=False, discover_fuzzable_headers
self.api_calls = []
self.no_validation = no_validation
self.discover_fuzzable_headers = discover_fuzzable_headers
self.fuzzed_auth_headers = []

@staticmethod
def content_type_match(http_resp):
Expand Down Expand Up @@ -146,10 +147,11 @@ def parse(self):
specification_handler = SpecificationHandler(self.get_http_response(),
self.no_validation)

self.fuzzed_auth_headers = []
for data in specification_handler.get_api_information():
try:
request_factory = RequestFactory(*data)
fuzzable_request = request_factory.get_fuzzable_request(self.discover_fuzzable_headers)
fuzzable_request = request_factory.get_fuzzable_request()
except Exception, e:
#
# This is a strange situation because parsing of the OpenAPI
Expand Down Expand Up @@ -178,9 +180,65 @@ def parse(self):
if not self._should_audit(fuzzable_request):
continue

if self.discover_fuzzable_headers:
operation = data[4]
headers = self._get_parameter_headers(operation)
fuzzable_request.set_force_fuzzing_headers(headers)

self.api_calls.append(fuzzable_request)

def _should_audit(self, fuzzable_request):
def _get_parameter_headers(self, operation):
"""
Looks for all parameters which are passed to the endpoint via headers.
The method filters out headers with auth info
which are defined in 'securityDefinitions' section of the API spec.

:param operation: An instance of Operation class
which represents the API endpoint.
:return: A list of unique header names.
"""
parameter_headers = set()
for parameter_name, parameter in operation.params.iteritems():
if parameter.location == 'header':

# Fuzz auth headers only once.
if self._is_already_fuzzed_auth_header(parameter.name, operation.swagger_spec):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parser shouldn't have any responsibility associated with fuzzing. The parser should just return what is found in the OpenAPI, then other parts of the code should be responsible of fuzzing (or not) the authentication header.

Not sure where to put this... maybe the solution is to modify the fuzzer.py code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. I wanted to have only API scans in scope. Then, I think the logic should look like the following:

  • We need to maintain a list of parameters which is used for auth. The list may be stored in the KB.
  • The OpenAPI plugin should add auth parameters to this list (it should consider parameters in headers and query string)
  • Probably we should also consider auth settings defined by http-settings and auth plugins but I would implement it separately.
  • create_mutants() method should take the list into account, or we can update HeadersMutant and QSMutant classes to take care about fuzzing auth parameters only once.

Copy link
Contributor Author

@artem-smotrakov artem-smotrakov Nov 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it sound okay? If yes, I'll try to implement it.

continue

parameter_headers.add(parameter.name)
om.out.debug('Found a parameter header for %s endpoint: %s'
% (operation.path_name, parameter.name))

return list(parameter_headers)

def _is_already_fuzzed_auth_header(self, name, spec):
if not self._is_auth_header(name, spec):
return False

if name in self.fuzzed_auth_headers:
return True

self.fuzzed_auth_headers.append(name)
return False

@staticmethod
def _is_auth_header(name, spec):
"""
:param name: Header name.
:param spec: API specification.
:return: True if this is an auth header, False otherwise.
"""
for key, auth in spec.security_definitions.iteritems():
if not hasattr(auth, 'location') or not hasattr(auth, 'name'):
continue

if auth.location == 'header' and auth.name == name:
return True

return False

@staticmethod
def _should_audit(fuzzable_request):
"""
We want to make sure that w3af doesn't delete all the items from the
REST API, so we ignore DELETE calls.
Expand Down
34 changes: 5 additions & 29 deletions w3af/core/data/parsers/doc/open_api/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,45 +56,21 @@ def __init__(self, spec, api_resource_name, resource, operation_name,
self.operation = operation
self.parameters = parameters

def get_fuzzable_request(self, discover_fuzzable_headers=False):
def get_fuzzable_request(self):
"""
Creates a fuzzable request by querying different parts of the spec
parameters, operation, etc.

:param discover_fuzzable_headers: If it's set to true,
then all fuzzable headers will be added to the fuzzable request.
:return: A fuzzable request.
"""
method = self.get_method()
uri = self.get_uri()
headers = self.get_headers()
data_container = self.get_data_container(headers)

fuzzable_request = FuzzableRequest(uri,
headers=headers,
post_data=data_container,
method=method)

if discover_fuzzable_headers:
fuzzable_request.set_force_fuzzing_headers(self._get_parameter_headers())

return fuzzable_request

def _get_parameter_headers(self):
"""
Looks for all parameters which are passed to the endpoint via headers.

:return: A list of unique header names.
"""
parameter_headers = set()
for parameter_name in self.parameters:
parameter = self.parameters[parameter_name]
if parameter.location == 'header':
parameter_headers.add(parameter.name)
om.out.debug('Found a parameter header for %s endpoint: %s'
% (self.operation.path_name, parameter.name))

return list(parameter_headers)
return FuzzableRequest(uri,
headers=headers,
post_data=data_container,
method=method)

def _bravado_construct_request(self):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ def get_specification(self):
return file('%s/data/multiple_paths_and_headers.json' % CURRENT_PATH).read()


class PetstoreModel(object):

@staticmethod
def get_specification():
return file('%s/data/swagger.json' % CURRENT_PATH).read()


class IntParamPath(object):
def get_specification(self):
spec = APISpec(
Expand Down
27 changes: 26 additions & 1 deletion w3af/core/data/parsers/doc/open_api/tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from w3af.core.data.parsers.doc.open_api import OpenAPI
from w3af.core.data.url.HTTPResponse import HTTPResponse

from w3af.core.data.parsers.doc.open_api.tests.example_specifications import PetstoreModel


# Order them to be able to easily assert things
def by_path(fra, frb):
Expand Down Expand Up @@ -434,7 +436,30 @@ def test_is_valid_json_or_yaml_false(self):
http_resp = self.generate_response('"', 'image/jpeg')
self.assertFalse(OpenAPI.is_valid_json_or_yaml(http_resp))

def generate_response(self, specification_as_string, content_type='application/json'):
def test_no_forcing_fuzzing_auth_headers(self):
specification_as_string = PetstoreModel().get_specification()
http_response = self.generate_response(specification_as_string)
parser = OpenAPI(http_response)
parser.parse()
api_calls = parser.get_api_calls()

self.assertTrue(len(api_calls) > 0)
found_api_key = False
for fuzzable_request in api_calls:
fuzzing_headers = fuzzable_request.get_force_fuzzing_headers()
self.assertNotIn('Authorization', fuzzing_headers)
if found_api_key:
self.assertNotIn('api_key', fuzzing_headers)
else:
found_api_key = 'api_key' in fuzzing_headers

# Make sure that we forced to fuzz 'api_key' header once.
self.assertTrue(found_api_key)
self.assertEquals(1, len(parser.fuzzed_auth_headers))
self.assertIn('api_key', parser.fuzzed_auth_headers)

@staticmethod
def generate_response(specification_as_string, content_type='application/json'):
url = URL('http://www.w3af.com/swagger.json')
headers = Headers([('content-type', content_type)])
return HTTPResponse(200, specification_as_string, headers,
Expand Down
5 changes: 4 additions & 1 deletion w3af/core/data/parsers/doc/open_api/tests/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@


class TestRequests(unittest.TestCase):
def generate_response(self, specification_as_string):

@staticmethod
def generate_response(specification_as_string):
url = URL('http://www.w3af.com/swagger.json')
headers = Headers([('content-type', 'application/json')])
return HTTPResponse(200, specification_as_string, headers,
Expand Down Expand Up @@ -380,3 +382,4 @@ def test_dereferenced_pet_store(self):
self.assertEqual(fuzzable_request.get_uri().url_string, e_url)
self.assertEqual(fuzzable_request.get_headers(), e_headers)
self.assertEqual(fuzzable_request.get_data(), e_data)

81 changes: 78 additions & 3 deletions w3af/plugins/tests/crawl/test_open_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
along with w3af; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
"""
import json
import re
import urllib

from w3af.plugins.audit.sqli import sqli
from w3af.plugins.tests.helper import PluginTest, PluginConfig, MockResponse
from w3af.core.data.dc.headers import Headers
from w3af.core.data.parsers.doc.open_api.tests.example_specifications import (IntParamQueryString,
NestedModel)
NestedModel,
PetstoreModel)


class TestOpenAPIFindAllEndpointsWithAuth(PluginTest):
Expand Down Expand Up @@ -126,13 +127,14 @@ class TestOpenAPINestedModelSpec(PluginTest):
}

class SQLIMockResponse(MockResponse):

def get_response(self, http_request, uri, response_headers):
basic = http_request.headers.get('Basic', '')
if basic != TestOpenAPINestedModelSpec.BEARER:
return 401, response_headers, ''

# The body is in json format, need to escape my double quotes
request_body = str(http_request.parsed_body)
request_body = json.dumps(http_request.parsed_body)
payloads = [p.replace('"', '\\"') for p in sqli.SQLI_STRINGS]

response_body = 'Sunny outside'
Expand Down Expand Up @@ -203,6 +205,79 @@ def by_path(fra, frb):
self.assertEqual(len(vulns), 2)


class TestOpenAPIFuzzAuthHeaders(PluginTest):

api_key = 'zzz'
target_url = 'http://petstore.swagger.io/'

_run_configs = {
'cfg': {
'target': target_url,
'plugins': {'crawl': (PluginConfig('open_api',

('header_auth',
'api_key: %s' % api_key,
PluginConfig.HEADER),

),),
'audit': (PluginConfig('sqli'),)}
}
}

class SQLIMockResponse(MockResponse):

def get_response(self, http_request, uri, response_headers):
api_key = http_request.headers.get('api_key', '')

for payload in sqli.SQLI_STRINGS:
if payload in api_key:
return self.status, response_headers, 'PostgreSQL query failed:'

if api_key != TestOpenAPIFuzzAuthHeaders.api_key:
return 401, response_headers, ''

return self.status, response_headers, 'Sunny outside'

MOCK_RESPONSES = [MockResponse('http://petstore.swagger.io/openapi.json',
PetstoreModel().get_specification(),
content_type='application/json'),

SQLIMockResponse(re.compile('http://petstore.swagger.io/v2/.*'),
body=None,
method='GET',
status=200),

SQLIMockResponse(re.compile('http://petstore.swagger.io/v2/.*'),
body=None,
method='POST',
status=200),

SQLIMockResponse(re.compile('http://petstore.swagger.io/v2/.*'),
body=None,
method='PUT',
status=200)
]

def test_fuzz_auth_header_only_once(self):
cfg = self._run_configs['cfg']
self._scan(cfg['target'], cfg['plugins'])

#
# Since we configured authentication we should only get one of the Info
#
infos = self.kb.get('open_api', 'open_api')
self.assertEqual(len(infos), 1, infos)

info_i = infos[0]
self.assertEqual(info_i.get_name(), 'Open API specification found')

vulns = self.kb.get('sqli', 'sqli')
self.assertEqual(len(vulns), 1)

vuln = vulns[0]
self.assertEquals('SQL injection', vuln.get_name())


class TestOpenAPIRaisesWarningIfNoAuth(PluginTest):
target_url = 'http://w3af.org/'

Expand Down