Skip to content

Commit

Permalink
Merge pull request #126 from Dash-Industry-Forum/status-codes
Browse files Browse the repository at this point in the history
New URL option status code to configure cyclic HTTP error codes
  • Loading branch information
tobbee authored Oct 28, 2023
2 parents 06fbf20 + 795cc45 commit 032e687
Show file tree
Hide file tree
Showing 11 changed files with 463 additions and 26 deletions.
2 changes: 2 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
"Sidx",
"Sntp",
"startrel",
"statuscode",
"stoprel",
"stppd",
"stpptime",
Expand Down Expand Up @@ -128,6 +129,7 @@
"urlgen",
"urlprefix",
"UTCMS",
"vals",
"vodroot",
"vttc",
"vtte",
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## Added

- New highly configurable statuscode option for cyclic bad segment request responses

## Changed

- Upgrade to Go 1.21
Expand Down
27 changes: 24 additions & 3 deletions cmd/livesim2/app/configurl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"math"
"net/http"
"net/url"
"strings"

"github.com/Dash-Industry-Forum/livesim2/pkg/scte35"
Expand Down Expand Up @@ -83,6 +84,19 @@ type ResponseConfig struct {
TimeSubsDurMS int `json:"TimeSubsDurMS,omitempty"`
TimeSubsRegion int `json:"TimeSubsRegion,omitempty"`
Host string `json:"Host,omitempty"`
SegStatusCodes []SegStatusCodes `json:"SegStatus,omitempty"`
}

// SegStatusCodes configures regular extraordinary segment response codes
type SegStatusCodes struct {
// Cycle is cycle length in seconds
Cycle int
// Rsq is relative sequence number (in cycle)
Rsq int
// Code is the HTTP response code
Code int
// Reps is a list of applicable representations (empty means all)
Reps []string
}

// NewResponseConfig returns a new ResponseConfig with default values.
Expand Down Expand Up @@ -132,9 +146,14 @@ func (rc *ResponseConfig) getStartNr() int {
}

// processURLCfg returns all information that can be extracted from url
func processURLCfg(url string, nowMS int) (*ResponseConfig, error) {
func processURLCfg(confURL string, nowMS int) (*ResponseConfig, error) {
// Mimics configprocessor.process_url
urlParts := strings.Split(url, "/")

cfgURL, err := url.QueryUnescape(confURL)
if err != nil {
return nil, fmt.Errorf("url.QueryUnescape: %w", err)
}
urlParts := strings.Split(cfgURL, "/")
cfg := NewResponseConfig()
cfg.URLParts = urlParts
sc := strConvAccErr{}
Expand Down Expand Up @@ -225,6 +244,8 @@ cfgLoop:
cfg.TimeSubsDurMS = sc.Atoi(key, val)
case "timesubsreg": // region (0 or 1)
cfg.TimeSubsRegion = sc.Atoi(key, val)
case "statuscode":
cfg.SegStatusCodes = sc.ParseSegStatusCodes(key, val)
default:
contentStartIdx = i
break cfgLoop
Expand All @@ -237,7 +258,7 @@ cfgLoop:
return nil, fmt.Errorf("no content part")
}

err := verifyAndFillConfig(cfg, nowMS)
err = verifyAndFillConfig(cfg, nowMS)
if err != nil {
return cfg, fmt.Errorf("url config: %w", err)
}
Expand Down
107 changes: 101 additions & 6 deletions cmd/livesim2/app/handler_livesim.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/Dash-Industry-Forum/livesim2/pkg/logging"
"github.com/Eyevinn/dash-mpd/mpd"
)

