Skip to content

Commit

Permalink
list all files from s3 for prefix
Browse files Browse the repository at this point in the history
  • Loading branch information
diPhantxm authored and reshke committed Oct 31, 2024
1 parent ea69edd commit 2ee34f1
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 163 deletions.
6 changes: 4 additions & 2 deletions include/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ EXTERNC int statRelationSpaceUsage(Relation aorel, int segno, int64 modcount,
size_t *local_commited_bytes,
size_t *external_bytes);

EXTERNC int statRelationSpaceUsagePerExternalChunk(
Relation aorel, int segno, int64 modcount, int64 logicalEof,
EXTERNC int statRelationChunksSpaceUsage(
Relation aorel,
size_t *local_bytes, size_t *local_commited_bytes, yezzeyChunkMeta **list,
size_t *cnt_chunks);

EXTERNC int yezzey_get_block_from_file_path(const char* path);

#endif /* YEZZEY_STORAGE_H */
25 changes: 18 additions & 7 deletions src/storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -468,12 +468,11 @@ int statRelationSpaceUsage(Relation aorel, int segno, int64 modcount,
return 0;
}

int statRelationSpaceUsagePerExternalChunk(Relation aorel, int segno,
int64 modcount, int64 logicalEof,
size_t *local_bytes,
size_t *local_commited_bytes,
yezzeyChunkMeta **list,
size_t *cnt_chunks) {
int statRelationChunksSpaceUsage(Relation aorel,
size_t *local_bytes,
size_t *local_commited_bytes,
yezzeyChunkMeta **list,
size_t *cnt_chunks) {
auto rnode = aorel->rd_node;

/* rnode.spcNode == YEZZEYTABLESPACEOID here. we need
Expand All @@ -482,7 +481,7 @@ int statRelationSpaceUsagePerExternalChunk(Relation aorel, int segno,
auto spcNode = resolveTablespaceOidByName(
YezzeyGetRelationOriginTablespace(NULL, NULL, RelationGetRelid(aorel)));

auto coords = relnodeCoord(spcNode, rnode.dbNode, rnode.relNode, segno);
auto coords = relnodeCoord(spcNode, rnode.dbNode, rnode.relNode, 0);

auto tp = SearchSysCache1(NAMESPACEOID,
ObjectIdGetDatum(aorel->rd_rel->relnamespace));
Expand Down Expand Up @@ -540,3 +539,15 @@ int statRelationSpaceUsagePerExternalChunk(Relation aorel, int segno,
*local_commited_bytes = 0;
return 0;
}

int yezzey_get_block_from_file_path(const char* path) {
std::string pathstr = path;
int i = 0;
int previ = 0;
for (int n = 0; n < 7; ++n) {
previ = i;
i = pathstr.find('_', i+1);
}
auto blkno = pathstr.substr(previ+1, i-previ);
return atoi(blkno.c_str());
}
60 changes: 22 additions & 38 deletions src/yproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,52 +668,36 @@ int YProxyLister::prepareYproxyConnection() {
}

std::vector<storageChunkMeta> YProxyLister::list_relation_chunks() {
auto order = YezzeyVirtualGetOrder(YezzeyFindAuxIndex(adv_->reloid), adv_->reloid,
adv_->coords_.filenode, adv_->coords_.blkno);
std::set<std::string> xpaths;
for (auto& chunk : order) {
int i = 0;
for (int n = 0; n != 8 && i < chunk.x_path.size(); ++n)
i = chunk.x_path.find('_', i+1);
xpaths.insert(chunk.x_path.substr(0, i));
std::vector<storageChunkMeta> res;
auto ret = prepareYproxyConnection();
if (ret != 0) {
// throw?
return res;
}

std::vector<storageChunkMeta> res;
for (auto& xpath : xpaths) {
auto ret = prepareYproxyConnection();
if (ret != 0) {
// throw?
auto msg = ConstructListRequest(yezzey_block_db_file_path(adv_->nspname, adv_->relname, adv_->coords_, segindx_));
size_t rc = ::write(client_fd_, msg.data(), msg.size());
if (rc <= 0) {
// throw?
return res;
}

std::vector<storageChunkMeta> meta;
while (true) {
auto message = readMessage();
switch (message.type) {
case MessageTypeObjectMeta:
meta = readObjectMetaBody(&message.content);
res.insert(res.end(), meta.begin(), meta.end());
break;
case MessageTypeReadyForQuery:
return res;
}

auto msg = ConstructListRequest(xpath);
size_t rc = ::write(client_fd_, msg.data(), msg.size());
if (rc <= 0) {
default:
// throw?
return res;
}

std::vector<storageChunkMeta> meta;
bool more = true;
while (more) {
auto message = readMessage();
switch (message.type) {
case MessageTypeObjectMeta:
meta = readObjectMetaBody(&message.content);
res.insert(res.end(), meta.begin(), meta.end());
break;
case MessageTypeReadyForQuery:
more = false;
break;

default:
// throw?
return res;
}
}
}

return res;
}

std::vector<std::string> YProxyLister::list_chunk_names() {
Expand Down
152 changes: 36 additions & 116 deletions yezzey.c
Original file line number Diff line number Diff line change
Expand Up @@ -775,13 +775,9 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
Oid reloid;
Relation aorel;
int i;
int segno;
int pseudosegno;
int total_segfiles;
int nvp;
int inat;
int64 modcount;
int64 logicalEof;
FileSegInfo **segfile_array;
AOCSFileSegInfo **segfile_array_cs;
Snapshot appendOnlyMetaDataSnapshot;
Expand Down Expand Up @@ -825,13 +821,10 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
*/
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

size_t total_row = 0;
size_t local_bytes = 0;
size_t external_bytes = 0;
size_t local_commited_bytes = 0;

if (RelationIsAoRows(aorel)) {
/* ao rows relation */
#if IsModernYezzey
segfile_array =
GetAllFileSegInfo(aorel, appendOnlyMetaDataSnapshot, &total_segfiles, &segrelid);
Expand All @@ -840,118 +833,45 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
GetAllFileSegInfo(aorel, appendOnlyMetaDataSnapshot, &total_segfiles);
#endif

for (i = 0; i < total_segfiles; ++i) {

segno = segfile_array[i]->segno;
modcount = segfile_array[i]->modcount;
logicalEof = segfile_array[i]->eof;

elog(yezzey_log_level,
"stat segment no %d, modcount %ld with to logial eof %ld", segno,
modcount, logicalEof);
size_t curr_local_bytes = 0;
size_t curr_external_bytes = 0;
size_t curr_local_commited_bytes = 0;
yezzeyChunkMeta *list;
size_t cnt_chunks;

if (statRelationSpaceUsagePerExternalChunk(
aorel, segno, modcount, logicalEof, &curr_local_bytes,
&curr_local_commited_bytes, &list, &cnt_chunks) < 0) {
elog(ERROR, "failed to stat segment %d usage", segno);
}

local_bytes = curr_local_bytes;
external_bytes = curr_external_bytes;
local_commited_bytes = curr_local_commited_bytes;

chunkInfo = realloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) *
(total_row + cnt_chunks));

for (size_t chunk_index = 0; chunk_index < cnt_chunks; ++chunk_index) {
chunkInfo[total_row + chunk_index].reloid = reloid;
chunkInfo[total_row + chunk_index].segindex = GpIdentity.segindex;
chunkInfo[total_row + chunk_index].segfileindex = i;
chunkInfo[total_row + chunk_index].external_storage_filepath =
list[chunk_index].chunkName;
chunkInfo[total_row + chunk_index].local_bytes = local_bytes;
chunkInfo[total_row + chunk_index].local_commited_bytes =
local_commited_bytes;
chunkInfo[total_row + chunk_index].external_bytes =
list[chunk_index].chunkSize;
}
total_row += cnt_chunks;
}

/*
* Build a tuple descriptor for our result type
* The number and type of attributes have to match the definition of the
* view yezzey_offload_relation_status_internal
*/

} else if (RelationIsAoCols(aorel)) {

#if IsGreenplum6
segfile_array_cs = GetAllAOCSFileSegInfo(aorel, appendOnlyMetaDataSnapshot,
&total_segfiles);
#else
segfile_array_cs = GetAllAOCSFileSegInfo(aorel, appendOnlyMetaDataSnapshot,
&total_segfiles, &segrelid);
#endif
size_t curr_local_bytes = 0;
size_t curr_external_bytes = 0;
size_t curr_local_commited_bytes = 0;
yezzeyChunkMeta *list;
size_t cnt_chunks;

for (inat = 0; inat < nvp; ++inat) {
for (i = 0; i < total_segfiles; i++) {
segno = segfile_array_cs[i]->segno;
/* in AOCS case actual *segno* differs from segfile_array_cs[i]->segno
* whis is logical number of segment. On physical level, each logical
* segno (segfile_array_cs[i]->segno) is represented by
* AOTupleId_MultiplierSegmentFileNum in storage (1 file per
* attribute)
*/
pseudosegno = (inat * AOTupleId_MultiplierSegmentFileNum) + segno;
modcount = segfile_array_cs[i]->modcount;
logicalEof = segfile_array_cs[i]->vpinfo.entry[inat].eof;

size_t curr_local_bytes = 0;
size_t curr_external_bytes = 0;
size_t curr_local_commited_bytes = 0;
yezzeyChunkMeta *list;
size_t cnt_chunks;

if (statRelationSpaceUsagePerExternalChunk(
aorel, pseudosegno, modcount, logicalEof, &curr_local_bytes,
&curr_local_commited_bytes, &list, &cnt_chunks) < 0) {
elog(ERROR, "failed to stat segment %d usage", segno);
}
if (statRelationChunksSpaceUsage(
aorel, &curr_local_bytes,
&curr_local_commited_bytes, &list, &cnt_chunks) < 0) {
elog(ERROR, "failed to stat relation chunks space usage");
}

local_bytes = curr_local_bytes;
external_bytes = curr_external_bytes;
local_commited_bytes = curr_local_commited_bytes;

chunkInfo = realloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) *
(total_row + cnt_chunks));

for (size_t chunk_index = 0; chunk_index < cnt_chunks;
++chunk_index) {
chunkInfo[total_row + chunk_index].reloid = reloid;
chunkInfo[total_row + chunk_index].segindex = GpIdentity.segindex;
chunkInfo[total_row + chunk_index].segfileindex = i;
chunkInfo[total_row + chunk_index].external_storage_filepath =
list[chunk_index].chunkName;
chunkInfo[total_row + chunk_index].local_bytes = local_bytes;
chunkInfo[total_row + chunk_index].local_commited_bytes =
local_commited_bytes;
chunkInfo[total_row + chunk_index].external_bytes =
list[chunk_index].chunkSize;
}
total_row += cnt_chunks;
}
}
} else {
local_bytes = curr_local_bytes;
external_bytes = curr_external_bytes;
local_commited_bytes = curr_local_commited_bytes;

elog(ERROR, "yezzey: wrong relation storage type: not AO/AOCS relation");
chunkInfo = realloc(chunkInfo, sizeof(yezzeyChunkMetaInfo) *
(cnt_chunks));

for (size_t chunk_index = 0; chunk_index < cnt_chunks; ++chunk_index) {
chunkInfo[chunk_index].reloid = reloid;
chunkInfo[chunk_index].segindex = GpIdentity.segindex;
chunkInfo[chunk_index].segfileindex = yezzey_get_block_from_file_path(
list[chunk_index].chunkName);
chunkInfo[chunk_index].external_storage_filepath =
list[chunk_index].chunkName;
chunkInfo[chunk_index].local_bytes = local_bytes;
chunkInfo[chunk_index].local_commited_bytes =
local_commited_bytes;
chunkInfo[chunk_index].external_bytes =
list[chunk_index].chunkSize;
}

/*
* Build a tuple descriptor for our result type
* The number and type of attributes have to match the definition of the
* view yezzey_offload_relation_status_internal
*/

#if IsGreenplum6
funcctx->tuple_desc = CreateTemplateTupleDesc(
NUM_USED_OFFLOAD_PER_SEGMENT_STATUS_STRUCT, false);
Expand Down Expand Up @@ -985,8 +905,8 @@ Datum yezzey_relation_describe_external_storage_structure_internal(
attinmeta = TupleDescGetAttInMetadata(funcctx->tuple_desc);
funcctx->attinmeta = attinmeta;

if (total_row > 0) {
funcctx->max_calls = total_row;
if (cnt_chunks > 0) {
funcctx->max_calls = cnt_chunks;
funcctx->user_fctx = chunkInfo;
/* funcctx->user_fctx */
} else {
Expand Down

0 comments on commit 2ee34f1

Please sign in to comment.