-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDoor-Access.py
135 lines (105 loc) · 3.85 KB
/
Door-Access.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import RPi.GPIO as GPIO
from mfrc522 import SimpleMFRC522
from time import sleep
from datetime import datetime
import MySQLdb
from typing import Type
# from cachetools import cached, TTLCache
import threading
# import threading
# import time
DOORPIN = 7
RELAY_TIMEOUT = 2
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)
GPIO.setup(DOORPIN, GPIO.OUT)
dbHost = "localhost"
dbName = "pi-access"
dbUser = "accessadmin"
dbPass = "accesspassword"
dbExpireTime = 60 # minutes
dbRefreshInterval = 10 # minutes
reader = SimpleMFRC522(delay=0.4)
class user_info: # pylint: disable=too-few-public-methods
"""A wrapper for the returned SQL row for user access
Attributes:
name: A string representing the user's name.
has_access: A bool representing the user's access.
is_admin: A bool representing the user's admin status.
"""
name = "UNKNOWN"
has_access = False
is_admin = False
def __init__(self, results, card_key, card_id):
db_row = None
# results = db_cursor.fetchall()
if (datetime.now() - results.last_refresh).seconds < (dbExpireTime * 60):
for row in results.results:
if row["card_key"] == card_key and row["card_id"] == str(card_id):
db_row = row
break
if db_row is not None:
# if db_cursor.rowcount != 1:
# print("WARNING: Multiple matches returned for: {card_id}, {card_key}")
self.name: str = db_row["name"]
self.has_access: bool = int(db_row["access"]) == 1
self.is_admin: bool = int(db_row["admin"]) == 1
self.card_key: str = card_key[:36] if len(card_key) > 36 else card_key
self.card_id: int = card_id
class door:
@staticmethod
def open(timeout=RELAY_TIMEOUT):
GPIO.output(DOORPIN, True)
if timeout is not None:
sleep(timeout)
GPIO.output(DOORPIN, False)
@staticmethod
def close():
GPIO.output(DOORPIN, False)
class _cachedDB: # pylint: disable=too-few-public-methods
results = None
last_refresh = datetime(2000, 1, 1, 1, 1, 1)
def __init__(self):
threading.Thread(target=self.updateDB).start()
def updateDB(self):
while True:
dbConnection = MySQLdb.connect(host=dbHost, user=dbUser, passwd=dbPass, db=dbName)
cur = dbConnection.cursor(MySQLdb.cursors.DictCursor)
cur.execute(f"SELECT * FROM access_list")
dbConnection.close()
self.results = cur.fetchall()
self.last_refresh = datetime.now()
sleep(dbRefreshInterval * 60)
def ReaderAccess():
cachedDB = _cachedDB()
def log_access(u: Type[user_info]):
dbConnection = MySQLdb.connect(host=dbHost, user=dbUser, passwd=dbPass, db=dbName)
cur = dbConnection.cursor(MySQLdb.cursors.DictCursor)
cur.execute(
f"INSERT INTO access_log SET card_key_presented = '{u.card_key}', card_key_presented_datetime = NOW(), card_id_presented = '{u.card_id}', access_granted = '{1 if u.has_access else 0}', name_presented = '{u.name}'"
)
dbConnection.commit()
pass
door.close()
while True:
print("Waiting for key card...")
card_id, card_key = reader.read()
card_key = card_key.strip()
u = user_info(cachedDB, card_key, card_id)
if u.has_access:
print(f"\tACCESS GRANTED for {u.name}")
door.open()
else:
print("\tACCESS DENIED")
sleep(RELAY_TIMEOUT)
threading.Thread(target=log_access, args=[u]).start()
# dbConnection.commit()
if __name__ == "__main__":
try:
ReaderAccess()
except KeyboardInterrupt:
print("Stopped because of Keyboard Inturrupt")
except Exception as e:
print(e)
finally:
GPIO.cleanup()