diff --git a/arctic/chunkstore/chunkstore.py b/arctic/chunkstore/chunkstore.py index a0f2dab25..1f5a557cb 100644 --- a/arctic/chunkstore/chunkstore.py +++ b/arctic/chunkstore/chunkstore.py @@ -262,22 +262,29 @@ def read(self, symbol, chunk_range=None, filter_data=True, **kwargs): if chunk_range is not None: spec.update(chunker.to_mongo(chunk_range)) - by_start_segment = [(SYMBOL, pymongo.ASCENDING), - (START, pymongo.ASCENDING), - (SEGMENT, pymongo.ASCENDING)] - segment_cursor = self._collection.find(spec, sort=by_start_segment) - chunks = defaultdict(list) - for _, segments in groupby(segment_cursor, key=lambda x: (x[START], x[SYMBOL])): - segments = list(segments) - mdata = self._mdata.find_one({SYMBOL: segments[0][SYMBOL], - START: segments[0][START], - END: segments[0][END]}) - - # when len(segments) == 1, this is essentially a no-op - # otherwise, take all segments and reassemble the data to one chunk - chunk_data = b''.join([doc[DATA] for doc in segments]) - chunks[segments[0][SYMBOL]].append({DATA: chunk_data, METADATA: mdata}) + # Join metadata collection to top level collection on "sym" and "s"(start) + joined = self._collection.aggregate([ + {'$match': spec}, + {'$lookup': { + 'from': self._mdata.name, + 'let': {'symbol': '${}'.format(SYMBOL), + 'start': '${}'.format(START)}, + 'pipeline': [ + {'$match': { + '$expr': { + '$and': [{'$eq': ['${}'.format(SYMBOL), '$$symbol']}, + {'$eq': ['${}'.format(START), '$$start']}]}}}, + {'$project': {'_id': 0, 'm': 0}}], + 'as': METADATA}}, + {'$sort': {START: 1, + SEGMENT: 1}}], + allowDiskUse=True) # mongo aggregate pipeline stages have a 100MB RAM limit so we enable this flag to handle larger datasets + + for (_, sym), segment in groupby(joined, key=lambda x: (x[START], x[SYMBOL])): + segment = list(segment) + chunks[sym].append({DATA: b''.join([doc[DATA] for doc in segment]), + METADATA: segment[0][METADATA][0]}) skip_filter = not filter_data or chunk_range is None