-
Notifications
You must be signed in to change notification settings - Fork 3
/
backup.py
142 lines (98 loc) · 4.18 KB
/
backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import os
import argparse
import json
import requests
import web_constants as WEB_CONSTANTS
import app_constants as APP_CONSTANTS
parser = argparse.ArgumentParser(
description='Backup Slack channel, conversation, Users, and direct messages.')
parser.add_argument('-t', '--token', dest='token',required=True,
help='Slack api Access token')
parser.add_argument('-od', '--outDir', dest='outDir',required=False,default='./output',
help='Output directory to store JSON backup files.')
args = parser.parse_args()
token = args.token
outDir = args.outDir
def getOutputPath(relativePath):
return outDir+relativePath
def parseTemplatedFileName(template, *args):
return template.format(*args)
def readPostManCollectionJson():
with open('slack-postman.json') as file:
jsonObj = json.load(file)
return jsonObj
def readRequestJsonFile():
with open('requests.json') as file:
jsonObj = json.load(file)
return jsonObj
def writeJSONFile(jsonObj, filePath):
outputPath = getOutputPath(filePath)
dirPath = os.path.dirname(outputPath)
if not os.path.exists(dirPath):
os.makedirs(dirPath)
with open(outputPath, 'w+') as file:
json.dump(jsonObj, file, indent=True)
def getChannels():
response = requests.get(WEB_CONSTANTS.CHANNEL_LIST,
params={'token': token})
return response.json()['channels']
def getChannelHistory(channelId):
response = requests.get(WEB_CONSTANTS.CHANNEL_HISTORY, params={
'token': token, 'channel': channelId})
return response.json()
def getGroups():
response = requests.get(WEB_CONSTANTS.GROUP_LIST, params={'token': token})
return response.json()['groups']
def getGroupHistory(groupId):
response = requests.get(WEB_CONSTANTS.GROUP_HISTORY, params={
'token': token, 'channel': groupId})
return response.json()
def getOneToOneConversations():
# im for one to one conv.
response = requests.get(WEB_CONSTANTS.CONVERSATION_LIST, params={
'token': token, 'types': 'im'})
return response.json()['channels']
def getUsers():
# im for one to one conv.
response = requests.get(WEB_CONSTANTS.USERS_LIST, params={'token': token})
return response.json()['members']
def getConversationHistory(conversationId):
response = requests.get(WEB_CONSTANTS.CONVERSATION_HISTORY, params={
'token': token, 'channel': conversationId})
return response.json()
def run():
channels = getChannels()
writeJSONFile(channels, APP_CONSTANTS.CHANNEL_LIST_FILE)
for channel in channels:
channelId = channel['id']
channelName = channel['name']
channelHistory = getChannelHistory(channelId)
channelHistoryFilename = parseTemplatedFileName(
APP_CONSTANTS.CHANNEL_HISTORY_FILE, channelName)
writeJSONFile(channelHistory, channelHistoryFilename)
groups = getGroups()
writeJSONFile(groups, APP_CONSTANTS.GROUP_LIST_FILE)
for group in groups:
groupId = group['id']
groupName = group['name']
groupHistory = getGroupHistory(groupId)
groupHistoryFilename = parseTemplatedFileName(
APP_CONSTANTS.GROUP_HISTORY_FILE, groupName)
writeJSONFile(groupHistory, groupHistoryFilename)
users = getUsers()
writeJSONFile(users, APP_CONSTANTS.USER_LIST_FILE)
userIdToNameDict = {user['id']: user['name'] for user in users}
# Getting one to one conversation list
oneToOneConversations = getOneToOneConversations()
writeJSONFile(oneToOneConversations,
APP_CONSTANTS.ONE_TO_ONE_CONVERSATION_LIST_FILE)
for conversation in oneToOneConversations:
conversationId = conversation['id']
userId = conversation['user']
userName = userIdToNameDict[userId]
conversationHistory = getConversationHistory(conversationId)
conversationHistoryFilename = parseTemplatedFileName(
APP_CONSTANTS.ONE_TO_ONE_CONVERSATION_HISTORY_FILE, userName, userId)
writeJSONFile(conversationHistory, conversationHistoryFilename)
if __name__ == '__main__':
run()