Source code for ls_mlkit.diffuser.sde.corrector

import abc
import functools

import torch
from overrides import override
from torch import Tensor

from ...util.decorators import register_class_to_dict
from .base_sde import SDE
from .sde_lib import VESDE, VPSDE, SubVPSDE

_CORRECTORS = {}

register_corrector = functools.partial(register_class_to_dict, global_dict=_CORRECTORS)


[docs] class Corrector(abc.ABC): """The abstract class for a corrector algorithm.""" def __init__(self, sde: SDE, score_fn: object, snr: float, n_steps: int): super().__init__() self.sde = sde self.score_fn = score_fn self.snr = snr self.n_steps = n_steps
[docs] @abc.abstractmethod def update_fn(self, x: Tensor, t: Tensor, mask=None): """One update of the corrector. Args: x: A PyTorch tensor representing the current state t: A PyTorch tensor representing the current time step. Returns: x: A PyTorch tensor of the next state. x_mean: A PyTorch tensor. The next state without random noise. Useful for denoising. """
[docs] @register_corrector(key_name="none") class NoneCorrector(Corrector): """An empty corrector that does nothing.""" def __init__(self, sde, score_fn, snr, n_steps): ...
[docs] def update_fn(self, x, t, mask=None): return x, x
[docs] @register_corrector(key_name="langevin_corrector") class LangevinCorrector(Corrector): def __init__(self, sde: SDE, score_fn: object, snr: float, n_steps: int, n_dim: int = 2): super().__init__(sde, score_fn, snr, n_steps) if not isinstance(sde, VPSDE) and not isinstance(sde, VESDE) and not isinstance(sde, SubVPSDE): raise NotImplementedError(f"SDE class {sde.__class__.__name__} not yet supported.") self.n_dim = n_dim
[docs] @override def update_fn(self, x: Tensor, t: Tensor, mask=None): sde = self.sde score_fn = self.score_fn n_steps = self.n_steps target_snr = self.snr if isinstance(sde, VPSDE) or isinstance(sde, SubVPSDE): timestep = (t * (sde.n_discretization_steps - 1) / sde.T).long() alpha = sde.alphas.to(t.device)[timestep] else: alpha = torch.ones_like(t) for _ in range(n_steps): grad = score_fn(x, t, mask) noise = torch.randn_like(x) grad_norm = torch.norm(grad.reshape(grad.shape[0], -1), dim=-1).mean() noise_norm = torch.norm(noise.reshape(noise.shape[0], -1), dim=-1).mean() step_size = (target_snr * noise_norm / grad_norm) ** 2 * 2 * alpha x_mean = x + step_size.view(step_size.shape[0], *[1 for _ in range(self.n_dim)]) * grad x = x_mean + torch.sqrt(step_size * 2).view(step_size.shape[0], *[1 for _ in range(self.n_dim)]) * noise return x, x_mean