Skip to content

Commit

Permalink
Add neuronx cache registry (#442)
Browse files Browse the repository at this point in the history
* feat(decoder): store neuron config in cache registry

* feat(cache): add helper to get cached model configurations

* feat(cli): add cache lookup command

* test(generation): only test push_to_hub with one model

* feat(cache): rename registry folder so that it appears on top

* doc(cache): add lookup command

* fix(cli): update cache command help

* fix(style): reformat with latest black version

* Update optimum/neuron/utils/hub_neuronx_cache.py

Co-authored-by: Michael Benayoun <[email protected]>

---------

Co-authored-by: Michael Benayoun <[email protected]>
  • Loading branch information
dacorvo and michaelbenayoun authored Jan 26, 2024
1 parent 7d0dbb5 commit 0f7bf4a
Show file tree
Hide file tree
Showing 15 changed files with 169 additions and 52 deletions.
3 changes: 2 additions & 1 deletion docs/source/guides/cache_system.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,13 @@ The Optimum CLI can be used to perform various cache-related tasks, as described
usage: optimum-cli neuron cache [-h] {create,set,add,list} ...
positional arguments:
{create,set,add,list}
{create,set,add,list,synchronize,lookup}
create Create a model repo on the Hugging Face Hub to store Neuron X compilation files.
set Set the name of the Neuron cache repo to use locally (trainium only).
add Add a model to the cache of your choice (trainium only).
list List models in a cache repo (trainium only).
synchronize Synchronize local compiler cache with the hub cache (inferentia only).
lookup Lookup the neuronx compiler hub cache for the specified model id (inferentia only).
optional arguments:
-h, --help show this help message and exit
Expand Down
6 changes: 3 additions & 3 deletions examples/language-modeling/run_clm.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,9 +606,9 @@ def compute_metrics(eval_preds):
# Data collator will default to DataCollatorWithPadding, so we change it.
data_collator=default_data_collator,
compute_metrics=compute_metrics if training_args.do_eval and not is_torch_tpu_available() else None,
preprocess_logits_for_metrics=preprocess_logits_for_metrics
if training_args.do_eval and not is_torch_tpu_available()
else None,
preprocess_logits_for_metrics=(
preprocess_logits_for_metrics if training_args.do_eval and not is_torch_tpu_available() else None
),
)

# Training
Expand Down
6 changes: 3 additions & 3 deletions examples/language-modeling/run_mlm.py
Original file line number Diff line number Diff line change
Expand Up @@ -631,9 +631,9 @@ def compute_metrics(eval_preds):
tokenizer=tokenizer,
data_collator=data_collator,
compute_metrics=compute_metrics if training_args.do_eval and not is_torch_tpu_available() else None,
preprocess_logits_for_metrics=preprocess_logits_for_metrics
if training_args.do_eval and not is_torch_tpu_available()
else None,
preprocess_logits_for_metrics=(
preprocess_logits_for_metrics if training_args.do_eval and not is_torch_tpu_available() else None
),
)

# Training
Expand Down
36 changes: 31 additions & 5 deletions optimum/commands/neuron/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from typing import TYPE_CHECKING

