Skip to content

Commit

Permalink
adds tests for key rotation, improves fixtures
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Kononykhin <[email protected]>
  • Loading branch information
andkononykhin committed Jan 30, 2019
1 parent a989672 commit d73101f
Showing 1 changed file with 188 additions and 98 deletions.
286 changes: 188 additions & 98 deletions indy_node/test/nym_txn/test_nym_auth_rules.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import sys
import pytest

from enum import Enum, unique
Expand All @@ -14,32 +15,26 @@
from indy_node.test.helper import createHalfKeyIdentifierAndAbbrevVerkey


# FIXME terms
# - add/create/provision
# - remove/demote/revoke/blacklist/suspend/flush
# - owner/holder
# - signer/initiator ???
#
#
# TODO
# - more specific string patterns for auth exc check
# - DRY: send requests and check replies
#
#
# - ptomoter after demotion


# FIXTURES

class EnumBase(Enum):
def __str__(self):
return self.name


@unique
class ActionIds(Enum):
class ActionIds(EnumBase):
add = 0
demote = 1
rotate = 2


@unique
class Demotions(Enum):
class Demotions(EnumBase):
# other DID-without-verkey created by the demoter
self_created_no_verkey = 1
# other DID-with-verkey created by the demoter
Expand All @@ -50,6 +45,21 @@ class Demotions(Enum):
other_created_verkey = 4


@unique
class Rotations(EnumBase):
none_val = 1
val_val = 2
val_none = 3
none_none = 4


@unique
class Rotator(EnumBase):
self = 1
creator = 2
other = 3


# FIXME class name
class DIDWallet(object):
def __init__(self, did=None, role=Roles.CLIENT, verkey=None, creator=None, wallet_handle=None):
Expand All @@ -67,28 +77,57 @@ def wallet_did(self):
def auth_check(action_id, signer, dest):

is_self = signer.did == dest.did
# is_creator = signer.did == dest.creator.did
dest_with_verkey = dest.verkey is not None
is_owner = signer == (dest if dest.verkey is not None else dest.creator)

if action_id == ActionIds.add:
if signer.role == Roles.TRUSTEE:
return True
elif (signer.role == Roles.STEWARD and
dest.role in (Roles.CLIENT, Roles.TRUST_ANCHOR, Roles.NETWORK_MONITOR)):
return True
elif signer.role == Roles.TRUST_ANCHOR and dest.role == Roles.CLIENT:
return True
if dest.role in (Roles.TRUSTEE, Roles.STEWARD):
return signer.role == Roles.TRUSTEE
elif dest.role in (Roles.TRUST_ANCHOR, Roles.NETWORK_MONITOR):
return signer.role in (Roles.TRUSTEE, Roles.STEWARD)
elif dest.role == Roles.CLIENT:
return signer.role in (Roles.TRUSTEE, Roles.STEWARD, Roles.TRUST_ANCHOR)

elif action_id == ActionIds.demote:
if signer.role == Roles.TRUSTEE:
return True
elif (signer.role == Roles.TRUST_ANCHOR and dest.role == Roles.TRUST_ANCHOR and
is_self and dest_with_verkey):
return True
if dest.role in (Roles.TRUSTEE, Roles.STEWARD):
return signer.role == Roles.TRUSTEE
elif dest.role == Roles.TRUST_ANCHOR:
return ((signer.role == Roles.TRUSTEE) or
(signer.role == Roles.TRUST_ANCHOR and
is_self and is_owner))
elif dest.role == Roles.NETWORK_MONITOR:
return signer.role in (Roles.TRUSTEE, Roles.STEWARD)

elif action_id == ActionIds.rotate:
return is_owner

return False


def create_new_did(looper, sdk_pool_handle, creator, role, skipverkey=False):

op = {
'type': NYM,
'role': role.value
}

new_did_verkey = None

if skipverkey:
new_did, _ = createHalfKeyIdentifierAndAbbrevVerkey()
op.update({'dest': new_did})
else:
new_did, new_did_verkey = looper.loop.run_until_complete(
create_and_store_my_did(creator.wallet_handle, "{}"))

op.update({'dest': new_did, 'verkey': new_did_verkey})

req = sdk_sign_and_submit_op(looper, sdk_pool_handle, creator.wallet_did, op)
sdk_get_and_check_replies(looper, [req])

return DIDWallet(did=new_did, role=role, verkey=new_did_verkey,
creator=creator, wallet_handle=creator.wallet_handle)


@pytest.fixture(scope="module")
def client(sdk_wallet_client):
return DIDWallet(did=sdk_wallet_client[1], role=Roles.CLIENT, wallet_handle=sdk_wallet_client[0])
Expand All @@ -104,76 +143,111 @@ def steward(sdk_wallet_steward):
return DIDWallet(did=sdk_wallet_steward[1], role=Roles.STEWARD, wallet_handle=sdk_wallet_steward[0])