// livesimHandlerFunc handles mpd and segment requests.
Expand Down Expand Up @@ -90,7 +91,7 @@ func (s *Server) livesimHandlerFunc(w http.ResponseWriter, r *http.Request) {
}
case ".mp4", ".m4s", ".cmfv", ".cmfa", ".cmft", ".jpg", ".jpeg", ".m4v", ".m4a":
segmentPart := strings.TrimPrefix(contentPart, a.AssetPath) // includes heading slash
err = writeSegment(r.Context(), w, log, cfg, s.assetMgr.vodFS, a, segmentPart[1:], nowMS, s.textTemplates)
code, err := writeSegment(r.Context(), w, log, cfg, s.assetMgr.vodFS, a, segmentPart[1:], nowMS, s.textTemplates)
if err != nil {
var tooEarly errTooEarly
switch {
Expand All @@ -108,6 +109,11 @@ func (s *Server) livesimHandlerFunc(w http.ResponseWriter, r *http.Request) {
return
}
}
if code != 0 {
log.Debug("special return code", "code", code)
http.Error(w, "triggered code", code)
return
}
default:
http.Error(w, "unknown file extension", http.StatusNotFound)
return
Expand Down Expand Up @@ -141,19 +147,108 @@ func writeLiveMPD(log *slog.Logger, w http.ResponseWriter, cfg *ResponseConfig,
return nil
}

// writeSegment writes a segment to the response writer, but may also return a special status code if configured.
func writeSegment(ctx context.Context, w http.ResponseWriter, log *slog.Logger, cfg *ResponseConfig, vodFS fs.FS, a *asset,
segmentPart string, nowMS int, tt *template.Template) error {
segmentPart string, nowMS int, tt *template.Template) (code int, err error) {
// First check if init segment and return
isInitSegment, err := writeInitSegment(w, cfg, vodFS, a, segmentPart)
if err != nil {
return fmt.Errorf("writeInitSegment: %w", err)
return 0, fmt.Errorf("writeInitSegment: %w", err)
}
if isInitSegment {
return nil
return 0, nil
}
if len(cfg.SegStatusCodes) > 0 {
code, err = calcStatusCode(cfg, vodFS, a, segmentPart, nowMS)
if err != nil {
return 0, err
}
if code != 0 {
return code, nil
}
}
if cfg.AvailabilityTimeCompleteFlag {
return writeLiveSegment(w, cfg, vodFS, a, segmentPart, nowMS, tt)
return 0, writeLiveSegment(w, cfg, vodFS, a, segmentPart, nowMS, tt)
}
// Chunked low-latency mode
return writeChunkedSegment(ctx, w, log, cfg, vodFS, a, segmentPart, nowMS)
return 0, writeChunkedSegment(ctx, w, log, cfg, vodFS, a, segmentPart, nowMS)
}

// calcStatusCode returns the configured status code for the segment or 0 if none.
func calcStatusCode(cfg *ResponseConfig, vodFS fs.FS, a *asset, segmentPart string, nowMS int) (int, error) {
rep, _, err := findRepAndSegmentID(a, segmentPart)
if err != nil {
return 0, fmt.Errorf("findRepAndSegmentID: %w", err)
}

// segMeta is to be used for all look up. For audio it uses reference (video) track
segMeta, err := findSegMeta(vodFS, a, cfg, segmentPart, nowMS)
if err != nil {
return 0, fmt.Errorf("findSegMeta: %w", err)
}
startTime := int(segMeta.newTime)
repTimescale := int(segMeta.timescale)
for _, ss := range cfg.SegStatusCodes {
if !repInReps(a, rep.ID, ss.Reps) {
continue
}
// Then move to the reference track and relate to cycles
// From segment number we calculate a start time
// The time gives us how many cycles we have passed (time / cycleDuration)
cycle := ss.Cycle
cycleInTimescale := cycle * repTimescale
nrWraps := startTime / cycleInTimescale
wrapStartS := nrWraps * cycle
// Next we need to find the number after wrap
// For that we need to find the first segment nr after wrapStart
// Use nowMS = cycleStart to look up the latest segment published at that time
firstNr := 0
if nrWraps > 0 {
lastNr := findLastSegNr(cfg, a, wrapStartS*1000, segMeta.rep)
firstNr = lastNr + 1
}
segTime := findSegStartTime(a, cfg, firstNr, segMeta.rep)
if segTime < wrapStartS*repTimescale {
firstNr += 1
}
idx := int(segMeta.newNr) - firstNr
if idx < 0 {
return 0, fmt.Errorf("segment %d is before first segment %d", segMeta.newNr, firstNr)
}
if idx == ss.Rsq {
return ss.Code, nil
}
}
return 0, nil
}

func findLastSegNr(cfg *ResponseConfig, a *asset, nowMS int, rep *RepData) int {
wTimes := calcWrapTimes(a, cfg, nowMS, mpd.Duration(60*time.Second))
timeLineEntries := a.generateTimelineEntries(rep.ID, wTimes, 0)
return timeLineEntries.lastNr()
}

func findSegStartTime(a *asset, cfg *ResponseConfig, nr int, rep *RepData) int {
wrapLen := len(rep.Segments)
startNr := cfg.getStartNr()
nrAfterStart := int(nr) - startNr
nrWraps := nrAfterStart / wrapLen
relNr := nrAfterStart - nrWraps*wrapLen
wrapDur := a.LoopDurMS * rep.MediaTimescale / 1000
wrapTime := nrWraps * wrapDur
seg := rep.Segments[relNr]
return wrapTime + int(seg.StartTime)
}

func repInReps(a *asset, segmentPart string, reps []string) bool {
// TODO. Make better
if len(reps) == 0 {
return true
}
for _, rep := range reps {
if strings.Contains(segmentPart, rep) {
return true
}
}
return false
}
11 changes: 11 additions & 0 deletions cmd/livesim2/app/handler_urlgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ type urlGenData struct {
StartRel string // sets timeline start (and availabilityStartTime) relative to now (in seconds). Normally negative value.
StopRel string // sets stop-time for time-limited event relative to now (in seconds)
Scte35Var string // SCTE-35 insertion variant
StatusCodes string // comma-separated list of response code patterns to return
}

var initData urlGenData
Expand Down Expand Up @@ -317,6 +318,16 @@ func createURL(r *http.Request, aInfo assetsInfo) (data urlGenData, err error) {
data.Scte35Var = scte35
sb.WriteString(fmt.Sprintf("scte35_%s/", scte35))
}
statusCodes := q.Get("statuscode")
if statusCodes != "" {
s := newStringConverter()
_ = s.ParseSegStatusCodes("statuscode", statusCodes)
if s.err != nil {
return data, fmt.Errorf("bad status codes: %w", s.err)
}
data.StatusCodes = statusCodes
sb.WriteString(fmt.Sprintf("statuscode_%s/", statusCodes))
}
sb.WriteString(fmt.Sprintf("%s/%s", asset, mpd))
data.URL = sb.String()
data.PlayURL = aInfo.PlayURL
Expand Down
8 changes: 8 additions & 0 deletions cmd/livesim2/app/livempd.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,14 @@ type segEntries struct {
mediaTimescale uint32
}

func (s segEntries) lastNr() int {
nrSegs := 0
for _, e := range s.entries {
nrSegs += int(e.R) + 1
}
return s.startNr + nrSegs - 1
}

// setOffsetInAdaptationSet sets the availabilityTimeOffset in the AdaptationSet.
// Returns ErrAtoInfTimeline if infinite ato set with timeline.
func setOffsetInAdaptationSet(cfg *ResponseConfig, a *asset, as *m.AdaptationSetType) (atoMS int, err error) {
Expand Down
79 changes: 62 additions & 17 deletions cmd/livesim2/app/livesegment.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,33 +371,55 @@ func createOutSeg(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart string
return so, nil
}

func createAudioSegment(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart string, nowMS int, rep *RepData, segID int) (segOut, error) {
refRep := a.refRep
refTimescale := uint64(refRep.MediaTimescale)
var refMeta segMeta
var err error
var so segOut
switch cfg.getRepType(segmentPart) {
case segmentNumber, timeLineNumber:
outSegNr := uint32(segID)
refMeta, err = findSegMetaFromNr(a, refRep, outSegNr, cfg, nowMS)
func findSegMeta(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart string, nowMS int) (segMeta, error) {
var sm segMeta
rep, segID, err := findRepAndSegmentID(a, segmentPart)
if err != nil {
return sm, fmt.Errorf("findRepAndSegmentID: %w", err)
}

if rep.ContentType == "audio" {
sm, err := findRefSegMeta(vodFS, a, cfg, segmentPart, nowMS, rep, segID)
if err != nil {
return so, fmt.Errorf("findSegMetaFromNr from reference: %w", err)
return sm, fmt.Errorf("findRefSegMeta: %w", err)
}
return sm, nil
} else {
switch cfg.getRepType(segmentPart) {
case segmentNumber, timeLineNumber:
nr := uint32(segID)
if nr < uint32(cfg.getStartNr()) {
return sm, errNotFound
}
sm, err = findSegMetaFromNr(a, rep, nr, cfg, nowMS)
case timeLineTime:
time := uint64(segID)
sm, err = findSegMetaFromTime(a, rep, time, cfg, nowMS)
default:
return sm, fmt.Errorf("unknown liveMPD type")
}
case timeLineTime:
time := int(segID)
refMeta, err = findRefSegMetaFromTime(a, rep, uint64(time), cfg, nowMS)
if err != nil {
return so, fmt.Errorf("findSegMetaFromNr from reference: %w", err)
return sm, err
}
default:
return so, fmt.Errorf("unknown liveMPD type")
}
return sm, nil
}

func createAudioSegment(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart string, nowMS int, rep *RepData, segID int) (segOut, error) {
refRep := a.refRep
refTimescale := uint64(refRep.MediaTimescale)

refMeta, err := findRefSegMeta(vodFS, a, cfg, segmentPart, nowMS, rep, segID)
if err != nil {
return segOut{}, fmt.Errorf("findRefSegMeta: %w", err)
}

recipe := calcAudioSegRecipe(refMeta.newNr,
refMeta.newTime,
refMeta.newTime+uint64(refMeta.newDur),
uint64(refRep.duration()),
refTimescale, rep)
var so segOut
so.seg, err = createAudioSeg(vodFS, a, cfg, recipe)
if err != nil {
return so, fmt.Errorf("createAudioSeg: %w", err)
Expand All @@ -413,6 +435,29 @@ func createAudioSegment(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart
return so, nil
}

// findRefSegMeta finds the reference track meta data given other following track like audio
func findRefSegMeta(vodFS fs.FS, a *asset, cfg *ResponseConfig, segmentPart string, nowMS int, rep *RepData, segID int) (segMeta, error) {
var refMeta segMeta
var err error
switch cfg.getRepType(segmentPart) {
case segmentNumber, timeLineNumber:
outSegNr := uint32(segID)
refMeta, err = findSegMetaFromNr(a, a.refRep, outSegNr, cfg, nowMS)
if err != nil {
return refMeta, fmt.Errorf("findSegMetaFromNr from reference: %w", err)
}
case timeLineTime:
time := int(segID)
refMeta, err = findRefSegMetaFromTime(a, rep, uint64(time), cfg, nowMS)
if err != nil {
return refMeta, fmt.Errorf("findSegMetaFromNr from reference: %w", err)
}
default:
return refMeta, fmt.Errorf("unknown liveMPD type")
}
return refMeta, nil
}

// writeChunkedSegment splits a segment into chunks and send them as they become available timewise.
//
// nowMS servers as reference for the current time and can be set to any value. Media time will
Expand Down
Loading

0 comments on commit 032e687

Please sign in to comment.