diff --git a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java index 18d7eccff3521..25b63374946c8 100644 --- a/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java +++ b/tiered-storage/file-system/src/main/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemManagedLedgerOffloader.java @@ -105,7 +105,7 @@ private FileSystemManagedLedgerOffloader(OffloadPoliciesImpl conf, OrderedSchedu this.configuration.setClassLoader(FileSystemLedgerOffloaderFactory.class.getClassLoader()); this.driverName = conf.getManagedLedgerOffloadDriver(); - this.storageBasePath = configuration.get("fs.defaultFS"); + this.storageBasePath = configuration.get("hadoop.tmp.dir"); this.scheduler = scheduler; this.fileSystem = FileSystem.get(configuration); this.assignmentScheduler = OrderedScheduler.newSchedulerBuilder() diff --git a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java b/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java deleted file mode 100644 index 14734b3faca99..0000000000000 --- a/tiered-storage/file-system/src/test/java/org/apache/bookkeeper/mledger/offload/filesystem/impl/FileSystemOffloaderLocalFileTest.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.bookkeeper.mledger.offload.filesystem.impl; - -import static org.testng.Assert.assertEquals; - -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.UUID; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.bookkeeper.client.PulsarMockBookKeeper; -import org.apache.bookkeeper.client.api.DigestType; -import org.apache.bookkeeper.client.api.LedgerEntries; -import org.apache.bookkeeper.client.api.LedgerEntry; -import org.apache.bookkeeper.client.api.ReadHandle; -import org.apache.bookkeeper.common.util.OrderedScheduler; -import org.apache.bookkeeper.mledger.LedgerOffloaderStats; -import org.apache.pulsar.common.naming.TopicName; -import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; -import org.testng.annotations.Test; - -public class FileSystemOffloaderLocalFileTest { - private OrderedScheduler scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("offloader").build(); - private LedgerOffloaderStats offloaderStats = LedgerOffloaderStats.create(true, true, scheduler, 60); - - - private String getResourceFilePath(String name) { - return getClass().getClassLoader().getResource(name).getPath(); - } - - @Test - public void testReadWriteWithLocalFileUsingFileSystemURI() throws Exception { - // prepare the offload policies - final String basePath = "/tmp"; - OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl(); - offloadPolicies.setFileSystemURI("file://" + basePath); - offloadPolicies.setManagedLedgerOffloadDriver("filesystem"); - offloadPolicies.setFileSystemProfilePath(getResourceFilePath("filesystem_offload_core_site.xml")); - - // initialize the offloader with the offload policies - var offloader = FileSystemManagedLedgerOffloader.create(offloadPolicies, scheduler, offloaderStats); - - int numberOfEntries = 100; - - // prepare the data in bookkeeper - BookKeeper bk = new PulsarMockBookKeeper(scheduler); - LedgerHandle lh = bk.createLedger(1,1,1, BookKeeper.DigestType.CRC32, "".getBytes()); - for (int i = 0; i < numberOfEntries; i++) { - byte[] entry = ("foobar"+i).getBytes(); - lh.addEntry(entry); - } - lh.close(); - - ReadHandle read = bk.newOpenLedgerOp() - .withLedgerId(lh.getId()) - .withDigestType(DigestType.CRC32) - .withPassword("".getBytes()).execute().get(); - - final String mlName = TopicName.get("testWriteLocalFIle").getPersistenceNamingEncoding(); - Map offloadDriverMetadata = new HashMap<>(); - offloadDriverMetadata.put("ManagedLedgerName", mlName); - - UUID uuid = UUID.randomUUID(); - offloader.offload(read, uuid, offloadDriverMetadata).get(); - ReadHandle toTest = offloader.readOffloaded(read.getId(), uuid, offloadDriverMetadata).get(); - assertEquals(toTest.getLastAddConfirmed(), read.getLastAddConfirmed()); - LedgerEntries toTestEntries = toTest.read(0, numberOfEntries - 1); - LedgerEntries toWriteEntries = read.read(0,numberOfEntries - 1); - Iterator toTestIter = toTestEntries.iterator(); - Iterator toWriteIter = toWriteEntries.iterator(); - while(toTestIter.hasNext()) { - LedgerEntry toWriteEntry = toWriteIter.next(); - LedgerEntry toTestEntry = toTestIter.next(); - - assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); - assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); - assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); - assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); - } - toTestEntries = toTest.read(1, numberOfEntries - 1); - toWriteEntries = read.read(1,numberOfEntries - 1); - toTestIter = toTestEntries.iterator(); - toWriteIter = toWriteEntries.iterator(); - while(toTestIter.hasNext()) { - LedgerEntry toWriteEntry = toWriteIter.next(); - LedgerEntry toTestEntry = toTestIter.next(); - - assertEquals(toWriteEntry.getLedgerId(), toTestEntry.getLedgerId()); - assertEquals(toWriteEntry.getEntryId(), toTestEntry.getEntryId()); - assertEquals(toWriteEntry.getLength(), toTestEntry.getLength()); - assertEquals(toWriteEntry.getEntryBuffer(), toTestEntry.getEntryBuffer()); - } - - // check the file located in the local file system - Path offloadedFilePath = Paths.get(basePath, mlName); - assertEquals(Files.exists(offloadedFilePath), true); - } -} diff --git a/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml b/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml deleted file mode 100644 index d26cec2cc60f0..0000000000000 --- a/tiered-storage/file-system/src/test/resources/filesystem_offload_core_site.xml +++ /dev/null @@ -1,48 +0,0 @@ - - - - - fs.defaultFS - - - - hadoop.tmp.dir - pulsar - - - io.file.buffer.size - 4096 - - - io.seqfile.compress.blocksize - 1000000 - - - io.seqfile.compression.type - BLOCK - - - io.map.index.interval - 128 - - -