# Copyright (c) Together
# This software is distributed under the terms of the Apache License, Version 2.0
# Author: Michael Poli
# Note: MP and PP utilities are removed for ease of use and editing.

import torch
import torch.nn as nn
import torch.nn.functional as F

from .utils import print_rank_0, column_split
from .cache import InferenceParams, RecurrentInferenceParams
from .engine import HyenaInferenceEngine
from .layers import (
    RMSNorm,
    ParallelGatedMLP,
    VocabParallelEmbedding,
)

try:
    from flash_attn.modules.mha import MHA
except ImportError:
    "flash_attn not installed"


class AttentionBlock(nn.Module):
    def __init__(self, config, layer_idx) -> None:
        super().__init__()
        self.config = config
        self.pre_norm, self.post_norm = RMSNorm(config), RMSNorm(config)
        self.layer_idx = layer_idx
        self.proj_groups = config.get("proj_groups", 1)
        dtype = config.get("attn_block_dtype", torch.bfloat16)
        mlp_dtype = config.get("mlp_dtype", torch.bfloat16)
        self.num_attention_heads = config.num_attention_heads
        self.hidden_size_per_attention_head = config.hidden_size // config.num_attention_heads

        self.counter = 0
        self.inner_mha_cls = MHA(
            embed_dim=config.hidden_size,
            num_heads=config.num_attention_heads,
            num_heads_kv=config.num_attention_heads // self.proj_groups,
            rotary_emb_dim=config.hidden_size // config.num_attention_heads,
            qkv_proj_bias=config.get("qkv_proj_bias", True),
            rotary_emb_base=config.get("rotary_emb_base", 10000),
            causal=True,
            layer_idx=layer_idx,
            out_proj_bias=config.get("mha_out_proj_bias", True),
            use_flash_attn=self.config.use_flash_attn,
        ).to(dtype=dtype)

        if self.config.get("smeared_gqa", False):
            self.inner_mha_cls.num_heads_kv = self.inner_mha_cls.num_heads
        self.inner_mha_cls.rotary_emb.register_buffer(
            "inv_freq", self.inner_mha_cls.rotary_emb.inv_freq
        )

        self.mlp = ParallelGatedMLP(config).to(dtype=mlp_dtype)

    def forward(self, u, inference_params=None, padding_mask=None, *args, **kwargs):
        if (
            type(padding_mask) == torch.Tensor
        ):  # workaround for masking bug in FA. This works because Wqkv does not have bias
            # and attention scores will be also automatically zeroed.
            u = u * padding_mask[..., None]

        u = (
            self.inner_mha_cls(
                self.pre_norm(u),
                inference_params=inference_params,
            )
            + u
        )
        if type(padding_mask) == torch.Tensor:  # guard against bias
            u = u * padding_mask[..., None]
        u = self.mlp(self.post_norm(u)) + u
        return u, None


