Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for appending to existing hsds "files" using hsload #86

Merged
merged 7 commits into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions h5pyd/_apps/hsload.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def usage():
print(" -e | --endpoint <domain> :: The HDF Server endpoint, e.g. http://hsdshdflab.hdfgroup.org")
print(" -u | --user <username> :: User name credential")
print(" -p | --password <password> :: Password credential")
print(" -a | --append <mode> :: Flag to append to an existing HDF Server domain")
print(" -c | --conf <file.cnf> :: A credential and config file")
print(" -z[n] :: apply compression filter to any non-compressed datasets, n: [0-9]")
print(" --cnf-eg :: Print a config file and then exit")
Expand Down Expand Up @@ -151,6 +152,7 @@ def main():
logfname=None
ipvfam=None
s3 = None # s3fs instance
mode = 'w'

src_files = []
argn = 1
Expand All @@ -163,8 +165,10 @@ def main():
sys.stderr.write("options must precede source files")
usage()
sys.exit(-1)

if len(sys.argv) > argn + 1:
val = sys.argv[argn+1]
val = sys.argv[argn + 1]

if arg in ("-v", "--verbose"):
verbose = True
argn += 1
Expand Down Expand Up @@ -216,6 +220,9 @@ def main():
elif arg in ("-p", "--password"):
cfg["hs_password"] = val
argn += 2
elif arg in ("-a", "--append"):
mode = 'a'
argn += 1
elif arg == '--cnf-eg':
print_config_example()
sys.exit(0)
Expand Down Expand Up @@ -273,21 +280,21 @@ def main():
if h5py.version.hdf5_version_tuple[0] != 1 or h5py.version.hdf5_version_tuple[1] != 10 or h5py.version.hdf5_version_tuple[2] < 6:
sys.stderr.write("link option requires hdf5 lib version 1.10.6 or higher")
sys.exit(1)


try:

for src_file in src_files:
# check if this is a non local file, if it is remote (http, etc...) stage it first then insert it into hsds
src_file_chk = urlparse(src_file)
src_file_chk = urlparse(src_file)
logging.debug(src_file_chk)

if src_file_chk.scheme == 'http' or src_file_chk.scheme == 'https' or src_file_chk.scheme == 'ftp':
src_file = stage_file(src_file, netfam=ipvfam)
if src_file == None:
if src_file is None:
continue
istmp = True
logging.info('temp source data: '+str(src_file))
logging.info('temp source data: ' + str(src_file))
else:
istmp = False

Expand All @@ -302,7 +309,7 @@ def main():
if not S3FS_IMPORT:
sys.stderr.write("Install S3FS package to load s3 files")
sys.exit(1)

if not s3:
s3 = s3fs.S3FileSystem()
try:
Expand All @@ -313,7 +320,7 @@ def main():
else:
if dataload == "link":
if op.isabs(src_file):
sys.stderr.write("source file must s3path (for HSDS using S3 storage) or relative path from server root directory (for HSDS using posix storage)")
sys.stderr.write("source file must s3path (for HSDS using S3 storage) or relative path from server root directory (for HSDS using posix storage)")
sys.exit(1)
s3path = src_file
else:
Expand All @@ -331,7 +338,7 @@ def main():
endpoint = cfg["hs_endpoint"]
bucket = cfg["hs_bucket"]

fout = h5pyd.File(tgt, 'x', endpoint=endpoint, username=username, password=password, bucket=bucket)
fout = h5pyd.File(tgt, mode, endpoint=endpoint, username=username, password=password, bucket=bucket)
except IOError as ioe:
if ioe.errno == 404:
logging.error("Domain: {} not found".format(tgt))
Expand Down
114 changes: 51 additions & 63 deletions h5pyd/_apps/utillib.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ def copy_array(src_arr, ctx):

#----------------------------------------------------------------------------------
def copy_attribute(desobj, name, srcobj, ctx):

msg = "creating attribute {} in {}".format(name, srcobj.name)
logging.debug(msg)
if ctx["verbose"]:
Expand Down Expand Up @@ -436,9 +437,8 @@ def create_dataset(dobj, ctx):
except (IOError, TypeError, KeyError) as e:
msg = "ERROR: failed to create dataset: {}".format(str(e))
logging.error(msg)
print(msg)
return

return dset
# create_dataset

