-
Notifications
You must be signed in to change notification settings - Fork 0
/
demo_config.py
100 lines (73 loc) · 2.9 KB
/
demo_config.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import dns.resolver
import random
import ipaddress
from sys import exit as die
BONDY_DNS_QUERY = os.getenv("BONDY_DNS_QUERY", None)
BONDY_DNS_RECORD_TYPE = os.getenv("BONDY_DNS_RECORD_TYPE", "A")
BONDY_TCP_PORT = os.getenv("BONDY_TCP_PORT", 18082)
BONDY_IP_VERSION = os.getenv("BONDY_IP_VERSION", 4)
BONDY_URL = os.getenv("BONDY_URL", "ws://localhost:18080/ws")
BONDY_REALM = os.getenv("BONDY_REALM", "com.market.demo")
# RPCs
MARKET_BIDDER_ADD = "com.market.bidder.add"
MARKET_BIDDER_GONE = "com.market.bidder.gone"
MARKET_GET = "com.market.get"
MARKET_ITEM_BID = "com.market.item.bid"
MARKET_ITEM_GET = "com.market.item.get"
MARKET_ITEM_SELL = "com.market.item.sell"
# PubSubs
MARKET_ITEM_ADDED = "com.market.item.added"
MARKET_ITEM_EXPIRED = "com.market.item.expired"
MARKET_ITEM_NEW_PRICE = "com.market.item.new_price"
MARKET_ITEM_SOLD = "com.market.item.sold"
MARKET_OPENED = "com.market.opened"
# Cryptosign users, their private key comes from an env var
CRYTOSIGN_PRIVATE_KEY_VARS = {
"bot": "BOT_PRIVKEY",
"market": "MARKET_PRIVKEY",
}
def create_transport_config(url=BONDY_URL):
if not BONDY_DNS_QUERY and not BONDY_URL:
raise ValueError("At least one of URL or DNS query must be provided via environment variables.")
# Autobhan Python does not support TLS over IPv6!!!!!!!!
if BONDY_DNS_QUERY:
answers = dns.resolver.resolve(BONDY_DNS_QUERY, BONDY_DNS_RECORD_TYPE)
if not answers:
raise ValueError(
f"No addresses found for DNS query {BONDY_DNS_QUERY}"
)
address = random.choice(answers).to_text()
url = rawsocket_url(address)
config = {"type": "rawsocket", "url": url, "serializer": "json"}
else:
config = {"type": "websocket", "url": url, "serializers": ["json"]}
print(f"Will use transport '{config}'")
return config
def rawsocket_url(address):
try:
# Check if the address is IPv4
ipaddress.IPv4Address(address)
return f"rs://{address}:{BONDY_TCP_PORT}"
except ipaddress.AddressValueError:
# Check if the address is IPv6
ipaddress.IPv6Address(address)
return f"rs://[{address}]:{BONDY_TCP_PORT}"
def create_cryptosign_config(user_id, private_key):
return {"cryptosign": {"authid": user_id, "privkey": private_key}}
def create_auth_config(user_id=None):
# Cryptosign users
private_key_var = CRYTOSIGN_PRIVATE_KEY_VARS.get(user_id)
if private_key_var:
private_key = os.getenv(private_key_var) or die(
f"Missing env {private_key_var}"
)
return create_cryptosign_config(user_id, private_key)
# Connect anonymously by default
return None
def create_autobahn_component_config(url=BONDY_URL, user_id=None, realm=BONDY_REALM):
return {
"transports": [create_transport_config(url)],
"authentication": create_auth_config(user_id),
"realm": realm,
}