-
Notifications
You must be signed in to change notification settings - Fork 8
/
scheduler.py
170 lines (138 loc) · 5.72 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
import math
from torch.optim.lr_scheduler import _LRScheduler
class CircularLRBeta:
""" A learning rate updater that implements the CircularLearningRate scheme.
Learning rate is increased then decreased linearly.
Args:
optimizer(torch.optim.optimizer) : None
lr_max(float) : the highest LR in the schedule.
lr_divider(int) : Determined first iterarion LR. It starts from lr_max/lr_divider.
cut_point(int) : None
step_size(int) : during how many epochs will the LR go up from the lower bound, up to the upper bound.
momentum(list) : Optional; momentum
"""
def __init__(
self, optimizer, lr_max, lr_divider, cut_point, step_size, momentum=None
):
self.lr_max = lr_max
self.lr_divider = lr_divider
self.cut_point = step_size // cut_point
self.step_size = step_size
self.iteration = 0
self.cycle_step = int(step_size * (1 - cut_point / 100) / 2)
self.momentum = momentum
self.optimizer = optimizer
def get_lr(self):
if self.iteration > 2 * self.cycle_step:
cut = (self.iteration - 2 * self.cycle_step) / (
self.step_size - 2 * self.cycle_step
)
lr = self.lr_max * (1 + (cut * (1 - 100) / 100)) / self.lr_divider
elif self.iteration > self.cycle_step:
cut = 1 - (self.iteration - self.cycle_step) / self.cycle_step
lr = self.lr_max * (1 + cut * (self.lr_divider - 1)) / self.lr_divider
else:
cut = self.iteration / self.cycle_step
lr = self.lr_max * (1 + cut * (self.lr_divider - 1)) / self.lr_divider
return lr
def get_momentum(self):
if self.iteration > 2 * self.cycle_step:
momentum = self.momentum[0]
elif self.iteration > self.cycle_step:
cut = 1 - (self.iteration - self.cycle_step) / self.cycle_step
momentum = self.momentum[0] + cut * (self.momentum[1] - self.momentum[0])
else:
cut = self.iteration / self.cycle_step
momentum = self.momentum[0] + cut * (self.momentum[1] - self.momentum[0])
return momentum
def step(self):
lr = self.get_lr()
if self.momentum is not None:
momentum = self.get_momentum()
self.iteration += 1
if self.iteration == self.step_size:
self.iteration = 0
for group in self.optimizer.param_groups:
group['lr'] = lr
if self.momentum is not None:
group['betas'] = (momentum, group['betas'][1])
return lr
class CosineAnnealingWithWarmupAndHardRestart(_LRScheduler):
"""
optimizer (Optimizer): Wrapped optimizer.
warmup_steps(int): Linear warmup step size.
cycle_steps (int): Cycle step size.
max_lr(float): First cycle's max learning rate.
min_lr(float): Min learning rate.
"""
def __init__(
self, optimizer, warmup_steps, cycle_steps, max_lr, min_lr=None,
):
self.optimizer = optimizer
self.warmup_steps = warmup_steps
self.cycle_steps = cycle_steps
self.max_lr = max_lr
self.min_lr = min_lr if min_lr is not None else max_lr / 50
super(CosineAnnealingWithWarmupAndHardRestart, self).__init__(optimizer=optimizer)
self.init_lr()
def init_lr(self):
self.lrs = []
for param_group in self.optimizer.param_groups:
param_group['lr'] = self.min_lr
self.lrs.append(self.min_lr)
def get_lr(self):
if self._step_count < self.warmup_steps:
return (
self.min_lr +
(self.max_lr - self.min_lr) / self.warmup_steps * self._step_count
)
else:
x = (self._step_count - self.warmup_steps) % self.cycle_steps
return (
self.min_lr +
0.5 * (self.max_lr - self.min_lr) * (1 + math.cos(math.pi / self.cycle_steps * x))
)
def step(self):
self.lr = self.get_lr()
for param_group in self.optimizer.param_groups:
param_group['lr'] = self.lr
self._step_count += 1
class CosineDecayWithWarmup(_LRScheduler):
"""
optimizer (Optimizer): Wrapped optimizer.
warmup_steps(int): Linear warmup step size.
total_steps (int): Total step size.
max_lr(float): First cycle's max learning rate.
min_lr(float): Min learning rate.
"""
def __init__(
self, optimizer, warmup_steps, total_steps, max_lr, min_lr=None,
):
self.optimizer = optimizer
self.warmup_steps = warmup_steps
self.total_steps = total_steps
self.max_lr = max_lr
self.min_lr = min_lr if min_lr is not None else max_lr / 50
super(CosineDecayWithWarmup, self).__init__(optimizer=optimizer)
self.init_lr()
def init_lr(self):
for param_group in self.optimizer.param_groups:
param_group['lr'] = self.min_lr
def get_lr(self):
if self._step_count < self.warmup_steps:
return (
self.min_lr +
(self.max_lr - self.min_lr) / self.warmup_steps * self._step_count
)
else:
x = self._step_count - self.warmup_steps
return (
self.min_lr +
(self.max_lr - self.min_lr) / 2 *
(1 + math.cos((self._step_count - self.warmup_steps) / (self.total_steps - self.warmup_steps) * math.pi))
)
def step(self):
self.lr = self.get_lr()
for param_group in self.optimizer.param_groups:
param_group['lr'] = self.lr
self._step_count += 1