From 602f26174fa76ba8e56b2c15c3e2045b0399e6ae Mon Sep 17 00:00:00 2001 From: dtb <2011xuesong@gmail.com> Date: Tue, 26 Sep 2023 09:58:49 +0800 Subject: [PATCH] Fix primary database not filter --- .../impl/StaticDatabaseMappingService.java | 91 +++++++++++-------- 1 file changed, 55 insertions(+), 36 deletions(-) diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/service/impl/StaticDatabaseMappingService.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/service/impl/StaticDatabaseMappingService.java index 75b1eae3a..60ae89d2a 100644 --- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/service/impl/StaticDatabaseMappingService.java +++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/mapping/service/impl/StaticDatabaseMappingService.java @@ -85,6 +85,11 @@ public StaticDatabaseMappingService( QueryMapping queryMapping) { this.metaStoreMappingFactory = metaStoreMappingFactory; this.queryMapping = queryMapping; + mappingsByMetaStoreName = Collections.synchronizedMap(new LinkedHashMap<>()); + mappingsByDatabaseName = Collections.synchronizedMap(new LinkedHashMap<>()); + databaseMappingToDatabaseList = new ConcurrentHashMap<>(); + databaseToTableAllowList = new ConcurrentHashMap<>(); + primaryDatabasesCache = CacheBuilder .newBuilder() .expireAfterAccess(1, TimeUnit.MINUTES) @@ -94,17 +99,14 @@ public StaticDatabaseMappingService( @Override public List load(String key) throws Exception { if (primaryDatabaseMapping != null) { - return primaryDatabaseMapping.getClient().get_all_databases(); + return new StaticDatabaseMappingPanopticOperationHandler() + .getPrimaryAllDatabases(); } else { return Lists.newArrayList(); } } }); - mappingsByMetaStoreName = Collections.synchronizedMap(new LinkedHashMap<>()); - mappingsByDatabaseName = Collections.synchronizedMap(new LinkedHashMap<>()); - databaseMappingToDatabaseList = new ConcurrentHashMap<>(); - databaseToTableAllowList = new ConcurrentHashMap<>(); for (AbstractMetaStore federatedMetaStore : initialMetastores) { add(federatedMetaStore); } @@ -363,47 +365,64 @@ private boolean databaseAndTableAllowed(String database, String table, DatabaseM @Override public PanopticOperationHandler getPanopticOperationHandler() { - return new PanopticOperationHandler() { + return new StaticDatabaseMappingPanopticOperationHandler(); + } - @Override - public List getTableMeta(String db_patterns, String tbl_patterns, List tbl_types) { + class StaticDatabaseMappingPanopticOperationHandler extends PanopticOperationHandler { - BiFunction filter = (tableMeta, mapping) -> - databaseAndTableAllowed(tableMeta.getDbName(), tableMeta.getTableName(), mapping); + @Override + public List getTableMeta(String db_patterns, String tbl_patterns, + List tbl_types) { - Map mappingsForPattern = new LinkedHashMap<>(); - for (DatabaseMapping mapping : getAvailableDatabaseMappings()) { - mappingsForPattern.put(mapping, db_patterns); - } - return super.getTableMeta(tbl_patterns, tbl_types, mappingsForPattern, filter); + BiFunction filter = (tableMeta, mapping) -> + databaseAndTableAllowed(tableMeta.getDbName(), tableMeta.getTableName(), mapping); + + Map mappingsForPattern = new LinkedHashMap<>(); + for (DatabaseMapping mapping : getAvailableDatabaseMappings()) { + mappingsForPattern.put(mapping, db_patterns); } + return super.getTableMeta(tbl_patterns, tbl_types, mappingsForPattern, filter); + } - @Override - public List getAllDatabases(String pattern) { - BiFunction filter = (database, mapping) -> mappingsByDatabaseName - .containsKey(database); + @Override + public List getAllDatabases(String pattern) { + BiFunction filter = getFilter(); - BiFunction filter1 = (database, mapping) -> filter.apply(database, mapping) - && databaseMappingToDatabaseList.get(mapping.getMetastoreMappingName()).contains(database); + Map mappingsForPattern = new LinkedHashMap<>(); + for (DatabaseMapping mapping : getAllDatabaseMappings()) { + mappingsForPattern.put(mapping, pattern); + } - Map mappingsForPattern = new LinkedHashMap<>(); - for (DatabaseMapping mapping : getAllDatabaseMappings()) { - mappingsForPattern.put(mapping, pattern); - } + return super.getAllDatabases(mappingsForPattern, filter); + } - return super.getAllDatabases(mappingsForPattern, filter1); - } + private BiFunction getFilter() { + BiFunction filter = + (database, mapping) -> mappingsByDatabaseName.containsKey(database); - @Override - public List getAllDatabases() { - return new ArrayList<>(mappingsByDatabaseName.keySet()); - } + return (database, mapping) -> filter.apply(database, mapping) + && databaseMappingToDatabaseList.get(mapping.getMetastoreMappingName()) + .contains(database); + } - @Override - protected PanopticOperationExecutor getPanopticOperationExecutor() { - return new PanopticConcurrentOperationExecutor(); - } - }; + @Override + public List getAllDatabases() { + return new ArrayList<>(mappingsByDatabaseName.keySet()); + } + + public List getPrimaryAllDatabases() { + BiFunction filter = getFilter(); + + Map mappingsForPattern = new LinkedHashMap<>(); + mappingsForPattern.put(primaryDatabaseMapping, "*"); + + return super.getAllDatabases(mappingsForPattern, filter); + } + + @Override + protected PanopticOperationExecutor getPanopticOperationExecutor() { + return new PanopticConcurrentOperationExecutor(); + } } @Override