-
Notifications
You must be signed in to change notification settings - Fork 660
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[copilot][flytedirectory] multipart blob download (#5715)
* add download multipart blob Signed-off-by: wayner0628 <[email protected]> * recursively process subparts Signed-off-by: wayner0628 <[email protected]> * implement GetItems function Signed-off-by: wayner0628 <[email protected]> * add unit testing Signed-off-by: wayner0628 <[email protected]> * Parallelly handle blob items Signed-off-by: wayner0628 <[email protected]> * fix lint error Signed-off-by: wayner0628 <[email protected]> * implement GetItems function Signed-off-by: wayner0628 <[email protected]> * add mutex avoid racing Signed-off-by: wayner0628 <[email protected]> * avoid infinite call Signed-off-by: wayner0628 <[email protected]> * protect critical variables Signed-off-by: wayner0628 <[email protected]> * avoid infinite call Signed-off-by: wayner0628 <[email protected]> * lint Signed-off-by: wayner0628 <[email protected]> * add more unit tests Signed-off-by: wayner0628 <[email protected]> * add more unit tests Signed-off-by: wayner0628 <[email protected]> * fix mock Signed-off-by: wayner0628 <[email protected]> * Accept incoming changes Signed-off-by: wayner0628 <[email protected]> * multipart blob download based on new api Signed-off-by: wayner0628 <[email protected]> * cache store stop listing at end cursor Signed-off-by: wayner0628 <[email protected]> * lint Signed-off-by: wayner0628 <[email protected]> * remove old api mock Signed-off-by: wayner0628 <[email protected]> * remove old api mock Signed-off-by: wayner0628 <[email protected]> * remove old api mock Signed-off-by: wayner0628 <[email protected]> * update mem_store List to return global path Signed-off-by: wayner0628 <[email protected]> * change mkdir perm Signed-off-by: wayner0628 <[email protected]> * add comments and handle more errors Signed-off-by: wayner0628 <[email protected]> * lint Co-authored-by: Han-Ru Chen (Future-Outlier) <[email protected]> Signed-off-by: Wei-Yu Kao <[email protected]> * address race condition and aggregate errors Signed-off-by: wayner0628 <[email protected]> * fix tests Signed-off-by: Future-Outlier <[email protected]> * err msg enhancement Signed-off-by: Future-Outlier <[email protected]> --------- Signed-off-by: wayner0628 <[email protected]> Signed-off-by: Wei-Yu Kao <[email protected]> Signed-off-by: Future-Outlier <[email protected]> Co-authored-by: Han-Ru Chen (Future-Outlier) <[email protected]>
- Loading branch information
1 parent
fef67b8
commit b5f23a6
Showing
4 changed files
with
339 additions
and
38 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package data | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"os" | ||
"path/filepath" | ||
"testing" | ||
|
||
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" | ||
"github.com/flyteorg/flyte/flytestdlib/promutils" | ||
"github.com/flyteorg/flyte/flytestdlib/storage" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestHandleBlobMultipart(t *testing.T) { | ||
t.Run("Successful Query", func(t *testing.T) { | ||
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) | ||
assert.NoError(t, err) | ||
ref := storage.DataReference("s3://container/folder/file1") | ||
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{})) | ||
ref = storage.DataReference("s3://container/folder/file2") | ||
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{})) | ||
|
||
d := Downloader{store: s} | ||
|
||
blob := &core.Blob{ | ||
Uri: "s3://container/folder", | ||
Metadata: &core.BlobMetadata{ | ||
Type: &core.BlobType{ | ||
Dimensionality: core.BlobType_MULTIPART, | ||
}, | ||
}, | ||
} | ||
|
||
toPath := "./inputs" | ||
defer func() { | ||
err := os.RemoveAll(toPath) | ||
if err != nil { | ||
t.Errorf("Failed to delete directory: %v", err) | ||
} | ||
}() | ||
|
||
result, err := d.handleBlob(context.Background(), blob, toPath) | ||
assert.NoError(t, err) | ||
assert.Equal(t, toPath, result) | ||
|
||
// Check if files were created and data written | ||
for _, file := range []string{"file1", "file2"} { | ||
if _, err := os.Stat(filepath.Join(toPath, "folder", file)); os.IsNotExist(err) { | ||
t.Errorf("expected file %s to exist", file) | ||
} | ||
} | ||
}) | ||
|
||
t.Run("No Items", func(t *testing.T) { | ||
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) | ||
assert.NoError(t, err) | ||
|
||
d := Downloader{store: s} | ||
|
||
blob := &core.Blob{ | ||
Uri: "s3://container/folder", | ||
Metadata: &core.BlobMetadata{ | ||
Type: &core.BlobType{ | ||
Dimensionality: core.BlobType_MULTIPART, | ||
}, | ||
}, | ||
} | ||
|
||
toPath := "./inputs" | ||
defer func() { | ||
err := os.RemoveAll(toPath) | ||
if err != nil { | ||
t.Errorf("Failed to delete directory: %v", err) | ||
} | ||
}() | ||
|
||
result, err := d.handleBlob(context.Background(), blob, toPath) | ||
assert.Error(t, err) | ||
assert.Nil(t, result) | ||
}) | ||
} | ||
|
||
func TestHandleBlobSinglePart(t *testing.T) { | ||
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) | ||
assert.NoError(t, err) | ||
ref := storage.DataReference("s3://container/file") | ||
s.WriteRaw(context.Background(), ref, 0, storage.Options{}, bytes.NewReader([]byte{})) | ||
|
||
d := Downloader{store: s} | ||
|
||
blob := &core.Blob{ | ||
Uri: "s3://container/file", | ||
Metadata: &core.BlobMetadata{ | ||
Type: &core.BlobType{ | ||
Dimensionality: core.BlobType_SINGLE, | ||
}, | ||
}, | ||
} | ||
|
||
toPath := "./input" | ||
defer func() { | ||
err := os.RemoveAll(toPath) | ||
if err != nil { | ||
t.Errorf("Failed to delete file: %v", err) | ||
} | ||
}() | ||
|
||
result, err := d.handleBlob(context.Background(), blob, toPath) | ||
assert.NoError(t, err) | ||
assert.Equal(t, toPath, result) | ||
|
||
// Check if files were created and data written | ||
if _, err := os.Stat(toPath); os.IsNotExist(err) { | ||
t.Errorf("expected file %s to exist", toPath) | ||
} | ||
} | ||
|
||
func TestHandleBlobHTTP(t *testing.T) { | ||
s, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope()) | ||
assert.NoError(t, err) | ||
d := Downloader{store: s} | ||
|
||
blob := &core.Blob{ | ||
Uri: "https://raw.githubusercontent.com/flyteorg/flyte/master/README.md", | ||
Metadata: &core.BlobMetadata{ | ||
Type: &core.BlobType{ | ||
Dimensionality: core.BlobType_SINGLE, | ||
}, | ||
}, | ||
} | ||
|
||
toPath := "./input" | ||
defer func() { | ||
err := os.RemoveAll(toPath) | ||
if err != nil { | ||
t.Errorf("Failed to delete file: %v", err) | ||
} | ||
}() | ||
|
||
result, err := d.handleBlob(context.Background(), blob, toPath) | ||
assert.NoError(t, err) | ||
assert.Equal(t, toPath, result) | ||
|
||
// Check if files were created and data written | ||
if _, err := os.Stat(toPath); os.IsNotExist(err) { | ||
t.Errorf("expected file %s to exist", toPath) | ||
} | ||
} |
Oops, something went wrong.