Skip to content

Commit

Permalink
Merge branch 'main' into move_and_remove_examples
Browse files Browse the repository at this point in the history
  • Loading branch information
radekosmulski authored Jul 2, 2023
2 parents db29aa2 + 86d0a34 commit 139597f
Show file tree
Hide file tree
Showing 22 changed files with 344 additions and 61 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cpu-horovod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@ jobs:
if [[ "${{ github.ref }}" != 'refs/heads/main' ]]; then
extra_pytest_markers="and changed"
fi
EXTRA_PYTEST_MARKERS="$extra_pytest_markers" MERLIN_BRANCH="$merlin_branch" COMPARE_BRANCH=${{ github.base_ref }} tox -e py38-horovod-cpu
EXTRA_PYTEST_MARKERS="$extra_pytest_markers" MERLIN_BRANCH="$merlin_branch" COMPARE_BRANCH=${{ github.base_ref }} tox -e horovod-cpu
2 changes: 1 addition & 1 deletion .github/workflows/cpu-nvtabular.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,4 @@ jobs:
- name: Run tests
run: |
merlin_branch="${{ steps.get-branch-name.outputs.branch }}"
MERLIN_BRANCH="$merlin_branch" GIT_COMMIT=$(git rev-parse HEAD) tox -e py38-nvtabular-cpu
MERLIN_BRANCH="$merlin_branch" GIT_COMMIT=$(git rev-parse HEAD) tox -e nvtabular-cpu
2 changes: 1 addition & 1 deletion .github/workflows/cpu-systems.yml
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,4 @@ jobs:
- name: Run tests
run: |
merlin_branch="${{ steps.get-branch-name.outputs.branch }}"
MERLIN_BRANCH="$merlin_branch" GIT_COMMIT=$(git rev-parse HEAD) tox -e py38-systems-cpu
MERLIN_BRANCH="$merlin_branch" GIT_COMMIT=$(git rev-parse HEAD) tox -e systems-cpu
2 changes: 1 addition & 1 deletion .github/workflows/cpu-t4r.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ jobs:
- name: Run tests
run: |
merlin_branch="${{ steps.get-branch-name.outputs.branch }}"
MERLIN_BRANCH="$merlin_branch" GIT_COMMIT=$(git rev-parse HEAD) tox -e py38-transformers4rec-cpu
MERLIN_BRANCH="$merlin_branch" GIT_COMMIT=$(git rev-parse HEAD) tox -e transformers4rec-cpu
2 changes: 1 addition & 1 deletion .github/workflows/gpu-multi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ jobs:
if [[ "${{ github.ref }}" != 'refs/heads/main' ]]; then
extra_pytest_markers="and changed"
fi
cd ${{ github.workspace }}; EXTRA_PYTEST_MARKERS=$extra_pytest_markers MERLIN_BRANCH=$branch COMPARE_BRANCH=${{ github.base_ref }} tox -e py38-multi-gpu
cd ${{ github.workspace }}; EXTRA_PYTEST_MARKERS=$extra_pytest_markers MERLIN_BRANCH=$branch COMPARE_BRANCH=${{ github.base_ref }} tox -e multi-gpu
4 changes: 2 additions & 2 deletions .github/workflows/gpu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
if [[ "${{ github.ref }}" != 'refs/heads/main' ]]; then
extra_pytest_markers="and changed"
fi
cd ${{ github.workspace }}; PYTEST_MARKERS="unit and not (examples or integration or notebook) $extra_pytest_markers" MERLIN_BRANCH=$branch COMPARE_BRANCH=${{ github.base_ref }} tox -e py38-gpu
cd ${{ github.workspace }}; PYTEST_MARKERS="unit and not (examples or integration or notebook) $extra_pytest_markers" MERLIN_BRANCH=$branch COMPARE_BRANCH=${{ github.base_ref }} tox -e gpu
tests-examples:
runs-on: 1GPU
Expand All @@ -55,4 +55,4 @@ jobs:
if [[ "${{ github.ref }}" != 'refs/heads/main' ]]; then
extra_pytest_markers="and changed"
fi
cd ${{ github.workspace }}; PYTEST_MARKERS="(examples or notebook) $extra_pytest_markers" MERLIN_BRANCH=$branch COMPARE_BRANCH=${{ github.base_ref }} tox -e py38-gpu
cd ${{ github.workspace }}; PYTEST_MARKERS="(examples or notebook) $extra_pytest_markers" MERLIN_BRANCH=$branch COMPARE_BRANCH=${{ github.base_ref }} tox -e gpu
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ You can find more details and information about a low-level API in our overview

### Notebook Examples and Tutorials

