From fda0b3a37a821c6d609d343522bcbbeb43dc8da3 Mon Sep 17 00:00:00 2001 From: Maimur Hasan Date: Mon, 20 Nov 2023 12:35:12 +0530 Subject: [PATCH 1/4] Enhancement: Selenium Grid Capability Signed-off-by: Maimur Hasan --- core/pom.xml | 2 + .../protocol/selenium/SeleniumGridImpl.java | 118 +++++++++++ .../selenium/SeleniumGridProtocol.java | 194 ++++++++++++++++++ .../archetype-resources/crawler-conf.yaml | 3 + .../archetype-resources/crawler-conf.yaml | 3 + 5 files changed, 320 insertions(+) create mode 100644 core/src/main/java/com/digitalpebble/stormcrawler/protocol/selenium/SeleniumGridImpl.java create mode 100644 core/src/main/java/com/digitalpebble/stormcrawler/protocol/selenium/SeleniumGridProtocol.java diff --git a/core/pom.xml b/core/pom.xml index 6559e1013..7061ba5b1 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -34,6 +34,7 @@ 3.1.8 0.3.7 4.2.0 + 2.8.7 @@ -51,6 +52,7 @@ org.apache.maven.plugins maven-jar-plugin + 3.2.2 attach-test diff --git a/core/src/main/java/com/digitalpebble/stormcrawler/protocol/selenium/SeleniumGridImpl.java b/core/src/main/java/com/digitalpebble/stormcrawler/protocol/selenium/SeleniumGridImpl.java new file mode 100644 index 000000000..641c71dd3 --- /dev/null +++ b/core/src/main/java/com/digitalpebble/stormcrawler/protocol/selenium/SeleniumGridImpl.java @@ -0,0 +1,118 @@ +/** + * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. + * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy of the + * License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.digitalpebble.stormcrawler.protocol.selenium; + +import com.digitalpebble.stormcrawler.util.ConfUtils; +import java.net.URL; +import java.time.Duration; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import org.apache.storm.Config; +import org.openqa.selenium.Dimension; +import org.openqa.selenium.WebDriver; +import org.openqa.selenium.remote.DesiredCapabilities; +import org.openqa.selenium.remote.RemoteWebDriver; + +public class SeleniumGridImpl extends SeleniumGridProtocol { + + private TimerTask timerTask; + private Timer updateQueue; + private int noOfWorkers = 0; + private String gridAddress; + private final DesiredCapabilities capabilities = new DesiredCapabilities(); + + @Override + public void configure(Config conf) { + super.configure(conf); + noOfWorkers = ConfUtils.getInt(conf, "topology.workers", 2); + gridAddress = super.gridAddress; + capabilities.setBrowserName( + ConfUtils.getString(conf, "selenium.capabilities.browserName", "chrome")); + capabilities.setCapability("newSessionWaitTimeout", 600); + capabilities.setCapability("browserTimeout", 600); + updateQueue = new Timer(); + updateQueueOfBrowsers(); + } + + protected void updateQueueOfBrowsers() { + timerTask = + new TimerTask() { + @Override + public void run() { + try { + List list = getAllNodesList(); + LOG.info("Blocking Queue size: " + driversQueue.size()); + while (getSessionsCount(list) > 0) { + int size = list.size(); + // Check if the queue size is more than the actual + // no of browsers allowed per worker + // means crawler services are idle because all drivers are in queue + if (driversQueue.size() >= (size / noOfWorkers)) { + SeleniumGridProtocol.Holder holder = driversQueue.take(); + long totalTime = System.currentTimeMillis() - holder.getTime(); + // clear the queue if the total time spent in the queue is more + // than 4.5 minutes + // As after 5 mintues the driver would be idle and will throw + // exception + // while we try to use that driver for fetching + if (totalTime > 1000 * 4.5 * 60) { + driversQueue.clear(); + } + } + // so that browsers get equally divided among workers + if (driversQueue.size() <= size / noOfWorkers) { + RemoteWebDriver driver = getDriverFromNode(); + if (driver != null) { + driversQueue.put( + new SeleniumGridProtocol.Holder( + driver, System.currentTimeMillis())); + LOG.info( + "Placed driver in blocking queue: " + + driversQueue.size()); + } + } + list = getAllNodesList(); + } + } catch (Exception e) { + LOG.error( + "Exception while running task for adding driver to the queue", + e); + } + } + }; + // update the queue every 5 minutes + updateQueue.schedule(timerTask, 0 * 60 * 1000, 5 * 60 * 1000); + } + + protected RemoteWebDriver getDriverFromNode() { + int sessionCount = 0; + RemoteWebDriver driver = null; + + try { + LOG.debug("Adding new driver from " + gridAddress); + driver = new RemoteWebDriver(new URL(gridAddress), capabilities); + WebDriver.Timeouts touts = driver.manage().timeouts(); + WebDriver.Window window = driver.manage().window(); + touts.implicitlyWait(Duration.ofSeconds(6)); + touts.pageLoadTimeout(Duration.ofSeconds(60)); + touts.scriptTimeout(Duration.ofSeconds(30)); + window.setSize(new Dimension(1980, 1280)); + LOG.debug("Inside getDriverFromGrid to set web drivers" + driver.hashCode()); + } catch (Exception e) { + } + return driver; + } +} diff --git a/core/src/main/java/com/digitalpebble/stormcrawler/protocol/selenium/SeleniumGridProtocol.java b/core/src/main/java/com/digitalpebble/stormcrawler/protocol/selenium/SeleniumGridProtocol.java new file mode 100644 index 000000000..3185c7aaf --- /dev/null +++ b/core/src/main/java/com/digitalpebble/stormcrawler/protocol/selenium/SeleniumGridProtocol.java @@ -0,0 +1,194 @@ +/** + * Licensed to DigitalPebble Ltd under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. + * DigitalPebble licenses this file to You under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. You may obtain a copy of the + * License at + * + *

http://www.apache.org/licenses/LICENSE-2.0 + * + *

Unless required by applicable law or agreed to in writing, software distributed under the + * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.digitalpebble.stormcrawler.protocol.selenium; + +import com.digitalpebble.stormcrawler.Metadata; +import com.digitalpebble.stormcrawler.protocol.AbstractHttpProtocol; +import com.digitalpebble.stormcrawler.protocol.HttpHeaders; +import com.digitalpebble.stormcrawler.protocol.ProtocolResponse; +import com.digitalpebble.stormcrawler.util.ConfUtils; +import com.google.gson.Gson; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.net.URL; +import java.util.List; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import org.apache.storm.Config; +import org.openqa.selenium.remote.DesiredCapabilities; +import org.openqa.selenium.remote.RemoteWebDriver; +import org.slf4j.LoggerFactory; + +public abstract class SeleniumGridProtocol extends AbstractHttpProtocol { + protected static final org.slf4j.Logger LOG = + LoggerFactory.getLogger(SeleniumGridProtocol.class); + protected static LinkedBlockingQueue driversQueue; + private NavigationFilters filters; + private final DesiredCapabilities capabilities = new DesiredCapabilities(); + + protected String gridAddress; + + @Override + public void configure(Config conf) { + super.configure(conf); + driversQueue = new LinkedBlockingQueue<>(); + filters = NavigationFilters.fromConf(conf); + gridAddress = + ConfUtils.getString(conf, "selenium.grid.address", "http://localhost:4444/wd/hub"); + } + + protected synchronized List> getAllNodesList() throws IOException { + Map valueMap = null; + boolean ready = false; + while (!ready) { + Map map = getStatusStream(); + valueMap = (Map) map.get("value"); + ready = (boolean) valueMap.get("ready"); + if (!ready) { + LOG.warn("Selenium Grid is not ready yet"); + } + } + LOG.info("Grid Is Ready to Serve"); + return (List>) valueMap.get("nodes"); + } + + private Map getStatusStream() throws IOException { + Gson gson = new Gson(); + URL url = new URL(gridAddress + "/status"); + InputStream stream = url.openStream(); + Reader reader = new InputStreamReader(stream); + Map map = gson.fromJson(reader, Map.class); + stream.close(); + return map; + } + + protected int getSessionsCount(List> nodes) { + int availableSessions = 0; + for (Map node : nodes) { + List> slots = (List>) node.get("slots"); + for (Map slot : slots) { + if (slot.get("session") == null) { + availableSessions++; + } + } + } + return availableSessions; + } + + public class Holder { + public RemoteWebDriver driver; + public Long time; + + public void setDriver(RemoteWebDriver driver) { + this.driver = driver; + } + + public void setTime(Long time) { + this.time = time; + } + + public RemoteWebDriver getDriver() { + return this.driver; + } + + public Long getTime() { + return this.time; + } + + public Holder(RemoteWebDriver driver, Long time) { + this.driver = driver; + this.time = time; + } + } + + public ProtocolResponse getProtocolOutput(String url, Metadata metadata) throws Exception { + RemoteWebDriver driver; + while ((driver = getDriver()) == null) {} + try { + // This will block for the page load and any + // associated AJAX requests + driver.get(url); + + String u = driver.getCurrentUrl(); + + // call the filters + ProtocolResponse response = filters.filter(driver, metadata); + if (response != null) { + return response; + } + + // if the URL is different then we must have hit a redirection + if (!u.equalsIgnoreCase(url)) { + byte[] content = new byte[] {}; + Metadata m = new Metadata(); + m.addValue(HttpHeaders.LOCATION, u); + return new ProtocolResponse(content, 307, m); + } + + // if no filters got triggered + byte[] content = driver.getPageSource().getBytes(); + return new ProtocolResponse(content, 200, new Metadata()); + + } catch (Exception e) { + if (e.getMessage() != null) { + if ((e.getMessage().contains("ERR_NAME_NOT_RESOLVED") + || e.getMessage().contains("ERR_CONNECTION_REFUSED") + || e.getMessage().contains("ERR_CONNECTION_CLOSED") + || e.getMessage().contains("ERR_SSL_PROTOCOL_ERROR") + || e.getMessage().contains("ERR_CONNECTION_RESET") + || e.getMessage().contains("ERR_SSL_VERSION_OR_CIPHER_MISMATCH") + || e.getMessage().contains("ERR_ADDRESS_UNREACHABLE"))) { + LOG.info( + "Exception is of webpage related hence continuing with the driver and adding it back to queue"); + } else { + LOG.error( + "Exception wile doing operation via driver url {} with driver hashcode {}" + + "with excepiton {}", + url, + driver.hashCode(), + e); + closeConnectionGracefully(driver); + driver = null; + } + } + throw new Exception(e); + } finally { + // finished with this driver - return it to the queue + if (driver != null) driversQueue.put(new Holder(driver, System.currentTimeMillis())); + } + } + + private final RemoteWebDriver getDriver() { + try { + return driversQueue.take().getDriver(); + } catch (Exception e) { + return null; + } + } + + private void closeConnectionGracefully(RemoteWebDriver driver) { + try { + LOG.info("Before disposing driver : {}", driver.hashCode()); + + if (driver.getSessionId() != null) { + if (driver.getSessionId().toString() != null) driver.quit(); + } + } catch (Exception e) { + LOG.info("Error while closing driver", e); + } + } +} diff --git a/external/elasticsearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml b/external/elasticsearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml index 4045dbe26..10c7295ad 100644 --- a/external/elasticsearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml +++ b/external/elasticsearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml @@ -46,6 +46,9 @@ config: http.protocol.implementation: "com.digitalpebble.stormcrawler.protocol.okhttp.HttpProtocol" https.protocol.implementation: "com.digitalpebble.stormcrawler.protocol.okhttp.HttpProtocol" + # Please define this for selenium grid configuration + #selenium.grid.address: http://localhost:4444/wd/hub + # The maximum number of bytes for returned HTTP response bodies. # The fetched page will be trimmed to 65KB in this case # Set -1 to disable the limit. diff --git a/external/opensearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml b/external/opensearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml index 16a357130..e9f4937ba 100644 --- a/external/opensearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml +++ b/external/opensearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml @@ -46,6 +46,9 @@ config: http.protocol.implementation: "com.digitalpebble.stormcrawler.protocol.okhttp.HttpProtocol" https.protocol.implementation: "com.digitalpebble.stormcrawler.protocol.okhttp.HttpProtocol" + # Please define this for selenium grid configuration + #selenium.grid.address: http://localhost:4444/wd/hub + # The maximum number of bytes for returned HTTP response bodies. # The fetched page will be trimmed to 65KB in this case # Set -1 to disable the limit. From c3bf9164e55c6ad8506bd52d411133176d09723b Mon Sep 17 00:00:00 2001 From: Maimur Hasan Date: Mon, 20 Nov 2023 12:56:26 +0530 Subject: [PATCH 2/4] Enhancement: Selenium Grid Capability Signed-off-by: Maimur Hasan --- external/urlfrontier/pom.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/external/urlfrontier/pom.xml b/external/urlfrontier/pom.xml index 1f46de6aa..dc6ee9f96 100644 --- a/external/urlfrontier/pom.xml +++ b/external/urlfrontier/pom.xml @@ -24,6 +24,7 @@ maven-surefire-plugin + 2.22.2 default-test From 0ba03f02d4cce84cd1e572cb3d74e15d2b52438b Mon Sep 17 00:00:00 2001 From: Maimur Hasan Date: Mon, 20 Nov 2023 13:11:47 +0530 Subject: [PATCH 3/4] Enhancement: Selenium Grid Capability, missed dependency Signed-off-by: Maimur Hasan --- core/pom.xml | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/pom.xml b/core/pom.xml index 7061ba5b1..e9be0138c 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -263,7 +263,11 @@ ${selenium.version} test - + + com.google.code.gson + gson + ${gson.version} + From e46d9732d016c2d0a459594d0315c581edb9d4d0 Mon Sep 17 00:00:00 2001 From: Maimur Hasan Date: Mon, 20 Nov 2023 23:23:21 +0530 Subject: [PATCH 4/4] Enhancement: Removing config from opensearch Signed-off-by: Maimur Hasan --- .../src/main/resources/archetype-resources/crawler-conf.yaml | 2 +- .../src/main/resources/archetype-resources/crawler-conf.yaml | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/external/elasticsearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml b/external/elasticsearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml index 10c7295ad..7d3c6090c 100644 --- a/external/elasticsearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml +++ b/external/elasticsearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml @@ -47,7 +47,7 @@ config: https.protocol.implementation: "com.digitalpebble.stormcrawler.protocol.okhttp.HttpProtocol" # Please define this for selenium grid configuration - #selenium.grid.address: http://localhost:4444/wd/hub + selenium.grid.address: http://localhost:4444 # The maximum number of bytes for returned HTTP response bodies. # The fetched page will be trimmed to 65KB in this case diff --git a/external/opensearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml b/external/opensearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml index e9f4937ba..16a357130 100644 --- a/external/opensearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml +++ b/external/opensearch/archetype/src/main/resources/archetype-resources/crawler-conf.yaml @@ -46,9 +46,6 @@ config: http.protocol.implementation: "com.digitalpebble.stormcrawler.protocol.okhttp.HttpProtocol" https.protocol.implementation: "com.digitalpebble.stormcrawler.protocol.okhttp.HttpProtocol" - # Please define this for selenium grid configuration - #selenium.grid.address: http://localhost:4444/wd/hub - # The maximum number of bytes for returned HTTP response bodies. # The fetched page will be trimmed to 65KB in this case # Set -1 to disable the limit.