Skip to content

Commit

Permalink
adds separate auth tests for NYM
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Kononykhin <[email protected]>
  • Loading branch information
andkononykhin committed Jan 27, 2019
1 parent 3ca8018 commit a989672
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 97 deletions.
6 changes: 6 additions & 0 deletions indy_common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,12 @@
ENDPOINT = "endpoint"

# Roles

# FIXME can be automated by oteration through Roles
# but it would be less self-descriptive
CLIENT = Roles.CLIENT.value
CLIENT_STRING = None

TRUST_ANCHOR = Roles.TRUST_ANCHOR.value
TRUST_ANCHOR_STRING = 'TRUST_ANCHOR'

Expand Down
1 change: 1 addition & 0 deletions indy_common/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Roles(Enum):
# These numeric constants CANNOT be changed once they have been used,
# because that would break backwards compatibility with the ledger
# Also the numeric constants CANNOT collide with the roles in plenum
CLIENT = None # FIXME might makes sense to move to plenum
TRUSTEE = Roles.TRUSTEE.value
STEWARD = Roles.STEWARD.value
TRUST_ANCHOR = "101"
Expand Down
2 changes: 1 addition & 1 deletion indy_node/test/nym_txn/test_demote_network_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@ def test_network_monitor_suspension_by_itself(looper,
op['verkey'] = new_network_monitor_verkey
req = sdk_sign_and_submit_op(looper, sdk_pool_handle, (sdk_wallet_handle, new_network_monitor_did), op)
with pytest.raises(RequestRejectedException):
sdk_get_and_check_replies(looper, [req])
sdk_get_and_check_replies(looper, [req])
21 changes: 1 addition & 20 deletions indy_node/test/nym_txn/test_nym.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,10 @@
from plenum.test.pool_transactions.helper import sdk_add_new_nym


def test_non_steward_cannot_create_trust_anchor(
nodeSet, looper, sdk_pool_handle, sdk_wallet_steward):
sdk_wallet_client = sdk_add_new_nym(looper, sdk_pool_handle, sdk_wallet_steward)
with pytest.raises(RequestRejectedException) as e:
sdk_add_new_nym(looper, sdk_pool_handle, sdk_wallet_client, role=TRUST_ANCHOR_STRING)
e.match('There is no accepted constraint')


def testStewardCreatesATrustAnchor(looper, sdk_pool_handle, sdk_wallet_steward):
sdk_add_new_nym(looper, sdk_pool_handle, sdk_wallet_steward, role=TRUST_ANCHOR_STRING)


# FIXME why is it necessary to check
def testStewardCreatesAnotherTrustAnchor(looper, sdk_pool_handle, sdk_wallet_steward):
sdk_add_new_nym(looper, sdk_pool_handle, sdk_wallet_steward, role=TRUST_ANCHOR_STRING)


def test_non_trust_anchor_cannot_create_user(
nodeSet, looper, sdk_pool_handle, sdk_wallet_steward):
sdk_wallet_client = sdk_add_new_nym(looper, sdk_pool_handle, sdk_wallet_steward)
with pytest.raises(RequestRejectedException) as e:
sdk_add_new_nym(looper, sdk_pool_handle, sdk_wallet_client)
e.match('There is no accepted constraint')


def testTrustAnchorCreatesAUser(sdk_user_wallet_a):
pass
228 changes: 228 additions & 0 deletions indy_node/test/nym_txn/test_nym_auth_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import pytest

from enum import Enum, unique

from indy.did import create_and_store_my_did

from plenum.common.constants import TRUSTEE, STEWARD, NYM
from plenum.common.exceptions import RequestRejectedException
from plenum.test.helper import sdk_sign_and_submit_op, sdk_get_and_check_replies
from plenum.test.pool_transactions.helper import sdk_add_new_nym

from indy_common.constants import CLIENT, TRUST_ANCHOR, NETWORK_MONITOR
from indy_common.roles import Roles
from indy_node.test.helper import createHalfKeyIdentifierAndAbbrevVerkey


# FIXME terms
# - add/create/provision
# - remove/demote/revoke/blacklist/suspend/flush
# - owner/holder
# - signer/initiator ???
#
#
# TODO
# - more specific string patterns for auth exc check
# - DRY: send requests and check replies
#
#
# - ptomoter after demotion


# FIXTURES


@unique
class ActionIds(Enum):
add = 0
demote = 1


@unique
class Demotions(Enum):
# other DID-without-verkey created by the demoter
self_created_no_verkey = 1
# other DID-with-verkey created by the demoter
self_created_verkey = 2
# other DID-without-verkey created by other
other_created_no_verkey = 3
# other DID-with-verkey created by other
other_created_verkey = 4


# FIXME class name
class DIDWallet(object):
def __init__(self, did=None, role=Roles.CLIENT, verkey=None, creator=None, wallet_handle=None):
self.did = did
self.role = role
self.verkey = verkey
self.creator = creator
self.wallet_handle = wallet_handle

@property
def wallet_did(self):
return (self.wallet_handle, self.did)


def auth_check(action_id, signer, dest):

is_self = signer.did == dest.did
# is_creator = signer.did == dest.creator.did
dest_with_verkey = dest.verkey is not None

if action_id == ActionIds.add:
if signer.role == Roles.TRUSTEE:
return True
elif (signer.role == Roles.STEWARD and
dest.role in (Roles.CLIENT, Roles.TRUST_ANCHOR, Roles.NETWORK_MONITOR)):
return True
elif signer.role == Roles.TRUST_ANCHOR and dest.role == Roles.CLIENT:
return True

elif action_id == ActionIds.demote:
if signer.role == Roles.TRUSTEE:
return True
elif (signer.role == Roles.TRUST_ANCHOR and dest.role == Roles.TRUST_ANCHOR and
is_self and dest_with_verkey):
return True

return False


@pytest.fixture(scope="module")
def client(sdk_wallet_client):
return DIDWallet(did=sdk_wallet_client[1], role=Roles.CLIENT, wallet_handle=sdk_wallet_client[0])


@pytest.fixture(scope="module")
def trustee(sdk_wallet_trustee):
return DIDWallet(did=sdk_wallet_trustee[1], role=Roles.TRUSTEE, wallet_handle=sdk_wallet_trustee[0])


@pytest.fixture(scope="module")
def steward(sdk_wallet_steward):
return DIDWallet(did=sdk_wallet_steward[1], role=Roles.STEWARD, wallet_handle=sdk_wallet_steward[0])


def idfn_enum(item):
return item.name


def _create_new_nym(looper, sdk_pool_handle, creator, role, *args, **kwargs):
new_did, new_did_verkey = looper.loop.run_until_complete(
create_and_store_my_did(creator.wallet_handle, "{}"))

op = {'type': NYM,
'dest': new_did,
'role': role.value,
'verkey': new_did_verkey}
req = sdk_sign_and_submit_op(looper, sdk_pool_handle, (creator.wallet_handle, creator.did), op)
sdk_get_and_check_replies(looper, [req])

return DIDWallet(did=new_did, role=role, verkey=new_did_verkey, creator=creator, wallet_handle=creator.wallet_handle)


@pytest.fixture(scope="module",
params=[Roles.CLIENT, Roles.TRUSTEE, Roles.STEWARD],
ids=idfn_enum)
def provisioner(request, client, trustee, steward):
# TODO
# - wallets for TRUST_ANCHOR and NETWORK_MONITOR
return {
Roles.CLIENT: client,
Roles.TRUSTEE: trustee,
Roles.STEWARD: steward,
}[request.param]


# scope is 'function' since demoter demotes
# themselves at the end of the each demotion test
@pytest.fixture(scope="function",
params=list(Roles),
ids=idfn_enum)
def demoter(looper, sdk_pool_handle, txnPoolNodeSet, trustee, request):
return _create_new_nym(looper, sdk_pool_handle, trustee, request.param)


@pytest.fixture(scope="module", params=list(Roles), ids=idfn_enum)
def role(request):
return request.param


@pytest.fixture(scope="function")
def nym_op():
halfKeyIdentifier, abbrevVerkey = createHalfKeyIdentifierAndAbbrevVerkey()
return {
'type': NYM,
'dest': halfKeyIdentifier,
'verkey': abbrevVerkey,
}


@pytest.fixture(scope="function",
params=list(Demotions),
ids=idfn_enum)
# TODO parametrize by verkey in op
def demoted(looper, sdk_pool_handle, txnPoolNodeSet, trustee, demoter, role, request):
if request.param == Demotions.self_created_no_verkey:
if auth_check(ActionIds.add, demoter, DIDWallet(role=role)):
return _create_new_nym(looper, sdk_pool_handle, demoter, role, skipverkey=True)
elif request.param == Demotions.self_created_verkey:
if auth_check(ActionIds.add, demoter, DIDWallet(role=role)):
return _create_new_nym(looper, sdk_pool_handle, demoter, role)
elif request.param == Demotions.other_created_no_verkey:
return _create_new_nym(looper, sdk_pool_handle, trustee, role, skipverkey=True)
elif request.param == Demotions.other_created_verkey:
return _create_new_nym(looper, sdk_pool_handle, trustee, role)


# TEST HELPERS

def sign_submit_check(looper, sdk_pool_handle, signer, dest, action_id, op):
req = sdk_sign_and_submit_op(looper, sdk_pool_handle, signer.wallet_did, op)

if auth_check(action_id, signer, dest):
sdk_get_and_check_replies(looper, [req])
else:
with pytest.raises(RequestRejectedException) as excinfo:
sdk_get_and_check_replies(looper, [req])
excinfo.match('UnauthorizedClientRequest')


def demote(looper, sdk_pool_handle, txnPoolNodeSet,
demoter, demoted):

op = {
'type': NYM,
'dest': demoted.did,
'role': None
}

sign_submit_check(looper, sdk_pool_handle, demoter,
demoted, ActionIds.demote, op)

# TESTS


def test_add_nym(looper, sdk_pool_handle, txnPoolNodeSet, nym_op, provisioner, role):
nym_op['role'] = role.value
sign_submit_check(looper, sdk_pool_handle, provisioner, DIDWallet(role=role), ActionIds.add, nym_op)


def test_add_nym_omitted_role(looper, sdk_pool_handle, txnPoolNodeSet, nym_op, provisioner):
sign_submit_check(looper, sdk_pool_handle, provisioner, DIDWallet(role=role), ActionIds.add, nym_op)


# TODO parametrize by verkey in op
def test_demote_self_nym(
looper, sdk_pool_handle, txnPoolNodeSet,
demoter):
demote(looper, sdk_pool_handle, txnPoolNodeSet, demoter, demoter)


# TODO parametrize by verkey in op
def test_demote_nym(
looper, sdk_pool_handle, txnPoolNodeSet,
demoter, demoted):
if demoted:
demote(looper, sdk_pool_handle, txnPoolNodeSet, demoter, demoted)
9 changes: 4 additions & 5 deletions indy_node/test/nym_txn/test_nym_blacklisting.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ def test_steward_suspension_by_another_trustee(looper,
sdk_get_and_check_replies(looper, [req])


def test_steward_cannot_create_trust_anchors_after_demote (looper,
sdk_pool_handle,
sdk_wallet_trustee,
sdk_wallet_handle):
def test_steward_cannot_create_trust_anchors_after_demote(looper,
sdk_pool_handle,
sdk_wallet_trustee,
sdk_wallet_handle):
new_steward_did, new_steward_verkey = looper.loop.run_until_complete(
did.create_and_store_my_did(sdk_wallet_trustee[0], "{}"))
new_ta_did, new_ta_verkey = looper.loop.run_until_complete(
Expand Down Expand Up @@ -79,4 +79,3 @@ def test_steward_cannot_create_trust_anchors_after_demote (looper,
'newSteward',
TRUST_ANCHOR_STRING,
verkey=new_ta_2_verkey, dest=new_ta_2_did)

71 changes: 0 additions & 71 deletions indy_node/test/nym_txn/test_send_nym_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,77 +69,6 @@ def testSendNymSucceedsForHalfKeyIdentifierAndAbbrevVerkey(
sdk_get_and_check_replies(looper, [request_couple])


def testSendNymSucceedsForTrusteeRole(
looper, sdk_pool_handle, txnPoolNodeSet, nym_request, sdk_wallet_trustee):
halfKeyIdentifier, abbrevVerkey = createHalfKeyIdentifierAndAbbrevVerkey()
parameters = {
'dest': halfKeyIdentifier,
'verkey': abbrevVerkey,
'role': TRUSTEE
}

nym_request[OPERATION].update(parameters)
request_couple = sdk_sign_and_send_prepared_request(looper, sdk_wallet_trustee,
sdk_pool_handle, json.dumps(nym_request))
sdk_get_and_check_replies(looper, [request_couple])


def testSendNymSucceedsForStewardRole(
looper, sdk_pool_handle, txnPoolNodeSet, nym_request, sdk_wallet_trustee):
halfKeyIdentifier, abbrevVerkey = createHalfKeyIdentifierAndAbbrevVerkey()
parameters = {
'dest': halfKeyIdentifier,
'verkey': abbrevVerkey,
'role': STEWARD
}
nym_request[OPERATION].update(parameters)
request_couple = sdk_sign_and_send_prepared_request(looper, sdk_wallet_trustee,
sdk_pool_handle, json.dumps(nym_request))
sdk_get_and_check_replies(looper, [request_couple])


def testSendNymSucceedsForTrustAnchorRole(
looper, sdk_pool_handle, txnPoolNodeSet, nym_request, sdk_wallet_trustee):
halfKeyIdentifier, abbrevVerkey = createHalfKeyIdentifierAndAbbrevVerkey()
parameters = {
'dest': halfKeyIdentifier,
'verkey': abbrevVerkey,
'role': TRUST_ANCHOR
}
nym_request[OPERATION].update(parameters)
request_couple = sdk_sign_and_send_prepared_request(looper, sdk_wallet_trustee,
sdk_pool_handle, json.dumps(nym_request))
sdk_get_and_check_replies(looper, [request_couple])


def testSendNymSucceedsForOmittedRole(
looper, sdk_pool_handle, txnPoolNodeSet, nym_request, sdk_wallet_trustee):
halfKeyIdentifier, abbrevVerkey = createHalfKeyIdentifierAndAbbrevVerkey()
parameters = {
'dest': halfKeyIdentifier,
'verkey': abbrevVerkey
}
del nym_request[OPERATION][ROLE]
nym_request[OPERATION].update(parameters)
request_couple = sdk_sign_and_send_prepared_request(looper, sdk_wallet_trustee,
sdk_pool_handle, json.dumps(nym_request))
sdk_get_and_check_replies(looper, [request_couple])


def testSendNymSucceedsForNoneRole(
looper, sdk_pool_handle, txnPoolNodeSet, nym_request, sdk_wallet_trustee):
halfKeyIdentifier, abbrevVerkey = createHalfKeyIdentifierAndAbbrevVerkey()
parameters = {
'dest': halfKeyIdentifier,
'verkey': abbrevVerkey,
'role': None
}
nym_request[OPERATION].update(parameters)
request_couple = sdk_sign_and_send_prepared_request(looper, sdk_wallet_trustee,
sdk_pool_handle, json.dumps(nym_request))
sdk_get_and_check_replies(looper, [request_couple])


@pytest.mark.skip(reason='INDY-210')
def testSendNymFailsForCryptonymIdentifierAnsdk_pool_handlemittedVerkey(
looper, sdk_pool_handle, txnPoolNodeSet, nym_request, sdk_wallet_trustee):
Expand Down

0 comments on commit a989672

Please sign in to comment.