diff --git a/.github/workflows/build_test.yml b/.github/workflows/build_test.yml index 386ae65..a713017 100644 --- a/.github/workflows/build_test.yml +++ b/.github/workflows/build_test.yml @@ -18,7 +18,7 @@ jobs: go-version: '1.20' - name: Install dependencies - run: sudo apt-get update && sudo apt-get install -y libasound2-dev + run: sudo apt-get update && sudo apt-get install -y libasound2-dev libvorbis-dev libogg-dev - name: Build daemon run: go build -v ./cmd/daemon diff --git a/cmd/daemon/controls.go b/cmd/daemon/controls.go index 6fb54c9..96b0984 100644 --- a/cmd/daemon/controls.go +++ b/cmd/daemon/controls.go @@ -186,15 +186,12 @@ func (s *Session) loadCurrentTrack() error { }, }) - stream, err := s.player.NewStream(trackId, s.app.cfg.Bitrate) + stream, err := s.player.NewStream(trackId, s.app.cfg.Bitrate, trackPosition) if err != nil { return fmt.Errorf("failed creating stream: %w", err) } - log.Debugf("seek track to %dms", trackPosition) - if err := stream.SeekMs(trackPosition); err != nil { - return fmt.Errorf("failed seeking track: %w", err) - } + log.Infof("loaded track \"%s\" (position: %dms, duration: %dms)", *stream.Track.Name, trackPosition, *stream.Track.Duration) s.updateState(func(s *State) { s.playerState.Duration = int64(*stream.Track.Duration) @@ -216,10 +213,10 @@ func (s *Session) play() error { return fmt.Errorf("no stream") } - log.Debug("resume track") s.stream.Play() streamPos := s.stream.PositionMs() + log.Debugf("resume track at %dms", streamPos) s.updateState(func(s *State) { s.playerState.Timestamp = time.Now().UnixMilli() @@ -234,10 +231,10 @@ func (s *Session) pause() error { return fmt.Errorf("no stream") } - log.Debug("pause track") s.stream.Pause() streamPos := s.stream.PositionMs() + log.Debugf("pause track at %dms", streamPos) s.updateState(func(s *State) { s.playerState.Timestamp = time.Now().UnixMilli() @@ -280,6 +277,10 @@ func (s *Session) seek(position int64) error { } func (s *Session) skipPrev() error { + if s.stream.PositionMs() > 3000 { + return s.seek(0) + } + var paused bool s.withState(func(s *State) { paused = s.playerState.IsPaused diff --git a/cmd/daemon/session.go b/cmd/daemon/session.go index 85a21b2..bb69598 100644 --- a/cmd/daemon/session.go +++ b/cmd/daemon/session.go @@ -245,7 +245,6 @@ func (s *Session) handlePlayerCommand(req dealer.RequestPayload) error { return nil case "skip_prev": - // TODO: handle rewinding track if pos < 3000ms return s.skipPrev() case "skip_next": return s.skipNext() diff --git a/go.mod b/go.mod index bae6244..e127a84 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,8 @@ require ( github.com/cenkalti/backoff/v4 v4.2.1 github.com/devgianlu/shannon v0.0.0-20230613115856-82ec90b7fa7e github.com/grandcat/zeroconf v1.0.0 - github.com/hajimehoshi/oto/v2 v2.4.0 - github.com/jfreymuth/oggvorbis v1.0.5 github.com/sirupsen/logrus v1.9.3 + github.com/xlab/vorbis-go v0.0.0-20210911202351-b5b85f1ec645 golang.org/x/crypto v0.10.0 golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 google.golang.org/protobuf v1.30.0 @@ -18,8 +17,6 @@ require ( require ( github.com/cenkalti/backoff v2.2.1+incompatible // indirect - github.com/ebitengine/purego v0.4.0-alpha.5 // indirect - github.com/jfreymuth/vorbis v1.0.2 // indirect github.com/klauspost/compress v1.10.3 // indirect github.com/miekg/dns v1.1.54 // indirect golang.org/x/mod v0.11.0 // indirect @@ -27,5 +24,3 @@ require ( golang.org/x/sys v0.10.0 // indirect golang.org/x/tools v0.10.0 // indirect ) - -replace github.com/hajimehoshi/oto/v2 v2.4.0 => github.com/devgianlu/oto/v2 v2.0.0-20230905080002-01f543acf2b9 diff --git a/go.sum b/go.sum index 0d4c400..b8daab6 100644 --- a/go.sum +++ b/go.sum @@ -5,12 +5,8 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/devgianlu/oto/v2 v2.0.0-20230905080002-01f543acf2b9 h1:gRg5jfSLzb4xZpmm9dWSzS/4RsCPkitljfq/dH7QKho= -github.com/devgianlu/oto/v2 v2.0.0-20230905080002-01f543acf2b9/go.mod h1:XoAqcfGced8SVzCvx7DO2YA69Zpr1NiHNhnxJawxaZ0= github.com/devgianlu/shannon v0.0.0-20230613115856-82ec90b7fa7e h1:OoETp+L//8ZDtd5BWKaogHQjgA104yF4a2yqjfaG3mE= github.com/devgianlu/shannon v0.0.0-20230613115856-82ec90b7fa7e/go.mod h1:m5DMFz6BcaKJwxxPaSh9MxwPzK2GPSt1KRFC8Imf0ik= -github.com/ebitengine/purego v0.4.0-alpha.5 h1:1jd+ClW6gD2x2F/+j4lCGc3TQMWL3hNYgLu/xF9Jz0g= -github.com/ebitengine/purego v0.4.0-alpha.5/go.mod h1:ah1In8AOtksoNK6yk5z1HTJeUkC1Ez4Wk2idgGslMwQ= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14= @@ -74,6 +70,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo= github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw= github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= +github.com/xlab/vorbis-go v0.0.0-20210911202351-b5b85f1ec645 h1:lYg/+vV/Fd5WM1+Ptg54Am3y4mDXaMSrT+mKUHV5uVc= +github.com/xlab/vorbis-go v0.0.0-20210911202351-b5b85f1ec645/go.mod h1:AMqfx3jFwPqem3u8mF2lsRodZs30jG/Mag5HZ3mB3sA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.10.0 h1:LKqV2xt9+kDzSTfOhx4FrkEBcMrAgHSYgzywV9zcGmM= diff --git a/output.go b/output.go new file mode 100644 index 0000000..0bdfe13 --- /dev/null +++ b/output.go @@ -0,0 +1,16 @@ +package go_librespot + +type Float32Reader interface { + Read([]float32) (n int, err error) +} + +type AudioSource interface { + // SetPositionMs sets the new position in samples + SetPositionMs(int64) error + + // PositionMs gets the position in samples + PositionMs() int64 + + // Read reads 32bit little endian floats from the stream + Read([]float32) (int, error) +} diff --git a/output/driver_unix.go b/output/driver_unix.go new file mode 100644 index 0000000..8ad5b46 --- /dev/null +++ b/output/driver_unix.go @@ -0,0 +1,284 @@ +//go:build !android && !darwin && !js && !windows && !nintendosdk + +package output + +// #cgo pkg-config: alsa +// +// #include +import "C" +import ( + "errors" + "fmt" + librespot "go-librespot" + "io" + "sync" + "unsafe" +) + +const ( + DisableHardwarePause = true // FIXME: should we fix this? + ReleasePcmOnPause = true +) + +type output struct { + channels int + sampleRate int + device string + reader librespot.Float32Reader + done chan error + + cond *sync.Cond + + handle *C.snd_pcm_t + canPause bool + + volume float32 + paused bool + closed bool + eof bool + released bool +} + +func alsaError(name string, err C.int) error { + return fmt.Errorf("ALSA error at %s: %s", name, C.GoString(C.snd_strerror(err))) +} + +func newOutput(reader librespot.Float32Reader, sampleRate int, channels int, device string) (*output, error) { + out := &output{ + reader: reader, + channels: channels, + sampleRate: sampleRate, + device: device, + volume: 1, + cond: sync.NewCond(&sync.Mutex{}), + done: make(chan error, 1), + } + + if err := out.openAndSetup(); err != nil { + return nil, err + } + + go func() { + err := out.loop() + _ = out.Close() + + if err != nil { + out.done <- err + } else { + out.done <- nil + } + }() + + return out, nil +} + +func (out *output) openAndSetup() error { + cdevice := C.CString(out.device) + defer C.free(unsafe.Pointer(cdevice)) + if err := C.snd_pcm_open(&out.handle, cdevice, C.SND_PCM_STREAM_PLAYBACK, 0); err < 0 { + return alsaError("snd_pcm_open", err) + } + + var params *C.snd_pcm_hw_params_t + C.snd_pcm_hw_params_malloc(¶ms) + defer C.free(unsafe.Pointer(params)) + + if err := C.snd_pcm_hw_params_any(out.handle, params); err < 0 { + return alsaError("snd_pcm_hw_params_any", err) + } + + if err := C.snd_pcm_hw_params_set_access(out.handle, params, C.SND_PCM_ACCESS_RW_INTERLEAVED); err < 0 { + return alsaError("snd_pcm_hw_params_set_access", err) + } + + if err := C.snd_pcm_hw_params_set_format(out.handle, params, C.SND_PCM_FORMAT_FLOAT_LE); err < 0 { + return alsaError("snd_pcm_hw_params_set_format", err) + } + + if err := C.snd_pcm_hw_params_set_channels(out.handle, params, C.unsigned(out.channels)); err < 0 { + return alsaError("snd_pcm_hw_params_set_channels", err) + } + + if err := C.snd_pcm_hw_params_set_rate_resample(out.handle, params, 1); err < 0 { + return alsaError("snd_pcm_hw_params_set_rate_resample", err) + } + + sr := C.unsigned(out.sampleRate) + if err := C.snd_pcm_hw_params_set_rate_near(out.handle, params, &sr, nil); err < 0 { + return alsaError("snd_pcm_hw_params_set_rate_near", err) + } + + if err := C.snd_pcm_hw_params(out.handle, params); err < 0 { + return alsaError("snd_pcm_hw_params", err) + } + + if DisableHardwarePause { + out.canPause = false + } else { + if err := C.snd_pcm_hw_params_can_pause(params); err < 0 { + return alsaError("snd_pcm_hw_params_can_pause", err) + } else { + out.canPause = err == 1 + } + } + + return nil +} + +func (out *output) loop() error { + floats := make([]float32, out.channels*16*1024) + + for { + n, err := out.reader.Read(floats) + if errors.Is(err, io.EOF) { + out.eof = true + + // drain pcm ignoring errors + out.cond.L.Lock() + C.snd_pcm_drain(out.handle) + out.cond.L.Unlock() + + return nil + } else if err != nil { + return fmt.Errorf("failed reading source: %w", err) + } + + if n%out.channels != 0 { + return fmt.Errorf("invalid read amount: %d", n) + } + + for i := 0; i < n; i++ { + floats[i] *= out.volume + } + + out.cond.L.Lock() + for !(!out.paused || out.closed || !out.released) { + out.cond.Wait() + } + + if out.closed { + out.cond.L.Unlock() + return nil + } + + if nn := C.snd_pcm_writei(out.handle, unsafe.Pointer(&floats[0]), C.snd_pcm_uframes_t(n/out.channels)); nn < 0 { + nn = C.long(C.snd_pcm_recover(out.handle, C.int(nn), 1)) + if nn < 0 { + out.cond.L.Unlock() + return alsaError("snd_pcm_recover", C.int(nn)) + } + } + + out.cond.L.Unlock() + } +} + +func (out *output) Pause() error { + // Do not use snd_pcm_drop as this might hang (https://github.com/libsdl-org/SDL/blob/a5c610b0a3857d3138f3f3da1f6dc3172c5ea4a8/src/audio/alsa/SDL_alsa_audio.c#L478). + + out.cond.L.Lock() + defer out.cond.L.Unlock() + + if out.closed { + return nil + } + + if ReleasePcmOnPause { + if !out.released { + if err := C.snd_pcm_close(out.handle); err < 0 { + return alsaError("snd_pcm_close", err) + } + } + + out.released = true + } else if out.canPause { + if C.snd_pcm_state(out.handle) != C.SND_PCM_STATE_RUNNING { + return nil + } + + if err := C.snd_pcm_pause(out.handle, 1); err < 0 { + return alsaError("snd_pcm_pause", err) + } + } + + out.paused = true + + return nil +} + +func (out *output) Resume() error { + out.cond.L.Lock() + defer out.cond.L.Unlock() + + if out.closed { + return nil + } + + if ReleasePcmOnPause { + if out.released { + if err := out.openAndSetup(); err != nil { + return err + } + } + + out.released = false + } else if out.canPause { + if C.snd_pcm_state(out.handle) != C.SND_PCM_STATE_PAUSED { + return nil + } + + if err := C.snd_pcm_pause(out.handle, 0); err < 0 { + return alsaError("snd_pcm_pause", err) + } + } + + out.paused = false + out.cond.Signal() + + return nil +} + +func (out *output) SetVolume(vol float32) { + if vol < 0 || vol > 1 { + panic(fmt.Sprintf("invalid volume value: %0.2f", vol)) + } + + out.volume = vol +} + +func (out *output) WaitDone() <-chan error { + out.cond.L.Lock() + defer out.cond.L.Unlock() + + if out.closed { + return nil + } + + return out.done +} + +func (out *output) IsEOF() bool { + out.cond.L.Lock() + defer out.cond.L.Unlock() + + return out.eof +} + +func (out *output) Close() error { + out.cond.L.Lock() + defer out.cond.L.Unlock() + + if out.closed || out.released { + out.closed = true + return nil + } + + if err := C.snd_pcm_close(out.handle); err < 0 { + return alsaError("snd_pcm_close", err) + } + + out.closed = true + out.cond.Signal() + + return nil +} diff --git a/output/output.go b/output/output.go new file mode 100644 index 0000000..7968d24 --- /dev/null +++ b/output/output.go @@ -0,0 +1,75 @@ +package output + +import ( + librespot "go-librespot" +) + +type Output struct { + *output +} + +type NewOutputOptions struct { + // Reader provides data for the output device. + // + // The format of data is as follows: + // + // [data] = [sample 1] [sample 2] [sample 3] ... + // [sample *] = [channel 1] [channel 2] ... + // [channel *] = [byte 1] [byte 2] ... + // + // Byte ordering is little endian. + Reader librespot.Float32Reader + + // SampleRate specifies the number of samples that should be played during one second. + // Usual numbers are 44100 or 48000. One context has only one sample rate. You cannot play multiple audio + // sources with different sample rates at the same time. + SampleRate int + + // ChannelCount specifies the number of channels. One channel is mono playback. Two + // channels are stereo playback. No other values are supported. + ChannelCount int + + // Device specifies the audio device name. + // + // This feature is support only for the unix driver. + Device string +} + +func NewOutput(options *NewOutputOptions) (*Output, error) { + out, err := newOutput(options.Reader, options.SampleRate, options.ChannelCount, options.Device) + if err != nil { + return nil, err + } + + return &Output{out}, nil +} + +// Pause pauses the output. +func (c *Output) Pause() error { + return c.output.Pause() +} + +// Resume resumes the output. +func (c *Output) Resume() error { + return c.output.Resume() +} + +// SetVolume sets the volume (0-1). +func (c *Output) SetVolume(vol float32) { + c.output.SetVolume(vol) +} + +// WaitDone waits for the playback loop to exit. +func (c *Output) WaitDone() <-chan error { + return c.output.WaitDone() +} + +// IsEOF returns whether the reader reached EOF. +func (c *Output) IsEOF() bool { + return c.output.IsEOF() +} + +// Close closes the output. +func (c *Output) Close() error { + return c.output.Close() +} diff --git a/player/decoder.go b/player/decoder.go deleted file mode 100644 index cd1e8aa..0000000 --- a/player/decoder.go +++ /dev/null @@ -1,118 +0,0 @@ -package player - -import ( - "encoding/binary" - "errors" - "github.com/jfreymuth/oggvorbis" - log "github.com/sirupsen/logrus" - "go-librespot/audio" - "io" - "math" - "sync" -) - -type sampleDecoder struct { - reader *oggvorbis.Reader - once sync.Once - ch chan [Channels * 4]byte - norm float32 - seeked bool - done bool - stop bool -} - -const bufferSizeSamples = 65536 - -func newSampleDecoder(reader *oggvorbis.Reader, norm *audio.ReplayGain) *sampleDecoder { - return &sampleDecoder{ - reader: reader, - norm: norm.GetTrackFactor(1), - ch: make(chan [Channels * 4]byte, bufferSizeSamples), - } -} - -func (s *sampleDecoder) decodeLoop() { - samples := make([]float32, Channels) - - for !s.stop { - samplesN, err := s.reader.Read(samples) - if errors.Is(err, io.EOF) { - // exit loop cleanly - break - } else if err != nil { - log.WithError(err).Error("exiting decoder loop") - break - } - - buf := [Channels * 4]byte{} - for i := 0; i < samplesN; i++ { - sample := samples[i] - sample *= s.norm - binary.LittleEndian.PutUint32(buf[i*4:(i+1)*4], math.Float32bits(sample)) - } - - // if we seeked, throw away the channel and make a new one - if s.seeked { - close(s.ch) - s.ch = make(chan [Channels * 4]byte, bufferSizeSamples) - s.seeked = false - } - - s.ch <- buf - } - - s.done = true - close(s.ch) -} - -func (s *sampleDecoder) Read(p []byte) (n int, err error) { - s.once.Do(func() { go s.decodeLoop() }) - - n = 0 - for n < len(p) { - if n+Channels*4 > len(p) { - return n, nil - } - - frame, ok := <-s.ch - if !ok { - // return EOF only if we are done, otherwise we might just be seeking - if s.done { - return n, io.EOF - } else { - return n, nil - } - } - - copy(p[n:], frame[:]) - n += Channels * 4 - } - - return n, nil -} - -func (s *sampleDecoder) Close() error { - s.stop = true - return nil -} - -// Seek will seek the stream to the offset position in milliseconds. -func (s *sampleDecoder) Seek(offset int64, whence int) (int64, error) { - if whence != io.SeekStart { - panic("unsupported seek whence") // TODO - } - - // signal that the channel should be cleared - s.seeked = true - - pos := offset * SampleRate / 1000 - if err := s.reader.SetPosition(pos); err != nil { - return 0, err - } - - return offset, nil -} - -func (s *sampleDecoder) Position() int64 { - return (s.reader.Position() - int64(bufferSizeSamples*Channels)) * 1000 / SampleRate -} diff --git a/player/player.go b/player/player.go index c1e53f4..81c7dff 100644 --- a/player/player.go +++ b/player/player.go @@ -2,13 +2,14 @@ package player import ( "fmt" - "github.com/hajimehoshi/oto/v2" - "github.com/jfreymuth/oggvorbis" + log "github.com/sirupsen/logrus" librespot "go-librespot" "go-librespot/audio" + "go-librespot/output" downloadpb "go-librespot/proto/spotify/download" metadatapb "go-librespot/proto/spotify/metadata" "go-librespot/spclient" + "go-librespot/vorbis" "io" "time" ) @@ -16,14 +17,14 @@ import ( const SampleRate = 44100 const Channels = 2 -const MaxPlayers = 4 const MaxStateVolume = 65535 type Player struct { sp *spclient.Spclient audioKey *audio.KeyProvider - oto *oto.Context + newOutput func(source librespot.Float32Reader) (*output.Output, error) + cmd chan playerCmd ev chan Event @@ -35,7 +36,7 @@ type Player struct { type playerCmdType int const ( - playerCmdNew playerCmdType = iota + playerCmdSet playerCmdType = iota playerCmdPlay playerCmdPause playerCmdStop @@ -51,125 +52,127 @@ type playerCmd struct { resp chan any } -func NewPlayer(sp *spclient.Spclient, audioKey *audio.KeyProvider, preferredDevice string, volumeSteps uint32) (*Player, error) { - otoCtx, readyChan, err := oto.NewContextWithOptions(&oto.NewContextOptions{ - SampleRate: SampleRate, - ChannelCount: Channels, - Format: oto.FormatFloat32LE, - BufferSize: 100 * time.Millisecond, - PreferredDevice: preferredDevice, - }) - if err != nil { - return nil, fmt.Errorf("failed initializing oto context: %w", err) - } - - <-readyChan - +func NewPlayer(sp *spclient.Spclient, audioKey *audio.KeyProvider, device string, volumeSteps uint32) (*Player, error) { p := &Player{ - sp: sp, - audioKey: audioKey, - oto: otoCtx, + sp: sp, + audioKey: audioKey, + newOutput: func(reader librespot.Float32Reader) (*output.Output, error) { + return output.NewOutput(&output.NewOutputOptions{ + Reader: reader, + SampleRate: SampleRate, + ChannelCount: Channels, + Device: device, + }) + }, volumeSteps: volumeSteps, cmd: make(chan playerCmd), ev: make(chan Event, 128), // FIXME: is too messy? } + go p.manageLoop() return p, nil } func (p *Player) manageLoop() { - players := [MaxPlayers]oto.Player{} - started := [MaxPlayers]bool{} + var out *output.Output + var source librespot.AudioSource + var done <-chan error loop: for { select { case cmd := <-p.cmd: switch cmd.typ { - case playerCmdNew: - for i := 0; i < MaxPlayers; i++ { - if players[i] == nil { - players[i] = cmd.data.(oto.Player) - - if p.startedPlaying.IsZero() { - p.startedPlaying = time.Now() - } - cmd.resp <- i - break + case playerCmdSet: + if out != nil { + _ = out.Close() + } + + source = cmd.data.(librespot.AudioSource) + + var err error + out, err = p.newOutput(source) + if err != nil { + source = nil + cmd.resp <- err + } else { + done = out.WaitDone() + + if p.startedPlaying.IsZero() { + p.startedPlaying = time.Now() } + + cmd.resp <- nil } case playerCmdPlay: - pp := players[cmd.data.(int)] - pp.Play() - _ = p.oto.Resume() - started[cmd.data.(int)] = true + if out != nil { + if err := out.Resume(); err != nil { + log.WithError(err).Errorf("failed resuming playback") + } + } + cmd.resp <- struct{}{} p.ev <- Event{Type: EventTypePlaying} case playerCmdPause: - pp := players[cmd.data.(int)] - pp.Pause() - _ = p.oto.Suspend() - started[cmd.data.(int)] = false + if out != nil { + if err := out.Pause(); err != nil { + log.WithError(err).Errorf("failed pausing playback") + } + } + cmd.resp <- struct{}{} p.ev <- Event{Type: EventTypePaused} case playerCmdStop: - pp := players[cmd.data.(int)] - _ = pp.Close() - started[cmd.data.(int)] = false - players[cmd.data.(int)] = nil + if out != nil { + _ = out.Close() + } + cmd.resp <- struct{}{} p.ev <- Event{Type: EventTypeStopped} case playerCmdSeek: - // seek directly with milliseconds - pp := players[cmd.data.(playerCmdSeekData).idx] - vol := pp.Volume() - pp.SetVolume(0) - _, err := pp.(io.Seeker).Seek(cmd.data.(playerCmdSeekData).pos, io.SeekStart) - time.AfterFunc(time.Second, func() { pp.SetVolume(vol) }) // FIXME: terrible hack, but works - cmd.resp <- err + if source != nil { + err := source.SetPositionMs(cmd.data.(int64)) + cmd.resp <- err + } else { + cmd.resp <- nil + } case playerCmdPosition: - pp := players[cmd.data.(int)] - pos := pp.(oto.ReaderGetter).Reader().(*sampleDecoder).Position() - cmd.resp <- pos + if source != nil { + cmd.resp <- source.PositionMs() + } else { + cmd.resp <- int64(0) + } case playerCmdVolume: - vol := cmd.data.(float64) - for _, pp := range players { - if pp == nil { - continue - } - - pp.SetVolume(vol) + if source != nil { + vol := cmd.data.(float32) + out.SetVolume(vol) } case playerCmdClose: break loop default: panic("unknown player command") } - default: - // FIXME: this is all awful - for i, pp := range players { - if pp == nil || !started[i] { - continue - } - - if !pp.IsPlaying() { - p.ev <- Event{Type: EventTypeNotPlaying} - } + case err := <-done: + if err != nil { + log.WithError(err).Errorf("playback failed") } - time.Sleep(10 * time.Millisecond) + done = nil + if err != nil || out.IsEOF() { + p.ev <- Event{Type: EventTypeNotPlaying} + } } } close(p.cmd) // teardown - for _, pp := range players { - _ = pp.Close() + if s, ok := source.(io.Closer); ok { + _ = s.Close() } - _ = p.oto.Close() + _ = out.Close() } func (p *Player) StartedPlayingAt() time.Time { @@ -193,17 +196,11 @@ func (p *Player) Close() { } func (p *Player) SetVolume(val uint32) { - vol := float64(val) / MaxStateVolume + vol := float32(val) / MaxStateVolume p.cmd <- playerCmd{typ: playerCmdVolume, data: vol} } -func (p *Player) newStream(pp oto.Player) int { - resp := make(chan any, 1) - p.cmd <- playerCmd{typ: playerCmdNew, data: pp, resp: resp} - return (<-resp).(int) -} - -func (p *Player) NewStream(tid librespot.TrackId, bitrate int) (*Stream, error) { +func (p *Player) NewStream(tid librespot.TrackId, bitrate int, trackPosition int64) (*Stream, error) { trackMeta, err := p.sp.MetadataForTrack(tid) if err != nil { return nil, fmt.Errorf("failed getting track metadata: %w", err) @@ -273,21 +270,24 @@ func (p *Player) NewStream(tid librespot.TrackId, bitrate int) (*Stream, error) return nil, fmt.Errorf("failed reading ReplayGain metadata: %w", err) } - stream, err := oggvorbis.NewReader(audioStream) + stream, err := vorbis.New(audioStream, *trackMeta.Duration, rawStream.Size(), norm.GetTrackFactor(1)) if err != nil { return nil, fmt.Errorf("failed initializing ogg vorbis stream: %w", err) } - if stream.SampleRate() != SampleRate { - return nil, fmt.Errorf("unsupported sample rate: %d", stream.SampleRate()) - } else if stream.Channels() != Channels { - return nil, fmt.Errorf("unsupported channels: %d", stream.Channels()) + if stream.Info().SampleRate != SampleRate { + return nil, fmt.Errorf("unsupported sample rate: %d", stream.Info().SampleRate) + } else if stream.Info().Channels != Channels { + return nil, fmt.Errorf("unsupported channels: %d", stream.Info().Channels) } - idx := p.newStream(p.oto.NewPlayer(newSampleDecoder(stream, norm))) - if idx == -1 { - return nil, fmt.Errorf("too many players") + if err := stream.SetPositionMs(trackPosition); err != nil { + return nil, fmt.Errorf("failed seeking stream: %w", err) } - return &Stream{p: p, idx: idx, Track: trackMeta, File: file}, nil + resp := make(chan any) + p.cmd <- playerCmd{typ: playerCmdSet, data: stream, resp: resp} + <-resp + + return &Stream{p: p, Track: trackMeta, File: file}, nil } diff --git a/player/stream.go b/player/stream.go index cb95605..0273b53 100644 --- a/player/stream.go +++ b/player/stream.go @@ -2,14 +2,8 @@ package player import metadatapb "go-librespot/proto/spotify/metadata" -type playerCmdSeekData struct { - idx int - pos int64 -} - type Stream struct { - p *Player - idx int + p *Player Track *metadatapb.Track File *metadatapb.AudioFile @@ -17,25 +11,29 @@ type Stream struct { func (s *Stream) Play() { resp := make(chan any, 1) - s.p.cmd <- playerCmd{typ: playerCmdPlay, data: s.idx, resp: resp} + s.p.cmd <- playerCmd{typ: playerCmdPlay, resp: resp} <-resp } func (s *Stream) Pause() { resp := make(chan any, 1) - s.p.cmd <- playerCmd{typ: playerCmdPause, data: s.idx, resp: resp} + s.p.cmd <- playerCmd{typ: playerCmdPause, resp: resp} <-resp } func (s *Stream) Stop() { resp := make(chan any, 1) - s.p.cmd <- playerCmd{typ: playerCmdStop, data: s.idx, resp: resp} + s.p.cmd <- playerCmd{typ: playerCmdStop, resp: resp} <-resp } func (s *Stream) SeekMs(pos int64) error { + if pos < 0 { + pos = 0 + } + resp := make(chan any, 1) - s.p.cmd <- playerCmd{typ: playerCmdSeek, data: playerCmdSeekData{s.idx, pos}, resp: resp} + s.p.cmd <- playerCmd{typ: playerCmdSeek, data: pos, resp: resp} if err := <-resp; err != nil { return err.(error) } @@ -45,7 +43,7 @@ func (s *Stream) SeekMs(pos int64) error { func (s *Stream) PositionMs() int64 { resp := make(chan any, 1) - s.p.cmd <- playerCmd{typ: playerCmdPosition, data: s.idx, resp: resp} + s.p.cmd <- playerCmd{typ: playerCmdPosition, resp: resp} pos := <-resp return pos.(int64) } diff --git a/vorbis/decoder.go b/vorbis/decoder.go new file mode 100644 index 0000000..e9b79ce --- /dev/null +++ b/vorbis/decoder.go @@ -0,0 +1,351 @@ +package vorbis + +import ( + "errors" + "fmt" + log "github.com/sirupsen/logrus" + "io" + "sync" + + "github.com/xlab/vorbis-go/vorbis" +) + +const ( + // DataChunkSize represents the amount of data read from physical bitstream on each iteration. + DataChunkSize = 4096 // could be also 8192 +) + +// Decoder implements an OggVorbis decoder. +type Decoder struct { + sync.Mutex + + // gain is the default track gain. + gain float32 + + // size is the input length in bytes. + size int64 + + // duration is the input length in milliseconds. + duration int32 + + // syncState tracks the synchronization of the current page. It is used during + // decoding to track the status of data as it is read in, synchronized, verified, + // and parsed into pages belonging to the various logical bistreams + // in the current physical bitstream link. + syncState vorbis.OggSyncState + + // streamState tracks the current decode state of the current logical bitstream. + streamState vorbis.OggStreamState + + // page encapsulates the data for an Ogg page. Ogg pages are the fundamental unit + // of framing and interleave in an Ogg bitstream. + page vorbis.OggPage + + // packet encapsulates the data for a single raw packet of data and is used to transfer + // data between the Ogg framing layer and the handling codec. + packet vorbis.OggPacket + + // info contains basic information about the audio in a vorbis bitstream. + info vorbis.Info + + // comment stores all the bitstream user comments as Ogg Vorbis comment. + comment vorbis.Comment + + // dspState is the state for one instance of the Vorbis decoder. + // This structure is intended to be private. + dspState vorbis.DspState + + // block holds the data for a single block of audio. One Vorbis block translates to one codec packet. + // The decoding process consists of decoding the packets into blocks and reassembling the audio from the blocks. + // This structure is intended to be private. + block vorbis.Block + + input io.ReadSeeker + pcm [][][]float32 + buf []float32 + stopChan chan struct{} + closed bool + + lastGranulepos vorbis.OggInt64 +} + +// Info represents basic information about the audio in a Vorbis bitstream. +type Info struct { + Channels int32 + SampleRate int32 + Comments []string + Vendor string +} + +// New creates and initialises a new OggVorbis decoder for the provided bytestream. +func New(r io.ReadSeeker, duration int32, size int64, gain float32) (*Decoder, error) { + d := &Decoder{ + input: r, + size: size, + duration: duration, + gain: gain, + stopChan: make(chan struct{}), + } + + vorbis.OggSyncInit(&d.syncState) + + if err := d.readStreamHeaders(); err != nil { + d.decoderStateCleanup() + return nil, err + } + + d.pcm = [][][]float32{ + make([][]float32, d.info.Channels), + } + + if ret := vorbis.SynthesisInit(&d.dspState, &d.info); ret < 0 { + d.decoderStateCleanup() + return nil, errors.New("vorbis: error during playback initialization") + } + + vorbis.BlockInit(&d.dspState, &d.block) + + return d, nil +} + +// Info returns some basic info about the Vorbis stream the decoder was fed with. +func (d *Decoder) Info() Info { + return ReadInfo(&d.info, &d.comment) +} + +// Close stops and finalizes the decoding process, releases the allocated resources. +// Puts the decoder into an unrecoverable state. +func (d *Decoder) Close() { + if !d.stopRequested() { + close(d.stopChan) + } + d.Lock() + defer d.Unlock() + if d.closed { + return + } + d.closed = true + d.decoderStateCleanup() +} + +func (d *Decoder) decoderStateCleanup() { + vorbis.OggStreamClear(&d.streamState) + d.streamState.Free() + + vorbis.CommentClear(&d.comment) + d.comment.Free() + + vorbis.InfoClear(&d.info) + d.info.Free() + + vorbis.OggSyncClear(&d.syncState) + d.syncState.Free() + + vorbis.DspClear(&d.dspState) + d.dspState.Free() + + vorbis.BlockClear(&d.block) + d.block.Free() + + vorbis.OggPacketClear(&d.packet) + d.packet.Free() + + // clear up all remaining refs + d.page.Free() +} + +func (d *Decoder) stopRequested() bool { + select { + case <-d.stopChan: + return true + default: + return false + } +} + +func (d *Decoder) readChunk() (n int, err error) { + buf := vorbis.OggSyncBuffer(&d.syncState, DataChunkSize) + n, err = io.ReadFull(d.input, buf[:DataChunkSize]) + vorbis.OggSyncWrote(&d.syncState, n) + if errors.Is(err, io.ErrUnexpectedEOF) { + return n, io.EOF + } + return n, err +} + +func (d *Decoder) readStreamHeaders() error { + if _, err := d.readChunk(); err != nil { + return fmt.Errorf("vorbis: failed reading chunk: %w", err) + } + + // Read the first page + if ret := vorbis.OggSyncPageout(&d.syncState, &d.page); ret != 1 { + return errors.New("vorbis: not a valid Ogg bitstream") + } + + // Init the logical bitstream with serial number stored in the page + vorbis.OggStreamInit(&d.streamState, vorbis.OggPageSerialno(&d.page)) + + vorbis.InfoInit(&d.info) + vorbis.CommentInit(&d.comment) + + // Add a complete page to the bitstream + if ret := vorbis.OggStreamPagein(&d.streamState, &d.page); ret < 0 { + return errors.New("vorbis: the supplied page does not belong this Vorbis stream") + } + // Get the first packet + if ret := vorbis.OggStreamPacketout(&d.streamState, &d.packet); ret != 1 { + return errors.New("vorbis: unable to fetch initial Vorbis packet from the first page") + } + // Finally decode the header packet + if ret := vorbis.SynthesisHeaderin(&d.info, &d.comment, &d.packet); ret < 0 { + return fmt.Errorf("vorbis: unable to decode the initial Vorbis header: %d", ret) + } + + var headersRead int +forPage: + for headersRead < 2 { + if res := vorbis.OggSyncPageout(&d.syncState, &d.page); res < 0 { + // bytes have been skipped, try to sync again + continue forPage + } else if res == 0 { + // go get more data + if _, err := d.readChunk(); err != nil { + return errors.New("vorbis: got EOF while reading Vorbis headers") + } + continue forPage + } + // page is synced at this point + vorbis.OggStreamPagein(&d.streamState, &d.page) + for headersRead < 2 { + if ret := vorbis.OggStreamPacketout(&d.streamState, &d.packet); ret < 0 { + return errors.New("vorbis: data is missing near the secondary Vorbis header") + } else if ret == 0 { + // no packets left on the page, go get a new one + continue forPage + } + if ret := vorbis.SynthesisHeaderin(&d.info, &d.comment, &d.packet); ret < 0 { + return errors.New("vorbis: unable to read the secondary Vorbis header") + } + headersRead++ + } + } + + d.info.Deref() + d.comment.Deref() + d.comment.UserComments = make([][]byte, d.comment.Comments) + d.comment.Deref() + return nil +} + +func (d *Decoder) Read(p []float32) (n int, err error) { + d.Lock() + defer d.Unlock() + if d.closed { + return 0, errors.New("decoder: decoder has already been closed") + } + + n = 0 + for n < len(p) { + // read from page buffer + if len(d.buf) > 0 { + copied := copy(p[n:], d.buf) + d.buf = d.buf[copied:] + n += copied + } + + // decode another page + err = d.readNextPage() + if err != nil { + return n, err + } + } + return n, nil +} + +func (d *Decoder) readNextPage() (err error) { + for { + if ret := vorbis.OggSyncPageout(&d.syncState, &d.page); ret < 0 { + log.Debugf("vorbis: corrupt or missing data in bitstream") + continue + } else if ret == 0 { + // need more data + _, err = d.readChunk() + if err != nil { + return err + } + } else { + // we have read the page + break + } + } + + // page is synced at this point + vorbis.OggStreamPagein(&d.streamState, &d.page) + + for { + if ret := vorbis.OggStreamPacketout(&d.streamState, &d.packet); ret < 0 { + // skip this packet + continue + } else if ret == 0 { + // no packets left on the page + break + } + + if vorbis.Synthesis(&d.block, &d.packet) == 0 { + vorbis.SynthesisBlockin(&d.dspState, &d.block) + } + + samples := vorbis.SynthesisPcmout(&d.dspState, d.pcm) + for ; samples > 0; samples = vorbis.SynthesisPcmout(&d.dspState, d.pcm) { + for i := 0; i < int(samples); i++ { + for j := 0; j < int(d.info.Channels); j++ { + d.buf = append(d.buf, d.pcm[0][j][:samples][i]*d.gain) + } + } + vorbis.SynthesisRead(&d.dspState, samples) + } + + // save last observed position + d.lastGranulepos = vorbis.OggPageGranulepos(&d.page) + } + + if vorbis.OggPageEos(&d.page) == 1 { + return io.EOF + } + + return nil +} + +func (d *Decoder) SetPositionMs(pos int64) (err error) { + d.Lock() + defer d.Unlock() + + _, err = d.input.Seek(pos*d.size/int64(d.duration), io.SeekStart) + if err != nil { + return fmt.Errorf("failed seeking input: %w", err) + } + + // read data at seek offset + if _, err = d.readChunk(); err != nil { + return fmt.Errorf("failed reading chunk: %w", err) + } + + for { + err = d.readNextPage() + if err != nil { + return fmt.Errorf("failed reading page: %w", err) + } + + if d.PositionMs() >= pos { + break + } + } + + return nil +} + +func (d *Decoder) PositionMs() int64 { + // TODO: account for alsa delay with snd_pcm_delay + return int64(vorbis.GranuleTime(&d.dspState, d.lastGranulepos) * 1000) +} diff --git a/vorbis/utils.go b/vorbis/utils.go new file mode 100644 index 0000000..b5956d2 --- /dev/null +++ b/vorbis/utils.go @@ -0,0 +1,93 @@ +package vorbis + +import ( + "errors" + "fmt" + + "github.com/xlab/vorbis-go/vorbis" +) + +// ReadInfo reads info and comment into Info, a go-friendly struct. +func ReadInfo(vi *vorbis.Info, vc *vorbis.Comment) Info { + info := Info{ + Channels: vi.Channels, + SampleRate: int32(vi.Rate), + Vendor: toString(vc.Vendor, 256), + } + lengths := vc.CommentLengths[:vc.Comments] + userComments := vc.UserComments[:vc.Comments] + for i, text := range userComments { + info.Comments = append(info.Comments, string(text[:lengths[i]])) + } + return info +} + +// ReadHeaders allows to init info and comment from a private codec data payload +// when no Ogg stream is available. Info and comment must be initialised beforehand. +func ReadHeaders(codecPriv []byte, info *vorbis.Info, comment *vorbis.Comment) error { + if len(codecPriv) == 0 { + return errors.New("vorbis decoder: no codec private data") + } + headerSizes := make([]int, 3) + headers := make([][]byte, 3) + p := codecPriv + + if p[0] == 0x00 && p[1] == 0x30 { + for i := 0; i < 3; i++ { + headerSizes[i] = int(uint16(p[0])<<8 | uint16(p[1])) + headers[i] = p + p = p[headerSizes[i]:] + } + } else if p[0] == 0x02 { + offset := 1 + p = p[1:] + for i := 0; i < 2; i++ { + headerSizes[i] = 0 + for (p[0] == 0xFF) && offset < len(codecPriv) { + headerSizes[i] += 0xFF + offset++ + p = p[1:] + } + if offset >= len(codecPriv)-1 { + return errors.New("vorbis decoder: header sizes damaged") + } + headerSizes[i] += int(p[0]) + offset++ + p = p[1:] + } + headerSizes[2] = len(codecPriv) - headerSizes[0] - headerSizes[1] - offset + headers[0] = codecPriv[offset:] + headers[1] = codecPriv[offset+headerSizes[0]:] + headers[2] = codecPriv[offset+headerSizes[0]+headerSizes[1]:] + } else { + return fmt.Errorf("vorbis decoder: initial header len is wrong: %d", p[0]) + } + for i := 0; i < 3; i++ { + packet := vorbis.OggPacket{ + BOS: b(i == 0), + Bytes: headerSizes[i], + Packet: headers[i], + } + if ret := vorbis.SynthesisHeaderin(info, comment, &packet); ret < 0 { + return fmt.Errorf("vorbis decoder: %d. header damaged", i+1) + } + } + return nil +} + +func b(v bool) int { + if v { + return 1 + } + return 0 +} + +func toString(buf []byte, maxlen int) string { + buf = buf[:maxlen] + for i := range buf { + if buf[i] == 0 { + return string(buf[:i:i]) + } + } + return "" +}