Skip to content

Commit

Permalink
Merge pull request pentaho#9277 from abryant-hv/BACKLOG-40099
Browse files Browse the repository at this point in the history
[BACKLOG-40099] - VFS support for Bowls
  • Loading branch information
niehusa authored Apr 25, 2024
2 parents 931f956 + 88aaadc commit 7852de2
Show file tree
Hide file tree
Showing 37 changed files with 1,317 additions and 498 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2002-2023 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2002-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -23,6 +23,7 @@
package org.pentaho.di.connections;

import org.pentaho.di.connections.utils.EncryptUtils;
import org.pentaho.di.core.bowl.Bowl;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.metastore.api.IMetaStore;
Expand Down Expand Up @@ -94,10 +95,39 @@ public synchronized void reset() {
initialized = false;
}

/**
* This getter should not generally be used because it is limited to the global scope. It would be better to have
* almost all callers use Bowl.getConnectionManager().
*
* This instance may still be used to register ConnectionProviders and Lookup Filters.
*
* @return ConnectionManager
*/
public static ConnectionManager getInstance() {
return instance;
}

/**
* Construct a new instance of a ConnectionManager using the metastore from the supplier.
*
* Instances returned by this will not share in-memory state with any other instannces. If you need the
* ConnectionManager for a Bowl, use Bowl.getConnectionManager() instead.
*
*
* @param bowl
*
* @return ConnectionManager
*/
public static ConnectionManager getInstance( Supplier<IMetaStore> metastoreSupplier ) {
ConnectionManager newManager = new ConnectionManager();
newManager.setMetastoreSupplier( metastoreSupplier );
// share the same set of connection providers and lookup filters. Everyone already registers with the one
// from getInstance()
newManager.connectionProviders = instance.connectionProviders;
newManager.lookupFilters = instance.lookupFilters;
return newManager;
}

/**
* Construct a meta store factory for a specific class using the default meta store supplier
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2019-2022 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2019-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -37,9 +37,31 @@ public interface ConnectionProvider<T extends ConnectionDetails> {

Class<T> getClassType();

List<String> getNames();
/**
* @deprecated use getNames( ConnectionManager )
*/
@Deprecated
default List<String> getNames() {
throw new UnsupportedOperationException( "Deprecated method" );
}

List<T> getConnectionDetails();
/**
* @deprecated use getNames( ConnectionManager )
*/
@Deprecated
default List<T> getConnectionDetails() {
throw new UnsupportedOperationException( "Deprecated method" );
}

// Subclasses should implement this to work with Bowls.
default List<String> getNames( ConnectionManager connectionManager ) {
return getNames();
}

// Subclasses should implement this to work with Bowls.
default List<T> getConnectionDetails( ConnectionManager connectionManager ) {
return getConnectionDetails();
}

boolean test( T connectionDetails ) throws KettleException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2019-2022 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2019-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand Down Expand Up @@ -38,13 +38,25 @@ public abstract class BaseVFSConnectionProvider<T extends VFSConnectionDetails>

private Supplier<ConnectionManager> connectionManagerSupplier = ConnectionManager::getInstance;