View the example notebooks in the [documentation](https://nvidia-merlin.github.io/models/stable/examples/README.html) to help you become familiar with Merlin Models.
View the example notebooks in the [documentation](https://nvidia-merlin.github.io/models/stable/examples/) to help you become familiar with Merlin Models.

The same notebooks are available in the `examples` directory from the [Merlin Models](https://github.com/NVIDIA-Merlin/models) GitHub repository.

Expand Down
4 changes: 2 additions & 2 deletions merlin/models/tf/core/tabular.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import abc
import collections
import collections.abc
import copy
from typing import Dict, List, Optional, Sequence, Union, overload

Expand Down Expand Up @@ -600,7 +600,7 @@ def get_config(self):
def select_by_tag(self, tags: Tags) -> Optional["Filter"]:
if isinstance(self.feature_names, Tags):
schema = self.schema.select_by_tag(self.feature_names).select_by_tag(tags)
elif isinstance(self.feature_names, collections.Sequence):
elif isinstance(self.feature_names, collections.abc.Sequence):
schema = self.schema.select_by_name(self.feature_names).select_by_tag(tags)
else:
raise RuntimeError(
Expand Down
4 changes: 2 additions & 2 deletions merlin/models/tf/inputs/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import collections
import collections.abc
import inspect
from copy import deepcopy
from dataclasses import dataclass
Expand Down Expand Up @@ -268,7 +268,7 @@ def select_by_tag(self, tags: Union[Tags, Sequence[Tags]]) -> Optional["Embeddin
-------
An EmbeddingTable if the tags match. If no features match, it returns None.
"""
if not isinstance(tags, collections.Sequence):
if not isinstance(tags, collections.abc.Sequence):
tags = [tags]

selected_schema = self.schema.select_by_tag(tags)
Expand Down
2 changes: 2 additions & 0 deletions merlin/models/torch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from merlin.models.torch import schema
from merlin.models.torch.batch import Batch, Sequence
from merlin.models.torch.block import Block, ParallelBlock
from merlin.models.torch.blocks.dlrm import DLRMBlock
from merlin.models.torch.blocks.mlp import MLPBlock
from merlin.models.torch.inputs.embedding import EmbeddingTable, EmbeddingTables
from merlin.models.torch.inputs.select import SelectFeatures, SelectKeys
Expand Down Expand Up @@ -51,4 +52,5 @@
"Concat",
"Stack",
"schema",
"DLRMBlock",
]
141 changes: 141 additions & 0 deletions merlin/models/torch/blocks/dlrm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from typing import Dict, Optional

import torch
from torch import nn

from merlin.models.torch.block import Block
from merlin.models.torch.inputs.embedding import EmbeddingTables
from merlin.models.torch.inputs.tabular import TabularInputBlock
from merlin.models.torch.link import Link
from merlin.models.torch.transforms.agg import MaybeAgg, Stack
from merlin.models.utils.doc_utils import docstring_parameter
from merlin.schema import Schema, Tags

_DLRM_REF = """
References
----------
.. [1] Naumov, Maxim, et al. "Deep learning recommendation model for
personalization and recommendation systems." arXiv preprint arXiv:1906.00091 (2019).
"""


@docstring_parameter(dlrm_reference=_DLRM_REF)
class DLRMInputBlock(TabularInputBlock):
"""Input block for DLRM model.
Parameters
----------
schema : Schema, optional
The schema to use for selection. Default is None.
dim : int
The dimensionality of the output vectors.
bottom_block : Block
Block to pass the continuous features to.
Note that, the output dimensionality of this block must be equal to ``dim``.
{dlrm_reference}
Raises
------
ValueError
If no categorical input is provided in the schema.
"""

def __init__(self, schema: Schema, dim: int, bottom_block: Block):
super().__init__(schema)
self.add_route(Tags.CATEGORICAL, EmbeddingTables(dim, seq_combiner="mean"))
self.add_route(Tags.CONTINUOUS, bottom_block)

if "categorical" not in self:
raise ValueError("DLRMInputBlock must have a categorical input")


@docstring_parameter(dlrm_reference=_DLRM_REF)
class DLRMInteraction(nn.Module):
"""
This class defines the forward interaction operation as proposed
in the DLRM
`paper https://arxiv.org/pdf/1906.00091.pdf`_ [1]_.
This forward operation performs elementwise multiplication
followed by a reduction sum (equivalent to a dot product) of all embedding pairs.
{dlrm_reference}
"""

def forward(self, inputs: torch.Tensor) -> torch.Tensor:
if not hasattr(self, "triu_indices"):
self.register_buffer(
"triu_indices", torch.triu_indices(inputs.shape[1], inputs.shape[1], offset=1)
)

interactions = torch.bmm(inputs, torch.transpose(inputs, 1, 2))
interactions_flat = interactions[:, self.triu_indices[0], self.triu_indices[1]]

return interactions_flat


class ShortcutConcatContinuous(Link):
"""
A shortcut connection that concatenates
continuous input features and intermediate outputs.
When there's no continuous input, the intermediate output is returned.
"""

def forward(self, inputs: Dict[str, torch.Tensor]) -> torch.Tensor:
intermediate_output = self.output(inputs)

if "continuous" in inputs:
return torch.cat((inputs["continuous"], intermediate_output), dim=1)

return intermediate_output


@docstring_parameter(dlrm_reference=_DLRM_REF)
class DLRMBlock(Block):
"""Builds the DLRM architecture, as proposed in the following
`paper https://arxiv.org/pdf/1906.00091.pdf`_ [1]_.
Parameters
----------
schema : Schema, optional
The schema to use for selection. Default is None.
dim : int
The dimensionality of the output vectors.
bottom_block : Block
Block to pass the continuous features to.
Note that, the output dimensionality of this block must be equal to ``dim``.
top_block : Block, optional
An optional upper-level block of the model.
interaction : nn.Module, optional
Interaction module for DLRM.
If not provided, DLRMInteraction will be used by default.
{dlrm_reference}
Raises
------
ValueError
If no categorical input is provided in the schema.
"""

def __init__(
self,
schema: Schema,
dim: int,
bottom_block: Block,
top_block: Optional[Block] = None,
interaction: Optional[nn.Module] = None,
):
super().__init__(DLRMInputBlock(schema, dim, bottom_block))

self.append(
Block(MaybeAgg(Stack(dim=1)), interaction or DLRMInteraction()),
link=ShortcutConcatContinuous(),
)

if top_block:
self.append(top_block)
16 changes: 7 additions & 9 deletions merlin/models/torch/outputs/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,24 +36,22 @@ class BinaryOutput(ModelOutput):
The metrics used for evaluation. Default includes Accuracy, AUROC, Precision, and Recall.
"""

DEFAULT_LOSS_CLS = nn.BCEWithLogitsLoss
DEFAULT_METRICS_CLS = (Accuracy, AUROC, Precision, Recall)

def __init__(
self,
schema: Optional[ColumnSchema] = None,
loss: nn.Module = nn.BCEWithLogitsLoss(),
metrics: Sequence[Metric] = (
Accuracy(task="binary"),
AUROC(task="binary"),
Precision(task="binary"),
Recall(task="binary"),
),
loss: Optional[nn.Module] = None,
metrics: Sequence[Metric] = (),
):
"""Initializes a BinaryOutput object."""
super().__init__(
nn.LazyLinear(1),
nn.Sigmoid(),
schema=schema,
loss=loss,
metrics=metrics,
loss=loss or self.DEFAULT_LOSS_CLS(),
metrics=metrics or [m(task="binary") for m in self.DEFAULT_METRICS_CLS],
)

def setup_schema(self, target: Optional[Union[ColumnSchema, Schema]]):
Expand Down
11 changes: 7 additions & 4 deletions merlin/models/torch/outputs/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,21 @@ class RegressionOutput(ModelOutput):
The metrics used for evaluation. Default is MeanSquaredError.
"""

DEFAULT_LOSS_CLS = nn.MSELoss
DEFAULT_METRICS_CLS = (MeanSquaredError,)

def __init__(
self,
schema: Optional[ColumnSchema] = None,
loss: nn.Module = nn.MSELoss(),
metrics: Sequence[Metric] = (MeanSquaredError(),),
loss: Optional[nn.Module] = None,
metrics: Sequence[Metric] = (),
):
"""Initializes a RegressionOutput object."""
super().__init__(
nn.LazyLinear(1),
schema=schema,
loss=loss,
metrics=metrics,
loss=loss or self.DEFAULT_LOSS_CLS(),
metrics=metrics or [m() for m in self.DEFAULT_METRICS_CLS],
)

def setup_schema(self, target: Optional[Union[ColumnSchema, Schema]]):
Expand Down
3 changes: 3 additions & 0 deletions merlin/models/torch/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ def add_route(
"""

routing_module = schema.select(self.selectable, selection)
if not routing_module:
return self

if module is not None:
schema.setup_schema(module, routing_module.schema)

Expand Down
3 changes: 3 additions & 0 deletions merlin/models/torch/transforms/agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ def forward(self, inputs: Dict[str, torch.Tensor]) -> torch.Tensor:

if self.align_dims:
max_dims = max(tensor.dim() for tensor in sorted_tensors)
max_dims = max(
max_dims, 2
) # assume first dimension is batch size + at least one feature
_sorted_tensors = []
for tensor in sorted_tensors:
if tensor.dim() < max_dims:
Expand Down
16 changes: 16 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import platform
import warnings
from pathlib import Path
from unittest.mock import patch

import distributed
import psutil
import pytest
from asvdb import BenchmarkInfo, utils

from merlin.core.utils import Distributed
from merlin.dataloader.loader_base import LoaderBase
from merlin.datasets.synthetic import generate_data
from merlin.io import Dataset
from merlin.models.utils import ci_utils
Expand Down Expand Up @@ -145,3 +147,17 @@ def get_benchmark_info():
arch=uname.machine,
ram="%d" % psutil.virtual_memory().total,
)


@pytest.fixture(scope="function", autouse=True)
def cleanup_dataloader():
"""After each test runs. Call .stop() on any dataloaders created during the test.
The avoids issues with background threads hanging around and interfering with subsequent tests.
This happens when a dataloader is partially consumed (not all batches are iterated through).
"""
with patch.object(
LoaderBase, "__iter__", side_effect=LoaderBase.__iter__, autospec=True
) as patched:
yield
for call in patched.call_args_list:
call.args[0].stop()
Loading

0 comments on commit 139597f

Please sign in to comment.