Skip to content

Commit

Permalink
Allow to disable resource cache
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jan 7, 2017
1 parent 9f3f71b commit 62a835e
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@
* @author Zhichun Wu
*/
public final class ServerCache {
public static final boolean RESOURCE_CACHE_DISABLED = "Y".equalsIgnoreCase(
System.getProperty("KETTLE_RESOURCE_CACHE_DISABLED", "N"));
public static final int RESOURCE_CACHE_SIZE
= Integer.parseInt(System.getProperty("RESOURCE_CACHE_SIZE", "500"));
= Integer.parseInt(System.getProperty("KETTLE_RESOURCE_CACHE_SIZE", "100"));
public static final int RESOURCE_EXPIRATION_MINUTE
= Integer.parseInt(System.getProperty("RESOURCE_EXPIRATION_MINUTE", "780"));
= Integer.parseInt(System.getProperty("KETTLE_RESOURCE_EXPIRATION_MINUTE", "1800"));
public static final String PARAM_ETL_JOB_ID = System.getProperty("KETTLE_JOB_ID_KEY", "ETL_CALLER");

// On master node, it's for name -> revision + md5; on slave server, it's name -> md5
Expand All @@ -61,9 +63,13 @@ private static void logBasic(SlaveServer server, String message) {
}
}

public static String buildResourceName(AbstractMeta meta, Map<String, String> params, SlaveServer server) {
private static String buildResourceName(AbstractMeta meta, Map<String, String> params, SlaveServer server) {
StringBuilder sb = new StringBuilder();

if (RESOURCE_CACHE_DISABLED) {
return sb.toString();
}

// in case this is triggered by a Quartz Job
String jobId = params == null ? null : params.get(PARAM_ETL_JOB_ID);
if (Strings.isNullOrEmpty(jobId)) {
Expand Down Expand Up @@ -98,7 +104,7 @@ public static String buildResourceName(AbstractMeta meta, Map<String, String> pa
* @return
*/
public static String getCachedIdentity(String resourceName) {
return resourceCache.getIfPresent(resourceName);
return RESOURCE_CACHE_DISABLED ? null : resourceCache.getIfPresent(resourceName);
}

public static String getCachedIdentity(AbstractMeta meta, Map<String, String> params, SlaveServer server) {
Expand Down Expand Up @@ -146,7 +152,9 @@ public static String getCachedIdentity(AbstractMeta meta, Map<String, String> pa
* @param identity identity
*/
public static void cacheIdentity(String resourceName, String identity) {
resourceCache.put(resourceName, identity);
if (!RESOURCE_CACHE_DISABLED) {
resourceCache.put(resourceName, identity);
}
}

public static void cacheIdentity(AbstractMeta meta, Map<String, String> params, SlaveServer server, String identity) {
Expand Down
40 changes: 30 additions & 10 deletions pentaho-kettle/src/main/java/org/pentaho/di/www/CarteSingleton.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

package org.pentaho.di.www;

import org.pentaho.di.cluster.ServerCache;
import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.KettleEnvironment;
Expand Down Expand Up @@ -126,6 +127,15 @@ public static void installPurgeTimer(final SlaveServerConfig config, final LogCh
objectTimeout = 24 * 60; // 1440 : default is a one day time-out
}

if (!ServerCache.RESOURCE_CACHE_DISABLED && objectTimeout <= ServerCache.RESOURCE_EXPIRATION_MINUTE) {
log.logBasic(new StringBuilder().append("You may want to increase ")
.append(Const.KETTLE_CARTE_OBJECT_TIMEOUT_MINUTES).append(" from ")
.append(objectTimeout).append(" minutes to ")
.append(ServerCache.RESOURCE_EXPIRATION_MINUTE + 1)
.append(" to fully utilize resource cache.").toString());
}


// If we need to time out finished or idle objects, we should create a timer
// in the background to clean
//
Expand All @@ -146,28 +156,30 @@ public void run() {
//
for (CarteObjectEntry entry : transformationMap.getTransformationObjects()) {
Trans trans = transformationMap.getTransformation(entry);

Date logDate = trans.getLogDate();
// See if the transformation is finished or stopped.
//
if (trans != null && (trans.isFinished() || trans.isStopped()) && trans.getLogDate() != null) {
if (trans != null && (trans.isFinished() || trans.isStopped()) && logDate != null) {
// check the last log time
//
int diffInMinutes =
(int) Math.floor((System.currentTimeMillis() - trans.getLogDate().getTime()) / 60000);
(int) Math.floor((System.currentTimeMillis() - logDate.getTime()) / 60000);
if (diffInMinutes >= objectTimeout) {
String logChannelId = trans.getLogChannelId();

// Let's remove this from the transformation map...
//
transformationMap.removeTransformation(entry);

// Remove the logging information from the log registry & central log store
//
LoggingRegistry.getInstance().removeIncludingChildren(trans.getLogChannelId());
KettleLogStore.discardLines(trans.getLogChannelId(), false);
LoggingRegistry.getInstance().removeIncludingChildren(logChannelId);
KettleLogStore.discardLines(logChannelId, false);

// transformationMap.deallocateServerSocketPorts(entry);

log.logMinimal("Cleaned up transformation "
+ entry.getName() + " with id " + entry.getId() + " from " + trans.getLogDate()
+ entry.getName() + " with id " + entry.getId() + " from " + logDate
+ ", diff=" + diffInMinutes);
}
}
Expand All @@ -177,20 +189,28 @@ public void run() {
//
for (CarteObjectEntry entry : jobMap.getJobObjects()) {
Job job = jobMap.getJob(entry);

Date logDate = job.getLogDate();
// See if the job is finished or stopped.
//
if (job != null && (job.isFinished() || job.isStopped()) && job.getLogDate() != null) {
if (job != null && (job.isFinished() || job.isStopped()) && logDate != null) {
// check the last log time
//
int diffInMinutes =
(int) Math.floor((System.currentTimeMillis() - job.getLogDate().getTime()) / 60000);
(int) Math.floor((System.currentTimeMillis() - logDate.getTime()) / 60000);
if (diffInMinutes >= objectTimeout) {
String logChannelId = job.getLogChannelId();

// Let's remove this from the job map...
//
jobMap.removeJob(entry);

// Remove the logging information from the log registry & central log store
//
LoggingRegistry.getInstance().removeIncludingChildren(logChannelId);
KettleLogStore.discardLines(logChannelId, false);

log.logMinimal("Cleaned up job "
+ entry.getName() + " with id " + entry.getId() + " from " + job.getLogDate());
+ entry.getName() + " with id " + entry.getId() + " from " + logDate);
}
}
}
Expand Down

0 comments on commit 62a835e

Please sign in to comment.