from ...neuron.utils import synchronize_hub_cache
from ...neuron.utils import get_hub_cached_entries, synchronize_hub_cache
from ...neuron.utils.cache_utils import (
CACHE_REPO_NAME,
HF_HOME_CACHE_REPO_FILE,
Expand Down Expand Up @@ -218,6 +218,27 @@ def run(self):
synchronize_hub_cache(self.args.repo_id)


class LookupRepoCommand(BaseOptimumCLICommand):
@staticmethod
def parse_args(parser: "ArgumentParser"):
parser.add_argument(
"model_id",
type=str,
help="The model_id to lookup cached versions for.",
)
parser.add_argument("--repo_id", type=str, default=None, help="The name of the repo to use as remote cache.")

def run(self):
entries = get_hub_cached_entries(self.args.model_id, cache_repo_id=self.args.repo_id)
n_entries = len(entries)
output = f"\n*** {n_entries} entrie(s) found in cache for {self.args.model_id} ***\n\n"
for entry in entries:
for key, value in entry.items():
output += f"\n{key}: {value}"
output += "\n"
print(output)


class CustomCacheRepoCommand(BaseOptimumCLICommand):
SUBCOMMANDS = (
CommandInfo(
Expand All @@ -227,22 +248,27 @@ class CustomCacheRepoCommand(BaseOptimumCLICommand):
),
CommandInfo(
name="set",
help="Set the name of the Neuron cache repo to use locally.",
help="Set the name of the Neuron cache repo to use locally (trainium only).",
subcommand_class=SetCustomCacheRepoCommand,
),
CommandInfo(
name="add",
help="Add a model to the cache of your choice.",
help="Add a model to the cache of your choice (trainium only).",
subcommand_class=AddToCacheRepoCommand,
),
CommandInfo(
name="list",
help="List models in a cache repo.",
help="List models in a cache repo (trainium only).",
subcommand_class=ListRepoCommand,
),
CommandInfo(
name="synchronize",
help="Synchronize the neuronx compiler cache with a hub cache repo.",
help="Synchronize the neuronx compiler cache with a hub cache repo (inferentia only).",
subcommand_class=SynchronizeRepoCommand,
),
CommandInfo(
name="lookup",
help="Lookup the neuronx compiler hub cache for the specified model id (inferentia only).",
subcommand_class=LookupRepoCommand,
),
)
8 changes: 5 additions & 3 deletions optimum/neuron/distributed/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,9 +397,11 @@ def parallelize(
tp_rank = get_tensor_model_parallel_rank()
size_per_rank = parameter.size(partition_dim)
slices = [
None
if idx != partition_dim
else (size_per_rank * tp_rank, size_per_rank * (tp_rank + 1))
(
None
if idx != partition_dim
else (size_per_rank * tp_rank, size_per_rank * (tp_rank + 1))
)
for idx in range(num_dims)
]
else:
Expand Down
8 changes: 5 additions & 3 deletions optimum/neuron/distributed/checkpointing.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ def consolidate_tensor_parallel_checkpoints(checkpoint_dir: Union[str, Path]) ->
parameter_names = state_dicts[0]["model"].keys()
sharded_metadatas = {
name: [
ParameterMetadata(**state_dict["sharded_metadata"][name])
if name in state_dict["sharded_metadata"]
else ParameterMetadata("tied")
(
ParameterMetadata(**state_dict["sharded_metadata"][name])
if name in state_dict["sharded_metadata"]
else ParameterMetadata("tied")
)
for state_dict in state_dicts
]
for name in parameter_names
Expand Down
7 changes: 5 additions & 2 deletions optimum/neuron/modeling_decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from ..exporters.neuron.model_configs import * # noqa: F403
from ..exporters.tasks import TasksManager
from ..modeling_base import OptimizedModel
from .utils import hub_neuronx_cache, is_transformers_neuronx_available
from .utils import CacheEntry, hub_neuronx_cache, is_transformers_neuronx_available
from .utils.require_utils import requires_transformers_neuronx
from .utils.version_utils import check_compiler_compatibility, get_neuronxcc_version

Expand Down Expand Up @@ -124,7 +124,10 @@ def __init__(
# Compile the Neuron model (if present compiled artifacts will be reloaded instead of compiled)
neuron_cc_flags = os.environ.get("NEURON_CC_FLAGS", "")
os.environ["NEURON_CC_FLAGS"] = neuron_cc_flags + " --model-type=transformer"
with hub_neuronx_cache():
checkpoint_id = neuron_config.get("checkpoint_id", None)
# Only create a cache entry if the model comes from the hub
cache_entry = None if checkpoint_id is None else CacheEntry(neuron_config["checkpoint_id"], neuron_config)
with hub_neuronx_cache(entry=cache_entry):
neuronx_model.to_neuron()
os.environ["NEURON_CC_FLAGS"] = neuron_cc_flags

Expand Down
2 changes: 1 addition & 1 deletion optimum/neuron/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
ENCODER_NAME,
NEURON_FILE_NAME,
)
from .hub_neuronx_cache import hub_neuronx_cache, synchronize_hub_cache
from .hub_neuronx_cache import CacheEntry, get_hub_cached_entries, hub_neuronx_cache, synchronize_hub_cache
from .import_utils import (
is_accelerate_available,
is_neuron_available,
Expand Down
66 changes: 63 additions & 3 deletions optimum/neuron/utils/hub_neuronx_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import json
import logging
import os
from contextlib import contextmanager
from typing import Optional
from dataclasses import dataclass
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Any, Dict, Optional

from huggingface_hub import HfApi, get_token

Expand Down Expand Up @@ -196,10 +201,25 @@ def _create_hub_compile_cache_proxy(
return CompileCacheHfProxy(cache_repo_id, default_cache, endpoint=endpoint, token=token)


@dataclass
class CacheEntry:
key: str
metadata: Dict[str, Any]


REGISTRY_FOLDER = "0_REGISTRY"


@requires_torch_neuronx
@contextmanager
def hub_neuronx_cache():
"""A context manager to trigger the Hugging Face Hub proxy compiler cache"""
def hub_neuronx_cache(entry: Optional[CacheEntry] = None):
"""A context manager to activate the Hugging Face Hub proxy compiler cache.
Args:
entry (`Optional[CacheEntry]`, defaults to `None`):
An optional dataclass containing metadata associated with the cache session.
Will create a dedicated entry in the cache registry.
"""

def hf_create_compile_cache(cache_url):
try:
Expand All @@ -209,8 +229,27 @@ def hf_create_compile_cache(cache_url):
return create_compile_cache(cache_url)

try:
default_cache = create_compile_cache(CacheUrl.get_cache_url())
patch_everywhere("create_compile_cache", hf_create_compile_cache, "libneuronxla")
yield
# The cache session ended without error
if entry is not None:
if isinstance(default_cache, CompileCacheS3):
logger.warning("Skipping cache metadata update on S3 cache.")
else:
# Create cache entry in local cache: it can be later synchronized with the hub cache
registry_path = default_cache.get_cache_dir_with_cache_key(REGISTRY_FOLDER)
entry_path = f"{registry_path}/{entry.key}"
metadata_json = json.dumps(entry.metadata, indent=4)
hash_gen = hashlib.sha512()
hash_gen.update(metadata_json.encode("utf-8"))
metadata_key = str(hash_gen.hexdigest())[:20]
metadata_path = f"{entry_path}/{metadata_key}.json"
if not default_cache.exists(metadata_path):
oldmask = os.umask(000)
Path(entry_path).mkdir(parents=True, exist_ok=True)
os.umask(oldmask)
default_cache.upload_string_to_file(metadata_path, metadata_json)
finally:
patch_everywhere("create_compile_cache", create_compile_cache, "libneuronxla")

Expand All @@ -225,3 +264,24 @@ def synchronize_hub_cache(cache_repo_id: Optional[str] = None):
"""
hub_cache_proxy = _create_hub_compile_cache_proxy(cache_repo_id=cache_repo_id)
hub_cache_proxy.synchronize()


def get_hub_cached_entries(model_id: str, cache_repo_id: Optional[str] = None):
if os.path.isdir(model_id):
raise ValueError("Please pass a hub model_id in the form <user>/<model>.")
if cache_repo_id is None:
cache_repo_id = get_hub_cache()
# Allocate a Hub API with refreshed information (required for tests altering the env)
endpoint = os.getenv("HF_ENDPOINT")
token = get_token()
api = HfApi(endpoint=endpoint, token=token)
repo_files = api.list_repo_files(cache_repo_id)
registry_pattern = REGISTRY_FOLDER + "/" + model_id
model_files = [path for path in repo_files if registry_pattern in path]
model_entries = []
with TemporaryDirectory() as tmpdir:
for model_path in model_files:
local_path = api.hf_hub_download(cache_repo_id, model_path, local_dir=tmpdir)
with open(local_path) as f:
model_entries.append(json.load(f))
return model_entries
34 changes: 27 additions & 7 deletions tests/cache/test_neuronx_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from transformers.testing_utils import ENDPOINT_STAGING

from optimum.neuron import NeuronModelForCausalLM
from optimum.neuron.utils import synchronize_hub_cache
from optimum.neuron.utils import get_hub_cached_entries, synchronize_hub_cache
from optimum.neuron.utils.testing_utils import is_inferentia_test, requires_neuronx
from optimum.utils.testing_utils import TOKEN

Expand Down Expand Up @@ -84,8 +84,15 @@ def check_decoder_generation(model):
assert sample_output.shape[0] == batch_size


def get_local_cached_files(cache_path):
return glob.glob(f"{cache_path}/**/*/*.*", recursive=True)
def get_local_cached_files(cache_path, extension="*"):
return glob.glob(f"{cache_path}/**/*/*.{extension}", recursive=True)


def check_cache_entry(model, cache_path):
local_files = get_local_cached_files(cache_path, "json")
model_id = model.config.neuron["checkpoint_id"]
model_configurations = [path for path in local_files if model_id in path]
assert len(model_configurations) > 0


def assert_local_and_hub_cache_sync(cache_path, cache_repo_id):
Expand All @@ -106,12 +113,18 @@ def local_cache_size(cache_path):
@requires_neuronx
def test_decoder_cache(cache_repos):
cache_path, cache_repo_id = cache_repos
model_id = "hf-internal-testing/tiny-random-gpt2"
# Export the model a first time to populate the local cache
model = export_decoder_model("hf-internal-testing/tiny-random-gpt2")
model = export_decoder_model(model_id)
check_decoder_generation(model)
check_cache_entry(model, cache_path)
# Synchronize the hub cache with the local cache
synchronize_hub_cache(cache_repo_id=cache_repo_id)
assert_local_and_hub_cache_sync(cache_path, cache_repo_id)
# Verify we are able to fetch the cached entry for the model
model_entries = get_hub_cached_entries(model_id, cache_repo_id=cache_repo_id)
assert len(model_entries) == 1
assert model_entries[0] == model.config.neuron
# Clear the local cache
for root, dirs, files in os.walk(cache_path):
for f in files:
Expand All @@ -123,7 +136,7 @@ def test_decoder_cache(cache_repos):
model = export_decoder_model("hf-internal-testing/tiny-random-gpt2")
check_decoder_generation(model)
# Verify the local cache directory has not been populated
assert local_cache_size(cache_path) == 0
assert len(get_local_cached_files(cache_path, "neff")) == 0


@is_inferentia_test
Expand Down Expand Up @@ -152,12 +165,19 @@ def test_decoder_cache_unavailable(cache_repos, var, value, match):
@requires_neuronx
def test_optimum_neuron_cli_cache_synchronize(cache_repos):
cache_path, cache_repo_id = cache_repos
model_id = "hf-internal-testing/tiny-random-gpt2"
# Export a model to populate the local cache
export_decoder_model("hf-internal-testing/tiny-random-gpt2")
export_decoder_model(model_id)
# Synchronize the hub cache with the local cache
command = "optimum-cli neuron cache synchronize".split()
p = subprocess.Popen(command, stdout=subprocess.PIPE)
p.communicate()
assert p.returncode == 0
assert_local_and_hub_cache_sync(cache_path, cache_repo_id)
# Check the model entry in the hub
command = f"optimum-cli neuron cache lookup {model_id}".split()
p = subprocess.Popen(command, stdout=subprocess.PIPE)
stdout, _ = p.communicate()
stdout = stdout.decode("utf-8")
assert p.returncode == 0
assert_local_and_hub_cache_sync(cache_path, cache_repo_id)
assert f"1 entrie(s) found in cache for {model_id}" in stdout
3 changes: 1 addition & 2 deletions tests/distributed/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ class DistributedExec(ABC):
exec_timeout: int = TEST_TIMEOUT

@abstractmethod
def run(self):
...
def run(self): ...

def __call__(self, request=None):
self._fixture_kwargs = self._get_fixture_kwargs(request, self.run)
Expand Down
7 changes: 0 additions & 7 deletions tests/generation/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,6 @@ def neuron_seq2seq_greedy_path_with_optional_outputs(export_seq2seq_id):
yield model_path


@pytest.fixture(scope="session")
def neuron_push_decoder_id(export_decoder_id):
model_name = export_decoder_id.split("/")[-1]
repo_id = f"{USER}/{model_name}-neuronx"
return repo_id


@pytest.fixture(scope="module")
def neuron_push_seq2seq_id(export_seq2seq_id):
model_name = export_seq2seq_id.split("/")[-1]
Expand Down
Loading

0 comments on commit 0f7bf4a

Please sign in to comment.