forked from amirziai/flatten
-
Notifications
You must be signed in to change notification settings - Fork 1
/
flatten_json.py
144 lines (114 loc) · 5.27 KB
/
flatten_json.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from collections import Iterable
from util import check_if_numbers_are_consecutive
def _construct_key(previous_key, separator, new_key):
"""
Returns the new_key if no previous key exists, otherwise concatenates previous key, separator, and new_key
:param previous_key:
:param separator:
:param new_key:
:return: a string if previous_key exists and simply passes through the new_key otherwise
"""
if previous_key:
return u"{}{}{}".format(previous_key, separator, new_key)
else:
return new_key
def flatten(nested_dict, separator="_", root_keys_to_ignore=set()):
"""
Flattens a dictionary with nested structure to a dictionary with no hierarchy
Consider ignoring keys that you are not interested in to prevent unnecessary processing
This is specially true for very deep objects
:param nested_dict: dictionary we want to flatten
:param separator: string to separate dictionary keys by
:param root_keys_to_ignore: set of root keys to ignore from flattening
:return: flattened dictionary
"""
assert isinstance(nested_dict, dict), "flatten requires a dictionary input"
assert isinstance(separator, str), "separator must be a string"
# This global dictionary stores the flattened keys and values and is ultimately returned
flattened_dict = dict()
def _flatten(object_, key):
"""
For dict, list and set objects_ calls itself on the elements and for other types assigns the object_ to
the corresponding key in the global flattened_dict
:param object_: object to flatten
:param key: carries the concatenated key for the object_
:return: None
"""
if isinstance(object_, dict):
for object_key in object_:
if not (not key and object_key in root_keys_to_ignore):
_flatten(object_[object_key], _construct_key(key, separator, object_key))
elif isinstance(object_, list) or isinstance(object_, set):
for index, item in enumerate(object_):
_flatten(item, _construct_key(key, separator, index))
else:
flattened_dict[key] = object_
_flatten(nested_dict, None)
return flattened_dict
flatten_json = flatten
def _unflatten_asserts(flat_dict, separator):
assert isinstance(flat_dict, dict), "un_flatten requires a dictionary input"
assert isinstance(separator, str), "separator must be a string"
assert all((not isinstance(value, Iterable) or isinstance(value, str)
for value in flat_dict.values())), "provided dictionary is not flat"
def unflatten(flat_dict, separator='_'):
"""
Creates a hierarchical dictionary from a flattened dictionary
Assumes no lists are present
:param flat_dict: a dictionary with no hierarchy
:param separator: a string that separates keys
:return: a dictionary with hierarchy
"""
_unflatten_asserts(flat_dict, separator)
# This global dictionary is mutated and returned
unflattened_dict = dict()
def _unflatten(dic, keys, value):
for key in keys[:-1]:
dic = dic.setdefault(key, {})
dic[keys[-1]] = value
for item in flat_dict:
_unflatten(unflattened_dict, item.split(separator), flat_dict[item])
return unflattened_dict
def unflatten_list(flat_dict, separator='_'):
"""
Unflattens a dictionary, first assuming no lists exist and then tries to identify lists and replaces them
This is probably not very efficient and has not been tested extensively
Feel free to add test cases or rewrite the logic
Issues that stand out to me:
- Sorting all the keys in the dictionary, which specially for the root dictionary can be a lot of keys
- Checking that numbers are consecutive is O(N) in number of keys
:param flat_dict: dictionary with no hierarchy
:param separator: a string that separates keys
:return: a dictionary with hierarchy
"""
_unflatten_asserts(flat_dict, separator)
# First unflatten the dictionary assuming no lists exist
unflattened_dict = unflatten(flat_dict, separator)
def _convert_dict_to_list(object_, parent_object, parent_object_key):
if isinstance(object_, dict):
try:
keys = [int(key) for key in object_]
keys.sort()
except (ValueError, TypeError):
keys = []
keys_len = len(keys)
if (keys_len > 0 and sum(keys) == int(((keys_len - 1) * keys_len) / 2) and keys[0] == 0 and
keys[-1] == keys_len - 1 and check_if_numbers_are_consecutive(keys)):
# The dictionary looks like a list so we're going to replace it as one
parent_object[parent_object_key] = [object_[str(key)] for key in keys]
for key in object_:
if isinstance(object_[key], dict):
_convert_dict_to_list(object_[key], object_, key)
_convert_dict_to_list(unflattened_dict, None, None)
return unflattened_dict
def cli(input_stream=sys.stdin, output_stream=sys.stdout):
import json
raw = input_stream.read()
input_json = json.loads(raw)
output = json.dumps(flatten(input_json))
output_stream.write(output)
if __name__ == '__main__':
cli()