-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathtaskqueued
313 lines (246 loc) · 10.3 KB
/
taskqueued
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
#!/usr/bin/perl
# cpanel - bin/taskqueued Copyright(c) 2010 cPanel, Inc.
# All rights Reserved.
# [email protected] http://cpanel.net
use strict;
use warnings;
use cPanel::TaskQueue ();
use cPanel::TaskQueue::Scheduler ();
use cPanel::TaskQueue::PluginManager ();
use Getopt::Long;
use Unix::PID 0.21;
use POSIX ();
use File::Spec ();
main() unless caller;
sub main {
my $queue_dir = '.';
my $qname = 'main';
my $sname;
my $plugindirs = [];
my $namespaces = [];
my $plugins = [];
my $logfile;
handle_config_file( 'taskqueue.cfg' );
GetOptions(
'dir=s', \$queue_dir,
'qname=s', \$qname,
'sname=s', \$sname,
'plugindir=s@', \$plugindirs,
'namespace=s@', \$namespaces,
'plugin=s@', \$plugins,
'logfile=s', \$logfile,
);
$namespaces = [ 'cPanel::TaskProcessors', ] unless @{$namespaces};
die "Missing plugin directories.\n" unless @{$plugindirs};
# Make directories absolute.
$plugindirs = [ map { File::Spec->rel2abs( $_ ) } @{$plugindirs} ];
$queue_dir = File::Spec->rel2abs( $queue_dir );
unshift @INC, @{$plugindirs} if @{$plugindirs};
$sname = $qname unless defined $sname;
# Default wait time is a tradeoff:
# - smaller number is more responsive at picking up new tasks
# - larger number uses less CPU while waiting for something to happen.
my $wait_time = 30;
my $is_running = 1;
my $pidobj = Unix::PID->new({ use_open3 => 0 });
my $pid_file = $queue_dir . '/taskqueued.pid';
$SIG{'HUP'} = sub { $is_running = 0; };
$SIG{'CHLD'} = \&reaper;
# Daemon startup
print "==> TaskQueue Processing Daemon started\n";
exit if ( my $pid = fork() ); # Background myself.
die "Failed to fork TaskQueue Processing Daemon\n" unless defined $pid;
# In order to remove the race condition, we need to test again after the fork.
# This use of Unix::PID removes the race condition when creating the pidfile.
if ( !$pidobj->pid_file_no_unlink( $pid_file ) ) {
print "TaskQueue Processing Daemon already running, exiting.\n";
exit 0;
}
# Finish launching daemon.
chdir( '/');
POSIX::setsid();
$logfile ||= "$queue_dir/${qname}_error.log";
open( STDOUT, '>>', $logfile ) or die "Unable to open log file '$logfile': $!\n";
open( STDERR, '>>', $logfile ) or die "Unable to open log file '$logfile': $!\n";
load_plugins( $plugindirs, $namespaces, $plugins );
my $queue = cPanel::TaskQueue->new({
name => $qname,
state_dir => $queue_dir,
max_running => 1,
});
my $sched = cPanel::TaskQueue::Scheduler->new({
name => $qname,
state_dir => $queue_dir,
});
my $retry_count = 0;
# Processing loop
# Each pass thru the loop will process one task (if any are available)
# - if any scheduled tasks are ready to go into the queue, we do that first
while ( $is_running ) {
eval {
$sched->process_ready_tasks( $queue );
if ( $queue->has_work_to_do() ) {
$queue->process_next_task();
}
else {
my $wait = $sched->seconds_until_next_task();
next if defined $wait and 0 == $wait;
$wait = $wait_time if !$wait || $wait > $wait_time;
sleep $wait;
}
$retry_count = 0;
};
if ( $@ ) {
warn "Exception detected: $@";
if ( $retry_count++ > 5 ) {
warn "Maximum exception count reached. Shutting down taskqueued.";
last;
}
# pause to allow situation to recover
sleep 15;
}
}
unlink $pid_file if $$ == $pidobj->get_pid_from_pidfile( $pid_file ) || !$pidobj->is_pidfile_running( $pid_file );
exit 0;
}
# -----------------------------------------
# subs
sub reaper {
my $thedead;
while ( ( $thedead = waitpid( -1, 1 ) ) > 0 ) {
if ( $? & 127 ) {
warn "Child [$thedead]: exited with signal " . ( $? & 127 ) . "\n";
}
else {
warn "Child [$thedead]: exited normally.\n";
}
}
$SIG{'CHLD'} = \&reaper;
}
sub load_plugins {
my ($plugindirs, $namespaces, $plugins) = @_;
if ( @{$plugins} ) {
foreach my $modname ( @{$plugins} ) {
cPanel::TaskQueue::PluginManager::load_plugin_by_name( $modname );
}
}
else {
cPanel::TaskQueue::PluginManager::load_all_plugins(
directories => $plugindirs,
namespaces => $namespaces,
);
}
}
sub handle_config_file {
my ($config) = @_;
if ( $ARGV[0] =~ /^\@(.*)$/ ) {
$config = $1;
shift @ARGV;
}
unshift @ARGV, config_file( $config ) if -e $config;
}
sub config_file {
my ($file) = @_;
open my $fh, '<', $file or die "Unable to open config file '$file': $!";
local $/;
my $cfg = <$fh>;
close $fh;
$cfg =~ s/#.*\n/\n/g;
return split( ' ', $cfg );
}
__END__
=head1 NAME
taskqueued - Simple daemon for processing a C<cPanel::TaskQueue>.
=head1 SYNOPSIS
taskqueued
In order to make use of the C<cPanel::TaskQueue>, you need some way to process
the tasks in the queue. Although this process is not very difficult, there are
a few subtleties to getting the processing correct.
The C<taskqueued> program provides a simple version of this processing program.
Unlike a proper production-quality processing daemon, this program does not
support any form of logging, and only minor error recovery. It also does not
support any easy way of shutting down or restarting the process while it is
running.
=head1 DESCRIPTION
Although the C<taskqueued> program is not a production-quality queue processing
daemon, it does provides an example of every step needed to do correct handling
of the C<cPanel::TaskQueue>. This script provides a reasonable starting point
for a production processing daemon.
=head1 CONFIGURATION
The C<taskqueueq> program is configured through command line options or a
configuration file. If the first parameter to the program starts with an C<@>,
the rest of the argument is treated as a configuration file name. If no
configuration is passed on the command line, the program defaults to reading
C<taskqueue.cfg> in the current directory.
The configuration file just contains the command line options, one per line.
Blank lines are ignored. Everything after the C<#> character is treated as
a comment and discarded.
The command line options that configure the program are
=over 4
=item --dir={queuedir}
This required parameter specifies a directory in which we can find the
C<TaskQueue>'s state files. The files to be accessed should be readable by the
current user for the program to work.
=item --qname={queue name}
This optional parameter specifies the name associated with the C<TaskQueue>
object. It is used to create the name of the C<TaskQueue> state file. If not
supplied, a default value of C<main> is used.
=item --sname={scheduler name}
This optional parameter specifies the name associated with the
C<TaskQueue::Scheduler> object. It is used to create the name of the
C<TaskQueue::Scheduler> state file. If not supplied, the specified queue name
is used.
=item --plugindir={directory}
This required parameter may be specified multiple times to specify one or more
directories to search for plugins. This directory name should not contain the
namespace.
For example, if we are looking for the plugin C<TaskProcessor::NewCommands> in
the namespace C<TaskProcessor>, and the plugin file is located at
C</usr/local/lib/taskplugins/TaskProcessor/NewCommands.pm>. The plugindir
would be C</usr/local/lib/taskplugins> and the namespace would be C<TaskProcessor>.
These directories are also added to the Perl include directory list to allow
loading any plugins we find.
=item --namespace={ns}
This optional parameter may be supplied multiple times to specify namespaces to
search for plugins. If none are supplied, the default C<cPanel::TaskProcessors>
is used.
=item --plugin={modulename}
This optional parameter may be specified multiple times to specify the particular
plugins to load. If this parameter is supplied, the plugin directories are not
searched for plugins. Instead, only the specified plugins are loaded.
=item --logfile={filename}
This option parameter specifies the file to use for logging error messages. If
not specified, the default logfile is written in the queue directory with the
name C<{qname}_error.log>.
=back
=head1 DEPENDENCIES
In addition to the normal dependencies of the L<cPanel::TaskQueue> module, this
script requires L<Unix::PID> to run.
=head1 INCOMPATIBILITIES
None reported.
=head1 BUGS AND LIMITATIONS
No outstanding bugs.
=head1 LICENCE AND COPYRIGHT
Copyright (c) 2010, cPanel, Inc. All rights reserved.
This module is free software; you can redistribute it and/or
modify it under the same terms as Perl itself. See L<perlartistic>.
=head1 DISCLAIMER OF WARRANTY
BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
FOR THE SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
PROVIDE THE SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER
EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE
ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE SOFTWARE IS WITH
YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL
NECESSARY SERVICING, REPAIR, OR CORRECTION.
IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
REDISTRIBUTE THE SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE
LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL,
OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE
THE SOFTWARE (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING
RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A
FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF
SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
SUCH DAMAGES.