Skip to content

Commit

Permalink
Merge pull request #129 from Dash-Industry-Forum/traffic
Browse files Browse the repository at this point in the history
feat: traffic pattern intervals with multiple base urls
  • Loading branch information
tobbee authored Oct 29, 2023
2 parents 3d85b30 + 62a7e46 commit 9bf47d4
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 34 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## Added

- New highly configurable statuscode option for cyclic bad segment request responses
- New highly configurable `statuscode`` parameter for cyclic bad segment request responses
- New URL parameter `traffic` to simulate periodic issues with fetching
segments. Supports multiple parallel BaseURLs.

## Changed

Expand Down
115 changes: 112 additions & 3 deletions cmd/livesim2/app/configurl.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

"github.com/Dash-Industry-Forum/livesim2/pkg/scte35"
)
Expand All @@ -24,6 +25,7 @@ const (
timeLineTime liveMPDType = iota
timeLineNumber
segmentNumber
baseURLPrefix = "bu"
)

type UTCTimingMethod string
Expand All @@ -49,7 +51,6 @@ const (
type ResponseConfig struct {
URLParts []string `json:"-"`
URLContentIdx int `json:"-"`
BaseURLs []string `json:"BaseURLs,omitempty"`
UTCTimingMethods []UTCTimingMethod `json:"UTCTimingMethods,omitempty"`
PeriodDurations []int `json:"PeriodDurations,omitempty"`
StartTimeS int `json:"StartTimeS"`
Expand Down Expand Up @@ -85,6 +86,7 @@ type ResponseConfig struct {
TimeSubsRegion int `json:"TimeSubsRegion,omitempty"`
Host string `json:"Host,omitempty"`
SegStatusCodes []SegStatusCodes `json:"SegStatus,omitempty"`
Traffic []LossItvls `json:"Traffic,omitempty"`
}

// SegStatusCodes configures regular extraordinary segment response codes
Expand All @@ -99,6 +101,113 @@ type SegStatusCodes struct {
Reps []string
}

// CreateAllLossItvls creates loss intervals for multiple BaseURLs
func CreateAllLossItvls(pattern string) ([]LossItvls, error) {
if pattern == "" {
return nil, nil
}
nr := strings.Count(pattern, ",") + 1
li := make([]LossItvls, 0, nr)
for _, s := range strings.Split(pattern, ",") {
li1, err := CreateLossItvls(s)
if err != nil {
return nil, err
}
li = append(li, li1)
}
return li, nil
}

type lossState int

const (
lossUnknown lossState = iota
lossNo
loss404
lossSlow // Slow response
lossHang // Hangs for 10s
lossSlowTime = 2 * time.Second
lossHangTime = 10 * time.Second
)

// LossItvls is loss intervals for one BaseURL
type LossItvls struct {
Itvls []LossItvl
}

// CycleDurS returns complete dur of cycle in seconds
func (l LossItvls) CycleDurS() int {
dur := 0
for _, itvl := range l.Itvls {
dur += itvl.durS
}
return dur
}

func (l LossItvls) StateAt(nowS int) lossState {
dur := l.CycleDurS()
rest := nowS % dur
for _, itvl := range l.Itvls {
rest -= itvl.durS
if rest < 0 {
return itvl.state
}
}
return lossUnknown
}

// CreateLossItvls creates a LossItvls from a pattern like u20d10 (20s up, 10 down)
func CreateLossItvls(pattern string) (LossItvls, error) {
li := LossItvls{}
state := lossUnknown
dur := 0
for i := 0; i < len(pattern); i++ {
c := pattern[i]
switch c {
case 'u', 'd', 's', 'h':
if state != lossUnknown {
if dur == 0 {
return LossItvls{}, fmt.Errorf("invalid loss pattern %q", pattern)
}
li.Itvls = append(li.Itvls, LossItvl{durS: dur, state: state})
}
dur = 0
switch c {
case 'u':
state = lossNo
case 'd':
state = loss404
case 's':
state = lossSlow
case 'h':
state = lossHang
}
default:
digit := c - '0'
if digit > 9 {
return LossItvls{}, fmt.Errorf("invalid loss pattern %q", pattern)
}
dur = dur*10 + int(digit)
}
}
if state != lossUnknown {
if dur == 0 {
return LossItvls{}, fmt.Errorf("invalid loss pattern %q", pattern)
}
li.Itvls = append(li.Itvls, LossItvl{durS: dur, state: state})
}
return li, nil
}

type LossItvl struct {
durS int
state lossState
}

func baseURL(nr int) string {
return fmt.Sprintf("bu%d/", nr)
}

// NewResponseConfig returns a new ResponseConfig with default values.
func NewResponseConfig() *ResponseConfig {
c := ResponseConfig{
Expand Down Expand Up @@ -213,8 +322,6 @@ cfgLoop:
cfg.SegTimelineFlag = true
case "segtimelinenr":
cfg.SegTimelineNrFlag = true
case "baseurl": // Add one or more BaseURLs, put all configurations
cfg.BaseURLs = append(cfg.BaseURLs, val)
case "peroff": // Set the period offset
cfg.PeriodOffset = sc.AtoiPtr(key, val)
case "scte35": // Signal this many SCTE-35 ad periods inband (emsg messages) every minute
Expand Down Expand Up @@ -246,6 +353,8 @@ cfgLoop:
cfg.TimeSubsRegion = sc.Atoi(key, val)
case "statuscode":
cfg.SegStatusCodes = sc.ParseSegStatusCodes(key, val)
case "traffic":
cfg.Traffic = sc.ParseLossItvls(key, val)
default:
contentStartIdx = i
break cfgLoop
Expand Down
37 changes: 37 additions & 0 deletions cmd/livesim2/app/configurl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,40 @@ func TestProcessURLCfg(t *testing.T) {
}
}
}

func TestParseLossItvls(t *testing.T) {
cases := []struct {
desc string
patter string
wantedItvls []LossItvls
}{
{"empty", "", nil},
{"up 10s", "u10",
[]LossItvls{
{Itvls: []LossItvl{
{10, lossNo}}},
},
},
{"up20s down3s up12s", "u10d3u12",
[]LossItvls{
{Itvls: []LossItvl{
{10, lossNo},
{3, loss404},
{12, lossNo}}},
},
},
{"up 5s", "u5",
[]LossItvls{
{Itvls: []LossItvl{
{5, lossNo}}},
},
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
gotItvls, err := CreateAllLossItvls(c.patter)
require.NoError(t, err)
require.Equal(t, c.wantedItvls, gotItvls)
})
}
}
41 changes: 41 additions & 0 deletions cmd/livesim2/app/handler_livesim.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,30 @@ func (s *Server) livesimHandlerFunc(w http.ResponseWriter, r *http.Request) {
}
case ".mp4", ".m4s", ".cmfv", ".cmfa", ".cmft", ".jpg", ".jpeg", ".m4v", ".m4a":
segmentPart := strings.TrimPrefix(contentPart, a.AssetPath) // includes heading slash
if len(cfg.Traffic) > 0 {
var patternNr int
patternNr, segmentPart = extractPattern(segmentPart)
if patternNr >= 0 {
itvls := cfg.Traffic[patternNr]
switch itvls.StateAt(nowMS / 1000) {
case lossNo:
// Just continue
case loss404:
http.Error(w, "Not Found", http.StatusNotFound)
return
case lossSlow:
time.Sleep(lossSlowTime)
case lossHang:
// Get the result, but after 10s
time.Sleep(lossHangTime)
http.Error(w, "Hang", http.StatusServiceUnavailable)
return
default:
http.Error(w, "strange loss state", http.StatusInternalServerError)
return
}
}
}
code, err := writeSegment(r.Context(), w, log, cfg, s.assetMgr.vodFS, a, segmentPart[1:], nowMS, s.textTemplates)
if err != nil {
var tooEarly errTooEarly
Expand Down Expand Up @@ -120,6 +144,23 @@ func (s *Server) livesimHandlerFunc(w http.ResponseWriter, r *http.Request) {
}
}

// extractPattern extracts the pattern number and return a modified segmentPart.
func extractPattern(segmentPart string) (int, string) {
parts := strings.Split(segmentPart, "/")
pPart := parts[1]
if !strings.HasPrefix(pPart, baseURLPrefix) {
return -1, segmentPart
}
nr, err := strconv.Atoi(pPart[len(baseURLPrefix):])
if err != nil {
return -1, segmentPart
}
// Remove the base URL part, but leave an empty string at start.
parts = parts[1:]
parts[0] = ""
return nr, strings.Join(parts, "/")
}

func writeLiveMPD(log *slog.Logger, w http.ResponseWriter, cfg *ResponseConfig, a *asset, mpdName string, nowMS int) error {
work := make([]byte, 0, 1024)
buf := bytes.NewBuffer(work)
Expand Down
59 changes: 36 additions & 23 deletions cmd/livesim2/app/handler_urlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,7 @@ func (s *Server) urlGenHandlerFunc(w http.ResponseWriter, r *http.Request) {
}
templateName = "mpds"
case "/urlgen/create":
data, err = createURL(r, aInfo)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
data = createURL(r, aInfo)
default:
data, err = createInitData(r, aInfo)
if err != nil {
Expand Down Expand Up @@ -117,15 +113,17 @@ type urlGenData struct {
TimeSubsDur string // cue duration of generated subtitles (in milliseconds)
TimeSubsReg string // 0 for bottom and 1 for top
UTCTiming string
Periods string // number of periods per hour (1-60)
Continuous bool // period continuity signaling
StartNR string // startNumber (default=0) -1 translates to no value in MPD (fallback to default = 1)
Start string // sets timeline start (and availabilityStartTime) relative to Epoch (in seconds)
Stop string // sets stop-time for time-limited event (in seconds)
StartRel string // sets timeline start (and availabilityStartTime) relative to now (in seconds). Normally negative value.
StopRel string // sets stop-time for time-limited event relative to now (in seconds)
Scte35Var string // SCTE-35 insertion variant
StatusCodes string // comma-separated list of response code patterns to return
Periods string // number of periods per hour (1-60)
Continuous bool // period continuity signaling
StartNR string // startNumber (default=0) -1 translates to no value in MPD (fallback to default = 1)
Start string // sets timeline start (and availabilityStartTime) relative to Epoch (in seconds)
Stop string // sets stop-time for time-limited event (in seconds)
StartRel string // sets timeline start (and availabilityStartTime) relative to now (in seconds). Normally negative value.
StopRel string // sets stop-time for time-limited event relative to now (in seconds)
Scte35Var string // SCTE-35 insertion variant
StatusCodes string // comma-separated list of response code patterns to return
Traffic string // comma-separated list of up/down/slow/hang intervals for one or more BaseURLs in MPD
Errors []string // error messages to display due to bad configuration
}

var initData urlGenData
Expand Down Expand Up @@ -175,13 +173,14 @@ func createInitData(r *http.Request, aInfo assetsInfo) (data urlGenData, err err
return data, nil
}

func createURL(r *http.Request, aInfo assetsInfo) (data urlGenData, err error) {
// createURL creates a URL from the request parameters. Errors are returned in ErrorMsg field.
func createURL(r *http.Request, aInfo assetsInfo) urlGenData {
q := r.URL.Query()
var sb strings.Builder // Used to build URL
asset := q.Get("asset")
mpd := q.Get("mpd")
// fmt.Println("create", asset, mpd)
data = initData
data := initData
data.Assets = make([]assetWithSelect, 0, len(aInfo.Assets))
data.MPDs = nil
for i := range aInfo.Assets {
Expand Down Expand Up @@ -320,17 +319,31 @@ func createURL(r *http.Request, aInfo assetsInfo) (data urlGenData, err error) {
}
statusCodes := q.Get("statuscode")
if statusCodes != "" {
s := newStringConverter()
_ = s.ParseSegStatusCodes("statuscode", statusCodes)
if s.err != nil {
return data, fmt.Errorf("bad status codes: %w", s.err)
sc := newStringConverter()
_ = sc.ParseSegStatusCodes("statuscode", statusCodes)
if sc.err != nil {
data.Errors = append(data.Errors, fmt.Sprintf("bad statuscode patterns: %s", sc.err.Error()))
}
data.StatusCodes = statusCodes
sb.WriteString(fmt.Sprintf("statuscode_%s/", statusCodes))
}
traffic := q.Get("traffic")
if traffic != "" {
_, err := CreateAllLossItvls(traffic)
if err != nil {
data.Errors = append(data.Errors, fmt.Sprintf("bad traffic pattern: %s", err.Error()))
}
data.Traffic = traffic
sb.WriteString(fmt.Sprintf("traffic_%s/", traffic))
}
sb.WriteString(fmt.Sprintf("%s/%s", asset, mpd))
data.URL = sb.String()
data.PlayURL = aInfo.PlayURL
if len(data.Errors) > 0 {
data.URL = ""
data.PlayURL = ""
} else {
data.URL = sb.String()
data.PlayURL = aInfo.PlayURL
}
data.Host = aInfo.Host
return data, nil
return data
}
4 changes: 4 additions & 0 deletions cmd/livesim2/app/livempd.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ func LiveMPD(a *asset, mpdName string, cfg *ResponseConfig, nowMS int) (*m.MPD,
period.Duration = nil
period.Id = "P0"
period.Start = Ptr(m.Duration(0))
for bNr := 0; bNr < len(cfg.Traffic); bNr++ {
b := m.NewBaseURL(baseURL(bNr))
period.BaseURLs = append(period.BaseURLs, b)
}

adaptationSets := orderAdaptationSetsByContentType(period.AdaptationSets)
var refSegEntries segEntries
Expand Down
12 changes: 12 additions & 0 deletions cmd/livesim2/app/strconv.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,15 @@ func (s *strConvAccErr) ParseSegStatusCodes(key, val string) []SegStatusCodes {
}
return codes
}

func (s *strConvAccErr) ParseLossItvls(key, val string) []LossItvls {
if s.err != nil {
return nil
}
itvls, err := CreateAllLossItvls(val)
if err != nil {
s.err = fmt.Errorf("key=%s, err=%w", key, err)
return nil
}
return itvls
}
Loading

0 comments on commit 9bf47d4

Please sign in to comment.