-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathapp_builder.py
159 lines (125 loc) · 5.07 KB
/
app_builder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
import falcon
from typing import Dict, Union
import boto3
import s3fs
import os
from pathlib import Path
from apscheduler.schedulers.background import BackgroundScheduler
import logging
try:
from .app.resources import *
from .app.io import load_fallback_map
except ImportError:
from app.resources import *
from app.io import load_fallback_map
logging.basicConfig(level=logging.INFO)
S3_URI_PREFIX = 's3://'
PathType = Union[Path, str]
s3 = s3fs.S3FileSystem()
dynamodb = boto3.resource('dynamodb')
def build_single_app(path_tar: PathType):
app = falcon.API()
ann_r = ANNResource(path_tar)
refresh_r = RefreshResource(ann_r)
healthcheck_r = HealthcheckResource(ann_r)
# handle all requests to the '/ann' URL path
app.req_options.auto_parse_form_urlencoded = True
app.add_route('/query', ann_r)
app.add_route('/refresh', refresh_r)
app.add_route('/', healthcheck_r)
return app
def build_many_app(path_ann_dir: PathType,
ooi_table_name: str = None,
path_fallback_map: PathType = None,
check_reload_interval: int = 3600,
):
"""
Args:
path_ann_dir: local or s3 remote path to tarball with
ANN index, ids, and metadata files
ooi_table_name: name of dynamo table to grab out-of-index
vectors from
path_fallback_map: path to mapping from ANN name
to name of fallback ANN
check_reload_interval: if >0, indicates the number of seconds
between checking for stale indexes and reloading
Returns: ANN api app
"""
scheduler = BackgroundScheduler()
app = falcon.API()
if path_ann_dir.startswith(S3_URI_PREFIX):
ann_keys = [
p if p.startswith(S3_URI_PREFIX) else S3_URI_PREFIX + p
for p in s3.glob(os.path.join(path_ann_dir, '**.tar*'))]
else:
ann_keys = [str(p) for p in Path(path_ann_dir).glob('**/*.tar*')]
logging.info(f'{len(ann_keys)} ann indexes detected')
ooi_dynamo_table = None
ooi_ann_name = None
if ooi_table_name:
dynamo_tables = {t.name for t in dynamodb.tables.all()}
if ooi_table_name in dynamo_tables:
logging.info(f'Using {ooi_table_name} dynamo table for OOI lookup')
ooi_dynamo_table = dynamodb.Table(ooi_table_name)
else:
logging.info(f'Using {ooi_table_name} ANN (if exists) '
f'for OOI lookup ')
ooi_ann_name = ooi_table_name
app.req_options.auto_parse_form_urlencoded = True
ann_d: Dict[str, ANNResource] = {}
for path_tar in ann_keys:
# ann_name = Path(path_tar).stem.split('.')[0]
# Preserve prefix structure of glob
ann_name = (path_tar
.split(path_ann_dir.split(S3_URI_PREFIX)[-1])[-1]
.strip('/').split('.')[0])
ann_r = ANNResource(path_tar,
ooi_dynamo_table=ooi_dynamo_table,
name=ann_name)
refresh_r = RefreshResource(ann_r)
ann_health_r = ANNHealthcheckResource(ann_r)
# automatically handles url encoding
app.add_route(f"/ann/{ann_name}/query", ann_r)
app.add_route(f"/ann/{ann_name}/refresh", refresh_r)
app.add_route(f"/ann/{ann_name}/", ann_health_r)
ann_d[ann_name] = ann_r
if check_reload_interval > 0:
scheduler.add_job(
func=ann_r.maybe_reload,
trigger='interval',
seconds=check_reload_interval)
ann_name_l = [falcon.uri.encode(n) for n in ann_d.keys()]
logging.info('***Done loading all indexes***')
if ooi_ann_name:
# Linking OOI ann
# (if the query is OOI for an ann, look at this other ann for the emb)
logging.info('Linking ooi_ann to resources...')
for name, ann_r in ann_d.items():
if name != ooi_ann_name:
ann_r.ooi_ann = ann_d[ooi_ann_name]
logging.info('... done linking ooi_ann_name')
if path_fallback_map:
# Linking fallbacks
logging.info('Linking fallback resources...')
# TODO: should check that there are no loops in fallback map
fallback_map = load_fallback_map(path_fallback_map)
for child, parent in fallback_map.items():
if child in ann_d:
ann_d[child].set_fallback(ann_d[parent])
logging.info('... done linking fallbacks')
cross_r = CrossANNResource(list(ann_d.values()),
fallback_dynamo_table=ooi_dynamo_table)
app.add_route('/crossq', cross_r)
scoring_r = ScoringResource(list(ann_d.values()))
app.add_route('/score', scoring_r)
healthcheck_r = HealthcheckResource(ann_name_l)
app.add_route('/', healthcheck_r)
maybe_refresh_all_r = MaybeRefreshAllResource(list(ann_d.values()))
app.add_route('/refresh-all', maybe_refresh_all_r)
tmpspace_r = TmpSpaceResource()
app.add_route('/tmp', tmpspace_r)
sleep_r = SleepResource()
app.add_route('/sleep', sleep_r)
if check_reload_interval > 0:
scheduler.start()
return app