Skip to content

Commit

Permalink
using a ShardedIndex for Playlists
Browse files Browse the repository at this point in the history
fixes see #8
  • Loading branch information
h0lg committed Oct 5, 2024
1 parent 226de82 commit 2409539
Show file tree
Hide file tree
Showing 9 changed files with 539 additions and 119 deletions.
4 changes: 2 additions & 2 deletions Shell/Program.ListKeywords.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ private static async Task ListKeywordsAsync(ListKeywords command, string origina
{
Prevalidate.Scopes(command);

await OutputAsync(command, originalCommand, async (youtube, cancellation, outputs) =>
await OutputAsync(command, originalCommand, async (youtube, cancellation, outputs, notifyCaller) =>
{
var resultDisplayed = false;
Dictionary<CommandScope, Dictionary<string, List<string>>> scopes = [];

await foreach (var (keyword, videoId, scope) in youtube.ListKeywordsAsync(command).WithCancellation(cancellation))
await foreach (var (keyword, videoId, scope) in youtube.ListKeywordsAsync(command, notifyCaller).WithCancellation(cancellation))
Youtube.AggregateKeywords(keyword, videoId, scope, scopes);

if (scopes.Any())
Expand Down
12 changes: 10 additions & 2 deletions Shell/Program.OutputCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace SubTubular.Shell;
static partial class Program
{
private static async Task OutputAsync(OutputCommand command, string originalCommand,
Func<Youtube, CancellationToken, List<OutputWriter>, Task<bool>> runCommand)
Func<Youtube, CancellationToken, List<OutputWriter>, Action<string, string>, Task<bool>> runCommand)
{
//inspired by https://johnthiriet.com/cancel-asynchronous-operation-in-csharp/
using var cancellation = new CancellationTokenSource();
Expand Down Expand Up @@ -53,7 +53,15 @@ private static async Task OutputAsync(OutputCommand command, string originalComm
{
/* passing token into command for it to react to cancellation,
see https://docs.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8#a-tour-through-async-enumerables */
resultDisplayed = await runCommand(youtube, cancellation.Token, outputs);
resultDisplayed = await runCommand(youtube, cancellation.Token, outputs,
// notification channel
(title, message) => outputs.ForEach(o =>
{
o.WriteLine();
o.WriteLine(title);
o.WriteLine(message);
o.WriteLine();
}));
}
catch (OperationCanceledException) { Console.WriteLine("The operation was cancelled."); }
finally // write output file even if exception occurs
Expand Down
4 changes: 2 additions & 2 deletions Shell/Program.Search.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ private static async Task SearchAsync(SearchCommand command, string originalComm
{
Prevalidate.Search(command);

await OutputAsync(command, originalCommand, async (youtube, cancellation, outputs) =>
await OutputAsync(command, originalCommand, async (youtube, cancellation, outputs, notifyCaller) =>
{
var resultDisplayed = false;
var tracksWithErrors = new List<CaptionTrack>();

await foreach (var result in youtube.SearchAsync(command).WithCancellation(cancellation))
await foreach (var result in youtube.SearchAsync(command, notifyCaller).WithCancellation(cancellation))
{
outputs.ForEach(o => o.WriteVideoResult(result, command.Padding));
resultDisplayed = true;
Expand Down
3 changes: 3 additions & 0 deletions SubTubular/Extensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ internal static bool ContainsAny<T>(this IEnumerable<T> collection, IEnumerable<

public static IEnumerable<T> WithValue<T>(this IEnumerable<T?> nullables)
=> nullables.Where(v => v != null).Select(v => v!);

public static IEnumerable<T> WithValue<T>(this IEnumerable<T?> nullables) where T : struct
=> nullables.Where(v => v.HasValue).Select(v => v!.Value);
}

internal static class HashCodeExtensions
Expand Down
183 changes: 170 additions & 13 deletions SubTubular/Playlist.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,189 @@ namespace SubTubular;

public sealed class Playlist
{
public static ushort ShardSize = 200;
private bool hasUnsavedChanges;

[JP("t")] public required string Title { get; set; }
[JP("u")] public required string ThumbnailUrl { get; set; }
[JP("c")] public string? Channel { get; set; }
[JP("l")] public DateTime Loaded { get; set; }

/// <summary>The <see cref="Video.Id"/>s and (optional) upload dates
/// of the videos included in the <see cref="Playlist" />.</summary>
[JP("v")] public IDictionary<string, DateTime?> Videos { get; set; } = new Dictionary<string, DateTime?>();
[JsonInclude, JP("v")] private List<VideoInfo> videos = [];

private VideoInfo? GetVideo(string videoId) => videos.SingleOrDefault(s => s.Id == videoId);

/// <summary>The videos included in the <see cref="Playlist" /> (i.e. excluding dropped)
/// ordered by <see cref="VideoInfo.PlaylistIndex"/>.</summary>
public IOrderedEnumerable<VideoInfo> GetVideos()
{
changeToken?.Wait();
try { return videos.Where(v => v.PlaylistIndex.HasValue).OrderBy(v => v.PlaylistIndex); }
finally { changeToken?.Release(); }
}

internal uint GetVideoCount()
{
changeToken?.Wait();
try { return (uint)videos.Count; }
finally { changeToken?.Release(); }
}

// Retrieve all video IDs from all shards
internal IEnumerable<string> GetVideoIds()
{
changeToken?.Wait();
try { return videos.Select(v => v.Id); }
finally { changeToken?.Release(); }
}

private SemaphoreSlim? changeToken; // ensures safe concurrent access during the update phase

// creates the changeToken required for changes
public IDisposable CreateChangeToken()
{
changeToken = new(1, 1);
return changeToken;
}

/// <summary>Tries inserting or moving <paramref name="videoId"/>
/// at or to <paramref name="newIndex"/> in <see cref="GetVideos()"/>
/// and returns whether the operation resulted in any changes.</summary>
public bool TryAddVideoId(string videoId, uint newIndex)
{
changeToken!.Wait();

try
{
var video = GetVideo(videoId);

if (video == null)
{
foreach (var dropped in videos.Where(v => newIndex == v.PlaylistIndex))
dropped.PlaylistIndex = null;

video = new VideoInfo { Id = videoId, PlaylistIndex = newIndex };
videos.Add(video);
hasUnsavedChanges = true;
return true;
}
else
{
if (newIndex == video.PlaylistIndex) return false;

if (video.PlaylistIndex == null)
foreach (var dropped in videos.Where(v => newIndex == v.PlaylistIndex))
dropped.PlaylistIndex = null;
else
foreach (var dropped in videos.Where(v => newIndex <= v.PlaylistIndex))
dropped.PlaylistIndex = null;

video.PlaylistIndex = newIndex;

hasUnsavedChanges = true;
return true;
}
}
finally { changeToken.Release(); }
}

internal bool SetUploaded(Video loadedVideo)
{
changeToken!.Wait();

try
{
VideoInfo? video = GetVideo(loadedVideo.Id);

// should not happen, just as a fall-back
if (video == null)
{
video = new VideoInfo { Id = loadedVideo.Id };
videos.Add(video);
UpdateShardNumbers();
hasUnsavedChanges = true;
}

bool madeChanges = false;

if (video.Uploaded != loadedVideo.Uploaded)
{
video.Uploaded = loadedVideo.Uploaded;
hasUnsavedChanges = true;
madeChanges = true;
}

return madeChanges;
}
finally { changeToken.Release(); }
}

public void UpdateShardNumbers()
{
changeToken!.Wait();

try
{
videos = videos.OrderBy(v => v.PlaylistIndex).ToList();
var firstLoaded = videos.Find(v => v.ShardNumber == 0);
var indexOfFirstLoaded = firstLoaded == null ? 0 : videos.IndexOf(firstLoaded);

foreach (var video in videos.Where(v => v.ShardNumber == null))
{
var index = videos.IndexOf(video);
int translatedIndex = index - indexOfFirstLoaded;
var shardNumber = (short?)(translatedIndex < 0 ? ((translatedIndex + 1) / ShardSize) - 1 : translatedIndex / ShardSize);

internal IEnumerable<string> GetVideoIds() => Videos.Keys;
internal DateTime? GetVideoUploaded(string id) => Videos.TryGetValue(id, out var uploaded) ? uploaded : null;
if (video.ShardNumber != shardNumber)
{
video.ShardNumber = shardNumber;
hasUnsavedChanges = true;
}
}
}
finally { changeToken.Release(); }
}

internal void AddVideoIds(string[] freshKeys)
internal void UpdateLoaded()
{
// use new order but append older entries; note that this leaves remotely deleted videos in the playlist
Videos = freshKeys.Concat(GetVideoIds().Except(freshKeys)).ToDictionary(id => id, GetVideoUploaded);
changeToken!.Wait();
Loaded = DateTime.UtcNow;
hasUnsavedChanges = true;
changeToken.Release();
}

internal bool SetUploaded(Video video)
internal async ValueTask SaveAsync(Func<Task> save)
{
if (Videos[video.Id] != video.Uploaded)
if (!hasUnsavedChanges) return;
await changeToken!.WaitAsync();

try
{
await save();
hasUnsavedChanges = false;
}
finally
{
Videos[video.Id] = video.Uploaded;
return true;
changeToken.Release();
}
}

public sealed class VideoInfo
{
[JP("i")] public required string Id { get; set; }
[JP("s")] public short? ShardNumber { get; set; }
[JP("u")] public DateTime? Uploaded { get; set; }

return false;
//set if included, null if dropped
[JP("n")] public uint? PlaylistIndex { get; set; }

// used for debugging
public override string ToString()
{
var shardNumber = ShardNumber.HasValue ? " s" + ShardNumber : null;
var playlistIndex = PlaylistIndex.HasValue ? " n" + PlaylistIndex : null;
var uploaded = Uploaded.HasValue ? $" {Uploaded:d}" : null;
return Id + playlistIndex + shardNumber + uploaded;
}
}
}
40 changes: 22 additions & 18 deletions SubTubular/VideoIndex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,9 @@ internal VideoIndex Build(string key)
// see https://mikegoatly.github.io/lifti/docs/index-construction/withindexmodificationaction/
FullTextIndex<string> index = CreateIndexBuilder().WithIndexModificationAction(async indexSnapshot =>
{
try
{
await videoIndex!.AccessToken.WaitAsync();
await SaveAsync(indexSnapshot, key);
}
finally
{
videoIndex?.AccessToken.Release();
}
await videoIndex!.AccessToken.WaitAsync();
try { await SaveAsync(indexSnapshot, key); }
finally { videoIndex!.AccessToken.Release(); }
}).Build();

videoIndex = new VideoIndex(index);
Expand Down Expand Up @@ -90,6 +84,15 @@ internal VideoIndex Build(string key)
}
}

// see https://github.com/mikegoatly/lifti/issues/32 and https://github.com/mikegoatly/lifti/issues/74
internal async ValueTask<VideoIndex> GetIndexShardAsync(string playlistKey, int shardNumber)
{
string key = playlistKey + "." + shardNumber;
var index = await GetAsync(key);
index ??= Build(key);
return index;
}

private async Task SaveAsync(IIndexSnapshot<string> indexSnapshot, string key)
{
// see https://mikegoatly.github.io/lifti/docs/serialization/
Expand Down Expand Up @@ -131,13 +134,13 @@ internal async Task AddAsync(Video video, CancellationToken cancellation)
/// accompanied by their corresponding <see cref="Video.Uploaded"/> dates, if known.
/// The latter are only used for <see cref="SearchCommand.OrderOptions.uploaded"/>
/// and missing dates are determined by loading the videos using <paramref name="getVideoAsync"/>.</param>
/// <param name="updatePlaylistVideosUploaded">A callback for updating the <see cref="Playlist.Videos"/>
/// <param name="playlist">Allows updating the <see cref="Playlist.GetVideos()"/>
/// with the <see cref="Video.Uploaded"/> dates after loading them for
/// <see cref="SearchCommand.OrderOptions.uploaded"/>.</param>
internal async IAsyncEnumerable<VideoSearchResult> SearchAsync(SearchCommand command,
Func<string, CancellationToken, Task<Video>> getVideoAsync,
IDictionary<string, DateTime?>? relevantVideos = default,
Func<IEnumerable<Video>, Task>? updatePlaylistVideosUploaded = default,
Playlist? playlist = default,
[EnumeratorCancellation] CancellationToken cancellation = default)
{
cancellation.ThrowIfCancellationRequested();
Expand All @@ -164,7 +167,7 @@ internal async IAsyncEnumerable<VideoSearchResult> SearchAsync(SearchCommand com
var matchesForVideosWithoutUploadDate = matches.Where(m =>
!relevantVideos.ContainsKey(m.Key) || relevantVideos[m.Key] == null).ToArray();

// get upload dates for videos that we don't know it of
// get upload dates for videos that we don't know it of (may occur if index remembers a video the Playlist forgot about)
if (matchesForVideosWithoutUploadDate.Length != 0)
{
var getVideos = matchesForVideosWithoutUploadDate.Select(m => getVideoAsync(m.Key, cancellation)).ToArray();
Expand All @@ -173,10 +176,11 @@ internal async IAsyncEnumerable<VideoSearchResult> SearchAsync(SearchCommand com
unIndexedVideos.AddRange(videosWithoutUploadDate.Where(v => v.UnIndexed));

foreach (var match in matchesForVideosWithoutUploadDate)
relevantVideos[match.Key] = videosWithoutUploadDate.Single(v => v.Id == match.Key).Uploaded;

if (updatePlaylistVideosUploaded != default)
await updatePlaylistVideosUploaded(videosWithoutUploadDate);
{
Video video = videosWithoutUploadDate.Single(v => v.Id == match.Key);
relevantVideos[match.Key] = video.Uploaded;
playlist?.SetUploaded(video);
}
}
}

Expand Down Expand Up @@ -277,7 +281,7 @@ internal async IAsyncEnumerable<VideoSearchResult> SearchAsync(SearchCommand com

await foreach (var result in SearchAsync(command, GetReIndexedVideoAsync,
unIndexedVideos.ToDictionary(v => v.Id, v => v.Uploaded as DateTime?),
updatePlaylistVideosUploaded, cancellation))
playlist, cancellation))
yield return result;

// re-trigger search for re-indexed videos only
Expand Down Expand Up @@ -307,4 +311,4 @@ public void Dispose()
Index.Dispose();
AccessToken.Dispose();
}
}
}
Loading

0 comments on commit 2409539

Please sign in to comment.