diff --git a/.vscode/settings.json b/.vscode/settings.json index 9eb6b01..286b461 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -94,6 +94,7 @@ "Sidx", "Sntp", "startrel", + "statuscode", "stoprel", "stppd", "stpptime", @@ -128,6 +129,7 @@ "urlgen", "urlprefix", "UTCMS", + "vals", "vodroot", "vttc", "vtte", diff --git a/CHANGELOG.md b/CHANGELOG.md index b39a02c..30a32f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## Added + +- New highly configurable statuscode option for cyclic bad segment request responses + ## Changed - Upgrade to Go 1.21 diff --git a/cmd/livesim2/app/configurl.go b/cmd/livesim2/app/configurl.go index 6e092c0..ba73251 100644 --- a/cmd/livesim2/app/configurl.go +++ b/cmd/livesim2/app/configurl.go @@ -8,6 +8,7 @@ import ( "fmt" "math" "net/http" + "net/url" "strings" "github.com/Dash-Industry-Forum/livesim2/pkg/scte35" @@ -83,6 +84,19 @@ type ResponseConfig struct { TimeSubsDurMS int `json:"TimeSubsDurMS,omitempty"` TimeSubsRegion int `json:"TimeSubsRegion,omitempty"` Host string `json:"Host,omitempty"` + SegStatusCodes []SegStatusCodes `json:"SegStatus,omitempty"` +} + +// SegStatusCodes configures regular extraordinary segment response codes +type SegStatusCodes struct { + // Cycle is cycle length in seconds + Cycle int + // Rsq is relative sequence number (in cycle) + Rsq int + // Code is the HTTP response code + Code int + // Reps is a list of applicable representations (empty means all) + Reps []string } // NewResponseConfig returns a new ResponseConfig with default values. @@ -132,9 +146,14 @@ func (rc *ResponseConfig) getStartNr() int { } // processURLCfg returns all information that can be extracted from url -func processURLCfg(url string, nowMS int) (*ResponseConfig, error) { +func processURLCfg(confURL string, nowMS int) (*ResponseConfig, error) { // Mimics configprocessor.process_url - urlParts := strings.Split(url, "/") + + cfgURL, err := url.QueryUnescape(confURL) + if err != nil { + return nil, fmt.Errorf("url.QueryUnescape: %w", err) + } + urlParts := strings.Split(cfgURL, "/") cfg := NewResponseConfig() cfg.URLParts = urlParts sc := strConvAccErr{} @@ -225,6 +244,8 @@ cfgLoop: cfg.TimeSubsDurMS = sc.Atoi(key, val) case "timesubsreg": // region (0 or 1) cfg.TimeSubsRegion = sc.Atoi(key, val) + case "statuscode": + cfg.SegStatusCodes = sc.ParseSegStatusCodes(key, val) default: contentStartIdx = i break cfgLoop @@ -237,7 +258,7 @@ cfgLoop: return nil, fmt.Errorf("no content part") } - err := verifyAndFillConfig(cfg, nowMS) + err = verifyAndFillConfig(cfg, nowMS) if err != nil { return cfg, fmt.Errorf("url config: %w", err) } diff --git a/cmd/livesim2/app/handler_livesim.go b/cmd/livesim2/app/handler_livesim.go index 44fc607..f3041e4 100644 --- a/cmd/livesim2/app/handler_livesim.go +++ b/cmd/livesim2/app/handler_livesim.go @@ -21,6 +21,7 @@ import ( "time" "github.com/Dash-Industry-Forum/livesim2/pkg/logging" + "github.com/Eyevinn/dash-mpd/mpd" ) // livesimHandlerFunc handles mpd and segment requests. @@ -90,7 +91,7 @@ func (s *Server) livesimHandlerFunc(w http.ResponseWriter, r *http.Request) { } case ".mp4", ".m4s", ".cmfv", ".cmfa", ".cmft", ".jpg", ".jpeg", ".m4v", ".m4a": segmentPart := strings.TrimPrefix(contentPart, a.AssetPath) // includes heading slash - err = writeSegment(r.Context(), w, log, cfg, s.assetMgr.vodFS, a, segmentPart[1:], nowMS, s.textTemplates) + code, err := writeSegment(r.Context(), w, log, cfg, s.assetMgr.vodFS, a, segmentPart[1:], nowMS, s.textTemplates) if err != nil { var tooEarly errTooEarly switch { @@ -108,6 +109,11 @@ func (s *Server) livesimHandlerFunc(w http.ResponseWriter, r *http.Request) { return } } + if code != 0 { + log.Debug("special return code", "code", code) + http.Error(w, "triggered code", code) + return + } default: http.Error(w, "unknown file extension", http.StatusNotFound) return @@ -141,19 +147,108 @@ func writeLiveMPD(log *slog.Logger, w http.ResponseWriter, cfg *ResponseConfig, return nil } +// writeSegment writes a segment to the response writer, but may also return a special status code if configured. func writeSegment(ctx context.Context, w http.ResponseWriter, log *slog.Logger, cfg *ResponseConfig, vodFS fs.FS, a *asset, - segmentPart string, nowMS int, tt *template.Template) error { + segmentPart string, nowMS int, tt *template.Template) (code int, err error) { // First check if init segment and return isInitSegment, err := writeInitSegment(w, cfg, vodFS, a, segmentPart) if err != nil { - return fmt.Errorf("writeInitSegment: %w", err) + return 0, fmt.Errorf("writeInitSegment: %w", err) } if isInitSegment { - return nil + return 0, nil + } + if len(cfg.SegStatusCodes) > 0 { + code, err = calcStatusCode(cfg, vodFS, a, segmentPart, nowMS) + if err != nil { + return 0, err + } + if code != 0 { + return code, nil + } } if cfg.AvailabilityTimeCompleteFlag { - return writeLiveSegment(w, cfg, vodFS, a, segmentPart, nowMS, tt) + return 0, writeLiveSegment(w, cfg, vodFS, a, segmentPart, nowMS, tt) } // Chunked low-latency mode - return writeChunkedSegment(ctx, w, log, cfg, vodFS, a, segmentPart, nowMS) + return 0, writeChunkedSegment(ctx, w, log, cfg, vodFS, a, segmentPart, nowMS) +} + +// calcStatusCode returns the configured status code for the segment or 0 if none. +func calcStatusCode(cfg *ResponseConfig, vodFS fs.FS, a *asset, segmentPart string, nowMS int) (int, error) { + rep, _, err := findRepAndSegmentID(a, segmentPart) + if err != nil { + return 0, fmt.Errorf("findRepAndSegmentID: %w", err) + } + + // segMeta is to be used for all look up. For audio it uses reference (video) track + segMeta, err := findSegMeta(vodFS, a, cfg, segmentPart, nowMS) + if err != nil { + return 0, fmt.Errorf("findSegMeta: %w", err) + } + startTime := int(segMeta.newTime) + repTimescale := int(segMeta.timescale) + for _, ss := range cfg.SegStatusCodes { + if !repInReps(a, rep.ID, ss.Reps) { + continue + } + // Then move to the reference track and relate to cycles + // From segment number we calculate a start time + // The time gives us how many cycles we have passed (time / cycleDuration) + cycle := ss.Cycle + cycleInTimescale := cycle * repTimescale + nrWraps := startTime / cycleInTimescale + wrapStartS := nrWraps * cycle + // Next we need to find the number after wrap + // For that we need to find the first segment nr after wrapStart + // Use nowMS = cycleStart to look up the latest segment published at that time + firstNr := 0 + if nrWraps > 0 { + lastNr := findLastSegNr(cfg, a, wrapStartS*1000, segMeta.rep) + firstNr = lastNr + 1 + } + segTime := findSegStartTime(a, cfg, firstNr, segMeta.rep) + if segTime < wrapStartS*repTimescale { + firstNr += 1 + } + idx := int(segMeta.newNr) - firstNr + if idx < 0 { + return 0, fmt.Errorf("segment %d is before first segment %d", segMeta.newNr, firstNr) + } + if idx == ss.Rsq { + return ss.Code, nil + } + } + return 0, nil +} + +func findLastSegNr(cfg *ResponseConfig, a *asset, nowMS int, rep *RepData) int { + wTimes := calcWrapTimes(a, cfg, nowMS, mpd.Duration(60*time.Second)) + timeLineEntries := a.generateTimelineEntries(rep.ID, wTimes, 0) + return timeLineEntries.lastNr() +} + +func findSegStartTime(a *asset, cfg *ResponseConfig, nr int, rep *RepData) int { + wrapLen := len(rep.Segments) + startNr := cfg.getStartNr() + nrAfterStart := int(nr) - startNr + nrWraps := nrAfterStart / wrapLen + relNr := nrAfterStart - nrWraps*wrapLen + wrapDur := a.LoopDurMS * rep.MediaTimescale / 1000 + wrapTime := nrWraps * wrapDur + seg := rep.Segments[relNr] + return wrapTime + int(seg.StartTime) +} + +func repInReps(a *asset, segmentPart string, reps []string) bool { + // TODO. Make better + if len(reps) == 0 { + return true + } + for _, rep := range reps { + if strings.Contains(segmentPart, rep) { + return true + } + } + return false } diff --git a/cmd/livesim2/app/handler_urlgen.go b/cmd/livesim2/app/handler_urlgen.go index d1fe645..45cd784 100644 --- a/cmd/livesim2/app/handler_urlgen.go +++ b/cmd/livesim2/app/handler_urlgen.go @@ -125,6 +125,7 @@ type urlGenData struct { StartRel string // sets timeline start (and availabilityStartTime) relative to now (in seconds). Normally negative value. StopRel string // sets stop-time for time-limited event relative to now (in seconds) Scte35Var string // SCTE-35 insertion variant + StatusCodes string // comma-separated list of response code patterns to return } var initData urlGenData @@ -317,6 +318,16 @@ func createURL(r *http.Request, aInfo assetsInfo) (data urlGenData, err error) { data.Scte35Var = scte35 sb.WriteString(fmt.Sprintf("scte35_%s/", scte35)) } + statusCodes := q.Get("statuscode") + if statusCodes != "" { + s := newStringConverter() + _ = s.ParseSegStatusCodes("statuscode", statusCodes) + if s.err != nil { + return data, fmt.Errorf("bad status codes: %w", s.err) + } + data.StatusCodes = statusCodes + sb.WriteString(fmt.Sprintf("statuscode_%s/", statusCodes)) + } sb.WriteString(fmt.Sprintf("%s/%s", asset, mpd)) data.URL = sb.String() data.PlayURL = aInfo.PlayURL diff --git a/cmd/livesim2/app/livempd.go b/cmd/livesim2/app/livempd.go index c98b27a..7049666 100644 --- a/cmd/livesim2/app/livempd.go +++ b/cmd/livesim2/app/livempd.go @@ -375,6 +375,14 @@ type segEntries struct { mediaTimescale uint32 } +func (s segEntries) lastNr() int { + nrSegs := 0 + for _, e := range s.entries { + nrSegs += int(e.R) + 1 + } + return s.startNr + nrSegs - 1 +} + // setOffsetInAdaptationSet sets the availabilityTimeOffset in the AdaptationSet. // Returns ErrAtoInfTimeline if infinite ato set with timeline. func setOffsetInAdaptationSet(cfg *ResponseConfig, a *asset, as *m.AdaptationSetType) (atoMS int, err error) { diff --git a/cmd/livesim2/app/livesegment.go b/cmd/livesim2/app/livesegment.go index 1aec090..28b9030 100644 --- a/cmd/livesim2/app/livesegment.go +++ b/cmd/livesim2/app/livesegment.go @@ -371,33 +371,55 @@ func createOutSeg(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart string return so, nil } -func createAudioSegment(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart string, nowMS int, rep *RepData, segID int) (segOut, error) { - refRep := a.refRep - refTimescale := uint64(refRep.MediaTimescale) - var refMeta segMeta - var err error - var so segOut - switch cfg.getRepType(segmentPart) { - case segmentNumber, timeLineNumber: - outSegNr := uint32(segID) - refMeta, err = findSegMetaFromNr(a, refRep, outSegNr, cfg, nowMS) +func findSegMeta(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart string, nowMS int) (segMeta, error) { + var sm segMeta + rep, segID, err := findRepAndSegmentID(a, segmentPart) + if err != nil { + return sm, fmt.Errorf("findRepAndSegmentID: %w", err) + } + + if rep.ContentType == "audio" { + sm, err := findRefSegMeta(vodFS, a, cfg, segmentPart, nowMS, rep, segID) if err != nil { - return so, fmt.Errorf("findSegMetaFromNr from reference: %w", err) + return sm, fmt.Errorf("findRefSegMeta: %w", err) + } + return sm, nil + } else { + switch cfg.getRepType(segmentPart) { + case segmentNumber, timeLineNumber: + nr := uint32(segID) + if nr < uint32(cfg.getStartNr()) { + return sm, errNotFound + } + sm, err = findSegMetaFromNr(a, rep, nr, cfg, nowMS) + case timeLineTime: + time := uint64(segID) + sm, err = findSegMetaFromTime(a, rep, time, cfg, nowMS) + default: + return sm, fmt.Errorf("unknown liveMPD type") } - case timeLineTime: - time := int(segID) - refMeta, err = findRefSegMetaFromTime(a, rep, uint64(time), cfg, nowMS) if err != nil { - return so, fmt.Errorf("findSegMetaFromNr from reference: %w", err) + return sm, err } - default: - return so, fmt.Errorf("unknown liveMPD type") } + return sm, nil +} + +func createAudioSegment(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart string, nowMS int, rep *RepData, segID int) (segOut, error) { + refRep := a.refRep + refTimescale := uint64(refRep.MediaTimescale) + + refMeta, err := findRefSegMeta(vodFS, a, cfg, segmentPart, nowMS, rep, segID) + if err != nil { + return segOut{}, fmt.Errorf("findRefSegMeta: %w", err) + } + recipe := calcAudioSegRecipe(refMeta.newNr, refMeta.newTime, refMeta.newTime+uint64(refMeta.newDur), uint64(refRep.duration()), refTimescale, rep) + var so segOut so.seg, err = createAudioSeg(vodFS, a, cfg, recipe) if err != nil { return so, fmt.Errorf("createAudioSeg: %w", err) @@ -413,6 +435,29 @@ func createAudioSegment(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart return so, nil } +// findRefSegMeta finds the reference track meta data given other following track like audio +func findRefSegMeta(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart string, nowMS int, rep *RepData, segID int) (segMeta, error) { + var refMeta segMeta + var err error + switch cfg.getRepType(segmentPart) { + case segmentNumber, timeLineNumber: + outSegNr := uint32(segID) + refMeta, err = findSegMetaFromNr(a, a.refRep, outSegNr, cfg, nowMS) + if err != nil { + return refMeta, fmt.Errorf("findSegMetaFromNr from reference: %w", err) + } + case timeLineTime: + time := int(segID) + refMeta, err = findRefSegMetaFromTime(a, rep, uint64(time), cfg, nowMS) + if err != nil { + return refMeta, fmt.Errorf("findSegMetaFromNr from reference: %w", err) + } + default: + return refMeta, fmt.Errorf("unknown liveMPD type") + } + return refMeta, nil +} + // writeChunkedSegment splits a segment into chunks and send them as they become available timewise. // // nowMS servers as reference for the current time and can be set to any value. Media time will diff --git a/cmd/livesim2/app/livesegment_test.go b/cmd/livesim2/app/livesegment_test.go index 84a1718..40a9b36 100644 --- a/cmd/livesim2/app/livesegment_test.go +++ b/cmd/livesim2/app/livesegment_test.go @@ -558,3 +558,115 @@ func TestLLSegmentAvailability(t *testing.T) { require.Equal(t, tc.expectedDecodeTime, int(decodeTime), "response segment decode time") } } + +func TestSegmentStatusCodeResponse(t *testing.T) { + vodFS := os.DirFS("testdata/assets") + am := newAssetMgr(vodFS, "", false) + err := am.discoverAssets() + require.NoError(t, err) + + cases := []struct { + desc string + asset string + media string + mpdType string + nrOrTime int + ss []SegStatusCodes + nowMS int + expCode int + }{ + { + desc: "hit cyclic 404", + asset: "testpic_2s", + media: "V300/$NrOrTime$.m4s", + mpdType: "Number", + nrOrTime: 30, + nowMS: 90_000, + ss: []SegStatusCodes{ + {Cycle: 30, Code: 404, Rsq: 0}, + }, + expCode: 404, + }, + { + desc: "miss cyclic 404", + asset: "testpic_2s", + media: "V300/$NrOrTime$.m4s", + mpdType: "Number", + nrOrTime: 31, + nowMS: 90_000, + ss: []SegStatusCodes{ + {Cycle: 30, Code: 404, Rsq: 0}, + }, + expCode: 0, + }, + { + desc: "hit cyclic 404 for timeline time", + asset: "testpic_2s", + media: "V300/$NrOrTime$.m4s", + mpdType: "TimelineTime", + nrOrTime: 90000 * 30, + nowMS: 90_000, + ss: []SegStatusCodes{ + {Cycle: 30, Code: 404}, + }, + expCode: 404, + }, + { + desc: "miss cyclic 404 for timeline time", + asset: "testpic_2s", + media: "V300/$NrOrTime$.m4s", + mpdType: "TimelineTime", + nrOrTime: 90000 * 32, + nowMS: 90_000, + ss: []SegStatusCodes{ + {Cycle: 30, Code: 404}, + }, + expCode: 0, + }, + { + desc: "hit cyclic 404 with specific rep", + asset: "testpic_2s", + media: "A48/$NrOrTime$.m4s", + mpdType: "Number", + nrOrTime: 30, + nowMS: 90_000, + ss: []SegStatusCodes{ + {Cycle: 30, Code: 404, Reps: []string{"A48"}}, + }, + expCode: 404, + }, + { + desc: "hit cyclic 404 but not specific rep", + asset: "testpic_2s", + media: "A48/$NrOrTime$.m4s", + mpdType: "Number", + nrOrTime: 30, + nowMS: 90_000, + ss: []SegStatusCodes{ + {Cycle: 30, Code: 404, Reps: []string{"V300"}}, + }, + expCode: 0, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + asset, ok := am.findAsset(tc.asset) + require.True(t, ok) + require.NoError(t, err) + cfg := NewResponseConfig() + switch tc.mpdType { + case "Number": + case "TimelineTime": + cfg.SegTimelineFlag = true + case "TimelineNumber": + cfg.SegTimelineNrFlag = true + } + cfg.SegStatusCodes = tc.ss + media := strings.Replace(tc.media, "$NrOrTime$", fmt.Sprintf("%d", tc.nrOrTime), -1) + rr := httptest.NewRecorder() + code, err := writeSegment(context.TODO(), rr, slog.Default(), cfg, vodFS, asset, media, tc.nowMS, nil) + require.NoError(t, err) + require.Equal(t, tc.expCode, code) + }) + } +} diff --git a/cmd/livesim2/app/strconv.go b/cmd/livesim2/app/strconv.go index ca35cf9..5b37fad 100644 --- a/cmd/livesim2/app/strconv.go +++ b/cmd/livesim2/app/strconv.go @@ -15,6 +15,11 @@ type strConvAccErr struct { err error } +// newStringConverter returns a new string converter for URL parsing. +func newStringConverter() *strConvAccErr { + return &strConvAccErr{} +} + func (s *strConvAccErr) Atoi(key, val string) int { if s.err != nil { return 0 @@ -104,3 +109,55 @@ func (s *strConvAccErr) AtofInf(key, val string) float64 { } return valFloat } + +// ParseSegStatusCodes parses a command line [{cycle:30, rsq: 0, code: 404, rep:video}] +func (s *strConvAccErr) ParseSegStatusCodes(key, val string) []SegStatusCodes { + if s.err != nil { + return nil + } + // remove all spaces, remove start [{ and end }], split on },{, + trimmed := strings.ReplaceAll(val, " ", "") + if len(trimmed) < 4 { + s.err = fmt.Errorf("val=%q for key %q is too short", val, key) + return nil + } + trimmed = trimmed[2 : len(trimmed)-2] + parts := strings.Split(trimmed, "},{") + codes := make([]SegStatusCodes, len(parts)) + for i, part := range parts { + // split on , and : + pairs := strings.Split(part, ",") + for _, p := range pairs { + kv := strings.Split(p, ":") + if len(kv) != 2 { + s.err = fmt.Errorf("val=%q for key %q is not a valid. Bad pair", val, key) + return nil + } + switch kv[0] { + case "cycle": + codes[i].Cycle = s.Atoi("cycle", kv[1]) + case "rsq": + codes[i].Rsq = s.Atoi("rsq", kv[1]) + case "code": + codes[i].Code = s.Atoi("code", kv[1]) + case "rep": + if kv[1] != "*" { // * and empty means all reps + reps := strings.Split(kv[1], ",") + codes[i].Reps = reps + } + default: + s.err = fmt.Errorf("val=%q for key %q is not a valid. Unknown key", val, key) + } + } + if codes[i].Cycle <= 0 { + s.err = fmt.Errorf("val=%q for key %q is not a valid. cycle is too small", val, key) + } + if codes[i].Rsq < 0 { + s.err = fmt.Errorf("val=%q for key %q is not a valid. rsq is too small", val, key) + } + if codes[i].Code < 400 || codes[i].Code > 599 { + s.err = fmt.Errorf("val=%q for key %q is not a valid. code is not in range 400-599", val, key) + } + } + return codes +} diff --git a/cmd/livesim2/app/strconv_test.go b/cmd/livesim2/app/strconv_test.go new file mode 100644 index 0000000..b46891f --- /dev/null +++ b/cmd/livesim2/app/strconv_test.go @@ -0,0 +1,62 @@ +package app + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestParseSegStatusCodes(t *testing.T) { + key := "statuscode" + cases := []struct { + desc string + val string + wantErr string + want []SegStatusCodes + }{ + { + desc: "empty", + val: "", + wantErr: `val="" for key "statuscode" is too short`, + }, + { + desc: "404 on first video packet", + val: "[{cycle:30, rsq:0, code:404, rep:video}]", + want: []SegStatusCodes{ + {Cycle: 30, Rsq: 0, Code: 404, Reps: []string{"video"}}, + }, + }, + { + desc: "* for reps", + val: "[{cycle:30, rsq:2, code:404, rep:*}]", + want: []SegStatusCodes{ + {Cycle: 30, Rsq: 2, Code: 404, Reps: nil}, + }, + }, + { + desc: "no reps", + val: "[{cycle:30, rsq:1, code:404}]", + want: []SegStatusCodes{ + {Cycle: 30, Rsq: 1, Code: 404, Reps: nil}, + }, + }, + { + desc: "bad code", + val: "[{cycle:30, rsq:2, code:600, rep:*}]", + wantErr: `val="[{cycle:30, rsq:2, code:600, rep:*}]" for key "statuscode" is not a valid. code is not in range 400-599`, + }, + } + + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + s := strConvAccErr{} + got := s.ParseSegStatusCodes(key, c.val) + if c.wantErr != "" { + require.Equal(t, c.wantErr, s.err.Error()) + return + } + require.NoError(t, s.err) + require.Equal(t, c.want, got) + }) + } +} diff --git a/cmd/livesim2/app/templates/urlgen.html b/cmd/livesim2/app/templates/urlgen.html index b40f85c..9553975 100644 --- a/cmd/livesim2/app/templates/urlgen.html +++ b/cmd/livesim2/app/templates/urlgen.html @@ -192,6 +192,26 @@