def idfn_enum(item):
return item.name
@pytest.fixture(scope="module", params=list(Roles))
def role(request):
return request.param


def _create_new_nym(looper, sdk_pool_handle, creator, role, *args, **kwargs):
new_did, new_did_verkey = looper.loop.run_until_complete(
create_and_store_my_did(creator.wallet_handle, "{}"))
def did_fixture_wrapper():
def _fixture(looper, sdk_pool_handle, txnPoolNodeSet, trustee, request):
marker = request.node.get_marker('skip_did_verkey')
return create_new_did(looper, sdk_pool_handle, trustee, request.param,
skipverkey=(marker is not None))
return _fixture

op = {'type': NYM,
'dest': new_did,
'role': role.value,
'verkey': new_did_verkey}
req = sdk_sign_and_submit_op(looper, sdk_pool_handle, (creator.wallet_handle, creator.did), op)
sdk_get_and_check_replies(looper, [req])

return DIDWallet(did=new_did, role=role, verkey=new_did_verkey, creator=creator, wallet_handle=creator.wallet_handle)
for scope in ('module', 'function'):
setattr(
sys.modules[__name__],
"did_per_{}".format(scope),
pytest.fixture(scope=scope, params=list(Roles))(did_fixture_wrapper()))


@pytest.fixture(scope="module",
params=[Roles.CLIENT, Roles.TRUSTEE, Roles.STEWARD],
ids=idfn_enum)
def provisioner(request, client, trustee, steward):
# TODO
# - wallets for TRUST_ANCHOR and NETWORK_MONITOR
return {
Roles.CLIENT: client,
Roles.TRUSTEE: trustee,
Roles.STEWARD: steward,
}[request.param]
@pytest.fixture(scope="module")
def provisioner(did_per_module):
return did_per_module


@pytest.fixture(scope="module", params=list(Roles) + [None],
ids=lambda r: str(r) if r else 'omitted_role')
def provisioned_role(request):
return request.param


@pytest.fixture(scope="module")
def provisioned(provisioned_role):
did, verkey = createHalfKeyIdentifierAndAbbrevVerkey()
return (DIDWallet(did=did,
role=provisioned_role if provisioned_role else Roles.CLIENT,
verkey=verkey),
provisioned_role is None)


# scope is 'function' since demoter demotes
# themselves at the end of the each demotion test
@pytest.fixture(scope="function",
params=list(Roles),
ids=idfn_enum)
def demoter(looper, sdk_pool_handle, txnPoolNodeSet, trustee, request):
return _create_new_nym(looper, sdk_pool_handle, trustee, request.param)
@pytest.fixture(scope="function")
def demoter(did_per_function):
return did_per_function


@pytest.fixture(scope="module", params=list(Roles), ids=idfn_enum)
def role(request):
@pytest.fixture(scope="function",
params=[(x, y) for x in Demotions for y in Roles] + [None],
ids=lambda p: "{}-{}".format(p[0], p[1]) if p else 'self')
def demotion(request):
return request.param


@pytest.fixture(scope="function")
def nym_op():
halfKeyIdentifier, abbrevVerkey = createHalfKeyIdentifierAndAbbrevVerkey()
return {
'type': NYM,
'dest': halfKeyIdentifier,
'verkey': abbrevVerkey,
}
def demoted(looper, sdk_pool_handle, txnPoolNodeSet, trustee, demoter, demotion):
if demotion is None: # self demotion
return demoter
else:
demotion_type, role = demotion
if demotion_type == Demotions.self_created_no_verkey:
if auth_check(ActionIds.add, demoter, DIDWallet(role=role)):
return create_new_did(looper, sdk_pool_handle, demoter, role, skipverkey=True)
elif demotion_type == Demotions.self_created_verkey:
if auth_check(ActionIds.add, demoter, DIDWallet(role=role)):
return create_new_did(looper, sdk_pool_handle, demoter, role)
elif demotion_type == Demotions.other_created_no_verkey:
return create_new_did(looper, sdk_pool_handle, trustee, role, skipverkey=True)
elif demotion_type == Demotions.other_created_verkey:
return create_new_did(looper, sdk_pool_handle, trustee, role)


# Note. dedicated trustee is used to test rotations by other
# (not creator and not self). Other other-rotators (e.g. TRUST_ANCHOR)
# are ignored as less powerful.
@pytest.fixture(scope="module")
def trustee_not_creator(looper, sdk_pool_handle, txnPoolNodeSet, trustee):
return create_new_did(looper, sdk_pool_handle, trustee, Roles.TRUSTEE)


