diff --git a/core/src/main/java/org/pentaho/di/core/logging/LoggingBuffer.java b/core/src/main/java/org/pentaho/di/core/logging/LoggingBuffer.java index 166e80a6ab42..db9631b51d19 100644 --- a/core/src/main/java/org/pentaho/di/core/logging/LoggingBuffer.java +++ b/core/src/main/java/org/pentaho/di/core/logging/LoggingBuffer.java @@ -22,21 +22,19 @@ package org.pentaho.di.core.logging; -import com.google.common.annotations.VisibleForTesting; -import org.pentaho.di.core.Const; -import org.pentaho.di.core.util.Utils; - -import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.LinkedBlockingDeque; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.pentaho.di.core.Const; +import org.pentaho.di.core.util.Utils; + +import com.google.common.annotations.VisibleForTesting; + /** * This class keeps the last N lines in a buffer * @@ -45,8 +43,7 @@ public class LoggingBuffer { private String name; - private List buffer; - private ReadWriteLock lock = new ReentrantReadWriteLock(); + private LinkedBlockingDeque buffer; private int bufferSize; @@ -58,28 +55,24 @@ public class LoggingBuffer { public LoggingBuffer( int bufferSize ) { this.bufferSize = bufferSize; - // The buffer overflow protection allows it to be overflowed for 1 item within a single thread. - // Considering a possible high contention, let's set it's max overflow size to be 10%. - // Anyway, even an overflow goes higher than 10%, it wouldn't cost us too much. - buffer = new ArrayList<>( (int) ( bufferSize * 1.1 ) ); + buffer = new LinkedBlockingDeque(); layout = new KettleLogLayout( true ); - eventListeners = new CopyOnWriteArrayList<>(); + eventListeners = new CopyOnWriteArrayList(); } /** * @return the number (sequence, 1..N) of the last log line. If no records are present in the buffer, 0 is returned. */ - public int getLastBufferLineNr() { - lock.readLock().lock(); - try { - if ( buffer.size() > 0 ) { - return buffer.get( buffer.size() - 1 ).getNr(); - } else { - return 0; - } - } finally { - lock.readLock().unlock(); - } + public int getLastBufferLineNr() { + //Based on example from https://www.baeldung.com/java-stream-last-element + + // Stream stream = buffer.stream(); + // BufferLine line = stream.reduce((first, second) -> second) + // .orElse(null); + // return line != null ? line.getNr() : 0; + + BufferLine line = buffer.peekLast(); + return line != null ? line.getNr() : 0; } /** @@ -89,21 +82,28 @@ public int getLastBufferLineNr() { * @param to * @return */ - public List getLogBufferFromTo( List channelId, boolean includeGeneral, int from, +public List getLogBufferFromTo( List channelId, boolean includeGeneral, int from, int to ) { - lock.readLock().lock(); - try { - Stream bufferStream = buffer.stream().filter( line -> line.getNr() > from && line.getNr() <= to ); + //OPTION 1 + //This is to FIX the Halting Bug when async concurrent access to the object. NEW Object + Stream bufferStream=new ConcurrentLinkedQueue(buffer).stream(); + + //OPTION 2 + //Stream bufferStream=buffer.stream(); + + //Improvement, doing all the EVALUATIONS in one iteration, to avoid iterate multiple times + if ( !Utils.isEmpty( channelId ) ) { bufferStream = bufferStream.filter( line -> { - String logChannelId = getLogChId( line ); - return includeGeneral ? isGeneral( logChannelId ) || channelId.contains( logChannelId ) : channelId.contains( logChannelId ); - } ); + String logChannelId = getLogChId( line ); + boolean condition1 = includeGeneral ? channelId.contains( logChannelId ) || isGeneral( logChannelId ) : channelId.contains( logChannelId ); + boolean condition2 = line.getNr() > from && line.getNr() <= to; + return condition1 && condition2; + } ); + } else { + bufferStream = bufferStream.filter( line -> line.getNr() > from && line.getNr() <= to ); } - return bufferStream.map( BufferLine::getEvent ).collect( Collectors.toList() ); - } finally { - lock.readLock().unlock(); - } + return bufferStream.map( BufferLine::getEvent ).collect( Collectors.toList()); } /** @@ -154,14 +154,9 @@ public void close() { public void doAppend( KettleLoggingEvent event ) { if ( event.getMessage() instanceof LogMessage ) { - lock.writeLock().lock(); - try { - buffer.add( new BufferLine( event ) ); - while ( bufferSize > 0 && buffer.size() > bufferSize ) { - buffer.remove( 0 ); - } - } finally { - lock.writeLock().unlock(); + buffer.add( new BufferLine( event ) ); + while ( bufferSize > 0 && buffer.size() > bufferSize ) { + buffer.poll(); } } } @@ -187,12 +182,7 @@ public boolean requiresLayout() { } public void clear() { - lock.writeLock().lock(); - try { - buffer.clear(); - } finally { - lock.writeLock().unlock(); - } + buffer.clear(); } /** @@ -222,12 +212,7 @@ public int getNrLines() { * @param id the id of the logging channel to remove */ public void removeChannelFromBuffer( String id ) { - lock.writeLock().lock(); - try { - buffer.removeIf( line -> id.equals( getLogChId( line ) ) ); - } finally { - lock.writeLock().unlock(); - } + buffer.removeIf( line -> id.equals( getLogChId( line ) ) ); } public int size() { @@ -235,12 +220,7 @@ public int size() { } public void removeGeneralMessages() { - lock.writeLock().lock(); - try { - buffer.removeIf( line -> isGeneral( getLogChId( line ) ) ); - } finally { - lock.writeLock().unlock(); - } + buffer.removeIf( line -> isGeneral( getLogChId( line ) ) ); } /** @@ -262,17 +242,12 @@ public Iterator getBufferIterator() { @Deprecated public String dump() { StringBuilder buf = new StringBuilder( 50000 ); - lock.readLock().lock(); - try { - buffer.forEach( line -> { - LogMessage message = (LogMessage) line.getEvent().getMessage(); - buf.append( message.getLogChannelId() ).append( "\t" ) - .append( message.getSubject() ).append( "\n" ); - } ); - return buf.toString(); - } finally { - lock.readLock().unlock(); - } + buffer.forEach( line -> { + LogMessage message = (LogMessage) line.getEvent().getMessage(); + buf.append( message.getLogChannelId() ).append( "\t" ) + .append( message.getSubject() ).append( "\n" ); + } ); + return buf.toString(); } /** @@ -282,12 +257,7 @@ public String dump() { */ @Deprecated public void removeBufferLines( List linesToRemove ) { - lock.writeLock().lock(); - try { - buffer.removeAll( linesToRemove ); - } finally { - lock.writeLock().unlock(); - } + buffer.removeAll( linesToRemove ); } /** @@ -297,35 +267,12 @@ public void removeBufferLines( List linesToRemove ) { */ @Deprecated public List getBufferLinesBefore( long minTimeBoundary ) { - lock.readLock().lock(); - try { - return buffer.stream().filter( line -> line.getEvent().timeStamp < minTimeBoundary ) - .collect( Collectors.toList() ); - } finally { - lock.readLock().unlock(); - } + return buffer.stream().filter( line -> line.getEvent().timeStamp < minTimeBoundary ) + .collect( Collectors.toList() ); } public void removeBufferLinesBefore( long minTimeBoundary ) { - // Using HashSet even though BufferLine does not implement hashcode and equals, - // we just need to remove the exact objects we have found and put in the set. - Set linesToRemove = new HashSet<>(); - lock.writeLock().lock(); - try { - for ( BufferLine bufferLine : buffer ) { - if ( bufferLine.getEvent().timeStamp < minTimeBoundary ) { - linesToRemove.add( bufferLine ); - } else { - break; - } - } - // removeAll should run fast against a HashSet, - // since ArrayList.batchRemove check for each element of a collection given if it is in the ArrayList. - // Thus, removeAll should run in a linear time. - buffer.removeAll( linesToRemove ); - } finally { - lock.writeLock().unlock(); - } + buffer.removeIf( line -> line.getEvent().timeStamp < minTimeBoundary ); } public void addLogggingEvent( KettleLoggingEvent loggingEvent ) { @@ -346,7 +293,7 @@ private boolean isGeneral( String logChannelId ) { return loggingObject != null && LoggingObjectType.GENERAL.equals( loggingObject.getObjectType() ); } - private static String getLogChId( BufferLine bufferLine ) { + private String getLogChId( BufferLine bufferLine ) { return ( (LogMessage) bufferLine.getEvent().getMessage() ).getLogChannelId(); } }