Skip to content

Commit

Permalink
fix(multiple): July/August 2024 wave (#536)
Browse files Browse the repository at this point in the history
* fix(multiple): July 2024 wave

* feat(middleware): add Expires support for cache-tests, add current storer name in the Cache-Status detail parameter

* fix(ci): Unit tests for beego & gin, Traefik and Tyk sync

* feat(traefik): loads well, support v3.1

* fix(surrogate): purge tags

* fix(surrogate): invalidation

* fix(traefik): use latest surrogate API

* fix: use protobuf instead of gob

* fix(chore): set back the storer name in the detail directive

* fix: golangci-lint
  • Loading branch information
darkweak authored Aug 14, 2024
1 parent 4ad3d0b commit 65cb241
Show file tree
Hide file tree
Showing 47 changed files with 226 additions and 1,078 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/non-regression.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
with:
go-version: ${{ env.GO_VERSION }}
- name: golangci-lint
uses: golangci/golangci-lint-action@v3
uses: golangci/golangci-lint-action@v6
with:
args: --timeout=240s
unit-test-golang:
Expand Down
5 changes: 3 additions & 2 deletions .github/workflows/plugin_template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ jobs:
uses: actions/checkout@v4
-
name: golangci-lint
uses: golangci/golangci-lint-action@v3
uses: golangci/golangci-lint-action@v6
with:
version: v1.59.1
working-directory: plugins/${{ inputs.LOWER_NAME }}
args: --skip-dirs=override --timeout=240s
args: --exclude-dirs=override --timeout=240s
-
name: Run ${{ inputs.CAPITALIZED_NAME }} tests
run: cd plugins/${{ inputs.LOWER_NAME }} && go test -v .
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ generate-workflow: ## Generate plugin workflow
bash .github/workflows/workflow_plugins_generator.sh

golangci-lint: ## Run golangci-lint to ensure the code quality
docker run --rm -v $(PWD):/app -w /app golangci/golangci-lint:v1.57.2 golangci-lint run -v --timeout 180s ./...
docker run --rm -v $(PWD):/app -w /app golangci/golangci-lint:v1.59.1 golangci-lint run -v --timeout 180s ./...
for plugin in $(PLUGINS_LIST) ; do \
echo "Starting lint $$plugin \n" && docker run --rm -v $(PWD):/app -w /app golangci/golangci-lint:v1.57.2 golangci-lint run -v --skip-dirs=override --timeout 240s ./plugins/$$plugin; \
echo "Starting lint $$plugin \n" && docker run --rm -v $(PWD):/app -w /app golangci/golangci-lint:v1.59.1 golangci-lint run -v --exclude-dirs=override --timeout 240s ./plugins/$$plugin; \
done
cd plugins/caddy && go mod tidy && go mod download

Expand Down
2 changes: 1 addition & 1 deletion docs/website/content/docs/middlewares/roadrunner.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ tags = ["Beginners", "Advanced"]
+++

## Build the roadrunner binary
First you need to build your roadrunner instance with the cache dependency. You should use [velox](github.com/roadrunner-server/velox) for that.
First you need to build your roadrunner instance with the cache dependency. You should use [velox](https://github.com/roadrunner-server/velox) for that.

Define a `configuration.toml` file to tell velox what and how it must build.
```toml
Expand Down
23 changes: 14 additions & 9 deletions pkg/api/souin.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,25 @@ func (s *SouinAPI) BulkDelete(key string, purge bool) {
}
}

if purge {
current.Delete(core.MappingKeyPrefix + key)
} else {
if !purge {
newFreshTime := time.Now()
for k, v := range mapping.Mapping {
v.FreshTime = timestamppb.New(newFreshTime)
mapping.Mapping[k] = v
}

v, e := proto.Marshal(&mapping)
if e != nil {
fmt.Println("Impossible to re-encode the mapping", core.MappingKeyPrefix+key)
current.Delete(core.MappingKeyPrefix + key)
}
_ = current.Set(core.MappingKeyPrefix+key, v, storageToInfiniteTTLMap[current.Name()])
}
}

if purge {
current.Delete(core.MappingKeyPrefix + key)
}
}

s.Delete(key)
Expand All @@ -96,13 +105,8 @@ func (s *SouinAPI) BulkDelete(key string, purge bool) {
// Delete will delete a record into the provider cache system and will update the Souin API if enabled
// The key can be a regexp to delete multiple items
func (s *SouinAPI) Delete(key string) {
_, err := regexp.Compile(key)
for _, current := range s.storers {
if err != nil {
current.DeleteMany(key)
} else {
current.Delete(key)
}
current.Delete(key)
}
}

Expand Down Expand Up @@ -144,6 +148,7 @@ func (s *SouinAPI) listKeys(search string) []string {
var storageToInfiniteTTLMap = map[string]time.Duration{
"BADGER": types.OneYearDuration,
"ETCD": types.OneYearDuration,
"GO-REDIS": 0,
"NUTS": 0,
"OLRIC": types.OneYearDuration,
"OTTER": types.OneYearDuration,
Expand Down
73 changes: 46 additions & 27 deletions pkg/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ func (s *SouinBaseHandler) Store(
rq *http.Request,
requestCc *cacheobject.RequestCacheDirectives,
cachedKey string,
uri string,
) error {
statusCode := customWriter.GetStatusCode()
if !isCacheableCode(statusCode) {
Expand Down Expand Up @@ -237,28 +238,40 @@ func (s *SouinBaseHandler) Store(
}
}

hasFreshness := false
ma := currentMatchedURL.TTL.Duration
if responseCc.SMaxAge >= 0 {
ma = time.Duration(responseCc.SMaxAge) * time.Second
} else if responseCc.MaxAge >= 0 {
ma = time.Duration(responseCc.MaxAge) * time.Second
} else if customWriter.Header().Get("Expires") != "" {
exp, err := time.Parse(time.RFC1123, customWriter.Header().Get("Expires"))
if err != nil {
return nil
}

duration := time.Until(exp)
if duration <= 0 || duration > 10*types.OneYearDuration {
return nil
}

date, _ := time.Parse(time.RFC1123, customWriter.Header().Get("Date"))
if date.Sub(exp) > 0 {
return nil
}

ma = duration
hasFreshness = true
}

now := rq.Context().Value(context.Now).(time.Time)
date, _ := http.ParseTime(now.Format(http.TimeFormat))
customWriter.Header().Set(rfc.StoredTTLHeader, ma.String())
ma = ma - time.Since(date)

if exp := customWriter.Header().Get("Expires"); exp != "" {
delta, _ := time.Parse(exp, time.RFC1123)
if sub := delta.Sub(now); sub > 0 {
ma = sub
}
}

status := fmt.Sprintf("%s; fwd=uri-miss", rq.Context().Value(context.CacheName))
if (modeContext.Bypass_request || !requestCc.NoStore) &&
(modeContext.Bypass_response || !responseCc.NoStore) {
(modeContext.Bypass_response || !responseCc.NoStore || hasFreshness) {
headers := customWriter.Header().Clone()
for hname, shouldDelete := range responseCc.NoCache {
if shouldDelete {
Expand Down Expand Up @@ -329,6 +342,7 @@ func (s *SouinBaseHandler) Store(
variedKey,
) == nil {
s.Configuration.GetLogger().Debugf("Stored the key %s in the %s provider", variedKey, currentStorer.Name())
res.Request = rq
} else {
mu.Lock()
fails = append(fails, fmt.Sprintf("; detail=%s-INSERTION-ERROR", currentStorer.Name()))
Expand All @@ -339,9 +353,9 @@ func (s *SouinBaseHandler) Store(

wg.Wait()
if len(fails) < s.storersLen {
go func(rs http.Response, key string) {
_ = s.SurrogateKeyStorer.Store(&rs, key)
}(res, variedKey)
go func(rs http.Response, key string, basekey string) {
_ = s.SurrogateKeyStorer.Store(&rs, key, uri, basekey)
}(res, variedKey, cachedKey)
status += "; stored"
}

Expand Down Expand Up @@ -375,6 +389,7 @@ func (s *SouinBaseHandler) Upstream(
next handlerFunc,
requestCc *cacheobject.RequestCacheDirectives,
cachedKey string,
uri string,
) error {
s.Configuration.GetLogger().Debug("Request the upstream server")
prometheus.Increment(prometheus.RequestCounter)
Expand Down Expand Up @@ -422,7 +437,7 @@ func (s *SouinBaseHandler) Upstream(
customWriter.Header().Set(headerName, s.DefaultMatchedUrl.DefaultCacheControl)
}

err := s.Store(customWriter, rq, requestCc, cachedKey)
err := s.Store(customWriter, rq, requestCc, cachedKey, uri)
defer customWriter.Buf.Reset()

return singleflightValue{
Expand All @@ -446,7 +461,7 @@ func (s *SouinBaseHandler) Upstream(
for _, vh := range variedHeaders {
if rq.Header.Get(vh) != sfWriter.requestHeaders.Get(vh) {
// cachedKey += rfc.GetVariedCacheKey(rq, variedHeaders)
return s.Upstream(customWriter, rq, next, requestCc, cachedKey)
return s.Upstream(customWriter, rq, next, requestCc, cachedKey, uri)
}
}
}
Expand All @@ -462,7 +477,7 @@ func (s *SouinBaseHandler) Upstream(
return nil
}

func (s *SouinBaseHandler) Revalidate(validator *core.Revalidator, next handlerFunc, customWriter *CustomWriter, rq *http.Request, requestCc *cacheobject.RequestCacheDirectives, cachedKey string) error {
func (s *SouinBaseHandler) Revalidate(validator *core.Revalidator, next handlerFunc, customWriter *CustomWriter, rq *http.Request, requestCc *cacheobject.RequestCacheDirectives, cachedKey string, uri string) error {
s.Configuration.GetLogger().Debug("Revalidate the request with the upstream server")
prometheus.Increment(prometheus.RequestRevalidationCounter)

Expand All @@ -484,7 +499,7 @@ func (s *SouinBaseHandler) Revalidate(validator *core.Revalidator, next handlerF
}

if statusCode != http.StatusNotModified {
err = s.Store(customWriter, rq, requestCc, cachedKey)
err = s.Store(customWriter, rq, requestCc, cachedKey, uri)
}
}

Expand Down Expand Up @@ -604,6 +619,8 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
}
cachedKey := req.Context().Value(context.Key).(string)

// Need to copy URL path before calling next because it can alter the URI
uri := req.URL.Path
bufPool := s.bufPool.Get().(*bytes.Buffer)
bufPool.Reset()
defer s.bufPool.Put(bufPool)
Expand All @@ -618,6 +635,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
if modeContext.Bypass_request || !requestCc.NoCache {
validator := rfc.ParseRequest(req)
var fresh, stale *http.Response
var storerName string
finalKey := cachedKey
if req.Context().Value(context.Hashed).(bool) {
finalKey = fmt.Sprint(xxhash.Sum64String(finalKey))
Expand All @@ -626,7 +644,8 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
fresh, stale = currentStorer.GetMultiLevel(finalKey, req, validator)

if fresh != nil || stale != nil {
s.Configuration.GetLogger().Debugf("Found at least one valid response in the %s storage", currentStorer.Name())
storerName = currentStorer.Name()
s.Configuration.GetLogger().Debugf("Found at least one valid response in the %s storage", storerName)
break
}
}
Expand All @@ -635,7 +654,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
if fresh != nil && (!modeContext.Strict || rfc.ValidateCacheControl(fresh, requestCc)) {
response := fresh
if validator.ResponseETag != "" && validator.Matched {
rfc.SetCacheStatusHeader(response)
rfc.SetCacheStatusHeader(response, storerName)
for h, v := range response.Header {
customWriter.Header()[h] = v
}
Expand All @@ -655,19 +674,19 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
}

if validator.NeedRevalidation {
err := s.Revalidate(validator, next, customWriter, req, requestCc, cachedKey)
err := s.Revalidate(validator, next, customWriter, req, requestCc, cachedKey, uri)
_, _ = customWriter.Send()

return err
}
if resCc, _ := cacheobject.ParseResponseCacheControl(rfc.HeaderAllCommaSepValuesString(response.Header, headerName)); resCc.NoCachePresent {
prometheus.Increment(prometheus.NoCachedResponseCounter)
err := s.Revalidate(validator, next, customWriter, req, requestCc, cachedKey)
err := s.Revalidate(validator, next, customWriter, req, requestCc, cachedKey, uri)
_, _ = customWriter.Send()

return err
}
rfc.SetCacheStatusHeader(response)
rfc.SetCacheStatusHeader(response, storerName)
if !modeContext.Strict || rfc.ValidateMaxAgeCachedResponse(requestCc, response) != nil {
for h, v := range response.Header {
customWriter.Header()[h] = v
Expand All @@ -685,7 +704,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n

if nil != response && (!modeContext.Strict || rfc.ValidateCacheControl(response, requestCc)) {
addTime, _ := time.ParseDuration(response.Header.Get(rfc.StoredTTLHeader))
rfc.SetCacheStatusHeader(response)
rfc.SetCacheStatusHeader(response, storerName)

responseCc, _ := cacheobject.ParseResponseCacheControl(rfc.HeaderAllCommaSepValuesString(response.Header, "Cache-Control"))
if responseCc.StaleWhileRevalidate > 0 {
Expand All @@ -697,9 +716,9 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
_, _ = io.Copy(customWriter.Buf, response.Body)
_, err := customWriter.Send()
customWriter = NewCustomWriter(req, rw, bufPool)
go func(v *core.Revalidator, goCw *CustomWriter, goRq *http.Request, goNext func(http.ResponseWriter, *http.Request) error, goCc *cacheobject.RequestCacheDirectives, goCk string) {
_ = s.Revalidate(v, goNext, goCw, goRq, goCc, goCk)
}(validator, customWriter, req, next, requestCc, cachedKey)
go func(v *core.Revalidator, goCw *CustomWriter, goRq *http.Request, goNext func(http.ResponseWriter, *http.Request) error, goCc *cacheobject.RequestCacheDirectives, goCk string, goUri string) {
_ = s.Revalidate(v, goNext, goCw, goRq, goCc, goCk, goUri)
}(validator, customWriter, req, next, requestCc, cachedKey, uri)
buf := s.bufPool.Get().(*bytes.Buffer)
buf.Reset()
defer s.bufPool.Put(buf)
Expand All @@ -709,7 +728,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n

if responseCc.MustRevalidate || responseCc.NoCachePresent || validator.NeedRevalidation {
req.Header["If-None-Match"] = append(req.Header["If-None-Match"], validator.ResponseETag)
err := s.Revalidate(validator, next, customWriter, req, requestCc, cachedKey)
err := s.Revalidate(validator, next, customWriter, req, requestCc, cachedKey, uri)
statusCode := customWriter.GetStatusCode()
if err != nil {
if responseCc.StaleIfError > -1 || requestCc.StaleIfError > 0 {
Expand All @@ -732,7 +751,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n

if statusCode == http.StatusNotModified {
if !validator.Matched {
rfc.SetCacheStatusHeader(response)
rfc.SetCacheStatusHeader(response, storerName)
customWriter.WriteHeader(response.StatusCode)
maps.Copy(customWriter.Header(), response.Header)
_, _ = io.Copy(customWriter.Buf, response.Body)
Expand Down Expand Up @@ -771,7 +790,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
errorCacheCh := make(chan error)
go func(vr *http.Request, cw *CustomWriter) {
prometheus.Increment(prometheus.NoCachedResponseCounter)
errorCacheCh <- s.Upstream(cw, vr, next, requestCc, cachedKey)
errorCacheCh <- s.Upstream(cw, vr, next, requestCc, cachedKey, uri)
}(req, customWriter)

select {
Expand Down
4 changes: 3 additions & 1 deletion pkg/middleware/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ type CustomWriter struct {
func (r *CustomWriter) Header() http.Header {
r.mutex.Lock()
defer r.mutex.Unlock()
if r.headersSent {

if r.headersSent || r.Req.Context().Err() != nil {
return http.Header{}
}

return r.Rw.Header()
}

Expand Down
8 changes: 4 additions & 4 deletions pkg/rfc/cache_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func HitStaleCache(h *http.Header) {
h.Set("Cache-Status", h.Get("Cache-Status")+"; fwd=stale")
}

func manageAge(h *http.Header, ttl time.Duration, cacheName, key string) {
func manageAge(h *http.Header, ttl time.Duration, cacheName, key, storerName string) {
utc1 := time.Now().UTC()
dh := h.Get("Date")
if dh == "" {
Expand Down Expand Up @@ -119,19 +119,19 @@ func manageAge(h *http.Header, ttl time.Duration, cacheName, key string) {
age := strconv.Itoa(oldAge + cage)
h.Set("Age", age)
ttlValue := strconv.Itoa(int(ttl.Seconds()) - cage)
h.Set("Cache-Status", cacheName+"; hit; ttl="+ttlValue+"; key="+key)
h.Set("Cache-Status", cacheName+"; hit; ttl="+ttlValue+"; key="+key+"; detail="+storerName)
}

func setMalformedHeader(headers *http.Header, header, cacheName string) {
SetRequestCacheStatus(headers, "MALFORMED-"+header, cacheName)
}

// SetCacheStatusHeader set the Cache-Status header
func SetCacheStatusHeader(resp *http.Response) *http.Response {
func SetCacheStatusHeader(resp *http.Response, storerName string) *http.Response {
h := resp.Header
cacheName := resp.Request.Context().Value(context.CacheName).(string)
validateEmptyHeaders(&h, cacheName)
manageAge(&h, 0, cacheName, GetCacheKeyFromCtx(resp.Request.Context()))
manageAge(&h, 0, cacheName, GetCacheKeyFromCtx(resp.Request.Context()), storerName)

resp.Header = h
return resp
Expand Down
4 changes: 2 additions & 2 deletions pkg/surrogate/providers/akamai.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ func (*AkamaiSurrogateStorage) getHeaderSeparator() string {
}

// Store stores the response tags located in the first non empty supported header
func (a *AkamaiSurrogateStorage) Store(response *http.Response, cacheKey string) error {
func (a *AkamaiSurrogateStorage) Store(response *http.Response, cacheKey, uri, basekey string) error {
defer func() {
response.Header.Del(surrogateKey)
response.Header.Del(surrogateControl)
}()
e := a.baseStorage.Store(response, cacheKey)
e := a.baseStorage.Store(response, cacheKey, uri, basekey)
response.Header.Set(edgeCacheTag, response.Header.Get(surrogateKey))

return e
Expand Down
Loading

0 comments on commit 65cb241

Please sign in to comment.