diff --git a/networks/.ipynb_checkpoints/SWIN-checkpoint.py b/networks/.ipynb_checkpoints/SWIN-checkpoint.py new file mode 100644 index 0000000..1957722 --- /dev/null +++ b/networks/.ipynb_checkpoints/SWIN-checkpoint.py @@ -0,0 +1,1110 @@ +import torch +import torch.nn as nn +import torch.nn.functional as F +import numpy as np +import math +import random +from timm.models.layers import DropPath, to_2tuple, trunc_normal_ +import torch.utils.checkpoint as checkpoint +from torch.utils.data import DataLoader + +import timm +# from dataset import START, PAD +### +START = "" +END = "" +PAD = "" +### + +device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + + +class Mlp(nn.Module): + def __init__(self, in_features, hidden_features=None, out_features=None, act_layer=nn.GELU, drop=0.): + super().__init__() + out_features = out_features or in_features + hidden_features = hidden_features or in_features + self.fc1 = nn.Linear(in_features, hidden_features) + self.act = act_layer() + self.fc2 = nn.Linear(hidden_features, out_features) + self.drop = nn.Dropout(drop) + + def forward(self, x): + x = self.fc1(x) + x = self.act(x) + x = self.drop(x) + x = self.fc2(x) + x = self.drop(x) + return x + + +def window_partition(x, window_size): + """ + Args: + x: (B, H, W, C) + window_size (int): window size + Returns: + windows: (num_windows*B, window_size, window_size, C) + """ + B, H, W, C = x.shape + x = x.view(B, H // window_size, window_size, W // window_size, window_size, C) + windows = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(-1, window_size, window_size, C) + return windows + + +def window_reverse(windows, window_size, H, W): + """ + Args: + windows: (num_windows*B, window_size, window_size, C) + window_size (int): Window size + H (int): Height of image + W (int): Width of image + Returns: + x: (B, H, W, C) + """ + B = int(windows.shape[0] / (H * W / window_size / window_size)) + x = windows.view(B, H // window_size, W // window_size, window_size, window_size, -1) + x = x.permute(0, 1, 3, 2, 4, 5).contiguous().view(B, H, W, -1) + return x + + +class WindowAttention(nn.Module): + r""" Window based multi-head self attention (W-MSA) module with relative position bias. + It supports both of shifted and non-shifted window. + Args: + dim (int): Number of input channels. + window_size (tuple[int]): The height and width of the window. + num_heads (int): Number of attention heads. + qkv_bias (bool, optional): If True, add a learnable bias to query, key, value. Default: True + qk_scale (float | None, optional): Override default qk scale of head_dim ** -0.5 if set + attn_drop (float, optional): Dropout ratio of attention weight. Default: 0.0 + proj_drop (float, optional): Dropout ratio of output. Default: 0.0 + """ + + def __init__(self, dim, window_size, num_heads, qkv_bias=True, qk_scale=None, attn_drop=0., proj_drop=0.): + + super().__init__() + self.dim = dim + self.window_size = window_size # Wh, Ww + self.num_heads = num_heads + head_dim = dim // num_heads + self.scale = qk_scale or head_dim ** -0.5 + + # define a parameter table of relative position bias + self.relative_position_bias_table = nn.Parameter( + torch.zeros((2 * window_size[0] - 1) * (2 * window_size[1] - 1), num_heads)) # 2*Wh-1 * 2*Ww-1, nH + + # get pair-wise relative position index for each token inside the window + coords_h = torch.arange(self.window_size[0]) + coords_w = torch.arange(self.window_size[1]) + coords = torch.stack(torch.meshgrid([coords_h, coords_w])) # 2, Wh, Ww + coords_flatten = torch.flatten(coords, 1) # 2, Wh*Ww + relative_coords = coords_flatten[:, :, None] - coords_flatten[:, None, :] # 2, Wh*Ww, Wh*Ww + relative_coords = relative_coords.permute(1, 2, 0).contiguous() # Wh*Ww, Wh*Ww, 2 + relative_coords[:, :, 0] += self.window_size[0] - 1 # shift to start from 0 + relative_coords[:, :, 1] += self.window_size[1] - 1 + relative_coords[:, :, 0] *= 2 * self.window_size[1] - 1 + relative_position_index = relative_coords.sum(-1) # Wh*Ww, Wh*Ww + self.register_buffer("relative_position_index", relative_position_index) + + self.qkv = nn.Linear(dim, dim * 3, bias=qkv_bias) + self.attn_drop = nn.Dropout(attn_drop) + self.proj = nn.Linear(dim, dim) + self.proj_drop = nn.Dropout(proj_drop) + + trunc_normal_(self.relative_position_bias_table, std=.02) + self.softmax = nn.Softmax(dim=-1) + + def forward(self, x, mask=None): + """ + Args: + x: input features with shape of (num_windows*B, N, C) + mask: (0/-inf) mask with shape of (num_windows, Wh*Ww, Wh*Ww) or None + """ + B_, N, C = x.shape + qkv = self.qkv(x).reshape(B_, N, 3, self.num_heads, C // self.num_heads).permute(2, 0, 3, 1, 4) + q, k, v = qkv[0], qkv[1], qkv[2] # make torchscript happy (cannot use tensor as tuple) + + q = q * self.scale + attn = (q @ k.transpose(-2, -1)) + + relative_position_bias = self.relative_position_bias_table[self.relative_position_index.view(-1)].view( + self.window_size[0] * self.window_size[1], self.window_size[0] * self.window_size[1], -1) # Wh*Ww,Wh*Ww,nH + relative_position_bias = relative_position_bias.permute(2, 0, 1).contiguous() # nH, Wh*Ww, Wh*Ww + attn = attn + relative_position_bias.unsqueeze(0) + + if mask is not None: + nW = mask.shape[0] + attn = attn.view(B_ // nW, nW, self.num_heads, N, N) + mask.unsqueeze(1).unsqueeze(0) + attn = attn.view(-1, self.num_heads, N, N) + attn = self.softmax(attn) + else: + attn = self.softmax(attn) + + attn = self.attn_drop(attn) + + x = (attn @ v).transpose(1, 2).reshape(B_, N, C) + x = self.proj(x) + x = self.proj_drop(x) + return x + + def extra_repr(self) -> str: + return f'dim={self.dim}, window_size={self.window_size}, num_heads={self.num_heads}' + + def flops(self, N): + # calculate flops for 1 window with token length of N + flops = 0 + # qkv = self.qkv(x) + flops += N * self.dim * 3 * self.dim + # attn = (q @ k.transpose(-2, -1)) + flops += self.num_heads * N * (self.dim // self.num_heads) * N + # x = (attn @ v) + flops += self.num_heads * N * N * (self.dim // self.num_heads) + # x = self.proj(x) + flops += N * self.dim * self.dim + return flops + + +class SwinTransformerBlock(nn.Module): + r""" Swin Transformer Block. + Args: + dim (int): Number of input channels. + input_resolution (tuple[int]): Input resulotion. + num_heads (int): Number of attention heads. + window_size (int): Window size. + shift_size (int): Shift size for SW-MSA. + mlp_ratio (float): Ratio of mlp hidden dim to embedding dim. + qkv_bias (bool, optional): If True, add a learnable bias to query, key, value. Default: True + qk_scale (float | None, optional): Override default qk scale of head_dim ** -0.5 if set. + drop (float, optional): Dropout rate. Default: 0.0 + attn_drop (float, optional): Attention dropout rate. Default: 0.0 + drop_path (float, optional): Stochastic depth rate. Default: 0.0 + act_layer (nn.Module, optional): Activation layer. Default: nn.GELU + norm_layer (nn.Module, optional): Normalization layer. Default: nn.LayerNorm + """ + + def __init__(self, dim, input_resolution, num_heads, window_size=7, shift_size=0, + mlp_ratio=4., qkv_bias=True, qk_scale=None, drop=0., attn_drop=0., drop_path=0., + act_layer=nn.GELU, norm_layer=nn.LayerNorm): + super().__init__() + self.dim = dim + self.input_resolution = input_resolution + self.num_heads = num_heads + self.window_size = window_size + self.shift_size = shift_size + self.mlp_ratio = mlp_ratio + if min(self.input_resolution) <= self.window_size: + # if window size is larger than input resolution, we don't partition windows + self.shift_size = 0 + self.window_size = min(self.input_resolution) + assert 0 <= self.shift_size < self.window_size, "shift_size must in 0-window_size" + + self.norm1 = norm_layer(dim) + self.attn = WindowAttention( + dim, window_size=to_2tuple(self.window_size), num_heads=num_heads, + qkv_bias=qkv_bias, qk_scale=qk_scale, attn_drop=attn_drop, proj_drop=drop) + + self.drop_path = DropPath(drop_path) if drop_path > 0. else nn.Identity() + self.norm2 = norm_layer(dim) + mlp_hidden_dim = int(dim * mlp_ratio) + self.mlp = Mlp(in_features=dim, hidden_features=mlp_hidden_dim, act_layer=act_layer, drop=drop) + + if self.shift_size > 0: + # calculate attention mask for SW-MSA + H, W = self.input_resolution + img_mask = torch.zeros((1, H, W, 1)) # 1 H W 1 + h_slices = (slice(0, -self.window_size), + slice(-self.window_size, -self.shift_size), + slice(-self.shift_size, None)) + w_slices = (slice(0, -self.window_size), + slice(-self.window_size, -self.shift_size), + slice(-self.shift_size, None)) + cnt = 0 + for h in h_slices: + for w in w_slices: + img_mask[:, h, w, :] = cnt + cnt += 1 + + mask_windows = window_partition(img_mask, self.window_size) # nW, window_size, window_size, 1 + mask_windows = mask_windows.view(-1, self.window_size * self.window_size) + attn_mask = mask_windows.unsqueeze(1) - mask_windows.unsqueeze(2) + attn_mask = attn_mask.masked_fill(attn_mask != 0, float(-100.0)).masked_fill(attn_mask == 0, float(0.0)) + else: + attn_mask = None + + self.register_buffer("attn_mask", attn_mask) + + def forward(self, x): + H, W = self.input_resolution + B, L, C = x.shape + assert L == H * W, "input feature has wrong size" + + shortcut = x + x = self.norm1(x) + x = x.view(B, H, W, C) + + # cyclic shift + if self.shift_size > 0: + shifted_x = torch.roll(x, shifts=(-self.shift_size, -self.shift_size), dims=(1, 2)) + else: + shifted_x = x + + # partition windows + x_windows = window_partition(shifted_x, self.window_size) # nW*B, window_size, window_size, C + x_windows = x_windows.view(-1, self.window_size * self.window_size, C) # nW*B, window_size*window_size, C + + # W-MSA/SW-MSA + attn_windows = self.attn(x_windows, mask=self.attn_mask) # nW*B, window_size*window_size, C + + # merge windows + attn_windows = attn_windows.view(-1, self.window_size, self.window_size, C) + shifted_x = window_reverse(attn_windows, self.window_size, H, W) # B H' W' C + + # reverse cyclic shift + if self.shift_size > 0: + x = torch.roll(shifted_x, shifts=(self.shift_size, self.shift_size), dims=(1, 2)) + else: + x = shifted_x + x = x.view(B, H * W, C) + + # FFN + x = shortcut + self.drop_path(x) + x = x + self.drop_path(self.mlp(self.norm2(x))) + + return x + + def extra_repr(self) -> str: + return f"dim={self.dim}, input_resolution={self.input_resolution}, num_heads={self.num_heads}, " \ + f"window_size={self.window_size}, shift_size={self.shift_size}, mlp_ratio={self.mlp_ratio}" + + def flops(self): + flops = 0 + H, W = self.input_resolution + # norm1 + flops += self.dim * H * W + # W-MSA/SW-MSA + nW = H * W / self.window_size / self.window_size + flops += nW * self.attn.flops(self.window_size * self.window_size) + # mlp + flops += 2 * H * W * self.dim * self.dim * self.mlp_ratio + # norm2 + flops += self.dim * H * W + return flops + + +class PatchMerging(nn.Module): + r""" Patch Merging Layer. + Args: + input_resolution (tuple[int]): Resolution of input feature. + dim (int): Number of input channels. + norm_layer (nn.Module, optional): Normalization layer. Default: nn.LayerNorm + """ + + def __init__(self, input_resolution, dim, norm_layer=nn.LayerNorm): + super().__init__() + self.input_resolution = input_resolution + self.dim = dim + self.reduction = nn.Linear(4 * dim, 2 * dim, bias=False) + self.norm = norm_layer(4 * dim) + + def forward(self, x): + """ + x: B, H*W, C + """ + H, W = self.input_resolution + B, L, C = x.shape + assert L == H * W, "input feature has wrong size" + assert H % 2 == 0 and W % 2 == 0, f"x size ({H}*{W}) are not even." + + x = x.view(B, H, W, C) + + x0 = x[:, 0::2, 0::2, :] # B H/2 W/2 C + x1 = x[:, 1::2, 0::2, :] # B H/2 W/2 C + x2 = x[:, 0::2, 1::2, :] # B H/2 W/2 C + x3 = x[:, 1::2, 1::2, :] # B H/2 W/2 C + x = torch.cat([x0, x1, x2, x3], -1) # B H/2 W/2 4*C + x = x.view(B, -1, 4 * C) # B H/2*W/2 4*C + + x = self.norm(x) + x = self.reduction(x) + + return x + + def extra_repr(self) -> str: + return f"input_resolution={self.input_resolution}, dim={self.dim}" + + def flops(self): + H, W = self.input_resolution + flops = H * W * self.dim + flops += (H // 2) * (W // 2) * 4 * self.dim * 2 * self.dim + return flops + + +class BasicLayer(nn.Module): + """ A basic Swin Transformer layer for one stage. + Args: + dim (int): Number of input channels. + input_resolution (tuple[int]): Input resolution. + depth (int): Number of blocks. + num_heads (int): Number of attention heads. + window_size (int): Local window size. + mlp_ratio (float): Ratio of mlp hidden dim to embedding dim. + qkv_bias (bool, optional): If True, add a learnable bias to query, key, value. Default: True + qk_scale (float | None, optional): Override default qk scale of head_dim ** -0.5 if set. + drop (float, optional): Dropout rate. Default: 0.0 + attn_drop (float, optional): Attention dropout rate. Default: 0.0 + drop_path (float | tuple[float], optional): Stochastic depth rate. Default: 0.0 + norm_layer (nn.Module, optional): Normalization layer. Default: nn.LayerNorm + downsample (nn.Module | None, optional): Downsample layer at the end of the layer. Default: None + use_checkpoint (bool): Whether to use checkpointing to save memory. Default: False. + """ + + def __init__(self, dim, input_resolution, depth, num_heads, window_size, + mlp_ratio=4., qkv_bias=True, qk_scale=None, drop=0., attn_drop=0., + drop_path=0., norm_layer=nn.LayerNorm, downsample=None, use_checkpoint=False): + + super().__init__() + self.dim = dim + self.input_resolution = input_resolution + self.depth = depth + self.use_checkpoint = use_checkpoint + + # build blocks + self.blocks = nn.ModuleList([ + SwinTransformerBlock(dim=dim, input_resolution=input_resolution, + num_heads=num_heads, window_size=window_size, + shift_size=0 if (i % 2 == 0) else window_size // 2, + mlp_ratio=mlp_ratio, + qkv_bias=qkv_bias, qk_scale=qk_scale, + drop=drop, attn_drop=attn_drop, + drop_path=drop_path[i] if isinstance(drop_path, list) else drop_path, + norm_layer=norm_layer) + for i in range(depth)]) + + # patch merging layer + if downsample is not None: + self.downsample = downsample(input_resolution, dim=dim, norm_layer=norm_layer) + else: + self.downsample = None + + def forward(self, x): + for blk in self.blocks: + if self.use_checkpoint: + x = checkpoint.checkpoint(blk, x) + else: + x = blk(x) + if self.downsample is not None: + x = self.downsample(x) + return x + + def extra_repr(self) -> str: + return f"dim={self.dim}, input_resolution={self.input_resolution}, depth={self.depth}" + + def flops(self): + flops = 0 + for blk in self.blocks: + flops += blk.flops() + if self.downsample is not None: + flops += self.downsample.flops() + return flops + + +class PatchEmbed(nn.Module): + r""" Image to Patch Embedding + Args: + img_size (int): Image size. Default: 224. + patch_size (int): Patch token size. Default: 4. + in_chans (int): Number of input image channels. Default: 3. + embed_dim (int): Number of linear projection output channels. Default: 96. + norm_layer (nn.Module, optional): Normalization layer. Default: None + """ + + def __init__(self, img_size=224, patch_size=4, in_chans=3, embed_dim=96, norm_layer=None): + super().__init__() + img_size = to_2tuple(img_size) + patch_size = to_2tuple(patch_size) + patches_resolution = [img_size[0] // patch_size[0], img_size[1] // patch_size[1]] + self.img_size = img_size + self.patch_size = patch_size + self.patches_resolution = patches_resolution + self.num_patches = patches_resolution[0] * patches_resolution[1] + + self.in_chans = in_chans + self.embed_dim = embed_dim + + self.proj = nn.Conv2d(in_chans, embed_dim, kernel_size=patch_size, stride=patch_size) + if norm_layer is not None: + self.norm = norm_layer(embed_dim) + else: + self.norm = None + + def forward(self, x): + B, C, H, W = x.shape + # FIXME look at relaxing size constraints + assert H == self.img_size[0] and W == self.img_size[1], \ + f"Input image size ({H}*{W}) doesn't match model ({self.img_size[0]}*{self.img_size[1]})." + x = self.proj(x).flatten(2).transpose(1, 2) # B Ph*Pw C + if self.norm is not None: + x = self.norm(x) + return x + + def flops(self): + Ho, Wo = self.patches_resolution + flops = Ho * Wo * self.embed_dim * self.in_chans * (self.patch_size[0] * self.patch_size[1]) + if self.norm is not None: + flops += Ho * Wo * self.embed_dim + return flops + + +class SwinTransformer(nn.Module): + r""" Swin Transformer + A PyTorch impl of : `Swin Transformer: Hierarchical Vision Transformer using Shifted Windows` - + https://arxiv.org/pdf/2103.14030 + Args: + img_size (int | tuple(int)): Input image size. Default 224 + patch_size (int | tuple(int)): Patch size. Default: 4 + in_chans (int): Number of input image channels. Default: 3 + num_classes (int): Number of classes for classification head. Default: 1000 + embed_dim (int): Patch embedding dimension. Default: 96 + depths (tuple(int)): Depth of each Swin Transformer layer. + num_heads (tuple(int)): Number of attention heads in different layers. + window_size (int): Window size. Default: 7 + mlp_ratio (float): Ratio of mlp hidden dim to embedding dim. Default: 4 + qkv_bias (bool): If True, add a learnable bias to query, key, value. Default: True + qk_scale (float): Override default qk scale of head_dim ** -0.5 if set. Default: None + drop_rate (float): Dropout rate. Default: 0 + attn_drop_rate (float): Attention dropout rate. Default: 0 + drop_path_rate (float): Stochastic depth rate. Default: 0.1 + norm_layer (nn.Module): Normalization layer. Default: nn.LayerNorm. + ape (bool): If True, add absolute position embedding to the patch embedding. Default: False + patch_norm (bool): If True, add normalization after patch embedding. Default: True + use_checkpoint (bool): Whether to use checkpointing to save memory. Default: False + """ + + def __init__(self, img_size=224, patch_size=4, in_chans=3, num_classes=1000, + embed_dim=96, depths=[2, 2, 6, 2], num_heads=[3, 6, 12, 24], + window_size=7, mlp_ratio=4., qkv_bias=True, qk_scale=None, + drop_rate=0., attn_drop_rate=0., drop_path_rate=0.1, + norm_layer=nn.LayerNorm, ape=False, patch_norm=True, + use_checkpoint=False, **kwargs): + super().__init__() + self.num_classes = num_classes + self.num_layers = len(depths) + self.embed_dim = embed_dim + self.ape = ape + self.patch_norm = patch_norm + self.num_features = int(embed_dim * 2 ** (self.num_layers - 1)) + self.mlp_ratio = mlp_ratio + + # split image into non-overlapping patches + self.patch_embed = PatchEmbed( + img_size=img_size, patch_size=patch_size, in_chans=in_chans, embed_dim=embed_dim, + norm_layer=norm_layer if self.patch_norm else None) + num_patches = self.patch_embed.num_patches + patches_resolution = self.patch_embed.patches_resolution + self.patches_resolution = patches_resolution + + # absolute position embedding + if self.ape: + self.absolute_pos_embed = nn.Parameter(torch.zeros(1, num_patches, embed_dim)) + trunc_normal_(self.absolute_pos_embed, std=.02) + + self.pos_drop = nn.Dropout(p=drop_rate) + + # stochastic depth + dpr = [x.item() for x in torch.linspace(0, drop_path_rate, sum(depths))] # stochastic depth decay rule + + # build layers + self.layers = nn.ModuleList() + for i_layer in range(self.num_layers): + layer = BasicLayer(dim=int(embed_dim * 2 ** i_layer), + input_resolution=(patches_resolution[0] // (2 ** i_layer), + patches_resolution[1] // (2 ** i_layer)), + depth=depths[i_layer], + num_heads=num_heads[i_layer], + window_size=window_size, + mlp_ratio=self.mlp_ratio, + qkv_bias=qkv_bias, qk_scale=qk_scale, + drop=drop_rate, attn_drop=attn_drop_rate, + drop_path=dpr[sum(depths[:i_layer]):sum(depths[:i_layer + 1])], + norm_layer=norm_layer, + downsample=PatchMerging if (i_layer < self.num_layers - 1) else None, + use_checkpoint=use_checkpoint) + self.layers.append(layer) + + self.norm = norm_layer(self.num_features) + self.avgpool = nn.AdaptiveAvgPool1d(1) + self.head = nn.Linear(self.num_features, num_classes) if num_classes > 0 else nn.Identity() + + self.apply(self._init_weights) + + def _init_weights(self, m): + if isinstance(m, nn.Linear): + trunc_normal_(m.weight, std=.02) + if isinstance(m, nn.Linear) and m.bias is not None: + nn.init.constant_(m.bias, 0) + elif isinstance(m, nn.LayerNorm): + nn.init.constant_(m.bias, 0) + nn.init.constant_(m.weight, 1.0) + + @torch.jit.ignore + def no_weight_decay(self): + return {'absolute_pos_embed'} + + @torch.jit.ignore + def no_weight_decay_keywords(self): + return {'relative_position_bias_table'} + + def forward_features(self, x): + x = self.patch_embed(x) + if self.ape: + x = x + self.absolute_pos_embed + x = self.pos_drop(x) + + for layer in self.layers: + x = layer(x) + + x = self.norm(x) # B L C + # x = self.avgpool(x.transpose(1, 2)) # B C 1 + # x = torch.flatten(x, 1) + return x + + def forward(self, x): + x = self.forward_features(x) + # x = self.head(x) + return x + + def flops(self): + flops = 0 + flops += self.patch_embed.flops() + for i, layer in enumerate(self.layers): + flops += layer.flops() + flops += self.num_features * self.patches_resolution[0] * self.patches_resolution[1] // (2 ** self.num_layers) + flops += self.num_features * self.num_classes + return flops + +class ScaledDotProductAttention(nn.Module): + def __init__(self, temperature, dropout=0.1): + super(ScaledDotProductAttention, self).__init__() + + self.temperature = temperature + self.dropout = nn.Dropout(p=dropout) + + def forward(self, q, k, v, mask=None): + + attn = torch.matmul(q, k.transpose(2, 3)) / self.temperature + if mask is not None: + attn = attn.masked_fill(mask=mask, value=float("-inf")) + attn = torch.softmax(attn, dim=-1) + attn = self.dropout(attn) + out = torch.matmul(attn, v) + return out, attn + + +class MultiHeadAttention(nn.Module): + def __init__(self, q_channels, k_channels, head_num=8, dropout=0.1): + super(MultiHeadAttention, self).__init__() + + self.q_channels = q_channels + self.k_channels = k_channels + self.head_dim = q_channels // head_num + self.head_num = head_num + + self.q_linear = nn.Linear(q_channels, self.head_num * self.head_dim) + self.k_linear = nn.Linear(k_channels, self.head_num * self.head_dim) + self.v_linear = nn.Linear(k_channels, self.head_num * self.head_dim) + self.attention = ScaledDotProductAttention( + temperature=(self.head_num * self.head_dim) ** 0.5, dropout=dropout + ) + self.out_linear = nn.Linear(self.head_num * self.head_dim, q_channels) + self.dropout = nn.Dropout(p=dropout) + + def forward(self, q, k, v, mask=None): + b, q_len, k_len, v_len = q.size(0), q.size(1), k.size(1), v.size(1) + q = ( + self.q_linear(q) + .view(b, q_len, self.head_num, self.head_dim) + .transpose(1, 2) + ) + k = ( + self.k_linear(k) + .view(b, k_len, self.head_num, self.head_dim) + .transpose(1, 2) + ) + v = ( + self.v_linear(v) + .view(b, v_len, self.head_num, self.head_dim) + .transpose(1, 2) + ) + + if mask is not None: + mask = mask.unsqueeze(1) + + out, attn = self.attention(q, k, v, mask=mask) + out = ( + out.transpose(1, 2) + .contiguous() + .view(b, q_len, self.head_num * self.head_dim) + ) + out = self.out_linear(out) + out = self.dropout(out) + + return out + + +class Feedforward(nn.Module): + def __init__(self, filter_size=2048, hidden_dim=512, dropout=0.1): + super(Feedforward, self).__init__() + + self.layers = nn.Sequential( + nn.Linear(hidden_dim, filter_size, True), + nn.ReLU(True), + nn.Dropout(p=dropout), + nn.Linear(filter_size, hidden_dim, True), + nn.ReLU(True), + nn.Dropout(p=dropout), + ) + + def forward(self, input): + return self.layers(input) + + + +class TransformerDecoderLayer(nn.Module): + def __init__(self, input_size, src_size, filter_size, head_num, dropout_rate=0.2): + super(TransformerDecoderLayer, self).__init__() + + self.self_attention_layer = MultiHeadAttention( + q_channels=input_size, + k_channels=input_size, + head_num=head_num, + dropout=dropout_rate, + ) + self.self_attention_norm = nn.LayerNorm(normalized_shape=input_size) + + self.attention_layer = MultiHeadAttention( + q_channels=input_size, + k_channels=src_size, + head_num=head_num, + dropout=dropout_rate, + ) + self.attention_norm = nn.LayerNorm(normalized_shape=input_size) + + self.feedforward_layer = Feedforward( + filter_size=filter_size, hidden_dim=input_size + ) + self.feedforward_norm = nn.LayerNorm(normalized_shape=input_size) + + def forward(self, tgt, tgt_prev, src, tgt_mask): + + if tgt_prev == None: # Train + att = self.self_attention_layer(tgt, tgt, tgt, tgt_mask) + out = self.self_attention_norm(att + tgt) + + att = self.attention_layer(out, src, src) + out = self.attention_norm(att + out) + + ff = self.feedforward_layer(out) + out = self.feedforward_norm(ff + out) + else: + tgt_prev = torch.cat([tgt_prev, tgt], 1) + att = self.self_attention_layer(tgt, tgt_prev, tgt_prev, tgt_mask) + out = self.self_attention_norm(att + tgt) + + att = self.attention_layer(out, src, src) + out = self.attention_norm(att + out) + + ff = self.feedforward_layer(out) + out = self.feedforward_norm(ff + out) + return out + + +class PositionEncoder1D(nn.Module): + def __init__(self, in_channels, max_len=500, dropout=0.1): + super(PositionEncoder1D, self).__init__() + + self.position_encoder = self.generate_encoder(in_channels, max_len) + self.position_encoder = self.position_encoder.unsqueeze(0) + self.dropout = nn.Dropout(p=dropout) + + def generate_encoder(self, in_channels, max_len): + pos = torch.arange(max_len).float().unsqueeze(1) + + i = torch.arange(in_channels).float().unsqueeze(0) + angle_rates = 1 / torch.pow(10000, (2 * (i // 2)) / in_channels) + + position_encoder = pos * angle_rates + position_encoder[:, 0::2] = torch.sin(position_encoder[:, 0::2]) + position_encoder[:, 1::2] = torch.cos(position_encoder[:, 1::2]) + + return position_encoder + + def forward(self, x, point=-1): + if point == -1: + out = x + self.position_encoder[:, : x.size(1), :].to(x.get_device()) + out = self.dropout(out) + else: + out = x + self.position_encoder[:, point, :].unsqueeze(1).to(x.get_device()) + return out + + +class TransformerDecoder(nn.Module): + def __init__( + self, + num_classes, + src_dim, + hidden_dim, + filter_dim, + head_num, + dropout_rate, + pad_id, + st_id, + layer_num=1, + checkpoint=None, + ): + super(TransformerDecoder, self).__init__() + + self.embedding = nn.Embedding(num_classes + 1, hidden_dim) + self.hidden_dim = hidden_dim + self.filter_dim = filter_dim + self.num_classes = num_classes + self.layer_num = layer_num + + self.pos_encoder = PositionEncoder1D( + in_channels=hidden_dim, dropout=dropout_rate + ) + + self.attention_layers = nn.ModuleList( + [ + TransformerDecoderLayer( + hidden_dim, src_dim, filter_dim, head_num, dropout_rate + ) + for _ in range(layer_num) + ] + ) + self.generator = nn.Linear(hidden_dim, num_classes) + + self.pad_id = pad_id + self.st_id = st_id + + if checkpoint is not None: + self.load_state_dict(checkpoint) + + def pad_mask(self, text): + pad_mask = text == self.pad_id + pad_mask[:, 0] = False + pad_mask = pad_mask.unsqueeze(1) + + return pad_mask + + def order_mask(self, length): + order_mask = torch.triu(torch.ones(length, length), diagonal=1).bool() + order_mask = order_mask.unsqueeze(0).to(device) + return order_mask + + def text_embedding(self, texts): + tgt = self.embedding(texts) + tgt *= math.sqrt(tgt.size(2)) + + return tgt + + def forward( + self, src, text, is_train=True, batch_max_length=50, teacher_forcing_ratio=1.0 + ): + + if is_train and random.random() < teacher_forcing_ratio: + tgt = self.text_embedding(text) + tgt = self.pos_encoder(tgt) + tgt_mask = self.pad_mask(text) | self.order_mask(text.size(1)) + for layer in self.attention_layers: + tgt = layer(tgt, None, src, tgt_mask) + out = self.generator(tgt) + else: + out = [] + num_steps = batch_max_length - 1 + target = torch.LongTensor(src.size(0)).fill_(self.st_id).to(device) # [START] token + features = [None] * self.layer_num + + for t in range(num_steps): + target = target.unsqueeze(1) + tgt = self.text_embedding(target) + tgt = self.pos_encoder(tgt, point=t) + tgt_mask = self.order_mask(t + 1) + tgt_mask = tgt_mask[:, -1].unsqueeze(1) # [1, (l+1)] + for l, layer in enumerate(self.attention_layers): + tgt = layer(tgt, features[l], src, tgt_mask) + features[l] = ( + tgt if features[l] == None else torch.cat([features[l], tgt], 1) + ) + + _out = self.generator(tgt) # [b, 1, c] + target = torch.argmax(_out[:, -1:, :], dim=-1) # [b, 1] + target = target.squeeze() # [b] + out.append(_out) + + out = torch.stack(out, dim=1).to(device) # [b, max length, 1, class length] + out = out.squeeze(2) # [b, max length, class length] + + return out + +class SWIN(nn.Module): + def __init__(self, FLAGS, train_dataset, checkpoint=None): + super(SWIN, self).__init__() + + # self.encoder = SwinTransformer(ape=True) + self.encoder = timm.create_model('swin_base_patch4_window12_384_in22k', pretrained=True) + self.encoder.ape = True + self.decoder = TransformerDecoder( + num_classes=len(train_dataset.id_to_token), + src_dim=FLAGS.SATRN.decoder.src_dim, + hidden_dim=FLAGS.SATRN.decoder.hidden_dim, + filter_dim=FLAGS.SATRN.decoder.filter_dim, + head_num=FLAGS.SATRN.decoder.head_num, + dropout_rate=FLAGS.dropout_rate, + pad_id=train_dataset.token_to_id[PAD], + st_id=train_dataset.token_to_id[START], + layer_num=FLAGS.SATRN.decoder.layer_num, + ) + self.criterion = ( + nn.CrossEntropyLoss(ignore_index=train_dataset.token_to_id[PAD]) + ) # without ignore_index=train_dataset.token_to_id[PAD] + + if checkpoint: + self.load_state_dict(checkpoint) + + def forward(self, input, expected, is_train, teacher_forcing_ratio): + enc_result = self.encoder(input) + dec_result = self.decoder( + enc_result, + expected[:, :-1], + is_train, + expected.size(1), + teacher_forcing_ratio, + ) + return dec_result + +class SWIN_encoder(nn.Module): + def __init__(self, FLAGS, train_dataset, checkpoint=None): + super(SWIN_encoder, self).__init__() + + self.encoder = SwinTransformer(img_size=384, patch_size=4, in_chans=3, + embed_dim=128, depths=[2, 2, 18, 2], num_heads=[4, 8, 16, 32], + window_size=12, mlp_ratio=4.,num_classes=21841, + drop_path_rate=0.5, ape=True,) + + self.criterion = ( + nn.CrossEntropyLoss(ignore_index=train_dataset.token_to_id[PAD]) + ) # without ignore_index=train_dataset.token_to_id[PAD] + + if checkpoint: + self.load_state_dict(checkpoint) + + def forward(self, input, expected, is_train, teacher_forcing_ratio): + enc_result = self.encoder(input) + return enc_result + +class SWIN_decoder(nn.Module): + def __init__(self, FLAGS, train_dataset, checkpoint=None): + super(SWIN_decoder, self).__init__() + + self.decoder = TransformerDecoder( + num_classes=len(train_dataset.id_to_token), + src_dim=FLAGS.SATRN.decoder.src_dim, + hidden_dim=FLAGS.SATRN.decoder.hidden_dim, + filter_dim=FLAGS.SATRN.decoder.filter_dim, + head_num=FLAGS.SATRN.decoder.head_num, + dropout_rate=FLAGS.dropout_rate, + pad_id=train_dataset.token_to_id[PAD], + st_id=train_dataset.token_to_id[START], + layer_num=FLAGS.SATRN.decoder.layer_num, + ) + self.criterion = ( + nn.CrossEntropyLoss(ignore_index=train_dataset.token_to_id[PAD]) + ) # without ignore_index=train_dataset.token_to_id[PAD] + + if checkpoint: + self.load_state_dict(checkpoint) + + def forward(self, input, expected, is_train, teacher_forcing_ratio): + dec_result = self.decoder( + input, + expected[:, :-1], + is_train, + expected.size(1), + teacher_forcing_ratio, + ) + return dec_result + + def step_forward(self, src, expected, target): + num_step = expected.size(1) - 1 + + target = target.unsqueeze(1) # b, 1 + tgt = self.decoder.text_embedding(target) # b, t+1, 128 + tgt = self.decoder.pos_encoder(tgt, point=self.step_idx) # b, t+1, 128 + tgt_mask = self.decoder.order_mask(self.step_idx + 1) # 1,t+1,t+1 + tgt_mask = tgt_mask[:, -1].unsqueeze(1) # [1,1,t+1] + for l, layer in enumerate(self.decoder.attention_layers): + tgt = layer(tgt, self.features[l], src, tgt_mask) + self.features[l] = ( + tgt if self.features[l] == None else torch.cat([self.features[l], tgt], 1) + ) + + _out = self.decoder.generator(tgt) # [b, 1, c] + self.step_idx += 1 + if self.step_idx == num_step: + self.reset_status() + + return _out + + def reset_status(self): + self.step_idx = 0 + self.features = [None] * self.decoder.layer_num + + def beam_search( + self, + input: torch.Tensor, + data_loader: DataLoader, + topk: int=1, + beam_width: int=5, + max_sequence: int=230 + ): + # 사용할 토큰 + sos_token_id = data_loader.dataset.token_to_id[''] + eos_token_id = data_loader.dataset.token_to_id[''] + pad_token_id = data_loader.dataset.token_to_id[''] + + batch_size = len(input) + src = self.encoder(input) # [B, HxW, C] + + decoded_batch = [] + with torch.no_grad(): + + # 문장 단위 생성 + for data_idx in range(batch_size): + + end_nodes = [] + number_required = min((topk + 1), topk - len(end_nodes)) # 최대 생성 횟수 + + # 빔서치 과정 상 역추적을 위한 우선순위큐 선언 + nodes = PriorityQueue() + + # 시작 토큰 초기화 + current_src = src[data_idx, :, :].unsqueeze(0) # [B=1, HxW, C] + current_input = torch.LongTensor([sos_token_id]) # [B=1] + current_hidden = [None] * self.decoder.layer_num + node = BeamSearchNode( + hidden_state=deepcopy(current_hidden), + prev_node=None, + token_id=deepcopy(current_input), # [1] + log_prob=0, + length=1 # NOTE: P.E에 사용 + ) + score = -node.eval() + + # 최대힙: 확률 높은 토큰을 추출하기 위함 + nodes.put((score, node)) + + num_steps = 0 + while True: + if num_steps >= (max_sequence-1)*beam_width: + break + + # 최대확률샘플 추출/제거, score: 로그확률, n: BeamSearchNode + score, n = nodes.get() + current_input = n.token_id # [B=1] + current_hidden = n.hidden_state + current_point = n.len - 1 # P.E 적용 시 활용 + + # 종료 토큰이 생성될 경우(종료 토큰 & 이전 노드 존재) + if n.token_id.item() == eos_token_id and n.prev_node != None: + end_nodes.append((score, n)) + if len(end_nodes) >= number_required: + break + else: + continue + + current_input = current_input.unsqueeze(1) # [B=1, 1] + + tgt = self.decoder.text_embedding(texts=current_input.to(input.get_device())) # [B=1, 1, HIDDEN] + tgt = self.decoder.pos_encoder(x=tgt, point=current_point) # [B=1, 1, HIDDEN] + tgt_mask = self.decoder.order_mask(length=current_point+1) # [B=1, LEN, LEN] + tgt_mask = tgt_mask[:, -1].unsqueeze(1) # [B=1, 1, LEN] + + # 어텐션 레이어 통과 + for l, layer in enumerate(self.decoder.attention_layers): + tgt = layer( + tgt=tgt, # [B=1, 1, HIDDEN] + tgt_prev=current_hidden[l], + src=current_src, + tgt_mask=tgt_mask + ) # [1, 1, HIDDEN] + + # Hidden state 갱신 + # 첫 state: [1, 1, HIDDEN] + # 이후: [B=1, 1, HIDDEN] -> [B=1, 2, HIDDEN] -> [B=1, 3, HIDDEN] -> ... + current_hidden[l] = (tgt if current_hidden[l] is None else torch.cat([current_hidden[l], tgt], dim=1)) + + # 확률화하기 전 모델의 로짓 + prob_step = self.decoder.generator(tgt) # [B=1, 1, VOCAB_SIZE] + + # 모델의 로짓을 확률화 + log_prob_step = F.log_softmax(prob_step, dim=-1) # [B=1, 1, VOCAB_SIZE] + log_prob, indices = torch.topk(log_prob_step, beam_width) + + # 다음 state에 활용할 {beam_width}개 후보 노드를 우선순위큐에 삽입 + next_nodes = [] + for new_k in range(beam_width): + decoded_t = indices[:, :, new_k].squeeze(0) + log_p = log_prob[:, :, new_k].item() + + node = BeamSearchNode( + hidden_state=deepcopy(current_hidden), + prev_node=n, + token_id=deepcopy(decoded_t), + log_prob=n.logp+log_p, + length=n.len+1, + ) + score = -node.eval() + next_nodes.append((score, node)) + + for i in range(len(next_nodes)): + score, next_node = next_nodes[i] + nodes.put((score, next_node)) + + num_steps += beam_width + + # 토큰이 한번도 등장하지 않았을 경우 - 최대 확률 노드 + if len(end_nodes) == 0: + end_nodes = [nodes.get() for _ in range(topk)] + + utterances = [] + for score, n in sorted( + end_nodes, key=operator.itemgetter(0) + ): # 가장 마지막 노드에서 역추적 + utterance = [] + utterance.append(n.token_id.item()) + # back trace + while n.prev_node != None: + n = n.prev_node + utterance.append(n.token_id.item()) + + utterance = utterance[::-1] # 뒤집기 + utterances.append(utterance) + + if topk == 1: + decoded_batch.append(utterances[0]) + else: + decoded_batch.append(utterances) + + # id_to_string의 입력에 맞게 텐서로 변경 + outputs = [] + for decoded_sample in decoded_batch: + if len(decoded_sample) < max_sequence: + num_pads = max_sequence - len(decoded_sample) + decoded_sample += [pad_token_id]*num_pads + elif len(decoded_sample) > max_sequence: + decoded_sample = decoded_sample[:max_sequence] + outputs.append(decoded_sample) + outputs = torch.tensor(outputs) + + return outputs \ No newline at end of file diff --git a/networks/SWIN.py b/networks/SWIN.py index 1cf0c09..1957722 100644 --- a/networks/SWIN.py +++ b/networks/SWIN.py @@ -887,9 +887,10 @@ class SWIN_encoder(nn.Module): def __init__(self, FLAGS, train_dataset, checkpoint=None): super(SWIN_encoder, self).__init__() - # self.encoder = SwinTransformer(ape=True) - self.encoder = timm.create_model('swin_base_patch4_window12_384_in22k', pretrained=True) - self.encoder.ape = True + self.encoder = SwinTransformer(img_size=384, patch_size=4, in_chans=3, + embed_dim=128, depths=[2, 2, 18, 2], num_heads=[4, 8, 16, 32], + window_size=12, mlp_ratio=4.,num_classes=21841, + drop_path_rate=0.5, ape=True,) self.criterion = ( nn.CrossEntropyLoss(ignore_index=train_dataset.token_to_id[PAD])