@pytest.fixture(scope="function",
params=list(Demotions),
ids=idfn_enum)
# TODO parametrize by verkey in op
def demoted(looper, sdk_pool_handle, txnPoolNodeSet, trustee, demoter, role, request):
if request.param == Demotions.self_created_no_verkey:
if auth_check(ActionIds.add, demoter, DIDWallet(role=role)):
return _create_new_nym(looper, sdk_pool_handle, demoter, role, skipverkey=True)
elif request.param == Demotions.self_created_verkey:
if auth_check(ActionIds.add, demoter, DIDWallet(role=role)):
return _create_new_nym(looper, sdk_pool_handle, demoter, role)
elif request.param == Demotions.other_created_no_verkey:
return _create_new_nym(looper, sdk_pool_handle, trustee, role, skipverkey=True)
elif request.param == Demotions.other_created_verkey:
return _create_new_nym(looper, sdk_pool_handle, trustee, role)
@pytest.fixture(scope="function", params=list(Rotations))
def rotation_verkey(request):
if request.param in (Rotations.none_none, Rotations.none_val):
request.node.add_marker('skip_did_verkey')

verkey = None
if request.param in (Rotations.val_val, Rotations.none_val):
_, verkey_ = createHalfKeyIdentifierAndAbbrevVerkey()

return verkey


@pytest.fixture(scope="function", params=list(Rotator))
def rotator(did_per_function, trustee_not_creator, request):
if request.param == Rotator.self:
return did_per_function
elif request.param == Rotator.creator:
return did_per_function.creator
elif request.param == Rotator.other:
return trustee_not_creator


@pytest.fixture(scope="function")
def rotated(did_per_function):
return did_per_function


# TEST HELPERS
Expand All @@ -189,9 +263,20 @@ def sign_submit_check(looper, sdk_pool_handle, signer, dest, action_id, op):
excinfo.match('UnauthorizedClientRequest')


def demote(looper, sdk_pool_handle, txnPoolNodeSet,
demoter, demoted):
def add(looper, sdk_pool_handle, provisioner, provisioned, omit_role=False):
op = {
'type': NYM,
'dest': provisioned.did,
'verkey': provisioned.verkey,
}

if not omit_role:
op['role'] = provisioned.role.value

sign_submit_check(looper, sdk_pool_handle, provisioner, provisioned, ActionIds.add, op)


def demote(looper, sdk_pool_handle, demoter, demoted):
op = {
'type': NYM,
'dest': demoted.did,
Expand All @@ -201,28 +286,33 @@ def demote(looper, sdk_pool_handle, txnPoolNodeSet,
sign_submit_check(looper, sdk_pool_handle, demoter,
demoted, ActionIds.demote, op)

# TESTS

def rotate(looper, sdk_pool_handle, rotator, rotated, new_verkey):
op = {
'type': NYM,
'dest': rotated.did,
'verkey': new_verkey
}

def test_add_nym(looper, sdk_pool_handle, txnPoolNodeSet, nym_op, provisioner, role):
nym_op['role'] = role.value
sign_submit_check(looper, sdk_pool_handle, provisioner, DIDWallet(role=role), ActionIds.add, nym_op)

sign_submit_check(looper, sdk_pool_handle, rotator,
rotated, ActionIds.rotate, op)

def test_add_nym_omitted_role(looper, sdk_pool_handle, txnPoolNodeSet, nym_op, provisioner):
sign_submit_check(looper, sdk_pool_handle, provisioner, DIDWallet(role=role), ActionIds.add, nym_op)

# TESTS

# TODO parametrize by verkey in op
def test_demote_self_nym(
looper, sdk_pool_handle, txnPoolNodeSet,
demoter):
demote(looper, sdk_pool_handle, txnPoolNodeSet, demoter, demoter)
def test_nym_add(looper, sdk_pool_handle, txnPoolNodeSet, provisioner, provisioned):
provisioned, omit_role = provisioned
add(looper, sdk_pool_handle, provisioner, provisioned, omit_role=omit_role)


# TODO parametrize by verkey in op
def test_demote_nym(
looper, sdk_pool_handle, txnPoolNodeSet,
demoter, demoted):
# Demotion is considered as NYM with only 'role' field specified and it's None.
# If NYM includes 'verkey' field as well it mixes role demotion/promotion and
# verkey rotation and should be checked separately. TODO mixed cases
def test_nym_demote(looper, sdk_pool_handle, txnPoolNodeSet, demoter, demoted):
# might be None for cases 'self_created_no_verkey' and 'self_created_verkey' or self demotion
if demoted:
demote(looper, sdk_pool_handle, txnPoolNodeSet, demoter, demoted)
demote(looper, sdk_pool_handle, demoter, demoted)


def test_nym_rotate(looper, sdk_pool_handle, txnPoolNodeSet, rotator, rotated, rotation_verkey):
rotate(looper, sdk_pool_handle, rotator, rotated, rotation_verkey)

0 comments on commit d73101f

Please sign in to comment.