Skip to content

Commit

Permalink
Merge pull request #124 from NicolasConstant/develop
Browse files Browse the repository at this point in the history
0.19.0 PR
  • Loading branch information
NicolasConstant authored Nov 19, 2021
2 parents 22b0d6d + 1855830 commit a21b493
Show file tree
Hide file tree
Showing 41 changed files with 1,149 additions and 100 deletions.
1 change: 1 addition & 0 deletions VARIABLES.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ If both whitelisting and blacklisting are set, only the whitelisting will be act
* `Instance:PublishReplies` (default: false) to enable or disable replies publishing.
* `Instance:UnlistedTwitterAccounts` (default: null) to enable unlisted publication for selected twitter accounts, separated by `;` (please limit this to brands and other public profiles).
* `Instance:SensitiveTwitterAccounts` (default: null) mark all media from given accounts as sensitive by default, separated by `;`.
* `Instance:FailingTwitterUserCleanUpThreshold` (default: 700) set the max allowed errors (due to a banned/deleted/private account) from a Twitter Account retrieval before auto-removal. (by default an account is called every 15 mins)

# Docker Compose full example

Expand Down
12 changes: 10 additions & 2 deletions src/BSLManager/App.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,23 +146,31 @@ private void OpenFollowerDialog(int selectedIndex)
Width = Dim.Fill(),
Height = 1
};
var inbox = new Label($"Inbox: {follower.InboxRoute}")
var errors = new Label($"Posting Errors: {follower.PostingErrorCount}")
{
X = 1,
Y = 4,
Width = Dim.Fill(),
Height = 1
};
var sharedInbox = new Label($"Shared Inbox: {follower.SharedInboxRoute}")
var inbox = new Label($"Inbox: {follower.InboxRoute}")
{
X = 1,
Y = 5,
Width = Dim.Fill(),
Height = 1
};
var sharedInbox = new Label($"Shared Inbox: {follower.SharedInboxRoute}")
{
X = 1,
Y = 6,
Width = Dim.Fill(),
Height = 1
};

dialog.Add(name);
dialog.Add(following);
dialog.Add(errors);
dialog.Add(inbox);
dialog.Add(sharedInbox);
dialog.Add(close);
Expand Down
2 changes: 1 addition & 1 deletion src/BSLManager/Domain/FollowersListState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ private void ResetLists()

foreach (var follower in _sourceUserList)
{
var displayedUser = $"{GetFullHandle(follower)} ({follower.Followings.Count})";
var displayedUser = $"{GetFullHandle(follower)} ({follower.Followings.Count}) (err:{follower.PostingErrorCount})";
_filteredDisplayableUserList.Add(displayedUser);
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/BirdsiteLive.Common/Settings/InstanceSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ public class InstanceSettings

public string UnlistedTwitterAccounts { get; set; }
public string SensitiveTwitterAccounts { get; set; }

public int FailingTwitterUserCleanUpThreshold { get; set; }
}
}
1 change: 1 addition & 0 deletions src/BirdsiteLive.Pipeline/BirdsiteLive.Pipeline.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

<ItemGroup>
<ProjectReference Include="..\BirdsiteLive.Domain\BirdsiteLive.Domain.csproj" />
<ProjectReference Include="..\BirdsiteLive.Moderation\BirdsiteLive.Moderation.csproj" />
<ProjectReference Include="..\BirdsiteLive.Twitter\BirdsiteLive.Twitter.csproj" />
<ProjectReference Include="..\DataAccessLayers\BirdsiteLive.DAL\BirdsiteLive.DAL.csproj" />
</ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Pipeline.Models;

