Skip to content

Commit

Permalink
Merge pull request #8 from arenadata/5.16.4-sync
Browse files Browse the repository at this point in the history
ADBDEV-2137 5.16.4 sync
  • Loading branch information
deart2k authored Oct 19, 2021
2 parents d7859b5 + adbd3d9 commit ec52c5b
Show file tree
Hide file tree
Showing 9 changed files with 417 additions and 52 deletions.
5 changes: 2 additions & 3 deletions .github/workflows/create-release-on-tag.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ jobs:
tag_name: ${{ github.ref }}
release_name: PXF Version ${{ github.ref }}
body: |
## 5.16.3 (08/20/2021)
## 5.16.4 (08/31/2021)
### Bug Fixes:
- [#682](https://github.com/greenplum-db/pxf/pull/682) fix CURLOPT_RESOLVE optimization
- [#681](https://github.com/greenplum-db/pxf/pull/681) Introduced operation retries when GSS connection failures are encountered
- [#693](https://github.com/greenplum-db/pxf/pull/693) Fixed GSS failure handler retry logic
draft: false
prerelease: false
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 5.16.4 (08/31/2021)

### Bug Fixes:

- [#693](https://github.com/greenplum-db/pxf/pull/693) Fixed GSS failure handler retry logic

## 5.16.3 (08/20/2021)

### Bug Fixes:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public ReadBridge(RequestContext context) {
*/
@Override
public boolean beginIteration() throws Exception {
return failureHandler.execute(accessor.getConfiguration(), "begin iteration", accessor::openForRead, this::beforeRetryCallback);
// using lambda and not a method reference accessor::openForRead as the accessor will be changed by the retry function
return failureHandler.execute(accessor.getConfiguration(), "begin iteration", () -> accessor.openForRead(), this::beforeRetryCallback);
}

protected Deque<Writable> makeOutput(OneRow oneRow) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public WriteBridge(RequestContext context) {

@Override
public boolean beginIteration() throws Exception {
return failureHandler.execute(accessor.getConfiguration(), "begin iteration", accessor::openForWrite, this::beforeRetryCallback);
// using lambda and not a method reference accessor::openForWrite as the accessor will be changed by the retry function
return failureHandler.execute(accessor.getConfiguration(), "begin iteration", () -> accessor.openForWrite(), this::beforeRetryCallback);
}

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,11 @@ public Response getFragmentsStats(@Context final ServletContext servletContext,
}

private List<Fragment> getFragments(RequestContext context) throws Exception {
/* Create a fragmenter instance with API level parameters */
Fragmenter fragmenter = fragmenterFactory.getPlugin(context);
// create a fragmenter holder that will get a fragmenter from the factory based on the request context
FragmenterHolder holder = new FragmenterHolder(context);
// We can't support lambdas here because asm version doesn't support it
List<Fragment> fragments = failureHandler.execute(fragmenter.getConfiguration(), "get fragments", new Callable<List<Fragment>>() {
@Override
public List<Fragment> call() throws Exception {
return fragmenter.getFragments();
}
});
// use the holder that has both an operation to run to fetch fragments and a callback to reset the fragmenter before retries
List<Fragment> fragments = failureHandler.execute(holder.getFragmenter().getConfiguration(), "get fragments", holder, holder);
fragments = AnalyzeUtils.getSampleFragments(fragments, context);

logFragmentStatistics(Level.INFO, context, fragments);
Expand Down Expand Up @@ -237,4 +233,49 @@ private void logFragmentStatistics(Level level, RequestContext context, List<Fra
}
}

/**
* Helper class to hold a reference to a request-specific fragmenter and implements methods to perform the operation
* of getting a list of fragments and an operation to reset the fragmenter.
*/
class FragmenterHolder implements Callable<List<Fragment>>, Runnable {
private Fragmenter fragmenter;
private RequestContext context;

/**
* Creates a new instance of the holder for the specific request. Initializes intself with a fragmenter instance
* obtained from the factory of the parent class based on the provided context.
* @param context request context
*/
public FragmenterHolder(RequestContext context) {
this.context = context; // remember the context, this instance is scoped per request
resetFragmenter(); // create the fragmenter from the factory
}

/**
* Get fragmenter instance associated with this holder.
* @return fragmenter instance
*/
public Fragmenter getFragmenter() {
return fragmenter;
}

/**
* Resets the fragmenter instance associated with this holder by requesting a new instance from the factory.
*/
public void resetFragmenter() {
this.fragmenter = fragmenterFactory.getPlugin(context);
}

/* The operation that can be retried by the failure handler */
@Override
public List<Fragment> call() throws Exception {
return fragmenter.getFragments();
}

/* The callback that is performed before a retry attempt by the failure handler */
@Override
public void run() {
resetFragmenter();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package org.greenplum.pxf.service.bridge;

import org.apache.hadoop.conf.Configuration;
import org.greenplum.pxf.api.model.Accessor;
import org.greenplum.pxf.api.model.RequestContext;
import org.greenplum.pxf.api.utilities.AccessorFactory;
import org.greenplum.pxf.api.utilities.ResolverFactory;
import org.greenplum.pxf.service.utilities.GSSFailureHandler;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import java.io.IOException;

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class ReadBridgeTest {

private ReadBridge bridge;
private RequestContext context;
private Configuration configuration;
private GSSFailureHandler handler;
@Mock
private AccessorFactory mockAccessorFactory;
@Mock
private ResolverFactory mockResolverFactory;
@Mock
private Accessor mockAccessor1;
@Mock
private Accessor mockAccessor2;
@Mock
private Accessor mockAccessor3;

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Before
public void setup() {
handler = GSSFailureHandler.getInstance();
context = new RequestContext();
configuration = new Configuration();
}

@Test
public void testBeginIterationFailureNoRetries() throws Exception {
expectedException.expect(IOException.class);
expectedException.expectMessage("Something Else");

configuration.set("hadoop.security.authentication", "kerberos");
when(mockAccessorFactory.getPlugin(context)).thenReturn(mockAccessor1);
when(mockAccessor1.getConfiguration()).thenReturn(configuration);
when(mockAccessor1.openForRead()).thenThrow(new IOException("Something Else"));

// constructor will call into mock factories, that's why we do not create ReadBridge in @Before method
bridge = new ReadBridge(context, mockAccessorFactory, mockResolverFactory, handler);
bridge.beginIteration();
verify(mockAccessorFactory).getPlugin(context);
verify(mockAccessor1.getConfiguration());
verify(mockAccessor1).openForRead();
verifyNoMoreInteractions();
}

@Test
public void testBeginIterationGSSFailureRetriedOnce() throws Exception {
configuration.set("hadoop.security.authentication", "kerberos");
when(mockAccessorFactory.getPlugin(context)).thenReturn(mockAccessor1).thenReturn(mockAccessor2);
when(mockAccessor1.getConfiguration()).thenReturn(configuration);
when(mockAccessor1.openForRead()).thenThrow(new IOException("GSS initiate failed"));
when(mockAccessor2.openForRead()).thenReturn(true);

bridge = new ReadBridge(context, mockAccessorFactory, mockResolverFactory, handler);
boolean result = bridge.beginIteration();

assertTrue(result);
verify(mockAccessorFactory, times(2)).getPlugin(context);
InOrder inOrder = inOrder(mockAccessor1, mockAccessor2);
inOrder.verify(mockAccessor1).openForRead(); // first attempt on accessor #1
inOrder.verify(mockAccessor2).openForRead(); // second attempt on accessor #2
inOrder.verifyNoMoreInteractions();
}

@Test
public void testBeginIterationGSSFailureRetriedTwice() throws Exception {
configuration.set("hadoop.security.authentication", "kerberos");
when(mockAccessorFactory.getPlugin(context))
.thenReturn(mockAccessor1)
.thenReturn(mockAccessor2)
.thenReturn(mockAccessor3);
when(mockAccessor1.getConfiguration()).thenReturn(configuration);
when(mockAccessor1.openForRead()).thenThrow(new IOException("GSS initiate failed"));
when(mockAccessor2.openForRead()).thenThrow(new IOException("GSS initiate failed"));
when(mockAccessor3.openForRead()).thenReturn(true);

bridge = new ReadBridge(context, mockAccessorFactory, mockResolverFactory, handler);
boolean result = bridge.beginIteration();

assertTrue(result);
verify(mockAccessorFactory, times(3)).getPlugin(context);
InOrder inOrder = inOrder(mockAccessor1, mockAccessor2, mockAccessor3);
inOrder.verify(mockAccessor1).openForRead(); // first attempt on accessor #1
inOrder.verify(mockAccessor2).openForRead(); // second attempt on accessor #2
inOrder.verify(mockAccessor3).openForRead(); // third attempt on accessor #3
inOrder.verifyNoMoreInteractions();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package org.greenplum.pxf.service.bridge;

import org.apache.hadoop.conf.Configuration;
import org.greenplum.pxf.api.model.Accessor;
import org.greenplum.pxf.api.model.RequestContext;
import org.greenplum.pxf.api.utilities.AccessorFactory;
import org.greenplum.pxf.api.utilities.ResolverFactory;
import org.greenplum.pxf.service.utilities.GSSFailureHandler;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import java.io.IOException;

import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class WriteBridgeTest {

private WriteBridge bridge;
private RequestContext context;
private Configuration configuration;
private GSSFailureHandler handler;
@Mock
private AccessorFactory mockAccessorFactory;
@Mock
private ResolverFactory mockResolverFactory;
@Mock
private Accessor mockAccessor1;
@Mock
private Accessor mockAccessor2;
@Mock
private Accessor mockAccessor3;

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Before
public void setup() {
handler = GSSFailureHandler.getInstance();
context = new RequestContext();
configuration = new Configuration();
}

@Test
public void testBeginIterationFailureNoRetries() throws Exception {
expectedException.expect(IOException.class);
expectedException.expectMessage("Something Else");

configuration.set("hadoop.security.authentication", "kerberos");
when(mockAccessorFactory.getPlugin(context)).thenReturn(mockAccessor1);
when(mockAccessor1.getConfiguration()).thenReturn(configuration);
when(mockAccessor1.openForWrite()).thenThrow(new IOException("Something Else"));

// constructor will call into mock factories, that's why we do not create WriteBridge in @Before method
bridge = new WriteBridge(context, mockAccessorFactory, mockResolverFactory, handler);
bridge.beginIteration();
verify(mockAccessorFactory).getPlugin(context);
verify(mockAccessor1.getConfiguration());
verify(mockAccessor1).openForWrite();
verifyNoMoreInteractions();
}

@Test
public void testBeginIterationGSSFailureRetriedOnce() throws Exception {
configuration.set("hadoop.security.authentication", "kerberos");
when(mockAccessorFactory.getPlugin(context)).thenReturn(mockAccessor1).thenReturn(mockAccessor2);
when(mockAccessor1.getConfiguration()).thenReturn(configuration);
when(mockAccessor1.openForWrite()).thenThrow(new IOException("GSS initiate failed"));
when(mockAccessor2.openForWrite()).thenReturn(true);

bridge = new WriteBridge(context, mockAccessorFactory, mockResolverFactory, handler);
boolean result = bridge.beginIteration();

assertTrue(result);
verify(mockAccessorFactory, times(2)).getPlugin(context);
InOrder inOrder = inOrder(mockAccessor1, mockAccessor2);
inOrder.verify(mockAccessor1).openForWrite(); // first attempt on accessor #1
inOrder.verify(mockAccessor2).openForWrite(); // second attempt on accessor #2
inOrder.verifyNoMoreInteractions();
}

@Test
public void testBeginIterationGSSFailureRetriedTwice() throws Exception {
configuration.set("hadoop.security.authentication", "kerberos");
when(mockAccessorFactory.getPlugin(context))
.thenReturn(mockAccessor1)
.thenReturn(mockAccessor2)
.thenReturn(mockAccessor3);
when(mockAccessor1.getConfiguration()).thenReturn(configuration);
when(mockAccessor1.openForWrite()).thenThrow(new IOException("GSS initiate failed"));
when(mockAccessor2.openForWrite()).thenThrow(new IOException("GSS initiate failed"));
when(mockAccessor3.openForWrite()).thenReturn(true);

bridge = new WriteBridge(context, mockAccessorFactory, mockResolverFactory, handler);
boolean result = bridge.beginIteration();

assertTrue(result);
verify(mockAccessorFactory, times(3)).getPlugin(context);
InOrder inOrder = inOrder(mockAccessor1, mockAccessor2, mockAccessor3);
inOrder.verify(mockAccessor1).openForWrite(); // first attempt on accessor #1
inOrder.verify(mockAccessor2).openForWrite(); // second attempt on accessor #2
inOrder.verify(mockAccessor3).openForWrite(); // third attempt on accessor #3
inOrder.verifyNoMoreInteractions();
}

}
Loading

0 comments on commit ec52c5b

Please sign in to comment.