Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8550] Make Hudi 1.x write timeline to a dedicated timeline folder under .hoodie #12288

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public String showCommits(
HoodieTableMetaClient metaClient = HoodieCLI.getTableMetaClient();
StoragePath basePath = metaClient.getBasePath();
StoragePath archivePath =
new StoragePath(metaClient.getArchivePath() + "/.commits_.archive*");
new StoragePath(metaClient.getArchivePath(), ".commits_.archive*");
List<StoragePathInfo> pathInfoList =
HoodieStorageUtils.getStorage(basePath, HoodieCLI.conf).globEntries(archivePath);
List<Comparable[]> allCommits = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public String exportInstants(
throws Exception {

final StoragePath basePath = HoodieCLI.getTableMetaClient().getBasePath();
final StoragePath archivePath = new StoragePath(HoodieCLI.getTableMetaClient().getArchivePath());
final StoragePath archivePath = HoodieCLI.getTableMetaClient().getArchivePath();
final Set<String> actionSet = new HashSet<String>(Arrays.asList(filter.split(",")));
int numExports = limit == -1 ? Integer.MAX_VALUE : limit;
int numCopied = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,12 @@ public void removeCorruptedPendingCleanAction() {
CleanerUtils.getCleanerPlan(client, instant);
} catch (AvroRuntimeException e) {
LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
TimelineUtils.deleteInstantFile(client.getStorage(), client.getMetaPath(),
TimelineUtils.deleteInstantFile(client.getStorage(), client.getActiveTimelinePath(),
instant, client.getInstantFileNameGenerator());
} catch (IOException ioe) {
if (ioe.getMessage().contains("Not an Avro data file")) {
LOG.warn("Corruption found. Trying to remove corrupted clean instant file: " + instant);
TimelineUtils.deleteInstantFile(client.getStorage(), client.getMetaPath(),
TimelineUtils.deleteInstantFile(client.getStorage(), client.getActiveTimelinePath(),
instant, client.getInstantFileNameGenerator());
} else {
throw new HoodieIOException(ioe.getMessage(), ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ public String showActive(
HoodieTableMetaClient mtMetaClient = getMetadataTableMetaClient(metaClient);
return printTimelineInfoWithMetadataTable(
metaClient.getActiveTimeline(), mtMetaClient.getActiveTimeline(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getMetaPath()),
getInstantInfoFromTimeline(mtMetaClient, mtMetaClient.getStorage(), mtMetaClient.getMetaPath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
getInstantInfoFromTimeline(mtMetaClient, mtMetaClient.getStorage(), mtMetaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo);
}
return printTimelineInfo(
metaClient.getActiveTimeline(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getMetaPath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo);
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -115,7 +115,7 @@ public String showIncomplete(
try {
return printTimelineInfo(
metaClient.getActiveTimeline().filterInflightsAndRequested(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getMetaPath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, showRollbackInfo);
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -137,7 +137,7 @@ public String metadataShowActive(
try {
return printTimelineInfo(
metaClient.getActiveTimeline(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getMetaPath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, false);
} catch (IOException e) {
e.printStackTrace();
Expand All @@ -159,7 +159,7 @@ public String metadataShowIncomplete(
try {
return printTimelineInfo(
metaClient.getActiveTimeline().filterInflightsAndRequested(),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getMetaPath()),
getInstantInfoFromTimeline(metaClient, metaClient.getStorage(), metaClient.getActiveTimelinePath()),
limit, sortByField, descending, headerOnly, true, showTimeSeconds, false);
} catch (IOException e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void cleanUp() throws IOException {
@Test
public void testAddPartitionMetaWithDryRun() throws IOException {
// create commit instant
Files.createFile(Paths.get(tablePath, ".hoodie", "100.commit"));
Files.createFile(Paths.get(tablePath, ".hoodie/timeline/", "100.commit"));

// create partition path
String partition1 = Paths.get(tablePath, HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testCreateWithSpecifiedValues() {
assertTrue(ShellEvaluationResultUtil.isSuccess(result));
assertEquals("Metadata for table " + tableName + " loaded", result.toString());
HoodieTableMetaClient client = HoodieCLI.getTableMetaClient();
assertEquals(metaPath + StoragePath.SEPARATOR + "archive", client.getArchivePath());
assertEquals(new StoragePath(metaPath, "archive"), client.getArchivePath());
assertEquals(tablePath, client.getBasePath().toString());
assertEquals(metaPath, client.getMetaPath().toString());
assertEquals(HoodieTableVersion.SIX, client.getTableConfig().getTableVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public List<RenameOpResult> unscheduleCompactionPlan(String compactionInstant, b
if (!dryRun) {
// Overwrite compaction request with empty compaction operations
HoodieInstant inflight = metaClient.createNewInstant(State.INFLIGHT, COMPACTION_ACTION, compactionInstant);
StoragePath inflightPath = new StoragePath(metaClient.getMetaPath(), metaClient.getInstantFileNameGenerator().getFileName(inflight));
StoragePath inflightPath = new StoragePath(metaClient.getActiveTimelinePath(), metaClient.getInstantFileNameGenerator().getFileName(inflight));
if (metaClient.getStorage().exists(inflightPath)) {
// We need to rollback data-files because of this inflight compaction before unscheduling
throw new IllegalStateException("Please rollback the inflight compaction before unscheduling");
Expand All @@ -122,7 +122,7 @@ public List<RenameOpResult> unscheduleCompactionPlan(String compactionInstant, b
// TODO: Add a rollback instant but for compaction
HoodieInstant instant = metaClient.createNewInstant(State.REQUESTED, COMPACTION_ACTION, compactionInstant);
boolean deleted = metaClient.getStorage().deleteFile(
new StoragePath(metaClient.getMetaPath(), instantFileNameGenerator.getFileName(instant)));
new StoragePath(metaClient.getActiveTimelinePath(), instantFileNameGenerator.getFileName(instant)));
ValidationUtils.checkArgument(deleted, "Unable to delete compaction instant.");
}
return new ArrayList<>();
Expand Down Expand Up @@ -159,7 +159,7 @@ public List<RenameOpResult> unscheduleCompactionFileId(HoodieFileGroupId fgId, b
HoodieCompactionPlan.newBuilder().setOperations(newOps).setExtraMetadata(plan.getExtraMetadata()).build();
HoodieInstant inflight =
metaClient.createNewInstant(State.INFLIGHT, COMPACTION_ACTION, compactionOperationWithInstant.getLeft());
StoragePath inflightPath = new StoragePath(metaClient.getMetaPath(), instantFileNameGenerator.getFileName(inflight));
StoragePath inflightPath = new StoragePath(metaClient.getActiveTimelinePath(), instantFileNameGenerator.getFileName(inflight));
if (metaClient.getStorage().exists(inflightPath)) {
// revert if in inflight state
metaClient.getActiveTimeline().revertInstantFromInflightToRequested(inflight);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.versioning.v1.ActiveTimelineV1;
import org.apache.hudi.common.table.timeline.versioning.v1.CommitMetadataSerDeV1;
import org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2;
import org.apache.hudi.common.table.timeline.versioning.v2.CommitMetadataSerDeV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
Expand All @@ -47,16 +48,21 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static org.apache.hudi.common.table.timeline.HoodieInstant.UNDERSCORE;
import static org.apache.hudi.common.table.timeline.TimelineLayout.TIMELINE_LAYOUT_V1;
import static org.apache.hudi.common.table.timeline.TimelineLayout.TIMELINE_LAYOUT_V2;
import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS;
import static org.apache.hudi.metadata.MetadataPartitionType.FILES;
Expand All @@ -71,6 +77,28 @@ public class EightToSevenDowngradeHandler implements DowngradeHandler {
private static final Logger LOG = LoggerFactory.getLogger(EightToSevenDowngradeHandler.class);
private static final Set<String> SUPPORTED_METADATA_PARTITION_PATHS = getSupportedMetadataPartitionPaths();

/**
* Extract Epoch time from completion time string
* @param instant : HoodieInstant
* @return
*/
public static long convertCompletionTimeToEpoch(HoodieInstant instant) {
try {
String completionTime = instant.getCompletionTime();
// In Java 8, no direct API to convert to epoch in millis.
// Strip off millis
String completionTimeInSecs = completionTime.substring(0, completionTime.length() - 3);
DateTimeFormatter inputFormatter = DateTimeFormatter.ofPattern("yyyyMMddHHmmss");
ZoneId zoneId = ZoneId.systemDefault();
LocalDateTime ldtInSecs = LocalDateTime.parse(completionTimeInSecs, inputFormatter);
long millis = Long.parseLong(completionTime.substring(completionTime.length() - 3));
return ldtInSecs.atZone(zoneId).toEpochSecond() * 1000 + millis;
} catch (Exception e) {
LOG.warn("Failed to parse completion time string for instant " + instant, e);
return -1;
}
}

@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
Expand All @@ -79,33 +107,48 @@ public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEng
UpgradeDowngradeUtils.syncCompactionRequestedFileToAuxiliaryFolder(table);

HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(context.getStorageConf().newInstance()).setBasePath(config.getBasePath()).build();
List<HoodieInstant> instants = metaClient.getActiveTimeline().getInstants();
List<HoodieInstant> instants = new ArrayList<>();
try {
// We need to move all the instants - not just completed ones.
instants = metaClient.scanHoodieInstantsFromFileSystem(metaClient.getActiveTimelinePath(),
ActiveTimelineV2.VALID_EXTENSIONS_IN_ACTIVE_TIMELINE, false);
} catch (IOException ioe) {
LOG.error("Failed to get instants from filesystem", ioe);
throw new HoodieIOException("Failed to get instants from filesystem", ioe);
}

if (!instants.isEmpty()) {
InstantFileNameGenerator instantFileNameGenerator = metaClient.getInstantFileNameGenerator();
CommitMetadataSerDeV2 commitMetadataSerDeV2 = new CommitMetadataSerDeV2();
CommitMetadataSerDeV1 commitMetadataSerDeV1 = new CommitMetadataSerDeV1();
ActiveTimelineV1 activeTimelineV1 = new ActiveTimelineV1(metaClient);
String tmpFilePrefix = "temp_commit_file_for_eight_to_seven_downgrade_";
context.map(instants, instant -> {
String fileName = instantFileNameGenerator.getFileName(instant);
// Rename the metadata file name from the ${instant_time}_${completion_time}.action[.state] format in version 1.x to the ${instant_time}.action[.state] format in version 0.x.
StoragePath fromPath = new StoragePath(TIMELINE_LAYOUT_V2.getTimelinePathProvider().getActiveTimelinePath(
metaClient.getTableConfig(), metaClient.getBasePath()), fileName);
long modificationTime = instant.isCompleted() ? convertCompletionTimeToEpoch(instant) : -1;
StoragePath toPath = new StoragePath(TIMELINE_LAYOUT_V1.getTimelinePathProvider().getActiveTimelinePath(
metaClient.getTableConfig(), metaClient.getBasePath()), fileName.replaceAll(UNDERSCORE + "\\d+", ""));
boolean success = true;
if (fileName.contains(UNDERSCORE)) {
try {
// Rename the metadata file name from the ${instant_time}_${completion_time}.action[.state] format in version 1.x to the ${instant_time}.action[.state] format in version 0.x.
StoragePath fromPath = new StoragePath(metaClient.getMetaPath(), fileName);
StoragePath toPath = new StoragePath(metaClient.getMetaPath(), fileName.replaceAll(UNDERSCORE + "\\d+", ""));
boolean success = true;
if (instant.getAction().equals(HoodieTimeline.COMMIT_ACTION) || instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION)) {
HoodieCommitMetadata commitMetadata =
commitMetadataSerDeV2.deserialize(instant, metaClient.getActiveTimeline().getInstantDetails(instant).get(), HoodieCommitMetadata.class);
Option<byte[]> data = commitMetadataSerDeV1.serialize(commitMetadata);
// Create a temporary file to store the json metadata.
String tmpFileName = tmpFilePrefix + UUID.randomUUID() + ".json";
StoragePath tmpPath = new StoragePath(metaClient.getTempFolderPath(), tmpFileName);
String tmpPathStr = tmpPath.toUri().toString();
activeTimelineV1.createFileInMetaPath(tmpPathStr, data, true);
// Note. this is a 2 step. First we create the V1 commit file and then delete file. If it fails in the middle, rerunning downgrade will be idempotent.
metaClient.getStorage().deleteFile(toPath); // First delete if it was created by previous failed downgrade.
success = metaClient.getStorage().rename(tmpPath, toPath);
String toPathStr = toPath.toUri().toString();
activeTimelineV1.createFileInMetaPath(toPathStr, data, true);
/**
* When we downgrade the table from 1.0 to 0.x, it is important to set the modification
* timestamp of the 0.x completed instant to match the completion time of the
* corresponding 1.x instant. Otherwise, log files in previous file slices could
* be wrongly attributed to latest file slice for 1.0 readers.
* (see HoodieFileGroup.getBaseInstantTime)
*/
if (modificationTime > 0) {
metaClient.getStorage().setModificationTime(toPath, modificationTime);
}
metaClient.getStorage().deleteFile(fromPath);
} else {
success = metaClient.getStorage().rename(fromPath, toPath);
Expand All @@ -119,6 +162,8 @@ public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, HoodieEng
LOG.error("Can not to complete the downgrade from version eight to version seven. The reason for failure is {}", e.getMessage());
throw new HoodieException(e);
}
} else {
success = metaClient.getStorage().rename(fromPath, toPath);
}
return false;
}, instants.size());
Expand Down
Loading
Loading