namespace BirdsiteLive.Pipeline.Contracts
{
public interface IRefreshTwitterUserStatusProcessor
{
Task<UserWithDataToSync[]> ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ namespace BirdsiteLive.Pipeline.Contracts
{
public interface IRetrieveFollowersProcessor
{
Task<IEnumerable<UserWithTweetsToSync>> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct);
Task<IEnumerable<UserWithDataToSync>> ProcessAsync(UserWithDataToSync[] userWithTweetsToSyncs, CancellationToken ct);
//IAsyncEnumerable<UserWithTweetsToSync> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ namespace BirdsiteLive.Pipeline.Contracts
{
public interface IRetrieveTweetsProcessor
{
Task<UserWithTweetsToSync[]> ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct);
Task<UserWithDataToSync[]> ProcessAsync(UserWithDataToSync[] syncTwitterUsers, CancellationToken ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ namespace BirdsiteLive.Pipeline.Contracts
{
public interface ISaveProgressionProcessor
{
Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct);
Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ namespace BirdsiteLive.Pipeline.Contracts
{
public interface ISendTweetsToFollowersProcessor
{
Task<UserWithTweetsToSync> ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct);
Task<UserWithDataToSync> ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace BirdsiteLive.Pipeline.Models
{
public class UserWithTweetsToSync
public class UserWithDataToSync
{
public SyncTwitterUser User { get; set; }
public ExtractedTweet[] Tweets { get; set; }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using BirdsiteLive.Common.Settings;
using BirdsiteLive.DAL.Contracts;
using BirdsiteLive.DAL.Models;
using BirdsiteLive.Moderation.Actions;
using BirdsiteLive.Pipeline.Contracts;
using BirdsiteLive.Pipeline.Models;
using BirdsiteLive.Twitter;

namespace BirdsiteLive.Pipeline.Processors
{
public class RefreshTwitterUserStatusProcessor : IRefreshTwitterUserStatusProcessor
{
private readonly ICachedTwitterUserService _twitterUserService;
private readonly ITwitterUserDal _twitterUserDal;
private readonly IRemoveTwitterAccountAction _removeTwitterAccountAction;
private readonly InstanceSettings _instanceSettings;

#region Ctor
public RefreshTwitterUserStatusProcessor(ICachedTwitterUserService twitterUserService, ITwitterUserDal twitterUserDal, IRemoveTwitterAccountAction removeTwitterAccountAction, InstanceSettings instanceSettings)
{
_twitterUserService = twitterUserService;
_twitterUserDal = twitterUserDal;
_removeTwitterAccountAction = removeTwitterAccountAction;
_instanceSettings = instanceSettings;
}
#endregion

public async Task<UserWithDataToSync[]> ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct)
{
var usersWtData = new List<UserWithDataToSync>();

foreach (var user in syncTwitterUsers)
{
var userView = _twitterUserService.GetUser(user.Acct);
if (userView == null)
{
await AnalyseFailingUserAsync(user);
}
else if (!userView.Protected)
{
user.FetchingErrorCount = 0;
var userWtData = new UserWithDataToSync
{
User = user
};
usersWtData.Add(userWtData);
}
}

return usersWtData.ToArray();
}

private async Task AnalyseFailingUserAsync(SyncTwitterUser user)
{
var dbUser = await _twitterUserDal.GetTwitterUserAsync(user.Acct);
dbUser.FetchingErrorCount++;

if (dbUser.FetchingErrorCount > _instanceSettings.FailingTwitterUserCleanUpThreshold)
{
await _removeTwitterAccountAction.ProcessAsync(user);
}
else
{
await _twitterUserDal.UpdateTwitterUserAsync(dbUser);
}

// Purge
_twitterUserService.PurgeUser(user.Acct);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public RetrieveFollowersProcessor(IFollowersDal followersDal)
}
#endregion

public async Task<IEnumerable<UserWithTweetsToSync>> ProcessAsync(UserWithTweetsToSync[] userWithTweetsToSyncs, CancellationToken ct)
public async Task<IEnumerable<UserWithDataToSync>> ProcessAsync(UserWithDataToSync[] userWithTweetsToSyncs, CancellationToken ct)
{
//TODO multithread this
foreach (var user in userWithTweetsToSyncs)
Expand Down
25 changes: 9 additions & 16 deletions src/BirdsiteLive.Pipeline/Processors/RetrieveTweetsProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,30 @@ public RetrieveTweetsProcessor(ITwitterTweetsService twitterTweetsService, ITwit
}
#endregion

public async Task<UserWithTweetsToSync[]> ProcessAsync(SyncTwitterUser[] syncTwitterUsers, CancellationToken ct)
public async Task<UserWithDataToSync[]> ProcessAsync(UserWithDataToSync[] syncTwitterUsers, CancellationToken ct)
{
var usersWtTweets = new List<UserWithTweetsToSync>();
var usersWtTweets = new List<UserWithDataToSync>();

//TODO multithread this
foreach (var user in syncTwitterUsers)
foreach (var userWtData in syncTwitterUsers)
{
var user = userWtData.User;
var tweets = RetrieveNewTweets(user);
if (tweets.Length > 0 && user.LastTweetPostedId != -1)
{
var userWtTweets = new UserWithTweetsToSync
{
User = user,
Tweets = tweets
};
usersWtTweets.Add(userWtTweets);
userWtData.Tweets = tweets;
usersWtTweets.Add(userWtData);
}
else if (tweets.Length > 0 && user.LastTweetPostedId == -1)
{
var tweetId = tweets.Last().Id;
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, now);
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, tweetId, tweetId, user.FetchingErrorCount, now);
}
else
{
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, now);
await _twitterUserDal.UpdateTwitterUserAsync(user.Id, user.LastTweetPostedId, user.LastTweetSynchronizedForAllFollowersId, user.FetchingErrorCount, now);
}
}

Expand All @@ -67,11 +64,7 @@ public async Task<UserWithTweetsToSync[]> ProcessAsync(SyncTwitterUser[] syncTwi
private ExtractedTweet[] RetrieveNewTweets(SyncTwitterUser user)
{
var tweets = new ExtractedTweet[0];

// Don't retrieve TL if protected
var userView = _twitterUserService.GetUser(user.Acct);
if (userView == null || userView.Protected) return tweets;


try
{
if (user.LastTweetPostedId == -1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ public async Task GetTwitterUsersAsync(BufferBlock<SyncTwitterUser[]> twitterUse
}

var splitCount = splitUsers.Count();
if (splitCount < 15) await Task.Delay((15 - splitCount) * WaitFactor, ct);
if (splitCount < 15) await Task.Delay((15 - splitCount) * WaitFactor, ct); //Always wait 15min

//// Extra wait time to fit 100.000/day limit
//var extraWaitTime = (int)Math.Ceiling((60 / ((100000d / 24) / userCount)) - 15);
//if (extraWaitTime < 0) extraWaitTime = 0;
//await Task.Delay(extraWaitTime * 1000, ct);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public SaveProgressionProcessor(ITwitterUserDal twitterUserDal, ILogger<SaveProg
}
#endregion

public async Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct)
public async Task ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct)
{
try
{
Expand All @@ -49,7 +49,7 @@ public async Task ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, Cancel
var lastPostedTweet = userWithTweetsToSync.Tweets.Select(x => x.Id).Max();
var minimumSync = followingSyncStatuses.Min();
var now = DateTime.UtcNow;
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, now);
await _twitterUserDal.UpdateTwitterUserAsync(userId, lastPostedTweet, minimumSync, userWithTweetsToSync.User.FetchingErrorCount, now);
}
catch (Exception e)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,39 @@ public class SendTweetsToFollowersProcessor : ISendTweetsToFollowersProcessor
{
private readonly ISendTweetsToInboxTask _sendTweetsToInboxTask;
private readonly ISendTweetsToSharedInboxTask _sendTweetsToSharedInbox;
private readonly IFollowersDal _followersDal;
private readonly ILogger<SendTweetsToFollowersProcessor> _logger;

#region Ctor
public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox, ILogger<SendTweetsToFollowersProcessor> logger)
public SendTweetsToFollowersProcessor(ISendTweetsToInboxTask sendTweetsToInboxTask, ISendTweetsToSharedInboxTask sendTweetsToSharedInbox, IFollowersDal followersDal, ILogger<SendTweetsToFollowersProcessor> logger)
{
_sendTweetsToInboxTask = sendTweetsToInboxTask;
_sendTweetsToSharedInbox = sendTweetsToSharedInbox;
_logger = logger;
_followersDal = followersDal;
}
#endregion

public async Task<UserWithTweetsToSync> ProcessAsync(UserWithTweetsToSync userWithTweetsToSync, CancellationToken ct)
public async Task<UserWithDataToSync> ProcessAsync(UserWithDataToSync userWithTweetsToSync, CancellationToken ct)
{
var user = userWithTweetsToSync.User;

// Process Shared Inbox
var followersWtSharedInbox = userWithTweetsToSync.Followers
.Where(x => !string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithSharedInbox(userWithTweetsToSync.Tweets, followersWtSharedInbox, user);
await ProcessFollowersWithSharedInboxAsync(userWithTweetsToSync.Tweets, followersWtSharedInbox, user);

// Process Inbox
var followerWtInbox = userWithTweetsToSync.Followers
.Where(x => string.IsNullOrWhiteSpace(x.SharedInboxRoute))
.ToList();
await ProcessFollowersWithInbox(userWithTweetsToSync.Tweets, followerWtInbox, user);
await ProcessFollowersWithInboxAsync(userWithTweetsToSync.Tweets, followerWtInbox, user);

return userWithTweetsToSync;
}

private async Task ProcessFollowersWithSharedInbox(ExtractedTweet[] tweets, List<Follower> followers, SyncTwitterUser user)
private async Task ProcessFollowersWithSharedInboxAsync(ExtractedTweet[] tweets, List<Follower> followers, SyncTwitterUser user)
{
var followersPerInstances = followers.GroupBy(x => x.Host);

Expand All @@ -61,28 +63,51 @@ private async Task ProcessFollowersWithSharedInbox(ExtractedTweet[] tweets, List
try
{
await _sendTweetsToSharedInbox.ExecuteAsync(tweets, user, followersPerInstance.Key, followersPerInstance.ToArray());

foreach (var f in followersPerInstance)
await ProcessWorkingUserAsync(f);
}
catch (Exception e)
{
var follower = followersPerInstance.First();
_logger.LogError(e, "Posting to {Host}{Route} failed", follower.Host, follower.SharedInboxRoute);

foreach (var f in followersPerInstance)
await ProcessFailingUserAsync(f);
}
}
}

private async Task ProcessFollowersWithInbox(ExtractedTweet[] tweets, List<Follower> followerWtInbox, SyncTwitterUser user)
private async Task ProcessFollowersWithInboxAsync(ExtractedTweet[] tweets, List<Follower> followerWtInbox, SyncTwitterUser user)
{
foreach (var follower in followerWtInbox)
{
try
{
await _sendTweetsToInboxTask.ExecuteAsync(tweets, follower, user);
await ProcessWorkingUserAsync(follower);
}
catch (Exception e)
{
_logger.LogError(e, "Posting to {Host}{Route} failed", follower.Host, follower.InboxRoute);
await ProcessFailingUserAsync(follower);
}
}
}

private async Task ProcessWorkingUserAsync(Follower follower)
{
if (follower.PostingErrorCount > 0)
{
follower.PostingErrorCount = 0;
await _followersDal.UpdateFollowerAsync(follower);
}
}

private async Task ProcessFailingUserAsync(Follower follower)
{
follower.PostingErrorCount++;
await _followersDal.UpdateFollowerAsync(follower);
}
}
}
Loading

0 comments on commit a21b493

Please sign in to comment.