Skip to content

Commit

Permalink
Optimize Chunkstore mongo read query
Browse files Browse the repository at this point in the history
  • Loading branch information
BaiBaiHi committed Apr 7, 2020
1 parent fcc00c6 commit ed84b68
Showing 1 changed file with 22 additions and 15 deletions.
37 changes: 22 additions & 15 deletions arctic/chunkstore/chunkstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,22 +262,29 @@ def read(self, symbol, chunk_range=None, filter_data=True, **kwargs):
if chunk_range is not None:
spec.update(chunker.to_mongo(chunk_range))

by_start_segment = [(SYMBOL, pymongo.ASCENDING),
(START, pymongo.ASCENDING),
(SEGMENT, pymongo.ASCENDING)]
segment_cursor = self._collection.find(spec, sort=by_start_segment)

chunks = defaultdict(list)
for _, segments in groupby(segment_cursor, key=lambda x: (x[START], x[SYMBOL])):
segments = list(segments)
mdata = self._mdata.find_one({SYMBOL: segments[0][SYMBOL],
START: segments[0][START],
END: segments[0][END]})

# when len(segments) == 1, this is essentially a no-op
# otherwise, take all segments and reassemble the data to one chunk
chunk_data = b''.join([doc[DATA] for doc in segments])
chunks[segments[0][SYMBOL]].append({DATA: chunk_data, METADATA: mdata})
# Join metadata collection to top level collection on "sym" and "s"(start).
joined = self._collection.aggregate([
{'$match': spec},
{'$lookup': {
'from': self._mdata.name,
'let': {'symbol': '${}'.format(SYMBOL),
'start': '${}'.format(START)},
'pipeline': [
{'$match': {
'$expr': {
'$and': [{'$eq': ['${}'.format(SYMBOL), '$$symbol']},
{'$eq': ['${}'.format(START), '$$start']}]}}},
{'$project': {'_id': 0, 'm': 0}}],
'as': METADATA}},
{'$sort': {START: 1,
SEGMENT: 1}}],
allowDiskUse=True) # mongo aggregate pipeline stages have a 100MB RAM limit so we enable this flag to handle larger datasets.

for (_, sym), segment in groupby(joined, key=lambda x: (x[START], x[SYMBOL])):
segment = list(segment)
chunks[sym].append({DATA: b''.join([doc[DATA] for doc in segment]),
METADATA: segment[0][METADATA][0]})

skip_filter = not filter_data or chunk_range is None

Expand Down

0 comments on commit ed84b68

Please sign in to comment.