Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sending JWT headers to user_loader function #37

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,10 @@ language: python

matrix:
include:
- python: 2.6
env:
- WITH_JWT=false
WITH_HAWK=true
- python: 2.7
env:
- WITH_JWT=true
WITH_HAWK=true
- python: 3.3
env:
- WITH_JWT=false
WITH_HAWK=false
SETUPTOOLS="setuptools<40.0.0"
- python: 3.4
env:
- WITH_JWT=true
Expand Down
35 changes: 33 additions & 2 deletions falcon_auth/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@
except ImportError:
pass

try:
# Optional dependencies for key discovery JWT backend
import urllib.request
import json
from jwt.algorithms import RSAAlgorithm
except ImportError:
pass

from falcon_auth.serializer import ExtendedJSONEncoder


Expand Down Expand Up @@ -153,13 +161,20 @@ class JWTAuthBackend(AuthBackend):
as value of ``iss`` field in the jwt payload. It will also be checked
against the ``iss`` field while decoding.

key_discovery_url(string, optional): Specifies the URL that will be used
to determine which public key to verify incoming JWT with. A JSON
array from this URL will be parsed and the ``kid`` (key id) field of
each object in the array will be compared against the ``kid`` field
of the incoming JWT.

"""

def __init__(self, user_loader, secret_key,
algorithm='HS256', auth_header_prefix='jwt',
leeway=0, expiration_delta=24 * 60 * 60,
audience=None, issuer=None,
verify_claims=None, required_claims=None):
verify_claims=None, required_claims=None,
key_discovery_url=None):

try:
jwt
Expand All @@ -176,6 +191,7 @@ def __init__(self, user_loader, secret_key,
self.issuer = issuer
self.verify_claims = verify_claims or ['signature', 'exp', 'nbf', 'iat']
self.required_claims = required_claims or ['exp', 'iat', 'nbf']
self.key_discovery_url = key_discovery_url

if 'aud' in self.verify_claims and not audience:
raise ValueError('Audience parameter must be provided if '
Expand All @@ -185,13 +201,28 @@ def __init__(self, user_loader, secret_key,
raise ValueError('Issuer parameter must be provided if '
'`iss` claim needs to be verified')

def _get_discovery_json(self):
with urllib.request.urlopen(self.key_discovery_url) as url:
return json.loads(url.read().decode())

def _discover_key(self, key_id):
data = self._get_discovery_json()
for key in data["keys"]:
if key_id == key["kid"]:
self.secret_key = RSAAlgorithm.from_jwk(json.dumps(key))

def _decode_jwt_token(self, req):

# Decodes the jwt token into a payload
auth_header = req.get_header('Authorization')
token = self.parse_auth_token_from_request(auth_header=auth_header)

options = dict(('verify_' + claim, True) for claim in self.verify_claims)
if(self.key_discovery_url):
headers = jwt.get_unverified_header(token)
self._discover_key(headers["kid"])

options = dict(('verify_' + claim, True)
for claim in self.verify_claims)

options.update(
dict(('require_' + claim, True) for claim in self.required_claims)
Expand Down
2 changes: 2 additions & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,5 @@ codecov==2.0.3
coverage==4.0.3
python-coveralls==2.9.0
PyYAML<=3.13
urllib3==1.25.7
cryptography==2.8
70 changes: 67 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

try:
import jwt
import json
from jwt.algorithms import RSAAlgorithm
jwt_available = pytest.mark.skipif(False, reason="jwt not installed")
except ImportError:
jwt_available = pytest.mark.skipif(True, reason="jwt not installed")
Expand Down Expand Up @@ -160,7 +162,7 @@ def user_loader(payload):
return JWTAuthBackend(user_loader, SECRET_KEY)


def get_jwt_token(user, prefix='JWT'):
def get_jwt_token(user, prefix='JWT', algorithm = "HS256", secret_key = SECRET_KEY):
now = datetime.utcnow()
payload = {
'user': {
Expand All @@ -172,8 +174,8 @@ def get_jwt_token(user, prefix='JWT'):
'exp': now + timedelta(seconds=EXPIRATION_DELTA)
}

jwt_token = jwt.encode(payload, SECRET_KEY,
json_encoder=ExtendedJSONEncoder).decode('utf-8')
jwt_token = jwt.encode(payload, secret_key,
json_encoder=ExtendedJSONEncoder, algorithm=algorithm, headers={'kid':'KeyID'}).decode('utf-8')
return '{prefix} {jwt_token}'.format(prefix=prefix, jwt_token=jwt_token)


Expand All @@ -188,6 +190,68 @@ def auth_token(self, user):

return get_jwt_token(user)


class KeyDiscoveryJWTAuthFixture:

@pytest.fixture(scope='function')
def auth_token(self, user):
# Need to sign the generated JWT with a known private key so it can be verified later
return get_jwt_token(user, algorithm = "RS256", secret_key = '''-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAqOvwMnXHNjseNWf1La5cnD3O3KCiFCafRuQYXtHMPZbzge3D
i9MVoXzoDveXbxGGJtLWoIrEy2OXfzTxP5z5irLvAVRca6h3/d4Fe37x0PH8Atel
nYZu3lKZGGPCVRwitX23qqqA/WWHfxUdrflD1V2ipUmWxYsb/NV8MZ9pUmedwpG7
GjAOrpNP+9aSwUrYys08iCHCwLQpLEiwtneHxdVUvd44mJtpBsxbBLQjwHW2CkPq
mkZv4Tb7451EOCMRIGMJLnUrc0ESGmXm8NMmdL3Hbdu0sg3lrIZiiG1twLaO4Y6z
JSRjvkXKpMakFiJTH+FsGI0/U+ntxXC1F4wL1wIDAQABAoIBAQCHuoSW1wIJtjjQ
qsZbPTXWqOc1abCxxlLG0HIwhhy5BDiHFre/+wzvZADGPfUk3ozPVyvzdW0pC83n
/W83MPdlld7rT5CvRH+dsa7wCxFcVYOr+QBu8VzWMMIo0ceNQX02HVzdugDJGrJj
z2C4sIfrwj/01YtbESqc3iDbcn5bIezwVmWunAv0JtszGecwVaTHw//7meXIRRFT
/jzf7dIiNQCX8UbI4/XsL29iNEL6uaK9V3uYN0kEWhBtRoShYWI2M8zOr98u1SQd
JIe+vaxTpFwYQI39tArcjZCL0D6rK8Fvx1YxzGDMsmddjKxkIUU3AWe604ord1xZ
z1p892PZAoGBAOTNMxJOubO0eLaJZkHIZ2SyLO3ryxxlADUT3N+wuRV+ovGZ/bbI
sC7kaAwCJilC79uIlOVFsIijkM5HjdP7l+I9nSEv5o2mO5GzdP+oz6srR23iDuqD
O2F1Q9sAfwWCjFHmFPdsTzIkVLOdGgWHUCoBSc1dVqS6UtlIF0pcngj1AoGBAL0A
fvuwB8dQVSnuFvOO11zLDsVC+IA0YRd8kIunQggLkf0uTPQB2itKpJ5DbEIpukXB
aVx1hwasBgNqUCyW9QnpWb+N52RtYVl61fxtZ+3MGyUkMHBe82CS1BJQdoQmhsc0
sPRvar/AEbrDsPw5iHMKqECd9eXWA/9K9ixo1HIbAoGAX6lv5gKmX/1fzyoJaA2r
NQ3N/Tft9xQ/jvGcEqan69XDuPIigy7Lgv+ahRLM88l50bb8UhPeKHMC00xVf0Ed
EsmiDcMiSS0skNGQZGgnU7DHr6ipheGSjT/jPAisExivJHrnXz+YqSVJiMNxosgd
e0KIoeWZmUwR4ajjnAK3TJUCgYBG/6e0DpVtdyz22ly+07rtPc5npdfJ+WM7umxm
OcehVA9cZ4c65nM5bgnW9gb198zkpVpaBEBb7kU4BTjm9zJHreQsBDeXT0uRnIZE
FClFeDX+RtD3dYPBlIab9qP+0qYwsQeEW1Jjg9hlK1wR897hMHCyDWSxGStZPKSr
XBnqXwKBgQDGu4MZlszh2tZrlQBoJEQwWJWQi3JT3W3FQYVtRNhJD9op8l2aBERU
Ia750TQvkhiTz537ukiHFBZRZAwaus85XLXGHqyXf2DboEjpGc/Tfy0PgogRGrLN
BTyTyCCAxb88aYZ9XCibjNA/ik8wB2jvfz7dOTv2Jop1imRYOmauww==
-----END RSA PRIVATE KEY-----
''')

@pytest.fixture(scope='function')
def backend(self, user):
return self.key_discovery_jwt_backend(user)

@pytest.fixture(scope='function')
def key_discovery_json(self):
return {
'keys':[
{
"kty": "RSA",
"e": "AQAB",
"use": "sig",
"kid": "KeyID",
"alg": "RS256",
"n": "qOvwMnXHNjseNWf1La5cnD3O3KCiFCafRuQYXtHMPZbzge3Di9MVoXzoDveXbxGGJtLWoIrEy2OXfzTxP5z5irLvAVRca6h3_d4Fe37x0PH8AtelnYZu3lKZGGPCVRwitX23qqqA_WWHfxUdrflD1V2ipUmWxYsb_NV8MZ9pUmedwpG7GjAOrpNP-9aSwUrYys08iCHCwLQpLEiwtneHxdVUvd44mJtpBsxbBLQjwHW2CkPqmkZv4Tb7451EOCMRIGMJLnUrc0ESGmXm8NMmdL3Hbdu0sg3lrIZiiG1twLaO4Y6zJSRjvkXKpMakFiJTH-FsGI0_U-ntxXC1F4wL1w"
}
]
}

def key_discovery_jwt_backend(self, user):
def user_loader(payload):
if user.id == payload['user']['id']:
return user
return None

return JWTAuthBackend(user_loader, "", "RS256", key_discovery_url="https://test.discovery.com")


@pytest.fixture(scope='function')
def hawk_backend(user):
Expand Down
11 changes: 11 additions & 0 deletions tests/test_auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,17 @@ def test_backend_get_auth_token(self, user, backend):
auth_token = backend.get_auth_token(user_payload)
decoded_token = jwt.decode(auth_token, SECRET_KEY)
assert decoded_token['user'] == user_payload

@jwt_available
class TestWithKeyDiscoveryUrlJwtAuth(KeyDiscoveryJWTAuthFixture, ResourceFixture):

def test_x(self, mocker, client, user, backend, auth_token, key_discovery_json):
mocker.patch.object(backend, '_get_discovery_json')
backend._get_discovery_json.return_value = key_discovery_json

resp = simulate_request(client, '/auth', auth_token=auth_token)
assert resp.status_code == 200
assert resp.json == user.to_dict()


@hawk_available
Expand Down