diff --git a/lib/polipus.rb b/lib/polipus.rb index a72957e..24b7527 100644 --- a/lib/polipus.rb +++ b/lib/polipus.rb @@ -1,4 +1,5 @@ # encoding: UTF-8 +require 'forwardable' require 'redis' require 'redis/connection/hiredis' require 'redis-queue' @@ -10,6 +11,8 @@ require 'polipus/queue_overflow' require 'polipus/robotex' require 'polipus/signal_handler' +require 'polipus/worker' +require 'polipus/polipus_crawler' require 'thread' require 'logger' require 'json' @@ -18,456 +21,4 @@ module Polipus def self.crawler(job_name = 'polipus', urls = [], options = {}, &block) PolipusCrawler.crawl(job_name, urls, options, &block) end - - class PolipusCrawler - OPTS = { - # run 4 threads - workers: 4, - # identify self as Polipus/VERSION - user_agent: "Polipus - #{Polipus::VERSION} - #{Polipus::HOMEPAGE}", - # by default, don't limit the depth of the crawl - depth_limit: false, - # number of times HTTP redirects will be followed - redirect_limit: 5, - # storage engine defaults to DevNull - storage: nil, - # proxy server hostname - proxy_host: nil, - # proxy server port number - proxy_port: false, - # HTTP read timeout in seconds - read_timeout: 30, - # HTTP open connection timeout in seconds - open_timeout: 10, - # Time to wait for new messages on Redis - # After this timeout, current crawling session is marked as terminated - queue_timeout: 30, - # An URL tracker instance. default is Bloomfilter based on redis - url_tracker: nil, - # A Redis options {} that will be passed directly to Redis.new - redis_options: {}, - # An instance of logger - logger: nil, - # A logger level - logger_level: nil, - # whether the query string should be included in the saved page - include_query_string_in_saved_page: true, - # Max number of items to keep on redis - queue_items_limit: 2_000_000, - # The adapter used to store exceed (queue_items_limit) redis items - queue_overflow_adapter: nil, - # Every x seconds, the main queue is checked for overflowed items - queue_overflow_manager_check_time: 60, - # If true, each page downloaded will increment a counter on redis - stats_enabled: false, - # Cookies strategy - cookie_jar: nil, - # whether or not accept cookies - accept_cookies: false, - # A set of hosts that should be considered parts of the same domain - # Eg It can be used to follow links with and without 'www' domain - domain_aliases: [], - # Mark a connection as staled after connection_max_hits request - connection_max_hits: nil, - # Page TTL: mark a page as expired after ttl_page seconds - ttl_page: nil, - # don't obey the robots exclusion protocol - obey_robots_txt: false, - # If true, signal handling strategy is enabled. - # INT and TERM signal will stop polipus gracefully - # Disable it if polipus will run as a part of Resque or DelayedJob-like system - enable_signal_handler: true - } - - attr_reader :storage - attr_reader :job_name - attr_reader :logger - attr_reader :options - attr_reader :crawler_name - - OPTS.keys.each do |key| - define_method "#{key}=" do |value| - @options[key.to_sym] = value - end - define_method "#{key}" do - @options[key.to_sym] - end - end - - def initialize(job_name = 'polipus', urls = [], options = {}) - @job_name = job_name - @options = OPTS.merge(options) - @options[:queue_timeout] = 1 if @options[:queue_timeout] <= 0 - @logger = @options[:logger] ||= Logger.new(nil) - - unless @logger.class.to_s == 'Log4r::Logger' - @logger.level = @options[:logger_level] ||= Logger::INFO - end - - @storage = @options[:storage] ||= Storage.dev_null - - @http_pool = [] - @workers_pool = [] - @queues_pool = [] - - @follow_links_like = [] - @skip_links_like = [] - @on_page_downloaded = [] - @on_before_save = [] - @on_page_error = [] - @focus_crawl_block = nil - @on_crawl_end = [] - @redis_factory = nil - - @overflow_manager = nil - @crawler_name = `hostname`.strip + "-#{@job_name}" - - @storage.include_query_string_in_uuid = @options[:include_query_string_in_saved_page] - - @urls = [urls].flatten.map { |url| URI(url) } - @urls.each { |url| url.path = '/' if url.path.empty? } - @internal_queue = queue_factory - @robots = Polipus::Robotex.new(@options[:user_agent]) if @options[:obey_robots_txt] - # Attach signal handling if enabled - SignalHandler.enable if @options[:enable_signal_handler] - execute_plugin 'on_initialize' - - yield self if block_given? - end - - def self.crawl(*args, &block) - new(*args, &block).takeover - end - - def takeover - overflow_items_controller if queue_overflow_adapter - - @urls.each do |u| - add_url(u) { |page| page.user_data.p_seeded = true } - end - return if @internal_queue.empty? - - execute_plugin 'on_crawl_start' - @options[:workers].times do |worker_number| - @workers_pool << Thread.new do - @logger.debug { "Start worker #{worker_number}" } - http = @http_pool[worker_number] ||= HTTP.new(@options) - queue = @queues_pool[worker_number] ||= queue_factory - queue.process(false, @options[:queue_timeout]) do |message| - - next if message.nil? - - execute_plugin 'on_message_received' - - page = Page.from_json message - - unless should_be_visited?(page.url, false) - @logger.info { "[worker ##{worker_number}] Page (#{page.url}) is no more welcome." } - queue.commit - next - end - - if page_exists? page - @logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." } - queue.commit - next - end - - url = page.url.to_s - @logger.debug { "[worker ##{worker_number}] Fetching page: [#{page.url}] Referer: #{page.referer} Depth: #{page.depth}" } - - execute_plugin 'on_before_download' - - pages = http.fetch_pages(url, page.referer, page.depth) - if pages.count > 1 - rurls = pages.map { |e| e.url.to_s }.join(' --> ') - @logger.info { "Got redirects! #{rurls}" } - page = pages.pop - page.aliases = pages.map { |e| e.url } - if page_exists? page - @logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." } - queue.commit - next - end - else - page = pages.last - end - - execute_plugin 'on_after_download' - - if page.error - @logger.warn { "Page #{page.url} has error: #{page.error}" } - incr_error - @on_page_error.each { |e| e.call(page) } - end - - # Execute on_before_save blocks - @on_before_save.each { |e| e.call(page) } - - page.storable? && @storage.add(page) - - @logger.debug { "[worker ##{worker_number}] Fetched page: [#{page.url}] Referrer: [#{page.referer}] Depth: [#{page.depth}] Code: [#{page.code}] Response Time: [#{page.response_time}]" } - @logger.info { "[worker ##{worker_number}] Page (#{page.url}) downloaded" } - - incr_pages - - # Execute on_page_downloaded blocks - @on_page_downloaded.each { |e| e.call(page) } - - if @options[:depth_limit] == false || @options[:depth_limit] > page.depth - links_for(page).each do |url_to_visit| - next unless should_be_visited?(url_to_visit) - enqueue url_to_visit, page, queue - end - else - @logger.info { "[worker ##{worker_number}] Depth limit reached #{page.depth}" } - end - - @logger.debug { "[worker ##{worker_number}] Queue size: #{queue.size}" } - @overflow_manager.perform if @overflow_manager && queue.empty? - execute_plugin 'on_message_processed' - - if SignalHandler.terminated? - @logger.info { 'About to exit! Thanks for using Polipus' } - queue.commit - break - end - true - end - end - end - @workers_pool.each { |w| w.join } - @on_crawl_end.each { |e| e.call(self) } - execute_plugin 'on_crawl_end' - end - - # A pattern or an array of patterns can be passed as argument - # An url will be discarded if it doesn't match patterns - def follow_links_like(*patterns) - @follow_links_like = @follow_links_like += patterns.uniq.compact - self - end - - # A pattern or an array of patterns can be passed as argument - # An url will be discarded if it matches a pattern - def skip_links_like(*patterns) - @skip_links_like = @skip_links_like += patterns.uniq.compact - self - end - - # A block of code will be executed on every page downloaded - # The block takes the page as argument - def on_page_downloaded(&block) - @on_page_downloaded << block - self - end - - # A block of code will be executed when crawl session is over - def on_crawl_end(&block) - @on_crawl_end << block - self - end - - # A block of code will be executed on every page downloaded - # before being saved in the registered storage - def on_before_save(&block) - @on_before_save << block - self - end - - # A block of code will be executed whether a page contains an error - def on_page_error(&block) - @on_page_error << block - self - end - - # A block of code will be executed - # on every page downloaded. The code is used to extract urls to visit - # see links_for method - def focus_crawl(&block) - @focus_crawl_block = block - self - end - - def redis_options - @options[:redis_options] - end - - def queue_size - @internal_queue.size - end - - def stats_reset! - ["polipus:#{@job_name}:errors", "polipus:#{@job_name}:pages"].each { |e| redis.del e } - end - - def redis_factory(&block) - @redis_factory = block - self - end - - def url_tracker - @url_tracker ||= - @options[:url_tracker] ||= - UrlTracker.bloomfilter(key_name: "polipus_bf_#{job_name}", - redis: redis_factory_adapter, - driver: 'lua') - end - - def redis - @redis ||= redis_factory_adapter - end - - def add_to_queue(page) - if [:url, :referer, :depth].all? { |method| page.respond_to?(method) } - add_url(page.url, referer: page.referer, depth: page.depth) - else - add_url(page) - end - end - - # Enqueue an url, no matter what - def add_url(url, params = {}) - page = Page.new(url, params) - yield(page) if block_given? - @internal_queue << page.to_json - end - - # Request to Polipus to stop its work (gracefully) - # cler_queue = true if you want to delete all of the pending urls to visit - def stop!(cler_queue = false) - SignalHandler.terminate - @internal_queue.clear(true) if cler_queue - end - - private - - # URLs enqueue policy - def should_be_visited?(url, with_tracker = true) - case - # robots.txt - when !allowed_by_robot?(url) - false - # Check against whitelist pattern matching - when !@follow_links_like.empty? && @follow_links_like.none? { |p| url.path =~ p } - false - # Check against blacklist pattern matching - when @skip_links_like.any? { |p| url.path =~ p } - false - # Page is marked as expired - when page_expired?(Page.new(url)) - true - # Check against url tracker - when with_tracker && url_tracker.visited?(@options[:include_query_string_in_saved_page] ? url.to_s : url.to_s.gsub(/\?.*$/, '')) - false - else - true - end - end - - # It extracts URLs from the page - def links_for(page) - page.domain_aliases = domain_aliases - @focus_crawl_block.nil? ? page.links : @focus_crawl_block.call(page) - end - - # whether a page is expired or not - def page_expired?(page) - return false if @options[:ttl_page].nil? - stored_page = @storage.get(page) - r = stored_page && stored_page.expired?(@options[:ttl_page]) - @logger.debug { "Page #{page.url} marked as expired" } if r - r - end - - # whether a page exists or not - def page_exists?(page) - return false if page.user_data && page.user_data.p_seeded - @storage.exists?(page) && !page_expired?(page) - end - - # - # Returns +true+ if we are obeying robots.txt and the link - # is granted access in it. Always returns +true+ when we are - # not obeying robots.txt. - # - def allowed_by_robot?(link) - return true if @robots.nil? - @options[:obey_robots_txt] ? @robots.allowed?(link) : true - end - - # The url is enqueued for a later visit - def enqueue(url_to_visit, current_page, queue) - page_to_visit = Page.new(url_to_visit.to_s, referer: current_page.url.to_s, depth: current_page.depth + 1) - queue << page_to_visit.to_json - to_track = @options[:include_query_string_in_saved_page] ? url_to_visit.to_s : url_to_visit.to_s.gsub(/\?.*$/, '') - url_tracker.visit to_track - @logger.debug { "Added [#{url_to_visit}] to the queue" } - end - - # It creates a redis client - def redis_factory_adapter - if @redis_factory - @redis_factory.call(redis_options) - else - Redis.new(redis_options) - end - end - - # It creates a new distributed queue - def queue_factory - Redis::Queue.new("polipus_queue_#{@job_name}", "bp_polipus_queue_#{@job_name}", redis: redis_factory_adapter) - end - - # If stats enabled, it increments errors found - def incr_error - redis.incr "polipus:#{@job_name}:errors" if @options[:stats_enabled] - end - - # If stats enabled, it increments pages downloaded - def incr_pages - redis.incr "polipus:#{@job_name}:pages" if @options[:stats_enabled] - end - - # It handles the overflow item policy (if any) - def overflow_items_controller - @overflow_manager = QueueOverflow::Manager.new(self, queue_factory, @options[:queue_items_limit]) - - # In the time, url policy may change so policy is re-evaluated - @overflow_manager.url_filter do |page| - should_be_visited?(page.url, false) - end - - Thread.new do - - redis_lock = redis_factory_adapter - op_timeout = @options[:queue_overflow_manager_check_time] - - loop do - lock = redis_lock.setnx "polipus_queue_overflow-#{@job_name}.lock", 1 - - if lock - redis_lock.expire "polipus_queue_overflow-#{@job_name}.lock", op_timeout + 350 - removed, restored = @overflow_manager.perform - @logger.info { "Overflow Manager: items removed=#{removed}, items restored=#{restored}, items stored=#{queue_overflow_adapter.size}" } - redis_lock.del "polipus_queue_overflow-#{@job_name}.lock" - else - @logger.info { 'Lock not acquired' } - end - - sleep @options[:queue_overflow_manager_check_time] - end - end - end - - # It invokes a plugin method if any - def execute_plugin(method) - Polipus::Plugin.plugins.each do |k, p| - next unless p.respond_to?(method) - @logger.info { "Running plugin method #{method} on #{k}" } - ret_val = p.send(method, self) - instance_eval(&ret_val) if ret_val.kind_of? Proc - end - end - end end diff --git a/lib/polipus/polipus_crawler.rb b/lib/polipus/polipus_crawler.rb new file mode 100644 index 0000000..03cac48 --- /dev/null +++ b/lib/polipus/polipus_crawler.rb @@ -0,0 +1,383 @@ +module Polipus + class PolipusCrawler + OPTS = { + # run 4 threads + workers: 4, + # identify self as Polipus/VERSION + user_agent: "Polipus - #{Polipus::VERSION} - #{Polipus::HOMEPAGE}", + # by default, don't limit the depth of the crawl + depth_limit: false, + # number of times HTTP redirects will be followed + redirect_limit: 5, + # storage engine defaults to DevNull + storage: nil, + # proxy server hostname + proxy_host: nil, + # proxy server port number + proxy_port: false, + # HTTP read timeout in seconds + read_timeout: 30, + # HTTP open connection timeout in seconds + open_timeout: 10, + # Time to wait for new messages on Redis + # After this timeout, current crawling session is marked as terminated + queue_timeout: 30, + # An URL tracker instance. default is Bloomfilter based on redis + url_tracker: nil, + # A Redis options {} that will be passed directly to Redis.new + redis_options: {}, + # An instance of logger + logger: nil, + # A logger level + logger_level: nil, + # whether the query string should be included in the saved page + include_query_string_in_saved_page: true, + # Max number of items to keep on redis + queue_items_limit: 2_000_000, + # The adapter used to store exceed (queue_items_limit) redis items + queue_overflow_adapter: nil, + # Every x seconds, the main queue is checked for overflowed items + queue_overflow_manager_check_time: 60, + # If true, each page downloaded will increment a counter on redis + stats_enabled: false, + # Cookies strategy + cookie_jar: nil, + # whether or not accept cookies + accept_cookies: false, + # A set of hosts that should be considered parts of the same domain + # Eg It can be used to follow links with and without 'www' domain + domain_aliases: [], + # Mark a connection as staled after connection_max_hits request + connection_max_hits: nil, + # Page TTL: mark a page as expired after ttl_page seconds + ttl_page: nil, + # don't obey the robots exclusion protocol + obey_robots_txt: false, + # If true, signal handling strategy is enabled. + # INT and TERM signal will stop polipus gracefully + # Disable it if polipus will run as a part of Resque or DelayedJob-like system + enable_signal_handler: true + } + + attr_reader :storage + attr_reader :job_name + attr_reader :logger + attr_reader :options + attr_reader :crawler_name + attr_reader :overflow_manager + + OPTS.keys.each do |key| + define_method "#{key}=" do |value| + options[key.to_sym] = value + end + define_method "#{key}" do + options[key.to_sym] + end + end + + def initialize(job_name = 'polipus', urls = [], options = {}) + @job_name = job_name + @options = OPTS.merge(options) + @options[:queue_timeout] = 1 if @options[:queue_timeout] <= 0 + @logger = @options[:logger] ||= Logger.new(nil) + + unless @logger.class.to_s == 'Log4r::Logger' + @logger.level = @options[:logger_level] ||= Logger::INFO + end + + @storage = @options[:storage] ||= Storage.dev_null + + @follow_links_like = [] + @skip_links_like = [] + @on_page_downloaded = [] + @on_before_save = [] + @on_page_error = [] + @focus_crawl_block = nil + @on_crawl_end = [] + @redis_factory = nil + + @overflow_manager = nil + @crawler_name = `hostname`.strip + "-#{@job_name}" + + @storage.include_query_string_in_uuid = @options[:include_query_string_in_saved_page] + + @urls = [urls].flatten.map { |url| URI(url) } + @urls.each { |url| url.path = '/' if url.path.empty? } + @robots = Polipus::Robotex.new(@options[:user_agent]) if @options[:obey_robots_txt] + # Attach signal handling if enabled + SignalHandler.enable if @options[:enable_signal_handler] + execute_plugin 'on_initialize' + + yield self if block_given? + end + + def self.crawl(*args, &block) + new(*args, &block).takeover + end + + def takeover + overflow_items_controller if queue_overflow_adapter + + @urls.each do |u| + add_url(u) { |page| page.user_data.p_seeded = true } + end + return if queue.empty? + + execute_plugin 'on_crawl_start' + options[:workers] + .times + .map do |worker_number| + Thread.new do + @logger.debug { "Start worker #{worker_number}" } + Worker.run(self, worker_number) + end + end + .each { |worker| worker.join } + @on_crawl_end.each { |e| e.call(self) } + execute_plugin 'on_crawl_end' + end + + # A pattern or an array of patterns can be passed as argument + # An url will be discarded if it doesn't match patterns + def follow_links_like(*patterns) + @follow_links_like = @follow_links_like += patterns.uniq.compact + self + end + + # A pattern or an array of patterns can be passed as argument + # An url will be discarded if it matches a pattern + def skip_links_like(*patterns) + @skip_links_like = @skip_links_like += patterns.uniq.compact + self + end + + # A block of code will be executed on every page downloaded + # The block takes the page as argument + def on_page_downloaded(&block) + @on_page_downloaded << block + self + end + + # A block of code will be executed when crawl session is over + def on_crawl_end(&block) + @on_crawl_end << block + self + end + + # A block of code will be executed on every page downloaded + # before being saved in the registered storage + def on_before_save(&block) + @on_before_save << block + self + end + + # A block of code will be executed whether a page contains an error + def on_page_error(&block) + @on_page_error << block + self + end + + # A block of code will be executed + # on every page downloaded. The code is used to extract urls to visit + # see links_for method + def focus_crawl(&block) + @focus_crawl_block = block + self + end + + def on_page_downloaded_blocks + @on_page_downloaded + end + + def on_before_save_blocks + @on_before_save + end + + def on_page_error_blocks + @on_page_error + end + + def redis_options + options[:redis_options] + end + + def queue + @queue ||= queue_factory + end + + def queue_size + queue.size + end + + def stats_reset! + ["polipus:#{@job_name}:errors", "polipus:#{@job_name}:pages"].each { |e| redis.del e } + end + + def redis_factory(&block) + @redis_factory = block + self + end + + def url_tracker + @url_tracker ||= + options[:url_tracker] ||= + UrlTracker.bloomfilter(key_name: "polipus_bf_#{job_name}", + redis: redis_factory_adapter, + driver: 'lua') + end + + def redis + @redis ||= redis_factory_adapter + end + + def add_to_queue(page) + if [:url, :referer, :depth].all? { |method| page.respond_to?(method) } + add_url(page.url, referer: page.referer, depth: page.depth) + else + add_url(page) + end + end + + # Enqueue an url, no matter what + def add_url(url, params = {}) + page = Page.new(url, params) + yield(page) if block_given? + queue << page.to_json + end + + # Request to Polipus to stop its work (gracefully) + # clear_queue = true if you want to delete all of the pending urls to visit + def stop!(clear_queue = false) + SignalHandler.terminate + queue.clear(true) if clear_queue + end + + # URLs enqueue policy + def should_be_visited?(url, with_tracker = true) + case + # robots.txt + when !allowed_by_robot?(url) + false + # Check against whitelist pattern matching + when !@follow_links_like.empty? && @follow_links_like.none? { |p| url.path =~ p } + false + # Check against blacklist pattern matching + when @skip_links_like.any? { |p| url.path =~ p } + false + # Page is marked as expired + when page_expired?(Page.new(url)) + true + # Check against url tracker + when with_tracker && url_tracker.visited?(options[:include_query_string_in_saved_page] ? url.to_s : url.to_s.gsub(/\?.*$/, '')) + false + else + true + end + end + + # It extracts URLs from the page + def links_for(page) + page.domain_aliases = domain_aliases + @focus_crawl_block.nil? ? page.links : @focus_crawl_block.call(page) + end + + # whether a page is expired or not + def page_expired?(page) + return false if options[:ttl_page].nil? + stored_page = @storage.get(page) + r = stored_page && stored_page.expired?(options[:ttl_page]) + @logger.debug { "Page #{page.url} marked as expired" } if r + r + end + + # whether a page exists or not + def page_exists?(page) + return false if page.user_data && page.user_data.p_seeded + @storage.exists?(page) && !page_expired?(page) + end + + # + # Returns +true+ if we are obeying robots.txt and the link + # is granted access in it. Always returns +true+ when we are + # not obeying robots.txt. + # + def allowed_by_robot?(link) + return true if @robots.nil? + options[:obey_robots_txt] ? @robots.allowed?(link) : true + end + + # The url is enqueued for a later visit + def enqueue(url_to_visit, current_page, queue) + page_to_visit = Page.new(url_to_visit.to_s, referer: current_page.url.to_s, depth: current_page.depth + 1) + queue << page_to_visit.to_json + to_track = options[:include_query_string_in_saved_page] ? url_to_visit.to_s : url_to_visit.to_s.gsub(/\?.*$/, '') + url_tracker.visit to_track + @logger.debug { "Added [#{url_to_visit}] to the queue" } + end + + # It creates a redis client + def redis_factory_adapter + if @redis_factory + @redis_factory.call(redis_options) + else + Redis.new(redis_options) + end + end + + # It creates a new distributed queue + def queue_factory + Redis::Queue.new("polipus_queue_#{@job_name}", "bp_polipus_queue_#{@job_name}", redis: redis_factory_adapter) + end + + # If stats enabled, it increments errors found + def incr_error + redis.incr "polipus:#{@job_name}:errors" if options[:stats_enabled] + end + + # If stats enabled, it increments pages downloaded + def incr_pages + redis.incr "polipus:#{@job_name}:pages" if options[:stats_enabled] + end + + # It handles the overflow item policy (if any) + def overflow_items_controller + @overflow_manager = QueueOverflow::Manager.new(self, queue_factory, options[:queue_items_limit]) + + # In the time, url policy may change so policy is re-evaluated + @overflow_manager.url_filter do |page| + should_be_visited?(page.url, false) + end + + Thread.new do + + redis_lock = redis_factory_adapter + op_timeout = options[:queue_overflow_manager_check_time] + + loop do + lock = redis_lock.setnx "polipus_queue_overflow-#{@job_name}.lock", 1 + + if lock + redis_lock.expire "polipus_queue_overflow-#{@job_name}.lock", op_timeout + 350 + removed, restored = @overflow_manager.perform + @logger.info { "Overflow Manager: items removed=#{removed}, items restored=#{restored}, items stored=#{queue_overflow_adapter.size}" } + redis_lock.del "polipus_queue_overflow-#{@job_name}.lock" + else + @logger.info { 'Lock not acquired' } + end + + sleep options[:queue_overflow_manager_check_time] + end + end + end + + # It invokes a plugin method if any + def execute_plugin(method) + Polipus::Plugin.plugins.each do |k, p| + next unless p.respond_to?(method) + @logger.info { "Running plugin method #{method} on #{k}" } + ret_val = p.send(method, self) + instance_eval(&ret_val) if ret_val.kind_of? Proc + end + end + end +end diff --git a/lib/polipus/worker.rb b/lib/polipus/worker.rb new file mode 100644 index 0000000..9920fc6 --- /dev/null +++ b/lib/polipus/worker.rb @@ -0,0 +1,148 @@ +module Polipus + class Worker + extend Forwardable + + attr_reader :crawler, :worker_number + + class << self + def run(*args) + new(*args).run + end + end + + def initialize(crawler, worker_number = nil) + @crawler = crawler + @worker_number = worker_number + end + + def_delegators :crawler, + :enqueue, + :execute_plugin, + :incr_error, + :incr_pages, + :links_for, + :logger, + :on_before_save_blocks, + :on_page_downloaded_blocks, + :on_page_error_blocks, + :options, + :overflow_manager, + :queue, + :should_be_visited?, + :storage + + def run + queue.process(false, options[:queue_timeout]) do |message| + next if message.nil? + + execute_plugin('on_message_received') + process(Page.from_json(message)) + execute_plugin('on_message_processed') + + manage_queue + + if SignalHandler.terminated? + logger.info { 'About to exit! Thanks for using Polipus' } + queue.commit + break + end + true + end + end + + def process(page) + return if shall_not_be_visited?(page) + return if page_exists?(page) + + logger.debug { "[worker ##{worker_number}] Fetching page: [#{page.url}] Referer: #{page.referer} Depth: #{page.depth}" } + + execute_plugin('on_before_download') + page = fetch(page) + return unless page + execute_plugin('on_after_download') + + check_for_error(page) + store(page) + + logger.debug { "[worker ##{worker_number}] Fetched page: [#{page.url}] Referrer: [#{page.referer}] Depth: [#{page.depth}] Code: [#{page.code}] Response Time: [#{page.response_time}]" } + logger.info { "[worker ##{worker_number}] Page (#{page.url}) downloaded" } + + incr_pages + + on_downloaded(page) + collect_links(page) + end + + def http + @http ||= HTTP.new(options) + end + + def shall_not_be_visited?(page) + if should_be_visited?(page.url, false) + false + else + logger.info { "[worker ##{worker_number}] Page (#{page.url}) is no more welcome." } + true + end + end + + def page_exists?(page) + if crawler.page_exists?(page) + logger.info { "[worker ##{worker_number}] Page (#{page.url}) already stored." } + true + else + false + end + end + + def fetch(page) + pages = http.fetch_pages(page.url, page.referer, page.depth) + if pages.count > 1 + rurls = pages.map { |e| e.url.to_s }.join(' --> ') + logger.info { "Got redirects! #{rurls}" } + page = pages.pop + page.aliases = pages.map { |e| e.url } + + page_exists?(page) ? nil : page + else + pages.last + end + end + + def check_for_error(page) + if page.error + logger.warn { "Page #{page.url} has error: #{page.error}" } + incr_error + on_page_error_blocks.each { |e| e.call(page) } + end + end + + def store(page) + # Execute on_before_save blocks + on_before_save_blocks.each { |e| e.call(page) } + + page.storable? && storage.add(page) + end + + # Execute on_page_downloaded blocks + def on_downloaded(page) + on_page_downloaded_blocks.each { |e| e.call(page) } + end + + def collect_links(page) + if options[:depth_limit] == false || options[:depth_limit] > page.depth + links_for(page).each do |url_to_visit| + next unless should_be_visited?(url_to_visit) + enqueue url_to_visit, page, queue + end + else + logger.info { "[worker ##{worker_number}] Depth limit reached #{page.depth}" } + end + end + + def manage_queue + logger.debug { "[worker ##{worker_number}] Queue size: #{queue.size}" } + overflow_manager.perform if overflow_manager && queue.empty? + end + end +end