Skip to content

Commit

Permalink
feat: traffic pattern intervals with multiple base urls
Browse files Browse the repository at this point in the history
  • Loading branch information
tobbee committed Oct 29, 2023
1 parent e5ac331 commit 1f0cc98
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 4 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Added

- New highly configurable statuscode option for cyclic bad segment request responses
- New highly configurable `statuscode`` parameter for cyclic bad segment request responses
- New URL parameter `traffic` to simulate periodic issues with fetching
segments. Supports multiple parallel BaseURLs.

## Changed

Expand Down
115 changes: 112 additions & 3 deletions cmd/livesim2/app/configurl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/Dash-Industry-Forum/livesim2/pkg/scte35"
)
Expand All @@ -24,6 +25,7 @@ const (
timeLineTime liveMPDType = iota
timeLineNumber
segmentNumber
baseURLPrefix = "bu"
)

type UTCTimingMethod string
Expand All @@ -49,7 +51,6 @@ const (
type ResponseConfig struct {
URLParts []string `json:"-"`
URLContentIdx int `json:"-"`
BaseURLs []string `json:"BaseURLs,omitempty"`
UTCTimingMethods []UTCTimingMethod `json:"UTCTimingMethods,omitempty"`
PeriodDurations []int `json:"PeriodDurations,omitempty"`
StartTimeS int `json:"StartTimeS"`
Expand Down Expand Up @@ -85,6 +86,7 @@ type ResponseConfig struct {
TimeSubsRegion int `json:"TimeSubsRegion,omitempty"`
Host string `json:"Host,omitempty"`
SegStatusCodes []SegStatusCodes `json:"SegStatus,omitempty"`
Traffic []LossItvls `json:"Traffic,omitempty"`
}

// SegStatusCodes configures regular extraordinary segment response codes
Expand All @@ -99,6 +101,113 @@ type SegStatusCodes struct {
Reps []string
}

// CreateAllLossItvls creates loss intervals for multiple BaseURLs
func CreateAllLossItvls(pattern string) ([]LossItvls, error) {
if pattern == "" {
return nil, nil
}
nr := strings.Count(pattern, ",") + 1
li := make([]LossItvls, 0, nr)
for _, s := range strings.Split(pattern, ",") {
li1, err := CreateLossItvls(s)
if err != nil {
return nil, err
}
li = append(li, li1)
}
return li, nil
}

type lossState int

const (
lossUnknown lossState = iota
lossNo
loss404
lossSlow // Slow response
lossHang // Hangs for 10s
lossSlowTime = 2 * time.Second
lossHangTime = 10 * time.Second
)

// LossItvls is loss intervals for one BaseURL
type LossItvls struct {
Itvls []LossItvl
}

// CycleDurS returns complete dur of cycle in seconds
func (l LossItvls) CycleDurS() int {
dur := 0
for _, itvl := range l.Itvls {
dur += itvl.durS
}
return dur
}

func (l LossItvls) StateAt(nowS int) lossState {
dur := l.CycleDurS()
rest := nowS % dur
for _, itvl := range l.Itvls {
rest -= itvl.durS
if rest < 0 {
return itvl.state
}
}
return lossUnknown
}

// CreateLossItvls creates a LossItvls from a pattern like u20d10 (20s up, 10 down)
func CreateLossItvls(pattern string) (LossItvls, error) {
li := LossItvls{}
state := lossUnknown
dur := 0
for i := 0; i < len(pattern); i++ {
c := pattern[i]
switch c {
case 'u', 'd', 's', 'h':
if state != lossUnknown {
if dur == 0 {
return LossItvls{}, fmt.Errorf("invalid loss pattern %q", pattern)
}
li.Itvls = append(li.Itvls, LossItvl{durS: dur, state: state})
}
dur = 0
switch c {
case 'u':
state = lossNo
case 'd':
state = loss404
case 's':
state = lossSlow
case 'h':
state = lossHang
}
default:
digit := c - '0'
if digit > 9 {
return LossItvls{}, fmt.Errorf("invalid loss pattern %q", pattern)
}
dur = dur*10 + int(digit)
}
}
if state != lossUnknown {
if dur == 0 {
return LossItvls{}, fmt.Errorf("invalid loss pattern %q", pattern)
}
li.Itvls = append(li.Itvls, LossItvl{durS: dur, state: state})
}
return li, nil
}

type LossItvl struct {
durS int
state lossState
}

func baseURL(nr int) string {
return fmt.Sprintf("bu%d/", nr)
}

// NewResponseConfig returns a new ResponseConfig with default values.
func NewResponseConfig() *ResponseConfig {
c := ResponseConfig{
Expand Down Expand Up @@ -213,8 +322,6 @@ cfgLoop:
cfg.SegTimelineFlag = true
case "segtimelinenr":
cfg.SegTimelineNrFlag = true
case "baseurl": // Add one or more BaseURLs, put all configurations
cfg.BaseURLs = append(cfg.BaseURLs, val)
case "peroff": // Set the period offset
cfg.PeriodOffset = sc.AtoiPtr(key, val)
case "scte35": // Signal this many SCTE-35 ad periods inband (emsg messages) every minute
Expand Down Expand Up @@ -246,6 +353,8 @@ cfgLoop:
cfg.TimeSubsRegion = sc.Atoi(key, val)
case "statuscode":
cfg.SegStatusCodes = sc.ParseSegStatusCodes(key, val)
case "traffic":
cfg.Traffic = sc.ParseLossItvls(key, val)
default:
contentStartIdx = i
break cfgLoop
Expand Down
37 changes: 37 additions & 0 deletions cmd/livesim2/app/configurl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,40 @@ func TestProcessURLCfg(t *testing.T) {
}
}
}

func TestParseLossItvls(t *testing.T) {
cases := []struct {
desc string
patter string
wantedItvls []LossItvls
}{
{"empty", "", nil},
{"up 10s", "u10",
[]LossItvls{
{Itvls: []LossItvl{
{10, lossNo}}},
},
},
{"up20s down3s up12s", "u10d3u12",
[]LossItvls{
{Itvls: []LossItvl{
{10, lossNo},
{3, loss404},
{12, lossNo}}},
},
},
{"up 5s", "u5",
[]LossItvls{
{Itvls: []LossItvl{
{5, lossNo}}},
},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
gotItvls, err := CreateAllLossItvls(c.patter)
require.NoError(t, err)
require.Equal(t, c.wantedItvls, gotItvls)
})
}
}
41 changes: 41 additions & 0 deletions cmd/livesim2/app/handler_livesim.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,30 @@ func (s *Server) livesimHandlerFunc(w http.ResponseWriter, r *http.Request) {
}
case ".mp4", ".m4s", ".cmfv", ".cmfa", ".cmft", ".jpg", ".jpeg", ".m4v", ".m4a":
segmentPart := strings.TrimPrefix(contentPart, a.AssetPath) // includes heading slash
if len(cfg.Traffic) > 0 {
var patternNr int
patternNr, segmentPart = extractPattern(segmentPart)
if patternNr >= 0 {
itvls := cfg.Traffic[patternNr]
switch itvls.StateAt(nowMS / 1000) {
case lossNo:
// Just continue
case loss404:
http.Error(w, "Not Found", http.StatusNotFound)
return
case lossSlow:
time.Sleep(lossSlowTime)
case lossHang:
// Get the result, but after 10s
time.Sleep(lossHangTime)
http.Error(w, "Hang", http.StatusServiceUnavailable)
return
default:
http.Error(w, "strange loss state", http.StatusInternalServerError)
return
}
}
}
code, err := writeSegment(r.Context(), w, log, cfg, s.assetMgr.vodFS, a, segmentPart[1:], nowMS, s.textTemplates)
if err != nil {
var tooEarly errTooEarly
Expand Down Expand Up @@ -120,6 +144,23 @@ func (s *Server) livesimHandlerFunc(w http.ResponseWriter, r *http.Request) {
}
}

// extractPattern extracts the pattern number and return a modified segmentPart.
func extractPattern(segmentPart string) (int, string) {
parts := strings.Split(segmentPart, "/")
pPart := parts[1]
if !strings.HasPrefix(pPart, baseURLPrefix) {
return -1, segmentPart
}
nr, err := strconv.Atoi(pPart[len(baseURLPrefix):])
if err != nil {
return -1, segmentPart
}
// Remove the base URL part, but leave an empty string at start.
parts = parts[1:]
parts[0] = ""
return nr, strings.Join(parts, "/")
}

func writeLiveMPD(log *slog.Logger, w http.ResponseWriter, cfg *ResponseConfig, a *asset, mpdName string, nowMS int) error {
work := make([]byte, 0, 1024)
buf := bytes.NewBuffer(work)
Expand Down
10 changes: 10 additions & 0 deletions cmd/livesim2/app/handler_urlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ type urlGenData struct {
StopRel string // sets stop-time for time-limited event relative to now (in seconds)
Scte35Var string // SCTE-35 insertion variant
StatusCodes string // comma-separated list of response code patterns to return
Traffic string // comma-separated list of up/down/slow/hang intervals for one or more BaseURLs in MPD
}

var initData urlGenData
Expand Down Expand Up @@ -328,6 +329,15 @@ func createURL(r *http.Request, aInfo assetsInfo) (data urlGenData, err error) {
data.StatusCodes = statusCodes
sb.WriteString(fmt.Sprintf("statuscode_%s/", statusCodes))
}
traffic := q.Get("traffic")
if traffic != "" {
_, err := CreateAllLossItvls(traffic)
if err != nil {
return data, fmt.Errorf("bad traffic patterns: %w", err)
}
data.Traffic = traffic
sb.WriteString(fmt.Sprintf("traffic_%s/", traffic))
}
sb.WriteString(fmt.Sprintf("%s/%s", asset, mpd))
data.URL = sb.String()
data.PlayURL = aInfo.PlayURL
Expand Down
4 changes: 4 additions & 0 deletions cmd/livesim2/app/livempd.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func LiveMPD(a *asset, mpdName string, cfg *ResponseConfig, nowMS int) (*m.MPD,
period.Duration = nil
period.Id = "P0"
period.Start = Ptr(m.Duration(0))
for bNr := 0; bNr < len(cfg.Traffic); bNr++ {
b := m.NewBaseURL(baseURL(bNr))
period.BaseURLs = append(period.BaseURLs, b)
}

adaptationSets := orderAdaptationSetsByContentType(period.AdaptationSets)
var refSegEntries segEntries
Expand Down
12 changes: 12 additions & 0 deletions cmd/livesim2/app/strconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,15 @@ func (s *strConvAccErr) ParseSegStatusCodes(key, val string) []SegStatusCodes {
}
return codes
}

func (s *strConvAccErr) ParseLossItvls(key, val string) []LossItvls {
if s.err != nil {
return nil
}
itvls, err := CreateAllLossItvls(val)
if err != nil {
s.err = fmt.Errorf("key=%s, err=%w", key, err)
return nil
}
return itvls
}
28 changes: 28 additions & 0 deletions cmd/livesim2/app/templates/urlgen.html
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,34 @@ <h1>Livesim2 URL generator</h1>
</p>
<input type="text" id="statuscode" name="statuscode" value="{{.StatusCodes}}" />
</label>
<label for="traffic">
<p><em>Traffic Patterns for one or more BaseURLs</em></p>
<p>
Specify time interval for loss patterns for one or more BaseURLs
with "up (u)", "down (d)", "slow (s)", or "hang (h)" states, like
<pre>u50d10,u10d50</pre>
or
<pre>d1,u1,u45s10h5</pre>
where:<br/>
<ul>
<li><it>u50d10,u10d50</it> is two pairs (two base URLS)</li>
<li><it>u50d10 means 50s up time and 10s downtime (60s cycle)</li>
<li><it>u10d50 means 10s up time and 50s downtime (60s cycle)</li>
<li><it>d1,u1,u45s10h5</it> means three base URLS where the first is always down,
the second is always up,
and the third is up 45s, slow 10s, and hanging 5s every minute</li>
</ul>
<ul>
<li>During a "down (d)" interval, all segment requests will result in a "404 Not Found" response.</li>
<li>During a "slow (s)" interval, all segment responses are delayed by 2s.</li>
<li>During a "hang (s)" interval, all segment responses hang for 10s before resulting in 503.
</li>
</ul>


</p>
<input type="text" id="traffic" name="traffic" value="{{.Traffic}}" />
</label>
</details>


Expand Down

0 comments on commit 1f0cc98

Please sign in to comment.