Skip to content

Commit

Permalink
Optimize Chunkstore mongo read query
Browse files Browse the repository at this point in the history
  • Loading branch information
BaiBaiHi committed Apr 2, 2020
1 parent 684bc8c commit 2fd8f02
Showing 1 changed file with 19 additions and 15 deletions.
34 changes: 19 additions & 15 deletions arctic/chunkstore/chunkstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,22 +262,26 @@ def read(self, symbol, chunk_range=None, filter_data=True, **kwargs):
if chunk_range is not None:
spec.update(chunker.to_mongo(chunk_range))

by_start_segment = [(SYMBOL, pymongo.ASCENDING),
(START, pymongo.ASCENDING),
(SEGMENT, pymongo.ASCENDING)]
segment_cursor = self._collection.find(spec, sort=by_start_segment)

chunks = defaultdict(list)
for _, segments in groupby(segment_cursor, key=lambda x: (x[START], x[SYMBOL])):
segments = list(segments)
mdata = self._mdata.find_one({SYMBOL: segments[0][SYMBOL],
START: segments[0][START],
END: segments[0][END]})

# when len(segments) == 1, this is essentially a no-op
# otherwise, take all segments and reassemble the data to one chunk
chunk_data = b''.join([doc[DATA] for doc in segments])
chunks[segments[0][SYMBOL]].append({DATA: chunk_data, METADATA: mdata})
joined = self._collection.aggregate([
{'$match': spec},
{'$lookup': {
'from': self._mdata.name,
'let': {'symbol': '${}'.format(SYMBOL),
'start': '${}'.format(START)},
'pipeline': [
{'$match': {
'$expr': {
'$and': [{'$eq': ['${}'.format(SYMBOL), '$$symbol']},
{'$eq': ['${}'.format(START), '$$start']}]}}}],
'as': METADATA}},
{'$sort': {START: 1,
SEGMENT: 1}}])

for (_, sym), segment in groupby(joined, key=lambda x: (x[START], x[SYMBOL])):
segment = list(segment)
chunks[sym].append({DATA: b''.join([doc[DATA] for doc in segment]),
METADATA: segment[0][METADATA][0]})

skip_filter = not filter_data or chunk_range is None

Expand Down

0 comments on commit 2fd8f02

Please sign in to comment.