#----------------------------------------------------------------------------------
Expand Down Expand Up @@ -537,8 +537,8 @@ def create_links(gsrc, gdes, ctx):
gdes[title] = des_obj
else:
# TBD - in hdf5 1.10 it seems that two references to the same object
# can return different id's. This will cause HDF5 files with
# multilinks to not load correctly
# can return different id's. This will cause HDF5 files with
# multilinks to not load correctly
msg = "could not find map item to src id: {}".format(src_obj_id_hash)
logging.warn(msg)
if ctx["verbose"]:
Expand Down Expand Up @@ -582,6 +582,8 @@ def create_group(gobj, ctx):
logging.debug("adding group id {} to {} in srcid_desobj_map".format(gobj.id.id, grp))
srcid_desobj_map[gobj.id.__hash__()] = grp

return grp

# create_group

#----------------------------------------------------------------------------------
Expand Down Expand Up @@ -627,84 +629,70 @@ def load_file(fin, fout, verbose=False, dataload="ingest", s3path=None, deflate=
ctx["s3path"] = s3path
ctx["srcid_desobj_map"] = {}


# create any root attributes
for ga in fin.attrs:
copy_attribute(fout, ga, fin, ctx)

def object_create_helper(name, obj):
class_name = obj.__class__.__name__
# create root soft/external links
create_links(fin, fout, ctx)

if class_name in ("Dataset", "Table"):
create_dataset(obj, ctx)
elif class_name == "Group":
create_group(obj, ctx)
elif class_name == "Datatype":
create_datatype(obj, ctx)
def object_helper(name, obj):
fout = ctx['fout']
if name in fout:
logger.warning('{} already exists and will be skipped'
.format(name))
else:
logging.error("no handler for object class: {}".format(type(obj)))

def object_link_helper(name, obj):
class_name = obj.__class__.__name__
logging.debug("object_link_helper for object: {}".format(obj.name))
if class_name == "Group":
# create any soft/external links
fout = ctx["fout"]
grp = fout[name]
create_links(obj, grp, ctx)

def object_copy_helper(name, obj):
class_name = obj.__class__.__name__
logging.debug("object_copy_helper for object: {}".format(obj.name))

if class_name in ("Dataset", "Table"):
if obj.dtype.metadata and 'vlen' in obj.dtype.metadata:
is_vlen = True
else:
is_vlen = False
if ctx["dataload"] == "link" and not is_vlen:
logging.info("skip datacopy for link reference")
class_name = obj.__class__.__name__
if class_name in ("Dataset", "Table"):
dset = create_dataset(obj, ctx)

if dset is not None:
for da in obj.attrs:
copy_attribute(dset, da, obj, ctx)

if dataload == "ingest":
logging.debug("object_copy_helper for object: {}".format(obj.name))
if ctx["dataload"] == "link":
logging.info("skip datacopy for link reference")
else:
logging.debug("calling write_dataset for dataset: {}".format(obj.name))
tgt = fout[obj.name]
write_dataset(obj, tgt, ctx)

elif class_name == "Group":
grp = create_group(obj, ctx)

if grp is not None:
for ga in obj.attrs:
copy_attribute(grp, ga, obj, ctx)

# create any soft/external links
logging.debug("object_link_helper for object: {}".format(obj.name))
fout = ctx["fout"]
grp = fout[name]
create_links(obj, grp, ctx)
elif class_name == "Datatype":
create_datatype(obj, ctx)
else:
logging.debug("calling write_dataset for dataset: {}".format(obj.name))
tgt = fout[obj.name]
write_dataset(obj, tgt, ctx)
elif class_name == "Group":
logging.debug("skip copy for group: {}".format(obj.name))
elif class_name == "Datatype":
logging.debug("skip copy for datatype: {}".format(obj.name))
else:
logging.error("no handler for object class: {}".format(type(obj)))

def object_attribute_helper(name, obj):
tgt = fout[obj.name]
for ga in obj.attrs:
copy_attribute(tgt, ga, obj, ctx)
logger.error("no handler for object class: {}"
.format(type(obj)))

# build a rough map of the file using the internal function above
logging.info("creating target objects")
fin.visititems(object_create_helper)

# copy over any attributes
logging.info("creating target attributes")
fin.visititems(object_attribute_helper)

# create soft/external links (and hardlinks not already created)
create_links(fin, fout, ctx) # create root soft/external links
fin.visititems(object_link_helper)

if dataload:
logging.info("creating target objects and attributes")
if dataload == "ingest":
# copy dataset data
logging.info("copying dataset data")
fin.visititems(object_copy_helper)
else:
logging.info("skipping dataset data copy (dataload is None)")

fin.visititems(object_helper)

# Fully flush the h5py handle.
fout.close()

# close up the source domain, see reason(s) for this below
fin.close()
msg="load_file complete"
msg = "load_file complete"
logging.info(msg)
if verbose:
print(msg)
Expand Down