From 62a835e8e986500f8bb60d972d1807a2b990057a Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Sat, 7 Jan 2017 14:17:56 +0800 Subject: [PATCH] Allow to disable resource cache --- .../org/pentaho/di/cluster/ServerCache.java | 18 ++++++--- .../org/pentaho/di/www/CarteSingleton.java | 40 ++++++++++++++----- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/cluster/ServerCache.java b/pentaho-kettle/src/main/java/org/pentaho/di/cluster/ServerCache.java index 1ac6762..7ccb152 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/cluster/ServerCache.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/cluster/ServerCache.java @@ -39,10 +39,12 @@ * @author Zhichun Wu */ public final class ServerCache { + public static final boolean RESOURCE_CACHE_DISABLED = "Y".equalsIgnoreCase( + System.getProperty("KETTLE_RESOURCE_CACHE_DISABLED", "N")); public static final int RESOURCE_CACHE_SIZE - = Integer.parseInt(System.getProperty("RESOURCE_CACHE_SIZE", "500")); + = Integer.parseInt(System.getProperty("KETTLE_RESOURCE_CACHE_SIZE", "100")); public static final int RESOURCE_EXPIRATION_MINUTE - = Integer.parseInt(System.getProperty("RESOURCE_EXPIRATION_MINUTE", "780")); + = Integer.parseInt(System.getProperty("KETTLE_RESOURCE_EXPIRATION_MINUTE", "1800")); public static final String PARAM_ETL_JOB_ID = System.getProperty("KETTLE_JOB_ID_KEY", "ETL_CALLER"); // On master node, it's for name -> revision + md5; on slave server, it's name -> md5 @@ -61,9 +63,13 @@ private static void logBasic(SlaveServer server, String message) { } } - public static String buildResourceName(AbstractMeta meta, Map params, SlaveServer server) { + private static String buildResourceName(AbstractMeta meta, Map params, SlaveServer server) { StringBuilder sb = new StringBuilder(); + if (RESOURCE_CACHE_DISABLED) { + return sb.toString(); + } + // in case this is triggered by a Quartz Job String jobId = params == null ? null : params.get(PARAM_ETL_JOB_ID); if (Strings.isNullOrEmpty(jobId)) { @@ -98,7 +104,7 @@ public static String buildResourceName(AbstractMeta meta, Map pa * @return */ public static String getCachedIdentity(String resourceName) { - return resourceCache.getIfPresent(resourceName); + return RESOURCE_CACHE_DISABLED ? null : resourceCache.getIfPresent(resourceName); } public static String getCachedIdentity(AbstractMeta meta, Map params, SlaveServer server) { @@ -146,7 +152,9 @@ public static String getCachedIdentity(AbstractMeta meta, Map pa * @param identity identity */ public static void cacheIdentity(String resourceName, String identity) { - resourceCache.put(resourceName, identity); + if (!RESOURCE_CACHE_DISABLED) { + resourceCache.put(resourceName, identity); + } } public static void cacheIdentity(AbstractMeta meta, Map params, SlaveServer server, String identity) { diff --git a/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java b/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java index 7a2abb0..a39ebe6 100644 --- a/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java +++ b/pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java @@ -22,6 +22,7 @@ package org.pentaho.di.www; +import org.pentaho.di.cluster.ServerCache; import org.pentaho.di.cluster.SlaveServer; import org.pentaho.di.core.Const; import org.pentaho.di.core.KettleEnvironment; @@ -126,6 +127,15 @@ public static void installPurgeTimer(final SlaveServerConfig config, final LogCh objectTimeout = 24 * 60; // 1440 : default is a one day time-out } + if (!ServerCache.RESOURCE_CACHE_DISABLED && objectTimeout <= ServerCache.RESOURCE_EXPIRATION_MINUTE) { + log.logBasic(new StringBuilder().append("You may want to increase ") + .append(Const.KETTLE_CARTE_OBJECT_TIMEOUT_MINUTES).append(" from ") + .append(objectTimeout).append(" minutes to ") + .append(ServerCache.RESOURCE_EXPIRATION_MINUTE + 1) + .append(" to fully utilize resource cache.").toString()); + } + + // If we need to time out finished or idle objects, we should create a timer // in the background to clean // @@ -146,28 +156,30 @@ public void run() { // for (CarteObjectEntry entry : transformationMap.getTransformationObjects()) { Trans trans = transformationMap.getTransformation(entry); - + Date logDate = trans.getLogDate(); // See if the transformation is finished or stopped. // - if (trans != null && (trans.isFinished() || trans.isStopped()) && trans.getLogDate() != null) { + if (trans != null && (trans.isFinished() || trans.isStopped()) && logDate != null) { // check the last log time // int diffInMinutes = - (int) Math.floor((System.currentTimeMillis() - trans.getLogDate().getTime()) / 60000); + (int) Math.floor((System.currentTimeMillis() - logDate.getTime()) / 60000); if (diffInMinutes >= objectTimeout) { + String logChannelId = trans.getLogChannelId(); + // Let's remove this from the transformation map... // transformationMap.removeTransformation(entry); // Remove the logging information from the log registry & central log store // - LoggingRegistry.getInstance().removeIncludingChildren(trans.getLogChannelId()); - KettleLogStore.discardLines(trans.getLogChannelId(), false); + LoggingRegistry.getInstance().removeIncludingChildren(logChannelId); + KettleLogStore.discardLines(logChannelId, false); // transformationMap.deallocateServerSocketPorts(entry); log.logMinimal("Cleaned up transformation " - + entry.getName() + " with id " + entry.getId() + " from " + trans.getLogDate() + + entry.getName() + " with id " + entry.getId() + " from " + logDate + ", diff=" + diffInMinutes); } } @@ -177,20 +189,28 @@ public void run() { // for (CarteObjectEntry entry : jobMap.getJobObjects()) { Job job = jobMap.getJob(entry); - + Date logDate = job.getLogDate(); // See if the job is finished or stopped. // - if (job != null && (job.isFinished() || job.isStopped()) && job.getLogDate() != null) { + if (job != null && (job.isFinished() || job.isStopped()) && logDate != null) { // check the last log time // int diffInMinutes = - (int) Math.floor((System.currentTimeMillis() - job.getLogDate().getTime()) / 60000); + (int) Math.floor((System.currentTimeMillis() - logDate.getTime()) / 60000); if (diffInMinutes >= objectTimeout) { + String logChannelId = job.getLogChannelId(); + // Let's remove this from the job map... // jobMap.removeJob(entry); + + // Remove the logging information from the log registry & central log store + // + LoggingRegistry.getInstance().removeIncludingChildren(logChannelId); + KettleLogStore.discardLines(logChannelId, false); + log.logMinimal("Cleaned up job " - + entry.getName() + " with id " + entry.getId() + " from " + job.getLogDate()); + + entry.getName() + " with id " + entry.getId() + " from " + logDate); } } }