Skip to content

Commit

Permalink
make configurable fields exported
Browse files Browse the repository at this point in the history
  • Loading branch information
vivek-ng committed Nov 24, 2020
1 parent b442305 commit c29c716
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 47 deletions.
44 changes: 15 additions & 29 deletions priority/priorityRateLimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,14 @@ const (
High PriorityValue = 4
)

// PriorityLimiter
// limit: max number of concurrent goroutines that can access aresource
//
// count: current number of goroutines accessing a resource
//
// waitList: Priority queue of goroutines waiting to access a resource. Goroutines will be added to
// this list if the number of concurrent requests are greater than the limit specified. Greater value for priority means
// higher priority for that particular goroutine.
//
// dynamicPeriod: If this field is specified , priority is increased for low priority goroutines periodically by the
// interval specified by dynamicPeriod (in ms)
//
// timeout: If this field is specified , goroutines will be automatically removed from the waitlist
// after the time passes the timeout specified even if the number of concurrent requests is greater than the limit. (in ms)
// PriorityLimiter ....
// PriorityLimiter stores the configuration need for priority concurrency limiter....
type PriorityLimiter struct {
count int
limit int
Limit int
mu sync.Mutex
waitList queue.PriorityQueue
dynamicPeriod *int
timeout *int
DynamicPeriod *int
Timeout *int
}

// Option is a type to configure the Limiter struct....
Expand All @@ -56,7 +42,7 @@ type Option func(*PriorityLimiter)
func NewLimiter(limit int, options ...Option) *PriorityLimiter {
pq := make(queue.PriorityQueue, 0)
nl := &PriorityLimiter{
limit: limit,
Limit: limit,
waitList: pq,
}

Expand All @@ -72,15 +58,15 @@ func NewLimiter(limit int, options ...Option) *PriorityLimiter {
// interval specified by dynamicPeriod
func WithDynamicPriority(dynamicPeriod int) func(*PriorityLimiter) {
return func(p *PriorityLimiter) {
p.dynamicPeriod = &dynamicPeriod
p.DynamicPeriod = &dynamicPeriod
}
}

// WithTimeout : If this field is specified , goroutines will be automatically removed from the waitlist
// after the time passes the timeout specified even if the number of concurrent requests is greater than the limit.
func WithTimeout(timeout int) func(*PriorityLimiter) {
return func(p *PriorityLimiter) {
p.timeout = &timeout
p.Timeout = &timeout
}
}

Expand All @@ -99,7 +85,7 @@ func (p *PriorityLimiter) Wait(ctx context.Context, priority PriorityValue) {
return
}

if p.dynamicPeriod == nil && p.timeout == nil {
if p.DynamicPeriod == nil && p.Timeout == nil {
select {
case <-w.Done:
case <-ctx.Done():
Expand All @@ -108,12 +94,12 @@ func (p *PriorityLimiter) Wait(ctx context.Context, priority PriorityValue) {
return
}

if p.dynamicPeriod != nil && p.timeout != nil {
if p.DynamicPeriod != nil && p.Timeout != nil {
p.dynamicPriorityAndTimeout(ctx, w)
return
}

if p.timeout != nil {
if p.Timeout != nil {
p.handleTimeout(ctx, w)
return
}
Expand All @@ -122,8 +108,8 @@ func (p *PriorityLimiter) Wait(ctx context.Context, priority PriorityValue) {
}

func (p *PriorityLimiter) dynamicPriorityAndTimeout(ctx context.Context, w *queue.Item) {
ticker := time.NewTicker(time.Duration(*p.dynamicPeriod) * time.Millisecond)
timer := time.NewTimer(time.Duration(*p.timeout) * time.Millisecond)
ticker := time.NewTicker(time.Duration(*p.DynamicPeriod) * time.Millisecond)
timer := time.NewTimer(time.Duration(*p.Timeout) * time.Millisecond)
for {
select {
case <-w.Done:
Expand Down Expand Up @@ -153,7 +139,7 @@ func (p *PriorityLimiter) dynamicPriorityAndTimeout(ctx context.Context, w *queu
}

func (p *PriorityLimiter) handleDynamicPriority(ctx context.Context, w *queue.Item) {
ticker := time.NewTicker(time.Duration(*p.dynamicPeriod) * time.Millisecond)
ticker := time.NewTicker(time.Duration(*p.DynamicPeriod) * time.Millisecond)
for {
select {
case <-w.Done:
Expand All @@ -175,7 +161,7 @@ func (p *PriorityLimiter) handleDynamicPriority(ctx context.Context, w *queue.It
func (p *PriorityLimiter) handleTimeout(ctx context.Context, w *queue.Item) {
select {
case <-w.Done:
case <-time.After(time.Duration(*p.timeout) * time.Millisecond):
case <-time.After(time.Duration(*p.Timeout) * time.Millisecond):
p.removeWaiter(w)
case <-ctx.Done():
p.removeWaiter(w)
Expand All @@ -197,7 +183,7 @@ func (p *PriorityLimiter) proceed(priority PriorityValue) (bool, *queue.Item) {
p.mu.Lock()
defer p.mu.Unlock()

if p.count < p.limit {
if p.count < p.Limit {
p.count++
return true, nil
}
Expand Down
26 changes: 8 additions & 18 deletions rateLimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,13 @@ type waiter struct {
done chan struct{}
}

// Limiter
// limit: max number of concurrent goroutines that can access aresource
//
// count: current number of goroutines accessing a resource
//
// waitList: list of goroutines waiting to access a resource. Goroutines will be added to
// this list if the number of concurrent requests are greater than the limit specified
//
// timeout: If this field is specified , goroutines will be automatically removed from the waitlist
// after the time passes the timeout specified even if the number of concurrent requests is greater than the limit. (in ms)
// Limiter ....
// Limiter stores the configuration need for concurrency limiter....
type Limiter struct {
count int
limit int
Limit int
mu sync.Mutex
waitList list.List
timeout *int
Timeout *int
}

// Option is a type to configure the Limiter struct....
Expand All @@ -39,7 +29,7 @@ type Option func(*Limiter)
// Example: limiter.New(4, WithTimeout(5))
func New(limit int, options ...Option) *Limiter {
l := &Limiter{
limit: limit,
Limit: limit,
}

for _, o := range options {
Expand All @@ -52,7 +42,7 @@ func New(limit int, options ...Option) *Limiter {
// after the time passes the timeout specified even if the number of concurrent requests is greater than the limit.
func WithTimeout(timeout int) func(*Limiter) {
return func(l *Limiter) {
l.timeout = &timeout
l.Timeout = &timeout
}
}

Expand All @@ -64,10 +54,10 @@ func (l *Limiter) Wait(ctx context.Context) {
if ok {
return
}
if l.timeout != nil {
if l.Timeout != nil {
select {
case <-ch:
case <-time.After((time.Duration(*l.timeout) * time.Millisecond)):
case <-time.After((time.Duration(*l.Timeout) * time.Millisecond)):
l.removeWaiter(ch)
case <-ctx.Done():
}
Expand Down Expand Up @@ -101,7 +91,7 @@ func (l *Limiter) proceed() (bool, chan struct{}) {
l.mu.Lock()
defer l.mu.Unlock()

if l.count < l.limit {
if l.count < l.Limit {
l.count++
return true, nil
}
Expand Down

0 comments on commit c29c716

Please sign in to comment.