Skip to content

Commit

Permalink
Make Archive folder to be under timeline folder as was requested
Browse files Browse the repository at this point in the history
  • Loading branch information
Balaji Varadarajan authored and Balaji Varadarajan committed Nov 22, 2024
1 parent 01d4811 commit 8364058
Show file tree
Hide file tree
Showing 36 changed files with 93 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public String showCommits(
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
StoragePath basePath = metaClient.getBasePath();
StoragePath archivePath =
new StoragePath(metaClient.getArchivePath() + "/.commits_.archive*");
new StoragePath(metaClient.getArchivePath(), ".commits_.archive*");
List<StoragePathInfo> pathInfoList =
HoodieStorageUtils.getStorage(basePath, HoodieCLI.conf).globEntries(archivePath);
List<Comparable[]> allCommits = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public String exportInstants(
throws Exception {

final StoragePath basePath = HoodieCLI.getTableMetaClient().getBasePath();
final StoragePath archivePath = new StoragePath(HoodieCLI.getTableMetaClient().getArchivePath());
final StoragePath archivePath = HoodieCLI.getTableMetaClient().getArchivePath();
final Set<String> actionSet = new HashSet<String>(Arrays.asList(filter.split(",")));
int numExports = limit == -1 ? Integer.MAX_VALUE : limit;
int numCopied = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ public void removeCorruptedPendingCleanAction() {
CleanerUtils.getCleanerPlan(client, instant);
} catch (AvroRuntimeException e) {
LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
TimelineUtils.deleteInstantFile(client.getStorage(), client.getTimelinePath(),
TimelineUtils.deleteInstantFile(client.getStorage(), client.getActiveTimelinePath(),
instant, client.getInstantFileNameGenerator());
} catch (IOException ioe) {
if (ioe.getMessage().contains("Not an Avro data file")) {
LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
TimelineUtils.deleteInstantFile(client.getStorage(), client.getTimelinePath(),
TimelineUtils.deleteInstantFile(client.getStorage(), client.getActiveTimelinePath(),
instant, client.getInstantFileNameGenerator());
} else {
throw new HoodieIOException(ioe.getMessage(), ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ public String showActive(
HoodieTableMetaClient mtMetaClient = getMetadataTableMetaClient(metaClient);
return printTimelineInfoWithMetadataTable(
metaClient.getActiveTimeline(), mtMetaClient.getActiveTimeline(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getTimelinePath()),
getInstantInfoFromTimeline(mtMetaClient, mtMetaClient.getStorage(), mtMetaClient.getTimelinePath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
getInstantInfoFromTimeline(mtMetaClient, mtMetaClient.getStorage(), mtMetaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo);
}
return printTimelineInfo(
metaClient.getActiveTimeline(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getTimelinePath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo);
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -115,7 +115,7 @@ public String showIncomplete(
try {
return printTimelineInfo(
metaClient.getActiveTimeline().filterInflightsAndRequested(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getTimelinePath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo);
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -137,7 +137,7 @@ public String metadataShowActive(
try {
return printTimelineInfo(
metaClient.getActiveTimeline(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getTimelinePath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, false);
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -159,7 +159,7 @@ public String metadataShowIncomplete(
try {
return printTimelineInfo(
metaClient.getActiveTimeline().filterInflightsAndRequested(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getTimelinePath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, false);
} catch (IOException e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testCreateWithSpecifiedValues() {
assertTrue(ShellEvaluationResultUtil.isSuccess(result));
assertEquals("Metadata for table " + tableName + " loaded", result.toString());
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
assertEquals(metaPath + StoragePath.SEPARATOR + "archive", client.getArchivePath());
assertEquals(new StoragePath(metaPath, "archive"), client.getArchivePath());
assertEquals(tablePath, client.getBasePath().toString());
assertEquals(metaPath, client.getMetaPath().toString());
assertEquals(HoodieTableVersion.SIX, client.getTableConfig().getTableVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public List<RenameOpResult> unscheduleCompactionPlan(String compactionInstant, b
if (!dryRun) {
// Overwrite compaction request with empty compaction operations
HoodieInstant inflight = metaClient.createNewInstant(State.INFLIGHT, COMPACTION_ACTION, compactionInstant);
StoragePath inflightPath = new StoragePath(metaClient.getTimelinePath(), metaClient.getInstantFileNameGenerator().getFileName(inflight));
StoragePath inflightPath = new StoragePath(metaClient.getActiveTimelinePath(), metaClient.getInstantFileNameGenerator().getFileName(inflight));
if (metaClient.getStorage().exists(inflightPath)) {
// We need to rollback data-files because of this inflight compaction before unscheduling
throw new IllegalStateException("Please rollback the inflight compaction before unscheduling");
Expand All @@ -122,7 +122,7 @@ public List<RenameOpResult> unscheduleCompactionPlan(String compactionInstant, b
// TODO: Add a rollback instant but for compaction
HoodieInstant instant = metaClient.createNewInstant(State.REQUESTED, COMPACTION_ACTION, compactionInstant);
boolean deleted = metaClient.getStorage().deleteFile(
new StoragePath(metaClient.getTimelinePath(), instantFileNameGenerator.getFileName(instant)));
new StoragePath(metaClient.getActiveTimelinePath(), instantFileNameGenerator.getFileName(instant)));
ValidationUtils.checkArgument(deleted, "Unable to delete compaction instant.");
}
return new ArrayList<>();
Expand Down Expand Up @@ -159,7 +159,7 @@ public List<RenameOpResult> unscheduleCompactionFileId(HoodieFileGroupId fgId, b
HoodieCompactionPlan.newBuilder().setOperations(newOps).setExtraMetadata(plan.getExtraMetadata()).build();
HoodieInstant inflight =
metaClient.createNewInstant(State.INFLIGHT, COMPACTION_ACTION, compactionOperationWithInstant.getLeft());
StoragePath inflightPath = new StoragePath(metaClient.getTimelinePath(), instantFileNameGenerator.getFileName(inflight));
StoragePath inflightPath = new StoragePath(metaClient.getActiveTimelinePath(), instantFileNameGenerator.getFileName(inflight));
if (metaClient.getStorage().exists(inflightPath)) {
// revert if in inflight state
metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEng
List<HoodieInstant> instants = new ArrayList<>();
try {
// We need to move all the instants - not just completed ones.
instants = metaClient.scanHoodieInstantsFromFileSystem(metaClient.getTimelinePath(),
instants = metaClient.scanHoodieInstantsFromFileSystem(metaClient.getActiveTimelinePath(),
ActiveTimelineV2.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, false);
} catch (IOException ioe) {
LOG.error("Failed to get instants from filesystem", ioe);
Expand All @@ -125,10 +125,10 @@ public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEng
context.map(instants, instant -> {
String fileName = instantFileNameGenerator.getFileName(instant);
// Rename the metadata file name from the ${instant_time}_${completion_time}.action[.state] format in version 1.x to the ${instant_time}.action[.state] format in version 0.x.
StoragePath fromPath = new StoragePath(TIMELINE_LAYOUT_V2.getTimelinePathProvider().getTimelinePath(
StoragePath fromPath = new StoragePath(TIMELINE_LAYOUT_V2.getTimelinePathProvider().getActiveTimelinePath(
metaClient.getTableConfig(), metaClient.getBasePath()), fileName);
long modificationTime = instant.isCompleted() ? convertCompletionTimeToEpoch(instant) : -1;
StoragePath toPath = new StoragePath(TIMELINE_LAYOUT_V1.getTimelinePathProvider().getTimelinePath(
StoragePath toPath = new StoragePath(TIMELINE_LAYOUT_V1.getTimelinePathProvider().getActiveTimelinePath(
metaClient.getTableConfig(), metaClient.getBasePath()), fileName.replaceAll(UNDERSCORE + "\\d+", ""));
boolean success = true;
if (fileName.contains(UNDERSCORE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public static void syncCompactionRequestedFileToAuxiliaryFolder(HoodieTable tabl
try {
if (!metaClient.getStorage().exists(new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName))) {
FileIOUtils.copy(metaClient.getStorage(),
new StoragePath(metaClient.getTimelinePath(), fileName),
new StoragePath(metaClient.getActiveTimelinePath(), fileName),
new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName));
}
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private void prepareTimeline(String tablePath, HoodieTableMetaClient metaClient)
// reconcile the active timeline
instants.subList(0, 3 * 6).forEach(
instant -> TimelineUtils.deleteInstantFile(metaClient.getStorage(),
metaClient.getTimelinePath(), instant, INSTANT_FILE_NAME_GENERATOR));
metaClient.getActiveTimelinePath(), instant, INSTANT_FILE_NAME_GENERATOR));
ValidationUtils.checkState(
metaClient.reloadActiveTimeline().filterCompletedInstants().countInstants() == 4,
"should archive 6 instants with 4 as active");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void prepareLegacyArchivedTimeline(HoodieTableMetaClient metaClient) thr
private HoodieLogFormat.Writer openWriter(HoodieTableMetaClient metaClient) {
try {
return HoodieLogFormat.newWriterBuilder()
.onParentPath(new StoragePath(metaClient.getArchivePath()))
.onParentPath(metaClient.getArchivePath())
.withFileId("commits").withFileExtension(HoodieArchivedLogFile.ARCHIVE_EXTENSION)
.withStorage(metaClient.getStorage()).withInstantTime("").build();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public HoodieWriteMetadata<List<WriteStatus>> execute() {
HoodieInstant dropPartitionsInstant =
instantGenerator.createNewInstant(REQUESTED, REPLACE_COMMIT_ACTION, instantTime);
if (!table.getStorage().exists(new StoragePath(
table.getMetaClient().getTimelinePath(), instantFileNameGenerator.getFileName(dropPartitionsInstant)))) {
table.getMetaClient().getActiveTimelinePath(), instantFileNameGenerator.getFileName(dropPartitionsInstant)))) {
HoodieRequestedReplaceMetadata requestedReplaceMetadata =
HoodieRequestedReplaceMetadata.newBuilder()
.setOperationType(WriteOperationType.DELETE_PARTITION.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inpu
HoodieInstant inflightInstant = instantGenerator.createNewInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);
try {
if (!metaClient.getStorage().exists(
new StoragePath(metaClient.getTimelinePath(), instantFileNameGenerator.getFileName(inflightInstant)))) {
new StoragePath(metaClient.getActiveTimelinePath(), instantFileNameGenerator.getFileName(inflightInstant)))) {
throw new HoodieCommitException("Failed to commit " + instantTime + " unable to save inflight metadata ", e);
}
} catch (IOException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
HoodieInstant dropPartitionsInstant =
instantGenerator.createNewInstant(REQUESTED, REPLACE_COMMIT_ACTION, instantTime);
if (!table.getStorage().exists(
new StoragePath(table.getMetaClient().getTimelinePath(),
new StoragePath(table.getMetaClient().getActiveTimelinePath(),
instantFileNameGenerator.getFileName(dropPartitionsInstant)))) {
HoodieRequestedReplaceMetadata requestedReplaceMetadata =
HoodieRequestedReplaceMetadata.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ public void testTagLocationAndDuplicateUpdate(IndexType indexType, boolean popul
// recomputed. This includes the state transitions. We need to delete the inflight instance so that subsequent
// upsert will not run into conflicts.
metaClient.getStorage().deleteDirectory(
new StoragePath(metaClient.getTimelinePath(), newCommitTime + ".inflight"));
new StoragePath(metaClient.getActiveTimelinePath(), newCommitTime + ".inflight"));

writeClient.upsert(writeRecords, newCommitTime);
assertNoWriteErrors(writeStatues.collect());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public void testTagLocationAndDuplicateUpdate() throws Exception {
// recomputed. This includes the state transitions. We need to delete the inflight instance so that subsequent
// upsert will not run into conflicts.
metaClient.getStorage().deleteDirectory(
new StoragePath(metaClient.getTimelinePath(), "001.inflight"));
new StoragePath(metaClient.getActiveTimelinePath(), "001.inflight"));

writeClient.upsert(writeRecords, newCommitTime);
assertNoWriteErrors(writeStatues.collect());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testRollbackForInflightCompaction() throws Exception {
// reads compaction plan from aux path which is untouched). TO test for regression, we simply get file status
// and look at the file size
StoragePathInfo pathInfo = metaClient.getStorage()
.getPathInfo(new StoragePath(metaClient.getTimelinePath(),
.getPathInfo(new StoragePath(metaClient.getActiveTimelinePath(),
INSTANT_FILE_NAME_GENERATOR.getFileName(pendingCompactionInstant)));
assertTrue(pathInfo.getLength() > 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void testRollbackBackup() throws Exception {
HoodieTable table =
this.getHoodieTable(metaClient, getConfigBuilder().withRollbackBackupEnabled(true).build());
HoodieInstant needRollBackInstant = HoodieTestUtils.getCompleteInstant(
metaClient.getStorage(), metaClient.getTimelinePath(),
metaClient.getStorage(), metaClient.getActiveTimelinePath(),
"002", HoodieTimeline.COMMIT_ACTION);

// Create the rollback plan and perform the rollback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ void testInsertsGeneratedIntoLogFilesRollback(boolean rollbackUsingMarkers) thro
for (HoodieInstant.State state : Arrays.asList(HoodieInstant.State.REQUESTED, HoodieInstant.State.INFLIGHT)) {
HoodieInstant toCopy = INSTANT_GENERATOR.createNewInstant(state, HoodieTimeline.DELTA_COMMIT_ACTION, lastCommitTime);
File file = Files.createTempFile(tempFolder, null, null).toFile();
fs().copyToLocalFile(new Path(metaClient.getTimelinePath().toString(), INSTANT_FILE_NAME_GENERATOR.getFileName(toCopy)),
fs().copyToLocalFile(new Path(metaClient.getActiveTimelinePath().toString(), INSTANT_FILE_NAME_GENERATOR.getFileName(toCopy)),
new Path(file.getAbsolutePath()));
fileNameMap.put(file.getAbsolutePath(), INSTANT_FILE_NAME_GENERATOR.getFileName(toCopy));
}
Expand All @@ -820,7 +820,7 @@ void testInsertsGeneratedIntoLogFilesRollback(boolean rollbackUsingMarkers) thro
for (Map.Entry<String, String> entry : fileNameMap.entrySet()) {
try {
fs().copyFromLocalFile(new Path(entry.getKey()),
new Path(metaClient.getTimelinePath().toString(), entry.getValue()));
new Path(metaClient.getActiveTimelinePath().toString(), entry.getValue()));
} catch (IOException e) {
throw new HoodieIOException("Error copying state from local disk.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ public class HoodieTableMetaClient implements Serializable {
private HoodieTableType tableType;
private TimelineLayoutVersion timelineLayoutVersion;
private TimelineLayout timelineLayout;
private StoragePath timelinePath;
private StoragePath activeTimelinePath;
private StoragePath archivedTimelinePath;
protected HoodieTableConfig tableConfig;
protected HoodieActiveTimeline activeTimeline;
private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build();
Expand Down Expand Up @@ -182,7 +183,8 @@ protected HoodieTableMetaClient(HoodieStorage storage, String basePath, boolean
}
this.timelineLayoutVersion = layoutVersion.orElseGet(() -> tableConfig.getTimelineLayoutVersion().get());
this.timelineLayout = TimelineLayout.fromVersion(timelineLayoutVersion);
this.timelinePath = timelineLayout.getTimelinePathProvider().getTimelinePath(tableConfig, this.basePath);
this.activeTimelinePath = timelineLayout.getTimelinePathProvider().getActiveTimelinePath(tableConfig, this.basePath);
this.archivedTimelinePath = timelineLayout.getTimelinePathProvider().getArchiveTimelinePath(tableConfig, this.basePath);
this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
LOG.info("Finished Loading Table of type " + tableType + "(version=" + timelineLayoutVersion + ") from " + basePath);
if (loadActiveTimelineOnLoad) {
Expand Down Expand Up @@ -309,8 +311,8 @@ public StoragePath getMetaPath() {
return metaPath;
}

public StoragePath getTimelinePath() {
return timelinePath;
public StoragePath getActiveTimelinePath() {
return activeTimelinePath;
}

/**
Expand Down Expand Up @@ -375,9 +377,8 @@ public String getBootstrapIndexByFileIdFolderNameFolderPath() {
/**
* @return path where archived timeline is stored
*/
public String getArchivePath() {
String archiveFolder = tableConfig.getArchivelogFolder();
return getMetaPath() + StoragePath.SEPARATOR + archiveFolder;
public StoragePath getArchivePath() {
return archivedTimelinePath;
}

/**
Expand Down Expand Up @@ -477,7 +478,8 @@ public synchronized void reloadTableConfig() {
private void reloadTimelineLayout() {
this.timelineLayoutVersion = tableConfig.getTimelineLayoutVersion().get();
this.timelineLayout = TimelineLayout.fromVersion(timelineLayoutVersion);
this.timelinePath = timelineLayout.getTimelinePathProvider().getTimelinePath(tableConfig, basePath);
this.activeTimelinePath = timelineLayout.getTimelinePathProvider().getActiveTimelinePath(tableConfig, basePath);
this.archivedTimelinePath = timelineLayout.getTimelinePathProvider().getArchiveTimelinePath(tableConfig, basePath);
}

/**
Expand Down
Loading

0 comments on commit 8364058

Please sign in to comment.