@Override public List<String> getNames() {
@Override
public List<String> getNames() {
return connectionManagerSupplier.get().getNamesByType( getClass() );
}

@Override
public List<T> getConnectionDetails() {
return getConnectionDetails( connectionManagerSupplier.get() );
}

@Override
public List<String> getNames( ConnectionManager connectionManager ) {
return connectionManager.getNamesByType( getClass() );
}

@SuppressWarnings( "unchecked" )
@Override public List<T> getConnectionDetails() {
return (List<T>) connectionManagerSupplier.get().getConnectionDetailsByScheme( getKey() );
@Override
public List<T> getConnectionDetails( ConnectionManager connectionManager ) {
return (List<T>) connectionManager.getConnectionDetailsByScheme( getKey() );
}

@Override public T prepare( T connectionDetails ) throws KettleException {
Expand Down
33 changes: 28 additions & 5 deletions core/src/main/java/org/pentaho/di/connections/vfs/VFSHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Pentaho Data Integration
*
* Copyright (C) 2019-2022 by Hitachi Vantara : http://www.pentaho.com
* Copyright (C) 2019-2024 by Hitachi Vantara : http://www.pentaho.com
*
*******************************************************************************
*
Expand All @@ -24,24 +24,47 @@

import org.apache.commons.vfs2.FileSystemOptions;
import org.pentaho.di.connections.ConnectionManager;
import org.pentaho.di.core.bowl.Bowl;
import org.pentaho.di.core.bowl.DefaultBowl;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.core.variables.Variables;
import org.pentaho.di.core.vfs.configuration.KettleGenericFileSystemConfigBuilder;
import org.pentaho.metastore.api.exceptions.MetaStoreException;

import java.util.function.Supplier;

/**
* Created by bmorrise on 2/13/19.
*/
public class VFSHelper {
public static FileSystemOptions getOpts( String file, String connection, VariableSpace space ){

/**
* @deprecated, use the version with the Bowl
*/
@Deprecated
public static FileSystemOptions getOpts( String file, String connection, VariableSpace space ) {
try {
return getOpts( DefaultBowl.getInstance(), file, connection, space );
} catch ( MetaStoreException ex ) {
// deprecated behavior was to ignore failures and return nulls.
return null;
}
}

public static FileSystemOptions getOpts( Bowl bowl, String file, String connection, VariableSpace space )
throws MetaStoreException {
if ( connection != null ) {
VFSConnectionDetails vfsConnectionDetails =
(VFSConnectionDetails) ConnectionManager.getInstance().getConnectionDetails( file, connection );
(VFSConnectionDetails) bowl.getConnectionManager().getConnectionDetails( file, connection );
VFSConnectionProvider<VFSConnectionDetails> vfsConnectionProvider =
(VFSConnectionProvider<VFSConnectionDetails>) ConnectionManager.getInstance().getConnectionProvider( file );
(VFSConnectionProvider<VFSConnectionDetails>) bowl.getConnectionManager().getConnectionProvider( file );
if ( vfsConnectionDetails != null && vfsConnectionProvider != null ) {
vfsConnectionDetails.setSpace( space );
return vfsConnectionProvider.getOpts( vfsConnectionDetails );
FileSystemOptions opts = vfsConnectionProvider.getOpts( vfsConnectionDetails );

KettleGenericFileSystemConfigBuilder.getInstance().setBowl( opts, bowl );

return opts;
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,20 @@
import org.pentaho.di.connections.ConnectionDetails;
import org.pentaho.di.connections.ConnectionManager;
import org.pentaho.di.connections.vfs.VFSConnectionDetails;
import org.pentaho.di.core.bowl.Bowl;
import org.pentaho.di.core.variables.VariableSpace;
import org.pentaho.di.core.variables.Variables;
import org.pentaho.di.core.vfs.KettleVFS;
import org.pentaho.di.core.vfs.configuration.IKettleFileSystemConfigBuilder;
import org.pentaho.di.core.vfs.configuration.KettleFileSystemConfigBuilderFactory;
import org.pentaho.di.core.vfs.configuration.KettleGenericFileSystemConfigBuilder;

import java.util.Collection;
import java.util.function.Supplier;

public class ConnectionFileSystem extends AbstractFileSystem implements FileSystem {

public static final String CONNECTION = "connection";
public static final String DOMAIN_ROOT = "[\\w]+://";
private Supplier<ConnectionManager> connectionManager = ConnectionManager::getInstance;

public ConnectionFileSystem( FileName rootName, FileSystemOptions fileSystemOptions ) {
super( rootName, null, fileSystemOptions );
Expand Down Expand Up @@ -86,8 +86,9 @@ public static String getUrl( AbstractFileName abstractFileName, ConnectionDetail
protected FileObject createFile( AbstractFileName abstractFileName ) throws Exception {

String connectionName = ( (ConnectionFileName) abstractFileName ).getConnection();
Bowl bowl = KettleGenericFileSystemConfigBuilder.getInstance().getBowl( getFileSystemOptions() );
VFSConnectionDetails connectionDetails =
(VFSConnectionDetails) connectionManager.get().getConnectionDetails( connectionName );
(VFSConnectionDetails) bowl.getConnectionManager().getConnectionDetails( connectionName );
FileSystemOptions opts = super.getFileSystemOptions();
IKettleFileSystemConfigBuilder configBuilder = KettleFileSystemConfigBuilderFactory.getConfigBuilder
( new Variables(), ConnectionFileProvider.SCHEME );
Expand All @@ -103,7 +104,7 @@ protected FileObject createFile( AbstractFileName abstractFileName ) throws Exce
if ( url != null ) {
domain = connectionDetails.getDomain();
varSpace.setVariable( CONNECTION, connectionName );
fileObject = (AbstractFileObject) KettleVFS.getFileObject( url, varSpace );
fileObject = (AbstractFileObject) KettleVFS.getInstance( bowl ).getFileObject( url, varSpace );
}

return new ConnectionFileObject( abstractFileName, this, fileObject, domain );
Expand Down
42 changes: 42 additions & 0 deletions core/src/main/java/org/pentaho/di/core/bowl/BaseBowl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*!
* Copyright 2024 Hitachi Vantara. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.pentaho.di.core.bowl;

import org.pentaho.di.connections.ConnectionManager;
import org.pentaho.metastore.api.exceptions.MetaStoreException;
import org.pentaho.metastore.api.IMetaStore;

public abstract class BaseBowl implements Bowl {

private volatile ConnectionManager connectionManager;

@Override
public ConnectionManager getConnectionManager() throws MetaStoreException {
ConnectionManager result = connectionManager;
if ( result != null ) {
return result;
}
synchronized( this ) {
if ( connectionManager == null ) {
IMetaStore metastore = getMetastore();
connectionManager = ConnectionManager.getInstance( () -> metastore );
}
return connectionManager;
}
}

}
18 changes: 17 additions & 1 deletion core/src/main/java/org/pentaho/di/core/bowl/Bowl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.pentaho.di.core.bowl;

import org.pentaho.di.connections.ConnectionManager;
import org.pentaho.metastore.api.exceptions.MetaStoreException;
import org.pentaho.metastore.api.IMetaStore;

Expand All @@ -24,11 +25,14 @@
* A Bowl is a generic container/context/workspace concept. Different plugin implementations may implement this for
* additional features.
*
* All implementations of Bowl should implement equals() and hashcode()
*
*/
public interface Bowl {

/**
* Gets a Metastore that handles any defaulting required for execution-time handling of metastores, for the Bowl.
* Gets a Read-Only Metastore that handles any defaulting required for execution-time handling of metastores, for the
* Bowl.
*
* @return IMetaStore A metastore for execution with the Bowl. Never null.
*/
Expand All @@ -42,4 +46,16 @@ public interface Bowl {
*/
IMetaStore getExplicitMetastore() throws MetaStoreException;

/**
* Gets a ConnectionManager for this Bowl. Uses a metastore from getMetastore(), so global connections will be
* returned as well. This ConnectionManager is effectively read-only.
*
* Since constructing and initializing ConnectionManagers can be expensive, and ConnectionManager instances don't
* share state, consumers should always use this method instead of ConnectionManager.getInstance()
*
* @return ConnectionManager, never null.
*/
ConnectionManager getConnectionManager() throws MetaStoreException;


}
33 changes: 30 additions & 3 deletions core/src/main/java/org/pentaho/di/core/bowl/DefaultBowl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,25 @@
*/
package org.pentaho.di.core.bowl;

import org.pentaho.di.connections.ConnectionManager;
import org.pentaho.di.metastore.MetaStoreConst;
import org.pentaho.metastore.api.exceptions.MetaStoreException;
import org.pentaho.metastore.api.IMetaStore;

import com.google.common.annotations.VisibleForTesting;
import java.util.function.Supplier;

/**
* The default/global Bowl. A singleton for standard behavior when there is no custom Bowl.
*
*/
public class DefaultBowl implements Bowl {
public class DefaultBowl extends BaseBowl {
private static final DefaultBowl INSTANCE = new DefaultBowl();

// for testing
private Supplier<IMetaStore> metastoreSupplier = MetaStoreConst.getDefaultMetastoreSupplier();
private boolean customSupplier = false;

private DefaultBowl() {
}

Expand All @@ -37,12 +45,31 @@ public static DefaultBowl getInstance() {

@Override
public IMetaStore getExplicitMetastore() throws MetaStoreException {
return MetaStoreConst.getDefaultMetastore();
return metastoreSupplier.get();
}

@Override
public IMetaStore getMetastore() throws MetaStoreException {
return MetaStoreConst.getDefaultMetastore();
return metastoreSupplier.get();
}


@Override
public ConnectionManager getConnectionManager() throws MetaStoreException {
// need to override getConnectionManager so this instance of DefaultBowl shares the same ConnectionManager
// instance with ConnectionManager.getInstance()
if ( customSupplier ) {
return super.getConnectionManager();
} else {
return ConnectionManager.getInstance();
}
}

@VisibleForTesting
public void setMetastoreSupplier( Supplier<IMetaStore> metastoreSupplier ) {
this.metastoreSupplier = metastoreSupplier;
this.customSupplier = true;
}


}
Loading

0 comments on commit 7852de2

Please sign in to comment.