-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathProgram.cs
248 lines (208 loc) · 12 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json.Linq;
using static ClassTranscribeDatabase.CommonUtils;
using ClassTranscribeDatabase;
using ClassTranscribeDatabase.Services;
using ClassTranscribeDatabase.Services.MSTranscription;
using TaskEngine.Tasks;
namespace TaskEngine
{
public static class TaskEngineGlobals
{
public static KeyProvider KeyProvider { get; set; }
}
class Program
{
// Default concurrency (max jobs in parallel *PER QUEUE* (=Per task) if none are set in env
private const ushort NO_CONCURRENCY = 1; // Some tasks should be serialized
private const ushort MIN_CONCURRENCY = 2; // By definition minimal is two.
private const ushort DISABLED_TASK = 0; // Task is disabled, expecting external task agent
public static ServiceProvider _serviceProvider;
public static ILogger<Program> _logger;
public static void Main()
{
Console.WriteLine("TaskEngine.Main starting up -GetConfigurations...");
try {
SetupServices(); // should never return
createTaskQueues();
runQueueAwakerForever();
} catch (Exception e) {
// Some paranoia here; we *should* have a logger and exception handler in place
// So this is only here to catch unexpected startup errors that otherwise might be silent
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\n");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}
}
public static void SetupServices()
{
var configuration = CTDbContext.GetConfigurations();
// This project relies on Dependency Injection to configure its various services,
// For more info, https://docs.microsoft.com/en-us/aspnet/core/fundamentals/dependency-injection?view=aspnetcore-3.1
// All the services used are configured using the service provider.
Console.WriteLine("SetupServices() - starting");
_serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.AddConsole();
builder.AddFilter<Microsoft.Extensions.Logging.ApplicationInsights.ApplicationInsightsLoggerProvider>
("", LogLevel.Warning);
// If we use A.I. in the future -
// Use the AddApplicationInsights() overload which accepts Action<TelemetryConfiguration> and set TelemetryConfiguration.ConnectionString. See https://github.com/microsoft/ApplicationInsights-dotnet/issues/2560 for more details.
// string insightKey = configuration.GetValue<string>("APPLICATION_INSIGHTS_KEY");
// if (!String.IsNullOrEmpty(insightKey) && insightKey.Trim().Length>1)
// {
// builder.AddApplicationInsights(insightKey);
// }
})
.AddOptions()
.Configure<AppSettings>(configuration)
.AddDbContext<CTDbContext>(options => options.UseLazyLoadingProxies().UseNpgsql(CTDbContext.ConnectionStringBuilder()))
.AddSingleton<RabbitMQConnection>()
.AddSingleton<CaptionQueries>()
.AddSingleton<DownloadPlaylistInfoTask>()
.AddSingleton<DownloadMediaTask>()
.AddSingleton<ConvertVideoToWavTask>()
.AddSingleton<LocalTranscriptionTask>()
.AddSingleton<AzureTranscriptionTask>()
.AddSingleton<QueueAwakerTask>()
// .AddSingleton<GenerateVTTFileTask>()
.AddSingleton<RpcClient>()
.AddSingleton<ProcessVideoTask>()
.AddSingleton<MSTranscriptionService>()
.AddSingleton<SceneDetectionTask>()
.AddSingleton<PythonCrawlerTask>()
.AddSingleton<DescribeVideoTask>()
.AddSingleton<DescribeImageTask>()
// .AddSingleton<UpdateBoxTokenTask>()
.AddSingleton<CreateBoxTokenTask>()
.AddSingleton<BuildElasticIndexTask>()
.AddSingleton<ExampleTask>()
.AddSingleton<CleanUpElasticIndexTask>()
.AddSingleton<BoxAPI>()
.AddScoped<Seeder>()
.AddScoped<SlackLogger>()
.AddSingleton<TempCode>()
.BuildServiceProvider();
_logger = _serviceProvider.GetRequiredService<ILogger<Program>>();
Globals.appSettings = _serviceProvider.GetService<IOptions<AppSettings>>().Value;
TaskEngineGlobals.KeyProvider = new KeyProvider(Globals.appSettings);
AppDomain currentDomain = AppDomain.CurrentDomain;
currentDomain.UnhandledException += new UnhandledExceptionEventHandler(ExceptionHandler);
_logger.LogInformation("Seeding database");
// Seed the database, with some initial data.
Seeder seeder = _serviceProvider.GetService<Seeder>();
seeder.Seed();
}
static void runQueueAwakerForever() {
_logger.LogInformation("runQueueAwakerForever - start");
QueueAwakerTask queueAwakerTask = _serviceProvider.GetService<QueueAwakerTask>();
int periodicCheck = Math.Max(1,Convert.ToInt32(Globals.appSettings.PERIODIC_CHECK_EVERY_MINUTES));
int initialPauseMinutes = Math.Max(1, Convert.ToInt32(Globals.appSettings.INITIAL_TASKENGINE_PAUSE_MINUTES));
_logger.LogInformation("Periodic Check Every {0} minutes", periodicCheck);
var timeInterval = new TimeSpan(0, periodicCheck, 0);
var initialPauseInterval = new TimeSpan(0, initialPauseMinutes, 0);
_logger.LogInformation("Pausing {0} minutes before first periodicCheck", initialPauseInterval);
// Thread.Sleep(initialPauseInterval);
// Task.Delay(initialPauseInterval).Wait();
// Check for new tasks every "timeInterval".
// The periodic check will discover all undone tasks
// TODO/REVIEW: However some tasks also publish the next items
while (true)
{
try {
_logger.LogInformation("Periodic Check");
queueAwakerTask.Publish(new JObject
{
{ "Type", TaskType.PeriodicCheck.ToString() }
});
} catch (Exception e) {
_logger.LogError(e, "Error in Periodic Check");
}
// Hacky testing...
// try {
// var videoId = "ddceb720-a9d6-417d-b5ea-e94c6c0a86c6";
// _logger.LogInformation("Transcription Task Initiated");
// queueAwakerTask.Publish(new JObject
// {
// { "Type", TaskType.LocalTranscribeVideo.ToString() },
// { "videoOrMediaId", videoId }
// });
// _logger.LogInformation("Transcription Task Published Successfully");
// } catch (Exception e) {
// _logger.LogError(e, "Error in Transcription Task");
// }
// Thread.Sleep(timeInterval);
Task.Delay(timeInterval).Wait();
_logger.LogInformation("Pausing {0} minutes before next periodicCheck", periodicCheck);
};
}
static void createTaskQueues() {
_logger.LogInformation("createTaskQueues() -starting");
// Delete any pre-existing queues on rabbitMQ.
RabbitMQConnection rabbitMQ = _serviceProvider.GetService<RabbitMQConnection>();
// Active queues managed by C# (concurrency > 0) are now purged after the queue is created and before messages are processed
ushort concurrent_videotasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_VIDEO_TASKS, NO_CONCURRENCY);
ushort concurrent_synctasks = ToUInt16(Globals.appSettings.MAX_CONCURRENT_SYNC_TASKS, MIN_CONCURRENCY);
ushort concurrent_transcriptions = ToUInt16(Globals.appSettings.MAX_CONCURRENT_TRANSCRIPTIONS, MIN_CONCURRENCY);
ushort concurrent_describe_images = NO_CONCURRENCY;
ushort concurrent_describe_videos = NO_CONCURRENCY;
// Create and start consuming from all queues. If concurrency >=1 the queues are purged
// Upstream Sync related
_logger.LogInformation($"Creating DownloadPlaylistInfoTask & DownloadMediaTask consumers. Concurrency={concurrent_synctasks} ");
_serviceProvider.GetService<DownloadPlaylistInfoTask>().Consume(concurrent_synctasks);
_serviceProvider.GetService<DownloadMediaTask>().Consume(concurrent_synctasks);
// Transcription Related
_logger.LogInformation($"Creating TranscriptionTask consumers. Concurrency={concurrent_transcriptions} ");
_serviceProvider.GetService<LocalTranscriptionTask>().Consume(concurrent_transcriptions);
// no more! - _serviceProvider.GetService<GenerateVTTFileTask>().Consume(concurrent_transcriptions);
// Video Processing Related
_logger.LogInformation($"Creating ProcessVideoTask consumer. Concurrency={concurrent_videotasks} ");
_serviceProvider.GetService<ProcessVideoTask>().Consume(concurrent_videotasks);
// Descriptions
_serviceProvider.GetService<DescribeVideoTask>().Consume(concurrent_describe_videos);
_serviceProvider.GetService<DescribeImageTask>().Consume(concurrent_describe_images);
// SceneDetection now handled by native Python
// See https://github.com/classtranscribe/pyapi
_serviceProvider.GetService<SceneDetectionTask>().Consume(DISABLED_TASK);
// We dont want concurrency for these tasks
_logger.LogInformation("Creating QueueAwakerTask and Box token tasks consumers!");
_serviceProvider.GetService<QueueAwakerTask>().Consume(NO_CONCURRENCY); //TODO TOREVIEW: NO_CONCURRENCY?
// does nothing at the moment _serviceProvider.GetService<UpdateBoxTokenTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<CreateBoxTokenTask>().Consume(NO_CONCURRENCY); // calls _box.CreateAccessTokenAsync(authCode);
// Elastic Search index should be built after TranscriptionTask
_serviceProvider.GetService<BuildElasticIndexTask>().Consume(NO_CONCURRENCY);
// Outdated Elastic Search index would be removed
_serviceProvider.GetService<CleanUpElasticIndexTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<ExampleTask>().Consume(NO_CONCURRENCY);
_serviceProvider.GetService<PythonCrawlerTask>().Consume(DISABLED_TASK);
_logger.LogInformation("createTaskQueues() - Done creating task consumers");
}
// Catch all unhandled exceptions.
static void ExceptionHandler(object sender, UnhandledExceptionEventArgs args)
{
Exception e = (Exception)args.ExceptionObject;
Console.WriteLine($"Unhandled Exception Caught {e.Message}\n{e}\nSender:{sender ?? "null"}");
if(_logger !=null){
_logger.LogError(e, "Unhandled Exception Caught");
}
}
private static ushort ToUInt16(String val, ushort defaultVal)
{
// ConvertToUInt16(String, int base) is not the droid you are looking for
if (val != null && val.Length > 0)
{
return Convert.ToUInt16(val); //May throw exception if val is not convertable
}
return defaultVal;
}
}
}