class ParallelHyenaFilter(nn.Module):
    def __init__(self, config, layer_idx) -> None:
        super().__init__()
        self.config = config
        self.layer_idx = layer_idx
        self.hyena_filter_groups = config.get("hyena_filter_groups", self.config.hidden_size)

        self.use_flashfft = config.get("use_flashfft", False)
        self.state_size = config.state_size
        self.hidden_size = config.hidden_size
        self.num_filters = config.num_filters
        self.inference_mode = config.get("inference_mode", True)
        self.counter = 0
        self.column_split_hyena = config.get("column_split_hyena", True)

        assert self.hidden_size % self.num_filters == 0 and self.num_filters <= self.hidden_size

        self.D = nn.Parameter(torch.zeros(self.hidden_size))

        # attention heads are not used except to split post short_filter
        # projections in the same way as the checkpoint
        self.num_attention_heads = config.num_attention_heads
        self.hidden_size_per_attention_head = self.hidden_size // self.num_attention_heads

        # after preprocessing here we can save the new checkpoint
        self.short_filter_length = config.short_filter_length
        self.short_filter_weight = nn.Parameter(
            torch.randn(3 * config.hidden_size, 1, config.short_filter_length)
        )
        self.short_filter_bias = (
            nn.Parameter(torch.randn(3 * config.hidden_size)) if config.short_filter_bias else None
        )

        self.engine = HyenaInferenceEngine(layer_idx=layer_idx)
        self.use_flash_depthwise = config.get("use_flash_depthwise", False)
        self.data_dtype = None

        if self.use_flash_depthwise:
            self.fir_fn = FlashDepthwiseConv1d(
                channels=3 * self.hidden_size,
                kernel_size=self.short_filter_length,
                padding=self.short_filter_length - 1,
                weights=self.short_filter_weight,
                bias=self.short_filter_bias,
                device=None,
                dtype=self.config.get("depthwise_dtype", torch.bfloat16),
            )
        else:
            self.fir_fn = F.conv1d

        self.fftconv_fn = None
        self.long_fir_threshold = config.get("long_fir_threshold", None)
        if self.long_fir_threshold is not None:
            assert (
                self.use_flashfft is False
            ), "long_fir_threshold not compatible with fused flashfft"

        self.num_systems = self.hidden_size // self.hyena_filter_groups
        self.poles = nn.Parameter(torch.randn(self.num_systems, self.state_size, 1, 2))
        self.residues = nn.Parameter(torch.randn(self.num_systems, self.state_size, 1, 2))
        self.h = None

    def forward(self, u, inference_params=None, padding_mask=None, *args, **kwargs):
        if (
            inference_params is not None
            and self.layer_idx in inference_params.fir_state_dict.keys()
        ):
            return self.sequential_forward(u, inference_params)

        else:
            return self.parallel_forward(u, inference_params, padding_mask)

    def parallel_forward(self, u, inference_params=None, padding_mask=None):
        L = u.shape[1]
        z_pre, fir_state = self.engine.parallel_fir(
            self.fir_fn,
            u,
            self.short_filter_weight,
            self.short_filter_bias,
            L,
            fir_length=self.short_filter_length,
            inference_params=inference_params,
            padding_mask=padding_mask,
        )
        if inference_params:
            inference_params.fir_state_dict[self.layer_idx] = fir_state

        if self.h is None:
            h, filter_dtype, poles, residues = self.compute_filter(L, u.device)
        else:
            h = self.h
            filter_dtype = self.h.dtype

        if self.hyena_filter_groups > 1:
            h = h.repeat_interleave(self.hidden_size // self.hyena_filter_groups, 1)

        # if inference_params is not None, we plan to perform generation:
        # prefilling for the IIR portion of the filter is handled by the engine.
        dims = (
            self.hidden_size,
            self.num_attention_heads,
            self.hidden_size_per_attention_head,
            self.state_size,
            self.hyena_filter_groups,
        )
        y = self.engine.parallel_iir(
            z_pre,
            h,
            self.D,
            L,
            t=self.t,
            poles=self.poles,
            dims=dims,
            inference_params=inference_params,
            layer_idx=self.layer_idx,
            prefill_style=self.config.get("prefill_style", "fft"),
            use_flashfft=self.use_flashfft,
            fftconv_fn=self.fftconv_fn,
            column_split_hyena=self.column_split_hyena,
            long_fir_threshold=self.long_fir_threshold,
            padding_mask=padding_mask,
        )

        return y, inference_params

    def sequential_forward(self, u, inference_params):
        if self.data_dtype is None:
            self.data_dtype = u.dtype
        if len(u.shape) > 2:
            u = u[:, -1]

        fir_state, iir_state = (
            inference_params.fir_state_dict[self.layer_idx],
            inference_params.state_dict[self.layer_idx],
        )

        z_pre, fir_state = self.engine.step_fir(
            u, fir_state, weight=self.short_filter_weight, bias=self.short_filter_bias
        )
        x2, x1, v = (
            column_split(z_pre, self.num_attention_heads, self.hidden_size_per_attention_head)
            if self.column_split_hyena
            else z_pre.split([self.hidden_size, self.hidden_size, self.hidden_size], dim=1)
        )

        y, iir_state = self.engine.step_iir(
            x2,
            x1,
            v,
            self.D,
            self.residues,
            self.poles,
            iir_state,
            iir_groups=self.hyena_filter_groups,
        )

        inference_params.fir_state_dict[self.layer_idx] = fir_state
        inference_params.state_dict[self.layer_idx] = iir_state
        y = y.to(dtype=self.data_dtype)
        return y[:, None], inference_params

    def update_time(self, L, device):
        """
        Set [0, 1, ..., L-1] where L is the length of the current batch of inputs.
        If L is greater than the length of the previous batch, then the time vector is
        reinitialized. Otherwise, the time vector is truncated from cache.
        """
        if not hasattr(self, "t"):
            self.t = torch.arange(L, device=device)[None, None]
        elif self.t.shape[-1] < L:
            self.t = torch.arange(L, device=device)[None, None]
        else:
            self.t = self.t[..., :L]

    def compute_filter(self, L, device):
        self.update_time(L, device)
        filter_dtype = torch.float32
        residues, log_poles = (
            torch.view_as_complex(self.residues.to(filter_dtype)),
            torch.view_as_complex(self.poles.to(filter_dtype)).log(),
        )
        h = (residues * (log_poles * self.t).exp()).real.sum(1)[None]
        return h, filter_dtype, log_poles, residues


class ParallelGatedConvBlock(nn.Module):
    def __init__(self, config, layer_idx) -> None:
        super().__init__()
        self.config = config
        self.layer_idx = layer_idx
        dtype = config.get("hyena_block_dtype", torch.float32)
        mlp_dtype = config.get("mlp_dtype", torch.bfloat16)
        self.pre_norm, self.post_norm = RMSNorm(config).to(dtype=dtype), RMSNorm(config).to(
            dtype=dtype
        )
        self.filter = ParallelHyenaFilter(config, layer_idx).to(dtype=dtype)
        self.projections = nn.Linear(config.hidden_size, 3 * config.hidden_size)
        self.out_filter_dense = nn.Linear(config.hidden_size, config.hidden_size).to(dtype)
        self.mlp = ParallelGatedMLP(config).to(dtype=mlp_dtype)

    def forward(self, u, inference_params=None, padding_mask=None, *args, **kwargs):
        z = self.projections(self.pre_norm(u))
        if type(padding_mask) == torch.Tensor:  # guard against bias
            z = z * padding_mask[..., None]

        z, inference_params = self.filter(
            z, inference_params=inference_params, padding_mask=padding_mask
        )

        u = self.out_filter_dense(z) + u
        if type(padding_mask) == torch.Tensor:  # guard against bias
            u = u * padding_mask[..., None]
        u = self.mlp(self.post_norm(u)) + u
        return u, inference_params


def get_block(config, layer_idx, flash_fft=None):
    if layer_idx in config.attn_layer_idxs:
        return AttentionBlock(config, layer_idx)
    elif layer_idx in config.hyena_layer_idxs:
        block = ParallelGatedConvBlock(config, layer_idx)
        if config.get("use_flashfft", "False"):
            block.filter.fftconv_fn = flash_fft
        return block
    else:
        raise NotImplementedError


class StripedHyena(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        self.embedding_layer = VocabParallelEmbedding(config)
        self.norm = RMSNorm(config) if config.get("final_norm", True) else None
        self.unembed = self.emb if config.tie_embeddings else VocabParallelEmbedding(config)
        self.gradient_checkpointing = False
        
        if config.get("use_flashfft", "False"):
            raise NotImplementedError("Please use standalone SH code for other custom kernels")
        else:
            self.flash_fft = None

        self.blocks = nn.ModuleList(
            get_block(config, layer_idx, flash_fft=self.flash_fft)
            for layer_idx in range(config.num_layers)
        )

    def forward(self, x, inference_params_dict=None, padding_mask=None):
        L = x.shape[1]
        x = self.embedding_layer.embed(x)
        if inference_params_dict is not None:
            x, inference_params_dict_out = self.stateful_forward(
                x,
                inference_params_dict=inference_params_dict,
            )
        else:
            x, inference_params_dict_out = self.stateless_forward(x, padding_mask=padding_mask)
        x = self.norm(x)
        x = self.unembed.unembed(x)
        return x, inference_params_dict_out

    def stateful_forward(self, x, inference_params_dict=None):
        for block_idx, block in enumerate(self.blocks):
            block_name = "mha" if block_idx in self.config.attn_layer_idxs else "hyena"
            inference_params = inference_params_dict[block_name]
            x, _ = block(x, inference_params=inference_params)

        return x, inference_params_dict

    def stateless_forward(self, x, padding_mask=None):
        if type(padding_mask) == torch.Tensor:
            x = x * padding_mask[..., None]

        for block_idx, block in enumerate(self.blocks):
            if self.gradient_checkpointing and self.training:
                def create_custom_forward(module):
                    def custom_forward(*inputs):
                        # None for past_key_value
                        return module(*inputs, inference_params=None, padding_mask=padding_mask)

                    return custom_forward

                x, _ = checkpoint(create_custom_forward(block), x, use_reentrant=False)
            else:
                x, _ = block(x, inference_params=None, padding_mask=padding_mask)
        return x, None

    def initialize_inference_params(self):
        print_rank_0("Initializing inference params...")
        inference_params_dict = {
            "mha": InferenceParams(
                max_seqlen=self.config.get("max_seqlen", 8192),
                max_batch_size=self.config.get("max_batch_size", 1),
                seqlen_offset=0,
            ),
            "hyena": RecurrentInferenceParams(
                fir_filter_length=self.config.short_filter_length,
                state_dim=self.config.state_size,
                seqlen_offset=0,
            ),
        }
        return inference_params_dict

    def precompute_filters(self, L, device):
        for block_idx, block in enumerate(self.blocks):
            if type(block) == ParallelGatedConvBlock:
                if type(block.filter) == ParallelHyenaFilter:
                    L = block.filter.long_fir_threshold or L
                    print_rank_0(f"Precomputing filters, L={L}...")

                    filter_dtype = torch.float16 if L >= 2048 else torch.float32

                    block.filter._set_time(L, device)
                    residues, poles = (
                        torch.view_as_complex(block.filter.residues.to(torch.float16)),
                        torch.view_as_complex(block.filter.poles.to(torch.float16)),
                    )

                    block.filter.h = (residues * poles**block.filter.t).real.sum(1)[None]
                    block.filter.h = block.filter.h.to(dtype=filter_dtype)

    def load_poles_residues(self, path):
        "Load different poles and residues for each layer."
        for block_idx, block in enumerate(self.blocks):
            if type(block) == ParallelGatedConvBlock:
                if type(block.filter) == ParallelHyenaFilter:
                    print(f"Loading poles and residues for block {block_idx}")
                    poles = torch.load(path + f"/approx_poles_{block_idx+1}.pt", map_location="cpu")
                    poles = torch.view_as_real(poles)
                    residues = torch.load(
                        path + f"/approx_residues_{block_idx+1}.pt", map_location="cpu"
                    )
                    residues = torch.view_as_real(residues)
                    poles = poles.permute(1, 0, 2).unsqueeze(-2)
                    residues = residues.permute(1, 0, 2).unsqueeze(-2)

                    block.filter.poles = nn.Parameter(poles)
                    block.filter.residues = nn.Parameter(residues)

    def to_bfloat16_except_poles_residues(self):
        """Convert all parameters to bfloat16 except for the poles and residues.

        Particularly important for longer prompts.
        """
        for k, p in self.named_parameters():
            if "poles" not in k and "residues" not in k:
                p.data = p.data.to(torch.bfloat16)