Skip to content

Commit

Permalink
BlobStore support multipath. (pingcap#3938)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaqizho authored Mar 29, 2022
1 parent 5b00cc6 commit e49339b
Show file tree
Hide file tree
Showing 11 changed files with 628 additions and 201 deletions.
52 changes: 50 additions & 2 deletions dbms/src/Storages/Page/V3/BlobFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,12 @@ extern const char exception_before_page_file_write_sync[];
namespace PS::V3
{
BlobFile::BlobFile(String path_,
FileProviderPtr file_provider_)
: file_provider{file_provider_}
BlobFileId blob_id_,
FileProviderPtr file_provider_,
PSDiskDelegatorPtr delegator_)
: blob_id(blob_id_)
, file_provider{std::move(file_provider_)}
, delegator(std::move(delegator_))
, path(path_)
{
// TODO: support encryption file
Expand All @@ -36,6 +40,22 @@ BlobFile::BlobFile(String path_,
getEncryptionPath(),
false,
/*create_new_encryption_info_*/ false);

Poco::File file_in_disk(getPath());
file_size = file_in_disk.getSize();
{
std::lock_guard<std::mutex> lock(file_size_lock);

// If file_size is 0, we still need insert it.
PageFileIdAndLevel id_lvl{blob_id, 0};
if (!delegator->fileExist(id_lvl))
{
delegator->addPageFileUsedSize(id_lvl,
file_size,
path,
/*need_insert_location*/ true);
}
}
}

void BlobFile::read(char * buffer, size_t offset, size_t size, const ReadLimiterPtr & read_limiter)
Expand Down Expand Up @@ -77,11 +97,37 @@ void BlobFile::write(char * buffer, size_t offset, size_t size, const WriteLimit
PageUtil::writeFile(wrfile, offset, buffer, size, write_limiter, false);
#endif
PageUtil::syncFile(wrfile);

UInt64 expand_size = 0;
{
std::lock_guard<std::mutex> lock(file_size_lock);
if ((offset + size) > file_size)
{
expand_size = offset + size - file_size;
file_size = offset + size;
}
}

if (expand_size != 0)
{
delegator->addPageFileUsedSize(std::make_pair(blob_id, 0),
expand_size,
path,
false);
}
}

void BlobFile::truncate(size_t size)
{
PageUtil::ftruncateFile(wrfile, size);
Int64 shrink_size = 0;
{
std::lock_guard<std::mutex> lock(file_size_lock);
assert(size <= file_size);
shrink_size = file_size - size;
file_size = size;
}
delegator->freePageFileUsedSize(std::make_pair(blob_id, 0), shrink_size, path);
}

void BlobFile::remove()
Expand All @@ -95,6 +141,8 @@ void BlobFile::remove()
{
file_provider->deleteRegularFile(getPath(), getEncryptionPath());
}

delegator->removePageFile(std::make_pair(blob_id, 0), file_size, false, false);
}

BlobFile::~BlobFile()
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Storages/Page/V3/BlobFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
#include <Storages/Page/Page.h>
#include <Storages/Page/PageDefines.h>
#include <Storages/Page/WriteBatch.h>
#include <Storages/PathPool.h>

namespace DB::PS::V3
{
class BlobFile
{
public:
BlobFile(String path_,
FileProviderPtr file_provider_);
BlobFileId blob_id_,
FileProviderPtr file_provider_,
PSDiskDelegatorPtr delegator_);

~BlobFile();

Expand All @@ -54,10 +57,16 @@ class BlobFile
void remove();

private:
const BlobFileId blob_id;

FileProviderPtr file_provider;
PSDiskDelegatorPtr delegator;
String path;

WriteReadableFilePtr wrfile;

std::mutex file_size_lock;
BlobFileOffset file_size;
};
using BlobFilePtr = std::shared_ptr<BlobFile>;

Expand Down
Loading

0 comments on commit e49339b

Please sign in to comment.