-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathsnyk-quiet.py
148 lines (120 loc) · 4.59 KB
/
snyk-quiet.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#
# Example: Turn off all notifications for the user that invokes the script
#
# WARNING:
# This script will disable *all* Snyk notifications for the invoking user.
# It will *not* ask for confirmation. Be sure this is what you want.
#
import aiohttp
import asyncio
import click
import json
import logging
import os
import snyk
import sys
import traceback
# Keys for the config dict
key_is_configured = "is_configured"
key_config_status_msg = "config_status_msg"
key_snyktoken = "snyk_token"
key_snyk_rest_api = "snyk_rest_api"
key_concurrent_connection_limit = "concurrent_connection_limit"
async def main():
session = None
try:
# Configuration
config = get_config()
if not config[key_is_configured]:
log(config[key_config_status_msg])
exit(1)
if config[key_config_status_msg] != "":
log(config[key_config_status_msg])
snyktoken = config[key_snyktoken]
# Client initialization
# Use pysnyk clients for the v1 and rest APIs
# this sets the session to include retries in case of api timeouts etc
v1client = snyk.SnykClient(snyktoken, tries=3, delay=1, backoff=2)
restclient = snyk.SnykClient(snyktoken, version="experimental", url="https://api.snyk.io/rest", tries=3, delay=1,
backoff=2)
# Use aiohttp connection for new REST APIs not yet supported by pysnyk
snyk_rest_api = config[key_snyk_rest_api]
concurrent_connection_limit = config[key_concurrent_connection_limit]
conn = aiohttp.TCPConnector(limit_per_host=concurrent_connection_limit)
session_headers = {
"Authorization": f"token {snyktoken}",
"Content-Type": "application/json"
}
session = aiohttp.ClientSession(connector=conn, headers=session_headers)
# Get orgs
orgs = v1client.organizations.all()
await deactivate_user_notifications(session, orgs)
except Exception as e:
traceback.print_exc()
finally:
try:
if session is not None:
await session.close()
except:
traceback.print_exc()
#######################################################################
def get_config():
# return a dictionary of configuration settings
config = {}
config[key_is_configured] = False
config[key_config_status_msg] = ""
try:
snyktoken = os.getenv('SNYK_TOKEN')
config[key_snyktoken] = snyktoken
except:
config[key_config_status_msg] += "\nUnable to detect SNYK_TOKEN from environment. Exiting."
return config
# logging
# from http.client import HTTPConnection
# HTTPConnection.debuglevel=1
# logging.basicConfig()
# logging.getLogger().setLevel(logging.DEBUG)
# requests_log = logging.getLogger("requests.packages.urllib3")
# requests_log.setLevel(logging.DEBUG)
# requests_log.propagate = True
# static config
config[key_snyk_rest_api] = "https://api.snyk.io/rest"
config[key_concurrent_connection_limit] = 10
config[key_is_configured] = True
return config
#######################################################################
async def deactivate_user_notifications(session, orgs):
mod_user_org_notification_settings_api = "https://snyk.io/api/v1/user/me/notification-settings/org"
print(f"Deactivating your notifications for {len(orgs)} organizations...")
fill_char = click.style('=')
with click.progressbar(orgs,
length=len(orgs),
show_pos=True,
show_percent=True,
fill_char=fill_char) as bar:
for org in bar:
api_url = f"{mod_user_org_notification_settings_api}/{org.id}"
settings_payload = {
"new-issues-remediations": {
"enabled": False,
"issueSeverity": "all",
"issueType": "all",
},
"project-imported": {
"enabled": False,
},
"test-limit": {
"enabled": False
},
"weekly-report": {
"enabled": False
}
}
async with session.put(api_url, data=json.dumps(settings_payload)) as resp:
if resp.status != 200:
print(f"{org.id}: Error {resp.status}")
#######################################################################
def log(message):
print(" " + message, file=sys.stderr)
#######################################################################
asyncio.run(main())