from collections import defaultdict from dataclasses import dataclass from typing import Any, Dict, Optional, Tuple, Union import torch import torch.nn.functional as F import torch.utils.checkpoint from torch import nn from torch.nn import CrossEntropyLoss from torch.utils._pytree import tree_map from transformers import PretrainedConfig from transformers.activations import ACT2FN from transformers.modeling_outputs import ( BaseModelOutputWithPast, CausalLMOutputWithPast, ) from transformers.generation import GenerationMixin from transformers.modeling_utils import PreTrainedModel from transformers.utils import ModelOutput, logging from transformers.utils.import_utils import is_causal_conv1d_available if is_causal_conv1d_available(): from causal_conv1d import causal_conv1d_fn, causal_conv1d_update else: causal_conv1d_update, causal_conv1d_fn = None, None logger = logging.get_logger(__name__) TTT_STANDARD_CONFIGS = { "125m": { "hidden_size": 768, "intermediate_size": 2048, "num_hidden_layers": 12, "num_attention_heads": 12, }, "350m": { "hidden_size": 1024, "intermediate_size": 2736, "num_hidden_layers": 24, "num_attention_heads": 16, }, "760m": { "hidden_size": 1536, "intermediate_size": 4096, "num_hidden_layers": 24, "num_attention_heads": 16, }, "1b": { "hidden_size": 2048, "intermediate_size": 5504, "num_hidden_layers": 24, "num_attention_heads": 32, }, } class TTTConfig(PretrainedConfig): r""" This is the configuration class to store the configuration of a [`TTTModel`]. It is used to instantiate an TTT model according to the specified arguments, defining the model architecture. Instantiating a configuration with the defaults will yield a similar configuration to that of the TTT-1B. Configuration objects inherit from [`PretrainedConfig`] and can be used to control the model outputs. Read the documentation from [`PretrainedConfig`] for more information. Args: vocab_size (`int`, *optional*, defaults to 32000): Vocabulary size of the LLaMA model. Defines the number of different tokens that can be represented by the `inputs_ids` passed when calling [`LlamaModel`] hidden_size (`int`, *optional*, defaults to 4096): Dimension of the hidden representations. intermediate_size (`int`, *optional*, defaults to 11008): Dimension of the MLP representations. num_hidden_layers (`int`, *optional*, defaults to 32): Number of hidden layers in the Transformer decoder. num_attention_heads (`int`, *optional*, defaults to 32): Number of attention heads for each attention layer in the Transformer decoder. num_key_value_heads (`int`, *optional*): This is the number of key_value heads that should be used to implement Grouped Query Attention. If `num_key_value_heads=num_attention_heads`, the model will use Multi Head Attention (MHA), if `num_key_value_heads=1 the model will use Multi Query Attention (MQA) otherwise GQA is used. When converting a multi-head checkpoint to a GQA checkpoint, each group key and value head should be constructed by meanpooling all the original heads within that group. For more details checkout [this paper](https://arxiv.org/pdf/2305.13245.pdf). If it is not specified, will default to `num_attention_heads`. hidden_act (`str` or `function`, *optional*, defaults to `"silu"`): The non-linear activation function (function or string) in the decoder. max_position_embeddings (`int`, *optional*, defaults to 2048): The maximum sequence length that this model might ever be used with. Llama 1 supports up to 2048 tokens, Llama 2 up to 4096, CodeLlama up to 16384. initializer_range (`float`, *optional*, defaults to 0.02): The standard deviation of the truncated_normal_initializer for initializing all weight matrices. rms_norm_eps (`float`, *optional*, defaults to 1e-06): The epsilon used by the rms normalization layers. use_cache (`bool`, *optional*, defaults to `True`): Whether or not the model should return the last key/values attentions (not used by all models). Only relevant if `config.is_decoder=True`. pad_token_id (`int`, *optional*): Padding token id. bos_token_id (`int`, *optional*, defaults to 1): Beginning of stream token id. eos_token_id (`int`, *optional*, defaults to 2): End of stream token id. pretraining_tp (`int`, *optional*, defaults to 1): Experimental feature. Tensor parallelism rank used during pretraining. Please refer to [this document](https://huggingface.co/docs/transformers/main/perf_train_gpu_many#tensor-parallelism) to understand more about it. This value is necessary to ensure exact reproducibility of the pretraining results. Please refer to [this issue](https://github.com/pytorch/pytorch/issues/76232). tie_word_embeddings (`bool`, *optional*, defaults to `False`): Whether to tie weight embeddings rope_theta (`float`, *optional*, defaults to 10000.0): The base period of the RoPE embeddings. rope_scaling (`Dict`, *optional*): Dictionary containing the scaling configuration for the RoPE embeddings. Currently supports two scaling strategies: linear and dynamic. Their scaling factor must be a float greater than 1. The expected format is `{"type": strategy name, "factor": scaling factor}`. When using this flag, don't update `max_position_embeddings` to the expected new maximum. See the following thread for more information on how these scaling strategies behave: https://www.reddit.com/r/LocalLLaMA/comments/14mrgpr/dynamically_scaled_rope_further_increases/. This is an experimental feature, subject to breaking API changes in future versions. use_gate (`bool`, *optional*, defaults to `False`): whether use gating in Mamba backbone share_qk (`bool`, *optional*, defaults to `False`): whether share Q/K projection matrix ttt_layer_type (`str`, *optional*, defaults to `"linear"`): ttt block type, "linear" or "mlp", stands for TTT-Linear and TTT-MLP ttt_base_lr (`float`, *optional*, defaults to 1.0): base learning rate for TTT learner pre_conv (`bool`, *optional*, defaults to `False`): whether use conv before TTT conv_kernel (`int`, *optional*, defaults to 4): kernel size of the conv layer scan_checkpoint_group_size (`int`, *optional*, defaults to 0): gradient checkpoint group size on seq dimension, 0 means no checkpointing. In JAX implementation, we set it 4, which means we group 4 mini-batches together in 1 gradient checkpointg to save memory. ```python >>> from . import TTTModel, TTTConfig >>> # Initializing a TTT ttt-1b style configuration >>> configuration = TTTConfig() >>> # Initializing a model from the ttt-1b style configuration >>> model = TTTModel(configuration) >>> # Accessing the model configuration >>> configuration = model.config ```""" model_type = "ttt" def __init__( self, vocab_size=151936, # Expanded vocab for MAC hidden_size=2048, intermediate_size=5504, # For memory layers num_hidden_layers=24, # Memory layers by default num_attention_heads=32, hidden_act="silu", max_position_embeddings=262144, # Long context support initializer_range=0.02, rms_norm_eps=1e-6, use_cache=True, pad_token_id=None, bos_token_id=151643, eos_token_id=151645, pretraining_tp=1, tie_word_embeddings=True, rope_theta=5000000.0, # For long context use_gate=True, share_qk=True, ttt_layer_type="linear", ttt_base_lr=1.0, mini_batch_size=16, pre_conv=True, conv_kernel=4, scan_checkpoint_group_size=0, # MAC-specific parameters model_variant="mac", # "mac", "standard", "memory_only", "core_only" num_memory_layers=None, # If None, uses num_hidden_layers num_core_layers=36, # Core processing layers core_intermediate_size=9728, # MLP size for core layers num_key_value_heads=8, # For GQA in core layers head_dim=None, # Auto-calculate if None attention_bias=False, attention_dropout=0.0, rope_scaling=None, sliding_window=None, fixed_memory_size=64, # Persistent memory tokens core_hidden_size=None, # If None, uses hidden_size (for same-size configs) **kwargs, ): self.vocab_size = vocab_size self.max_position_embeddings = max_position_embeddings self.hidden_size = hidden_size self.intermediate_size = intermediate_size self.num_hidden_layers = num_hidden_layers self.num_attention_heads = num_attention_heads self.hidden_act = hidden_act self.initializer_range = initializer_range self.rms_norm_eps = rms_norm_eps self.pretraining_tp = pretraining_tp self.use_cache = use_cache self.rope_theta = rope_theta self.use_gate = use_gate self.share_qk = share_qk self.ttt_layer_type = ttt_layer_type self.ttt_base_lr = ttt_base_lr self.mini_batch_size = mini_batch_size self.pre_conv = pre_conv self.conv_kernel = conv_kernel self.scan_checkpoint_group_size = scan_checkpoint_group_size # MAC-specific attributes self.model_variant = model_variant self.num_memory_layers = num_memory_layers if num_memory_layers is not None else num_hidden_layers self.num_core_layers = num_core_layers self.core_intermediate_size = core_intermediate_size self.num_key_value_heads = num_key_value_heads self.head_dim = head_dim if head_dim is not None else (hidden_size // num_attention_heads) self.attention_bias = attention_bias self.attention_dropout = attention_dropout self.rope_scaling = rope_scaling self.sliding_window = sliding_window self.fixed_memory_size = fixed_memory_size # Core hidden size - if None, same as memory hidden size self.core_hidden_size = core_hidden_size if core_hidden_size is not None else hidden_size super().__init__( pad_token_id=pad_token_id, bos_token_id=bos_token_id, eos_token_id=eos_token_id, tie_word_embeddings=tie_word_embeddings, **kwargs, ) ######################## ### Backbone Modules ### ######################## def rotate_half(x): """Rotates half the hidden dims of the input.""" x1 = x[..., : x.shape[-1] // 2] x2 = x[..., x.shape[-1] // 2 :] return torch.cat((-x2, x1), dim=-1) def permute_qk(q, k): # NOTE: EasyLM and transformers use different method to compute rotary emebdding # we manually reorder the dim here to match our JAX implementation # which may not be optimal for speed # reference: https://github.com/young-geng/EasyLM/blob/981a2ed9630f44258a94b6f44dff2b7bd203ae8d/EasyLM/models/llama/convert_hf_to_easylm.py#L33 bsz, num_head, seq_len, head_dim = q.shape q = q.reshape(bsz, num_head, seq_len, head_dim // 2, 2).transpose(3, 4).reshape(bsz, num_head, seq_len, head_dim) k = k.reshape(bsz, num_head, seq_len, head_dim // 2, 2).transpose(3, 4).reshape(bsz, num_head, seq_len, head_dim) return q, k def undo_permute_qk(q, k): # NOTE: EasyLM and transformers use different method to compute rotary emebdding # we manually undo the reorder the dim here to match our JAX implementation # which may not be optimal for speed # reference: https://github.com/young-geng/EasyLM/blob/981a2ed9630f44258a94b6f44dff2b7bd203ae8d/EasyLM/models/llama/convert_hf_to_easylm.py#L33 bsz, num_head, seq_len, head_dim = q.shape q = q.reshape(bsz, num_head, seq_len, 2, head_dim // 2).transpose(3, 4).reshape(bsz, num_head, seq_len, head_dim) k = k.reshape(bsz, num_head, seq_len, 2, head_dim // 2).transpose(3, 4).reshape(bsz, num_head, seq_len, head_dim) return q, k def apply_rotary_pos_emb(q, k, cos, sin, position_ids=None, unsqueeze_dim=1): """Applies Rotary Position Embedding to the query and key tensors. Args: q (`torch.Tensor`): The query tensor. k (`torch.Tensor`): The key tensor. cos (`torch.Tensor`): The cosine part of the rotary embedding. sin (`torch.Tensor`): The sine part of the rotary embedding. position_ids (`torch.Tensor`, *optional*): Deprecated and unused. unsqueeze_dim (`int`, *optional*, defaults to 1): The 'unsqueeze_dim' argument specifies the dimension along which to unsqueeze cos[position_ids] and sin[position_ids] so that they can be properly broadcasted to the dimensions of q and k. For example, note that cos[position_ids] and sin[position_ids] have the shape [batch_size, seq_len, head_dim]. Then, if q and k have the shape [batch_size, heads, seq_len, head_dim], then setting unsqueeze_dim=1 makes cos[position_ids] and sin[position_ids] broadcastable to the shapes of q and k. Similarly, if q and k have the shape [batch_size, seq_len, heads, head_dim], then set unsqueeze_dim=2. Returns: `tuple(torch.Tensor)` comprising of the query and key tensors rotated using the Rotary Position Embedding. """ cos = cos.unsqueeze(unsqueeze_dim) sin = sin.unsqueeze(unsqueeze_dim) q_embed = (q * cos) + (rotate_half(q) * sin) k_embed = (k * cos) + (rotate_half(k) * sin) return q_embed, k_embed class RMSNorm(nn.Module): def __init__(self, hidden_size, eps=1e-6): super().__init__() self.weight = nn.Parameter(torch.ones(hidden_size)) self.variance_epsilon = eps def forward(self, hidden_states): input_dtype = hidden_states.dtype hidden_states = hidden_states.to(torch.float32) variance = hidden_states.pow(2).mean(-1, keepdim=True) hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon) return self.weight * hidden_states.to(input_dtype) class SwiGluMLP(nn.Module): def __init__(self, config): super().__init__() self.config = config self.hidden_size = config.hidden_size self.intermediate_size = config.intermediate_size self.gate_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) self.up_proj = nn.Linear(self.hidden_size, self.intermediate_size, bias=False) self.down_proj = nn.Linear(self.intermediate_size, self.hidden_size, bias=False) self.act_fn = ACT2FN[config.hidden_act] def forward(self, x): if self.config.pretraining_tp > 1: slice = self.intermediate_size // self.config.pretraining_tp gate_proj_slices = self.gate_proj.weight.split(slice, dim=0) up_proj_slices = self.up_proj.weight.split(slice, dim=0) down_proj_slices = self.down_proj.weight.split(slice, dim=1) gate_proj = torch.cat( [F.linear(x, gate_proj_slices[i]) for i in range(self.config.pretraining_tp)], dim=-1, ) up_proj = torch.cat( [F.linear(x, up_proj_slices[i]) for i in range(self.config.pretraining_tp)], dim=-1, ) intermediate_states = (self.act_fn(gate_proj) * up_proj).split(slice, dim=2) down_proj = [ F.linear(intermediate_states[i], down_proj_slices[i]) for i in range(self.config.pretraining_tp) ] down_proj = sum(down_proj) else: down_proj = self.down_proj(self.act_fn(self.gate_proj(x)) * self.up_proj(x)) return down_proj class RotaryEmbedding(nn.Module): def __init__( self, dim, max_position_embeddings=16, base=10000, device=None, scaling_factor=1.0, ): super().__init__() self.scaling_factor = scaling_factor self.dim = dim self.max_position_embeddings = max_position_embeddings self.base = base inv_freq = 1.0 / (self.base ** (torch.arange(0, self.dim, 2, dtype=torch.int64).float().to(device) / self.dim)) self.register_buffer("inv_freq", inv_freq, persistent=False) @torch.no_grad() def forward(self, x, position_ids): # x: [bs, num_attention_heads, seq_len, head_size] inv_freq_expanded = self.inv_freq[None, :, None].float().expand(position_ids.shape[0], -1, 1) position_ids_expanded = position_ids[:, None, :].float() # Force float32 since bfloat16 loses precision on long contexts # See https://github.com/huggingface/transformers/pull/29285 device_type = x.device.type device_type = device_type if isinstance(device_type, str) and device_type != "mps" else "cpu" with torch.autocast(device_type=device_type, enabled=False): freqs = (inv_freq_expanded.float() @ position_ids_expanded.float()).transpose(1, 2) emb = torch.cat((freqs, freqs), dim=-1) cos = emb.cos() sin = emb.sin() return cos.to(dtype=x.dtype), sin.to(dtype=x.dtype) class Conv(nn.Module): def __init__(self, config, layer_idx): super().__init__() self.config = config self.layer_idx = layer_idx self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.conv = nn.Conv1d( config.hidden_size, config.hidden_size, bias=True, kernel_size=config.conv_kernel, groups=config.hidden_size, padding=config.conv_kernel - 1, ) def __call__(self, hidden_states, cache_params=None): seq_len = hidden_states.shape[1] hidden_states = self.norm(hidden_states) # [B, C, L] hidden_states = hidden_states.transpose(1, 2) if causal_conv1d_fn is None: if cache_params is not None: if cache_params.seqlen_offset > 0: conv_state = cache_params.conv_states_dic["pre_conv"][self.layer_idx] conv_state = torch.roll(conv_state, shifts=-1, dims=-1) conv_state[:, :, -1] = hidden_states[:, :, 0] cache_params.conv_states_dic["pre_conv"][self.layer_idx].copy_(conv_state) hidden_states = torch.sum(conv_state * self.conv.weight[:, 0, :], dim=-1) hidden_states += self.conv.bias hidden_states = hidden_states.unsqueeze(-1) else: conv_state = nn.functional.pad( hidden_states, (self.config.conv_kernel - hidden_states.shape[-1], 0), ) cache_params.conv_states_dic["pre_conv"][self.layer_idx].copy_(conv_state) hidden_states = self.conv(hidden_states)[..., :seq_len] else: hidden_states = self.conv(hidden_states)[..., :seq_len] else: conv_weights = self.conv.weight.view(self.conv.weight.size(0), self.conv.weight.size(2)) if cache_params is not None and cache_params.seqlen_offset > 0: hidden_states = causal_conv1d_update( hidden_states.squeeze(-1), cache_params.conv_states_dic["pre_conv"][self.layer_idx], conv_weights, self.conv.bias, None, ) hidden_states = hidden_states.unsqueeze(-1) else: if cache_params is not None: conv_states = nn.functional.pad( hidden_states, (self.config.conv_kernel - hidden_states.shape[-1], 0), ) cache_params.conv_states_dic["pre_conv"][self.layer_idx].copy_(conv_states) hidden_states = causal_conv1d_fn(hidden_states, conv_weights, self.conv.bias, activation=None) # [B, L, C] hidden_states = hidden_states.transpose(1, 2) return hidden_states ######################### ### TTT Layer Modules ### ######################### def scan(f, init, xs, out, checkpoint_group=0): """Minic jax.lax.scan function.""" carry = init if isinstance(xs, dict): num_items = len(next(iter(xs.values()))) else: num_items = len(xs[0]) def scan_fn(carry, i_start, i_end): for i in range(i_start, i_end): if isinstance(xs, dict): x = {key: tensor[i] for key, tensor in xs.items()} else: x = [x[i] for x in xs] carry, y = f(carry, x) out[i] = y return carry if checkpoint_group > 0: ckpt_every_n = num_items // checkpoint_group for k in range(0, num_items, ckpt_every_n): carry = torch.utils.checkpoint.checkpoint( scan_fn, carry, k, min(k + ckpt_every_n, num_items), use_reentrant=False ) else: carry = scan_fn(carry, 0, num_items) return carry, out def ln_fwd(x, gamma, beta, eps=1e-6): "Batch forward for LayerNorm." # Mean and variance computation mu = x.mean(dim=-1, keepdim=True) var = x.var(dim=-1, keepdim=True, unbiased=False) # Normalization std = torch.sqrt(var + eps) x_hat = (x - mu) / std # Scale and shift y = gamma * x_hat + beta return y def ln_fused_l2_bwd(x, l2_target, gamma, beta, eps=1e-6): "Batch backward for LayerNorm fused with L2 loss." D = x.shape[-1] # Mean and variance computation mu = x.mean(dim=-1, keepdim=True) var = x.var(dim=-1, keepdim=True, unbiased=False) # Normalization std = torch.sqrt(var + eps) x_hat = (x - mu) / std # Scale and shift y = gamma * x_hat + beta grad_output = y - l2_target grad_x_hat = grad_output * gamma z = ( (1.0 / D) * ( D * grad_x_hat - grad_x_hat.sum(dim=-1, keepdim=True) - x_hat * (grad_x_hat * x_hat).sum(dim=-1, keepdim=True) ) / std ) return z # Modified from https://github.com/NVIDIA/Megatron-LM/blob/e33c8f78a35765d5aa37475a144da60e8a2349d1/megatron/core/fusions/fused_bias_gelu.py#L26 def gelu_bwd(x): tanh_out = torch.tanh(0.79788456 * x * (1 + 0.044715 * x * x)) ff = 0.5 * x * ((1 - tanh_out * tanh_out) * (0.79788456 + 0.1070322243 * x * x)) + 0.5 * (1 + tanh_out) return ff class TTTCache: """ TTTCache is a data structure that holds the last hidden states and gradients for the TTT layer. Arguments: model: TTTModel batch_size: int Attributes: seqlen_offset: int mini_batch_size: int params_dict: Dict[str, Dict[int, torch.Tensor]] *_states, *_grad -> # layer_idx -> [batch_size, ...] conv_states_dic: Dict[str, Dict[int, torch.Tensor]] *_states -> # layer_idx -> [batch_size, ...] """ def __init__(self, model, batch_size: int): config = model.config self.seqlen_offset = 0 self.mini_batch_size = config.mini_batch_size self.ttt_params_dict = defaultdict(dict) if "linear" in config.ttt_layer_type: self.ttt_param_names = ["W1", "b1"] elif "mlp" in config.ttt_layer_type: self.ttt_param_names = ["W1", "b1", "W2", "b2"] else: raise ValueError(f"TTT Layer Type {config.ttt_layer_type} not supported yet") self.conv_states_dic = defaultdict(dict) logger.info(f"Creating cache of size: {batch_size}") # Get TTT layers based on model variant if hasattr(model, 'memories') and len(model.memories) > 0: # MAC model: use memories ttt_layers = model.memories num_ttt_layers = len(ttt_layers) elif hasattr(model, 'layers') and len(model.layers) > 0: # Standard model: use layers directly ttt_layers = model.layers num_ttt_layers = config.num_hidden_layers else: # No TTT layers return for layer_idx in range(num_ttt_layers): # Get the actual TTT layer (unwrap if it's a TTTPilotMemoryLayer) ttt_layer = ttt_layers[layer_idx] if hasattr(ttt_layer, 'layer'): # TTTPilotMemoryLayer wrapper ttt_layer = ttt_layer.layer # Now access seq_modeling_block seq_block = ttt_layer.seq_modeling_block for name in self.ttt_param_names: weight = getattr(seq_block, name) tiled_weight = torch.tile(weight.unsqueeze(0), (batch_size,) + (1,) * weight.dim()).to(model.device) self.ttt_params_dict[f"{name}_states"][layer_idx] = tiled_weight # for decoding, we need to store the gradients as well self.ttt_params_dict[f"{name}_grad"][layer_idx] = torch.zeros_like(tiled_weight) if config.pre_conv: self.conv_states_dic["pre_conv"][layer_idx] = torch.zeros( batch_size, config.hidden_size, config.conv_kernel, device=model.device, ) if config.share_qk: self.conv_states_dic["ttt_conv_q"][layer_idx] = torch.zeros( batch_size, config.hidden_size, config.conv_kernel, device=model.device, ) self.conv_states_dic["ttt_conv_k"][layer_idx] = torch.zeros( batch_size, config.hidden_size, config.conv_kernel, device=model.device, ) def update(self, py_tree, layer_idx, seq_len): if seq_len % self.mini_batch_size == 0: # copy last mini-batch states, clear gradients for name in self.ttt_param_names: self.ttt_params_dict[f"{name}_states"][layer_idx].copy_(py_tree[f"{name}_states"]) self.ttt_params_dict[f"{name}_grad"][layer_idx].zero_() elif seq_len < self.mini_batch_size: if seq_len != 1 and self.seqlen_offset > 0 and self.seqlen_offset % self.mini_batch_size != 0: raise ValueError("fractional update not supported yet.") if (seq_len + self.seqlen_offset) % self.mini_batch_size == 0: # copy last mini-batch states, clear gradients for name in self.ttt_param_names: self.ttt_params_dict[f"{name}_states"][layer_idx].copy_(py_tree[f"{name}_states"]) self.ttt_params_dict[f"{name}_grad"][layer_idx].zero_() else: # copy gradients for the next update for name in self.ttt_param_names: self.ttt_params_dict[f"{name}_grad"][layer_idx].copy_(py_tree[f"{name}_grad"]) else: raise ValueError(f"seq_len {seq_len} is a partial update not supported yet") def ttt_params_to_dict(self, layer_idx): return {name: self.ttt_params_dict[name][layer_idx] for name in self.ttt_params_dict} class TTTBase(nn.Module): def __init__(self, config: TTTConfig, layer_idx: Optional[int] = None): super().__init__() self.config = config self.layer_idx = layer_idx if layer_idx is None: logger.warning_once( f"Instantiating {self.__class__.__name__} without passing a `layer_idx` is not recommended and will " "lead to errors during the forward call if caching is used. Please make sure to provide a `layer_idx` " "when creating this class." ) self.width = config.hidden_size self.hidden_size = config.hidden_size self.num_heads = config.num_attention_heads self.head_dim = self.width // self.num_heads self.mini_batch_size = config.mini_batch_size # token_idx is a scale factor that scale the summation in Eqn. 4 token_idx = 1.0 / torch.arange(1, self.mini_batch_size + 1) self.register_buffer("token_idx", token_idx, persistent=False) # make the scale factor learnable self.learnable_token_idx = nn.Parameter(torch.zeros((self.mini_batch_size,))) self.share_qk = config.share_qk self.conv_kernel = config.conv_kernel self._init_qkvo_proj() self._init_rope() # Learnable eta in Sec. 2.7 self._init_ttt_lr_gate() self._init_ttt_ln() # use gating as in Mamba backbone self.use_gate = config.use_gate if self.use_gate: self.g_proj = nn.Linear(self.width, self.width, bias=False) self.post_norm = nn.LayerNorm(self.width, eps=1e-6) def _init_qkvo_proj(self): self.q_proj = nn.Linear(self.width, self.num_heads * self.head_dim, bias=False) # we share Q/K projection when using Mamba backbone if not self.share_qk: self.k_proj = nn.Linear(self.width, self.num_heads * self.head_dim, bias=False) self.v_proj = nn.Linear(self.width, self.num_heads * self.head_dim, bias=False) self.o_proj = nn.Linear(self.width, self.num_heads * self.head_dim, bias=False) # after share Q/K projection, we use different conv layers for Q and K if self.share_qk: self.conv_q = nn.Conv1d( self.hidden_size, self.hidden_size, bias=True, kernel_size=self.conv_kernel, groups=self.hidden_size, padding=self.conv_kernel - 1, ) self.conv_k = nn.Conv1d( self.hidden_size, self.hidden_size, bias=True, kernel_size=self.conv_kernel, groups=self.hidden_size, padding=self.conv_kernel - 1, ) def _init_rope(self): self.rope_theta = self.config.rope_theta self.rotary_emb = RotaryEmbedding( self.head_dim, max_position_embeddings=self.mini_batch_size, base=self.rope_theta, ) def _init_ttt_lr_gate(self): # [width, 1] linear_weight_data = nn.Linear(self.width, 1, bias=True).weight.data # prepending head dim -> [num_heads, width, 1] self.learnable_ttt_lr_weight = nn.Parameter( torch.stack( [torch.normal(0, 0.02, size=linear_weight_data.shape) for _ in range(self.num_heads)], dim=0, ) ) linear_bias_data = nn.Linear(self.width, 1, bias=True).bias.data # init bias to 0 following original JAX impl. # [num_heads, 1] self.learnable_ttt_lr_bias = nn.Parameter( torch.stack( [torch.zeros_like(linear_bias_data) for _ in range(self.num_heads)], dim=0, ) ) def _init_ttt_ln(self): ln_weight_data = nn.LayerNorm(self.head_dim).weight.data # prepending head dim -> [num_heads, width] self.ttt_norm_weight = nn.Parameter(torch.tile(ln_weight_data.unsqueeze(0), (self.num_heads, 1))) ln_bias_data = nn.LayerNorm(self.head_dim).bias.data self.ttt_norm_bias = nn.Parameter(torch.tile(ln_bias_data.unsqueeze(0), (self.num_heads, 1))) def get_qkv_projections(self, hidden_states, cache_params: Optional[TTTCache] = None): if self.share_qk: xq, XV = self.q_proj(hidden_states), self.v_proj(hidden_states) seq_len = xq.shape[1] xq = xq.transpose(1, 2) if causal_conv1d_fn is None: if cache_params is not None: if cache_params.seqlen_offset > 0: conv_q_state = cache_params.conv_states_dic["ttt_conv_q"][self.layer_idx] conv_q_state = torch.roll(conv_q_state, shifts=-1, dims=-1) conv_q_state[:, :, -1] = xq[:, :, 0] cache_params.conv_states_dic["ttt_conv_q"][self.layer_idx].copy_(conv_q_state) XQ = torch.sum(conv_q_state * self.conv_q.weight[:, 0, :], dim=-1) XQ += self.conv_q.bias XQ = XQ.unsqueeze(-1) conv_k_state = cache_params.conv_states_dic["ttt_conv_k"][self.layer_idx] conv_k_state = torch.roll(conv_k_state, shifts=-1, dims=-1) conv_k_state[:, :, -1] = xq[:, :, 0] cache_params.conv_states_dic["ttt_conv_k"][self.layer_idx].copy_(conv_k_state) XK = torch.sum(conv_k_state * self.conv_k.weight[:, 0, :], dim=-1) XK += self.conv_k.bias XK = XK.unsqueeze(-1) else: conv_q_state = nn.functional.pad(xq, (self.config.conv_kernel - xq.shape[-1], 0)) cache_params.conv_states_dic["ttt_conv_q"][self.layer_idx].copy_(conv_q_state) XQ = self.conv_q(xq)[..., :seq_len] conv_k_state = nn.functional.pad(xq, (self.config.conv_kernel - xq.shape[-1], 0)) cache_params.conv_states_dic["ttt_conv_k"][self.layer_idx].copy_(conv_k_state) XK = self.conv_k(xq)[..., :seq_len] else: XQ = self.conv_q(xq)[..., :seq_len] XK = self.conv_k(xq)[..., :seq_len] else: conv_q_weights = self.conv_q.weight.view(self.conv_q.weight.size(0), self.conv_q.weight.size(2)) conv_k_weights = self.conv_k.weight.view(self.conv_k.weight.size(0), self.conv_k.weight.size(2)) if cache_params is not None and cache_params.seqlen_offset > 0: XQ = causal_conv1d_update( xq.squeeze(-1), cache_params.conv_states_dic["ttt_conv_q"][self.layer_idx], conv_q_weights, self.conv_q.bias, None, ) XQ = XQ.unsqueeze(-1) XK = causal_conv1d_update( xq.squeeze(-1), cache_params.conv_states_dic["ttt_conv_k"][self.layer_idx], conv_k_weights, self.conv_k.bias, None, ) XK = XK.unsqueeze(-1) else: if cache_params is not None: conv_q_states = nn.functional.pad(xq, (self.config.conv_kernel - xq.shape[-1], 0)) cache_params.conv_states_dic["ttt_conv_q"][self.layer_idx].copy_(conv_q_states) conv_k_states = nn.functional.pad(xq, (self.config.conv_kernel - xq.shape[-1], 0)) cache_params.conv_states_dic["ttt_conv_k"][self.layer_idx].copy_(conv_k_states) XQ = causal_conv1d_fn(xq, conv_q_weights, self.conv_q.bias, activation=None) XK = causal_conv1d_fn(xq, conv_k_weights, self.conv_k.bias, activation=None) XQ = XQ.transpose(1, 2) XK = XK.transpose(1, 2) else: XQ, XK, XV = ( self.q_proj(hidden_states), self.k_proj(hidden_states), self.v_proj(hidden_states), ) return XQ, XK, XV def _split_heads(self, hidden_states): return hidden_states.reshape(hidden_states.shape[:2] + (self.num_heads, self.head_dim)) def get_eta(self, X, mini_batch_step_offset, mini_batch_size): # [B, num_heads, num_mini_batch, mini_batch_size, 1] ttt_lr = torch.einsum("bnkc,hdc->bhnkd", X, self.learnable_ttt_lr_weight) + self.learnable_ttt_lr_bias.reshape( 1, -1, 1, 1, 1 ) ttt_lr = F.sigmoid(ttt_lr) # [B, num_heads, num_mini_batch, 1, mini_batch_size] ttt_lr = ttt_lr.permute(0, 1, 2, 4, 3) ttt_lr_eta = self.config.ttt_base_lr * ttt_lr / self.head_dim # [B, L] token_idx = self.token_idx + self.learnable_token_idx token_idx = token_idx[mini_batch_step_offset : mini_batch_step_offset + mini_batch_size] # token idx should be greast than 0 token_idx = torch.clamp_min(token_idx, 0.0) # NOTE: token_eta is a scale factor that applies to each token in the mini-batch # [B, num_heads, num_mini_batch, mini_batch_size, 1] token_eta = torch.broadcast_to( token_idx.reshape(1, 1, 1, mini_batch_size, 1), (X.shape[0], self.num_heads, X.shape[1], mini_batch_size, 1), ) return token_eta, ttt_lr_eta def apply_gate(self, hidden_states, ttt_output): y = self.g_proj(hidden_states) # use 'tanh' approximation for matching JAX impl. y = F.gelu(y, approximate="tanh") output = y * ttt_output return output def get_ttt_inputs(self, inputs, mini_batch_size, cache_params): XQ = inputs["XQ"] XK = inputs["XK"] XV = inputs["XV"] X = inputs["X"] B, L, C = X.shape num_mini_batch = L // mini_batch_size # [B ,num_mini_batch, mini_batch_size, C] X = X.reshape(B, num_mini_batch, mini_batch_size, self.width) XQ = XQ.reshape(B, self.num_heads, L // mini_batch_size, mini_batch_size, self.head_dim) XK = XK.reshape(B, self.num_heads, L // mini_batch_size, mini_batch_size, self.head_dim) XV = XV.reshape(B, self.num_heads, L // mini_batch_size, mini_batch_size, self.head_dim) if cache_params is not None: mini_batch_step_offset = cache_params.seqlen_offset % self.mini_batch_size else: mini_batch_step_offset = 0 token_eta, ttt_lr_eta = self.get_eta(X, mini_batch_step_offset, mini_batch_size) eta = token_eta * ttt_lr_eta # decouple token_coeff and ilr_coeff for decoding inputs = { "XQ": XQ, "XK": XK, "XV": XV, "eta": eta, "token_eta": token_eta, "ttt_lr_eta": ttt_lr_eta, } return inputs def ttt( self, inputs, mini_batch_size, last_mini_batch_params_dict, cache_params: Optional[TTTCache] = None, ): raise NotImplementedError("ttt method must be implemented in TTTBase subclasses.") def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, cache_params: Optional[TTTCache] = None, ): B, L = hidden_states.shape[:2] reminder_len = L % self.mini_batch_size num_mini_batch = L // self.mini_batch_size last_mini_batch_params_dict = None XQ, XK, XV = self.get_qkv_projections(hidden_states, cache_params=cache_params) # [B, L, C] -> [B, L, num_heads, head_dim] -> [B, num_heads, L, head_dim] XQ = XQ.reshape(B, L, self.num_heads, self.head_dim).transpose(1, 2) XK = XK.reshape(B, L, self.num_heads, self.head_dim).transpose(1, 2) XV = XV.reshape(B, L, self.num_heads, self.head_dim).transpose(1, 2) cos, sin = self.rotary_emb(XV, position_ids % self.mini_batch_size) # permute_qk and undo_permute_qk is just for aligning pytorch with jax pre-training XQ, XK = permute_qk(XQ, XK) XQ, XK = apply_rotary_pos_emb(XQ, XK, cos, sin) XQ, XK = undo_permute_qk(XQ, XK) output_hidden_states = [] # when input sequence length is not a multiple of mini_batch_size # we need to compute them seperately, when computing the reminder, # we will need the last_mini_batch_params_dict to continue TTT learning if num_mini_batch > 0: inputs = { "XQ": XQ[:, :, : num_mini_batch * self.mini_batch_size], "XK": XK[:, :, : num_mini_batch * self.mini_batch_size], "XV": XV[:, :, : num_mini_batch * self.mini_batch_size], "X": hidden_states[:, : num_mini_batch * self.mini_batch_size], } output_mod, last_mini_batch_params_dict = self.ttt( self.get_ttt_inputs(inputs, self.mini_batch_size, cache_params), mini_batch_size=self.mini_batch_size, last_mini_batch_params_dict=last_mini_batch_params_dict, cache_params=cache_params, ) output_hidden_states.append(output_mod) if reminder_len > 0: inputs = { "XQ": XQ[:, :, -reminder_len:], "XK": XK[:, :, -reminder_len:], "XV": XV[:, :, -reminder_len:], "X": hidden_states[:, -reminder_len:], } output_reminder, _ = self.ttt( self.get_ttt_inputs(inputs, reminder_len, cache_params), mini_batch_size=reminder_len, last_mini_batch_params_dict=last_mini_batch_params_dict, cache_params=cache_params, ) output_hidden_states.append(output_reminder) output_hidden_states = torch.cat(output_hidden_states, dim=1) output_hidden_states = self.post_norm(output_hidden_states) if self.use_gate: output_hidden_states = self.apply_gate(hidden_states, output_hidden_states) output_hidden_states = self.o_proj(output_hidden_states) return output_hidden_states class TTTLinear(TTTBase): def __init__(self, config: TTTConfig, layer_idx: Optional[int] = None): super().__init__(config, layer_idx) # TTT model initialization for TTT-Linear self.W1 = nn.Parameter(torch.normal(0, 0.02, size=(self.num_heads, self.head_dim, self.head_dim))) self.b1 = nn.Parameter(torch.zeros(self.num_heads, 1, self.head_dim)) def ttt( self, inputs, mini_batch_size, last_mini_batch_params_dict, cache_params: Optional[TTTCache] = None, ): if mini_batch_size is None: mini_batch_size = self.mini_batch_size # in this case, we are decoding if last_mini_batch_params_dict is None and cache_params is not None: last_mini_batch_params_dict = cache_params.ttt_params_to_dict(self.layer_idx) # [B, num_heads, num_mini_batch, mini_batch_size, head_dim] B = inputs["XV"].shape[0] num_mini_batch = inputs["XV"].shape[2] L = inputs["XV"].shape[2] * inputs["XV"].shape[3] device = inputs["XV"].device dtype = inputs["XV"].dtype # NOTE: # for prefilling, we will always use dual form for faster computation # we need to use primal form if mini_batch_size is not a multiple of self.mini_batch_size # since we need store the gradient for the next mini-batch computation use_dual_form = cache_params is None or mini_batch_size % self.mini_batch_size == 0 def compute_mini_batch(params_dict, inputs): # [B, nh, f, f], nh=num_heads, f=head_dim W1_init = params_dict["W1_states"] # [B, nh, 1, f] b1_init = params_dict["b1_states"] # [B,nh,K,f], K=mini_batch_size XQ_mini_batch = inputs["XQ"] XV_mini_batch = inputs["XV"] XK_mini_batch = inputs["XK"] # [B, nh, K, 1] eta_mini_batch = inputs["eta"] token_eta_mini_batch = inputs["token_eta"] ttt_lr_eta_mini_batch = inputs["ttt_lr_eta"] X1 = XK_mini_batch # [B,nh,K,f] @ [B,nh,f,f] -> [B,nh,K,f] Z1 = X1 @ W1_init + b1_init reconstruction_target = XV_mini_batch - XK_mini_batch ln_weight = self.ttt_norm_weight.reshape(self.num_heads, 1, self.head_dim) ln_bias = self.ttt_norm_bias.reshape(self.num_heads, 1, self.head_dim) # [B,nh,K,f] grad_l_wrt_Z1 = ln_fused_l2_bwd(Z1, reconstruction_target, ln_weight, ln_bias) if use_dual_form: # [B,nh,K,K] Attn1 = torch.tril(XQ_mini_batch @ X1.transpose(-2, -1)) # [B,nh,1,f] - [B,nh,K,K] @ [B,nh,K,f] -> [B,nh,K,f] b1_bar = b1_init - torch.tril(eta_mini_batch) @ grad_l_wrt_Z1 # [B,nh,K,f] @ [B,nh,f,f] - ([B,nh,K,1] * [B,nh,K,K]) @ [B,nh,K,f] + [B,nh,K,f] Z1_bar = XQ_mini_batch @ W1_init - (eta_mini_batch * Attn1) @ grad_l_wrt_Z1 + b1_bar last_eta_mini_batch = eta_mini_batch[:, :, -1, :, None] # [B,nh,f,f] - [B,nh,f,K] @ [B,nh,K,f] W1_last = W1_init - (last_eta_mini_batch * X1).transpose(-1, -2) @ grad_l_wrt_Z1 # [B,nh,1,f] b1_last = b1_init - torch.sum(last_eta_mini_batch * grad_l_wrt_Z1, dim=-2, keepdim=True) grad_W1_last = torch.zeros_like(W1_last) grad_b1_last = torch.zeros_like(b1_last) else: ttt_lr_eta_mini_batch = torch.broadcast_to( ttt_lr_eta_mini_batch, ( *ttt_lr_eta_mini_batch.shape[:2], mini_batch_size, mini_batch_size, ), ) # [B, nh, K, f, f] grad_W1 = torch.einsum("bhki,bhkj->bhkij", X1, grad_l_wrt_Z1) grad_W1 = torch.einsum("bhnk,bhkij->bhnij", torch.tril(ttt_lr_eta_mini_batch), grad_W1) grad_W1 = grad_W1 + params_dict["W1_grad"].unsqueeze(2) # [B, nh, K, f] grad_b1 = torch.einsum("bhnk,bhki->bhni", torch.tril(ttt_lr_eta_mini_batch), grad_l_wrt_Z1) grad_b1 = grad_b1 + params_dict["b1_grad"] W1_bar = W1_init.unsqueeze(2) - grad_W1 * token_eta_mini_batch.unsqueeze(-1) b1_bar = b1_init - grad_b1 * token_eta_mini_batch # [B, nh, K, 1, f] @ [B, nh, K, f, f] Z1_bar = (XQ_mini_batch.unsqueeze(3) @ W1_bar).squeeze(3) + b1_bar W1_last = W1_bar[:, :, -1] b1_last = b1_bar[:, :, -1:] grad_W1_last = grad_W1[:, :, -1] grad_b1_last = grad_b1[:, :, -1:] Z1_bar = ln_fwd(Z1_bar, ln_weight, ln_bias) XQW_mini_batch = XQ_mini_batch + Z1_bar last_param_dict = { "W1_states": W1_last, "b1_states": b1_last, "W1_grad": grad_W1_last, "b1_grad": grad_b1_last, } return last_param_dict, XQW_mini_batch if last_mini_batch_params_dict is not None: init_params_dict = last_mini_batch_params_dict else: init_params_dict = { "W1_states": torch.tile(self.W1.unsqueeze(0), dims=(B, 1, 1, 1)), "b1_states": torch.tile(self.b1.unsqueeze(0), dims=(B, 1, 1, 1)), } init_params_dict.update(W1_grad=torch.zeros_like(init_params_dict["W1_states"])) init_params_dict.update(b1_grad=torch.zeros_like(init_params_dict["b1_states"])) # [B,num_heads, num_mini_batch, mini_batch_size, f] -> [num_mini_batch, B, num_heads, mini_batch_size, f] inputs = tree_map(lambda x: x.permute(2, 0, 1, 3, 4), inputs) # allocate output tensor XQW_batch = torch.empty( (num_mini_batch, B, self.num_heads, mini_batch_size, self.head_dim), device=device, dtype=dtype, ) # XQW_batch: [num_mini_batch, B, num_heads, mini_batch_size, head_dim] batch_params_dict, XQW_batch = scan( compute_mini_batch, init_params_dict, inputs, XQW_batch, self.config.scan_checkpoint_group_size if self.training else 0, ) # [B, num_heads, L, C] if cache_params is not None: cache_params.update(batch_params_dict, self.layer_idx, L) # [num_mini_batch, B, num_heads, mini_batch_size, head_dim] -> [B, num_mini_batch, mini_batch_size, num_heads, head_dim] XQW_batch = XQW_batch.permute(1, 0, 3, 2, 4) # [B, L, C] XQW_batch = XQW_batch.reshape(B, L, self.width) return XQW_batch, batch_params_dict class TTTMLP(TTTBase): def __init__(self, config: TTTConfig, layer_idx: Optional[int] = None): super().__init__(config, layer_idx) # TTT model initialization for TTT-MLP self.W1 = nn.Parameter(torch.normal(0, 0.02, size=(self.num_heads, self.head_dim, 4 * self.head_dim))) self.b1 = nn.Parameter(torch.zeros(self.num_heads, 1, 4 * self.head_dim)) self.W2 = nn.Parameter(torch.normal(0, 0.02, size=(self.num_heads, 4 * self.head_dim, self.head_dim))) self.b2 = nn.Parameter(torch.zeros(self.num_heads, 1, self.head_dim)) def ttt( self, inputs, mini_batch_size, last_mini_batch_params_dict, cache_params: Optional[TTTCache] = None, ): if mini_batch_size is None: mini_batch_size = self.mini_batch_size # in this case, we are decoding if last_mini_batch_params_dict is None and cache_params is not None: last_mini_batch_params_dict = cache_params.ttt_params_to_dict(self.layer_idx) # [B, num_heads, num_mini_batch, mini_batch_size, head_dim] B = inputs["XV"].shape[0] num_mini_batch = inputs["XV"].shape[2] L = inputs["XV"].shape[2] * inputs["XV"].shape[3] device = inputs["XV"].device dtype = inputs["XV"].dtype # NOTE: # for prefilling, we will always use dual form for faster computation # we need to use primal form if mini_batch_size is not a multiple of self.mini_batch_size # since we need store the gradient for the next mini-batch computation use_dual_form = cache_params is None or mini_batch_size % self.mini_batch_size == 0 def compute_mini_batch(params_dict, inputs): # [B, nh, f, 4f] W1_init = params_dict["W1_states"] # [B, nh, 1, 4f] b1_init = params_dict["b1_states"] # [B, nh, 4f, f] W2_init = params_dict["W2_states"] # [B, nh, 1, f] b2_init = params_dict["b2_states"] # [B,nh,K,f] XQ_mini_batch = inputs["XQ"] XV_mini_batch = inputs["XV"] XK_mini_batch = inputs["XK"] # [B,nh,K,1] eta_mini_batch = inputs["eta"] token_eta_mini_batch = inputs["token_eta"] ttt_lr_eta_mini_batch = inputs["ttt_lr_eta"] X1 = XK_mini_batch # [B,nh,K,f] @ [B,nh,f,4f] -> [B,nh,K,4f] Z1 = X1 @ W1_init + b1_init X2 = F.gelu(Z1, approximate="tanh") # [B,nh,K,4f] @ [B,nh,4f,f] -> [B,nh,K,f] Z2 = X2 @ W2_init + b2_init reconstruction_target = XV_mini_batch - XK_mini_batch ln_weight = self.ttt_norm_weight.reshape(self.num_heads, 1, self.head_dim) ln_bias = self.ttt_norm_bias.reshape(self.num_heads, 1, self.head_dim) # [B, nh, K, f] grad_l_wrt_Z2 = ln_fused_l2_bwd(Z2, reconstruction_target, ln_weight, ln_bias) # [B, nh, K, 4f] grad_l_wrt_Z1 = grad_l_wrt_Z2 @ W2_init.transpose(-2, -1) * gelu_bwd(Z1) if use_dual_form: Attn1 = torch.tril(XQ_mini_batch @ X1.transpose(-2, -1)) # [B,nh,K,K] # [B,nh,1,f] - [B,nh,K,K] @ [B,nh,K,4f] -> [B,nh,K,4f] b1_bar = b1_init - torch.tril(eta_mini_batch) @ grad_l_wrt_Z1 # [B,nh,K,f] @ [B,nh,f,4f] - ([B,nh,K,1] * [B,nh,K,K]) @ [B,nh,K,4f] + [B,nh,K,4f] Z1_bar = XQ_mini_batch @ W1_init - (eta_mini_batch * Attn1) @ grad_l_wrt_Z1 + b1_bar X2_bar = F.gelu(Z1_bar, approximate="tanh") # [B,nh,K,K] Attn2 = torch.tril(X2_bar @ X2.transpose(-2, -1)) # [B,nh,1,f] - [B,nh,K,1] * [B,nh,K,f] -> [B,nh,K,f] b2_bar = b2_init - torch.tril(eta_mini_batch) @ grad_l_wrt_Z2 # [B,nh,K,f] @ [1,nh,4f,f] - ([B,nh,K,1] * [B,nh,K,K]) @ [B,nh,K,f] + [B,nh,K,f] Z2_bar = X2_bar @ W2_init - (eta_mini_batch * Attn2) @ grad_l_wrt_Z2 + b2_bar last_eta_mini_batch = eta_mini_batch[:, :, -1, :, None] # [B,nh,f,4f] - [B,nh,f,K] @ [B,nh,K,4f] W1_last = W1_init - (last_eta_mini_batch * X1).transpose(-1, -2) @ grad_l_wrt_Z1 # [B,nh,1,4f] b1_last = b1_init - torch.sum(last_eta_mini_batch * grad_l_wrt_Z1, dim=-2, keepdim=True) # [B,nh,4f,f] - [B,nh,4f,K] @ [B,nh,K,f] W2_last = W2_init - (last_eta_mini_batch * X2).transpose(-1, -2) @ grad_l_wrt_Z2 # [B,nh,1,f] b2_last = b2_init - torch.sum(last_eta_mini_batch * grad_l_wrt_Z2, dim=-2, keepdim=True) grad_W1_last = torch.zeros_like(W1_last) grad_b1_last = torch.zeros_like(b1_last) grad_W2_last = torch.zeros_like(W2_last) grad_b2_last = torch.zeros_like(b2_last) else: ttt_lr_eta_mini_batch = torch.broadcast_to( ttt_lr_eta_mini_batch, ( *ttt_lr_eta_mini_batch.shape[:2], mini_batch_size, mini_batch_size, ), ) # [B, nh, K, 4f, f] grad_W2 = torch.einsum("bhki,bhkj->bhkij", X2, grad_l_wrt_Z2) grad_W2 = torch.einsum("bhnk,bhkij->bhnij", torch.tril(ttt_lr_eta_mini_batch), grad_W2) grad_W2 = grad_W2 + params_dict["W2_grad"].unsqueeze(2) # [B, nh, K, f] grad_b2 = torch.einsum("bhnk,bhki->bhni", torch.tril(ttt_lr_eta_mini_batch), grad_l_wrt_Z2) grad_b2 = grad_b2 + params_dict["b2_grad"] # [B, nh, K, f, 4f] grad_W1 = torch.einsum("bhki,bhkj->bhkij", X1, grad_l_wrt_Z1) grad_W1 = torch.einsum("bhnk,bhkij->bhnij", torch.tril(ttt_lr_eta_mini_batch), grad_W1) grad_W1 = grad_W1 + params_dict["W1_grad"].unsqueeze(2) # [B, nh, K, 4f] grad_b1 = torch.einsum("bhnk,bhki->bhni", torch.tril(ttt_lr_eta_mini_batch), grad_l_wrt_Z1) grad_b1 = grad_b1 + params_dict["b1_grad"] W1_bar = W1_init.unsqueeze(2) - grad_W1 * token_eta_mini_batch.unsqueeze(-1) b1_bar = b1_init - grad_b1 * token_eta_mini_batch W2_bar = W2_init.unsqueeze(2) - grad_W2 * token_eta_mini_batch.unsqueeze(-1) b2_bar = b2_init - grad_b2 * token_eta_mini_batch # [B, nh, K, 1, f] @ [B, nh, K, f, 4f] -> [B, nh, K, 4f] Z1_bar = (XQ_mini_batch.unsqueeze(3) @ W1_bar).squeeze(3) + b1_bar X2_bar = F.gelu(Z1_bar, approximate="tanh") Z2_bar = (X2_bar.unsqueeze(3) @ W2_bar).squeeze(3) + b2_bar W1_last = W1_bar[:, :, -1] b1_last = b1_bar[:, :, -1:] W2_last = W2_bar[:, :, -1] b2_last = b2_bar[:, :, -1:] grad_W1_last = grad_W1[:, :, -1] grad_b1_last = grad_b1[:, :, -1:] grad_W2_last = grad_W2[:, :, -1] grad_b2_last = grad_b2[:, :, -1:] Z2_bar = ln_fwd(Z2_bar, ln_weight, ln_bias) XQW_mini_batch = XQ_mini_batch + Z2_bar last_param_dict = { "W1_states": W1_last, "b1_states": b1_last, "W2_states": W2_last, "b2_states": b2_last, "W1_grad": grad_W1_last, "b1_grad": grad_b1_last, "W2_grad": grad_W2_last, "b2_grad": grad_b2_last, } return last_param_dict, XQW_mini_batch if last_mini_batch_params_dict is not None: init_params_dict = last_mini_batch_params_dict else: init_params_dict = { "W1_states": torch.tile(self.W1.unsqueeze(0), dims=(B, 1, 1, 1)), "b1_states": torch.tile(self.b1.unsqueeze(0), dims=(B, 1, 1, 1)), "W2_states": torch.tile(self.W2.unsqueeze(0), dims=(B, 1, 1, 1)), "b2_states": torch.tile(self.b2.unsqueeze(0), dims=(B, 1, 1, 1)), } init_params_dict.update(W1_grad=torch.zeros_like(init_params_dict["W1_states"])) init_params_dict.update(b1_grad=torch.zeros_like(init_params_dict["b1_states"])) init_params_dict.update(W2_grad=torch.zeros_like(init_params_dict["W2_states"])) init_params_dict.update(b2_grad=torch.zeros_like(init_params_dict["b2_states"])) inputs = tree_map(lambda x: x.permute(2, 0, 1, 3, 4), inputs) # [B,nh,NC,CS,f] -> [NC,B,nh,CS,f] # allocate output tensor XQW_batch = torch.empty( (num_mini_batch, B, self.num_heads, mini_batch_size, self.head_dim), device=device, dtype=dtype, ) # XQW_batch: [num_mini_batch, B, num_heads, mini_batch_size, head_dim] batch_params_dict, XQW_batch = scan( compute_mini_batch, init_params_dict, inputs, XQW_batch, self.config.scan_checkpoint_group_size if self.training else 0, ) # [B, num_heads, L, C] if cache_params is not None: cache_params.update(batch_params_dict, self.layer_idx, L) # [num_mini_batch, B, num_heads, mini_batch_size, head_dim] -> [B, num_mini_batch, mini_batch_size, num_heads, head_dim] XQW_batch = XQW_batch.permute(1, 0, 3, 2, 4) # [B, L, C] XQW_batch = XQW_batch.reshape(B, L, self.width) return XQW_batch, batch_params_dict ################################ ### E2E Architecture Modules ### ################################ class Block(nn.Module): def __init__(self, config: TTTConfig, layer_idx: int): super().__init__() self.hidden_size = config.hidden_size self.pre_conv = config.pre_conv if config.ttt_layer_type == "linear": ttt_layer = TTTLinear elif config.ttt_layer_type == "mlp": ttt_layer = TTTMLP else: raise ValueError(f"Invalid ttt_layer_type: {config.ttt_layer_type}") self.seq_modeling_block = ttt_layer(config=config, layer_idx=layer_idx) self.mlp = SwiGluMLP(config) if self.pre_conv: self.conv = Conv(config, layer_idx) self.seq_norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.ffn_norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.layer_idx = layer_idx def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, cache_params: Optional[TTTCache] = None, ): if self.pre_conv: residual = hidden_states hidden_states = self.conv(hidden_states, cache_params=cache_params) hidden_states = residual + hidden_states residual = hidden_states hidden_states = self.seq_norm(hidden_states) # TTT Layer hidden_states = self.seq_modeling_block( hidden_states=hidden_states, attention_mask=attention_mask, position_ids=position_ids, cache_params=cache_params, ) hidden_states = residual + hidden_states # Feed-Forward-Network residual = hidden_states hidden_states = self.ffn_norm(hidden_states) hidden_states = self.mlp(hidden_states) hidden_states = residual + hidden_states return hidden_states class TTTPreTrainedModel(PreTrainedModel): config_class = TTTConfig base_model_prefix = "model" supports_gradient_checkpointing = True _no_split_modules = ["Block"] def _init_weights(self, module): std = self.config.initializer_range if isinstance(module, nn.Linear): module.weight.data.normal_(mean=0.0, std=std) if module.bias is not None: module.bias.data.zero_() elif isinstance(module, nn.Embedding): module.weight.data.normal_(mean=0.0, std=std) if module.padding_idx is not None: module.weight.data[module.padding_idx].zero_() @dataclass class TTTOutput(ModelOutput): """ Class for the TTT model outputs. Args: last_hidden_state (`torch.FloatTensor` of shape `(batch_size, sequence_length, hidden_size)`): Sequence of hidden-states at the output of the last layer of the model. cache_params (`TTTCache`): The state of the model at the last time step. Can be used in a forward method with the next `input_ids` to avoid providing the old `input_ids`. """ last_hidden_state: Optional[torch.FloatTensor] = None cache_params: Optional[TTTCache] = None hidden_states: Optional[Tuple[torch.FloatTensor]] = None @dataclass class TTTCausalLMOutput(ModelOutput): """ Base class for causal language model (or autoregressive) outputs. Args: loss (`torch.FloatTensor` of shape `(1,)`, *optional*, returned when `labels` is provided): Language modeling loss (for next-token prediction). logits (`torch.FloatTensor` of shape `(batch_size, sequence_length, config.vocab_size)`): Prediction scores of the language modeling head (scores for each vocabulary token before SoftMax). cache_params (`TTTCache`): The state of the model at the last time step. Can be used in a forward method with the next `input_ids` to avoid providing the old `input_ids`. """ loss: Optional[torch.FloatTensor] = None logits: Optional[torch.FloatTensor] = None cache_params: Optional[TTTCache] = None hidden_states: Optional[Tuple[torch.FloatTensor]] = None class TTTModel(TTTPreTrainedModel): """ Decoder consisting of *config.num_hidden_layers* layers. Each layer is a [`Block`] Args: config: TTTConfig """ def __init__(self, config: TTTConfig): super().__init__(config) self.padding_idx = config.pad_token_id self.vocab_size = config.vocab_size self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx) self.layers = nn.ModuleList([Block(config, layer_idx) for layer_idx in range(config.num_hidden_layers)]) self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.gradient_checkpointing = False # Initialize weights and apply final processing self.post_init() def get_input_embeddings(self): return self.embed_tokens def set_input_embeddings(self, value): self.embed_tokens = value def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, cache_params: Optional[TTTCache] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, use_cache: Optional[bool] = None, ) -> Union[Tuple, BaseModelOutputWithPast]: output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) use_cache = use_cache if use_cache is not None else self.config.use_cache return_dict = return_dict if return_dict is not None else self.config.use_return_dict if (input_ids is None) ^ (inputs_embeds is not None): raise ValueError( "You cannot specify both input_ids and inputs_embeds at the same time, and must specify either one" ) if self.gradient_checkpointing and self.training and use_cache: logger.warning_once( "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`." ) use_cache = False if inputs_embeds is None: inputs_embeds = self.embed_tokens(input_ids) if cache_params is None and use_cache: cache_params = TTTCache(self, inputs_embeds.size(0)) seqlen_offset = 0 if cache_params is not None: seqlen_offset = cache_params.seqlen_offset position_ids = torch.arange( seqlen_offset, seqlen_offset + inputs_embeds.shape[1], dtype=torch.long, device=inputs_embeds.device, ).unsqueeze(0) hidden_states = inputs_embeds if attention_mask is None: attention_mask = torch.ones_like(input_ids) # decoder layers all_hidden_states = () if output_hidden_states else None for decoder_layer in self.layers: if self.gradient_checkpointing and self.training: hidden_states = self._gradient_checkpointing_func( decoder_layer.__call__, hidden_states, attention_mask, position_ids, cache_params, ) else: hidden_states = decoder_layer( hidden_states, attention_mask=attention_mask, position_ids=position_ids, cache_params=cache_params, ) if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) if use_cache: cache_params.seqlen_offset += inputs_embeds.shape[1] hidden_states = self.norm(hidden_states) # add hidden states from the last decoder layer if output_hidden_states: all_hidden_states += (hidden_states,) if not return_dict: return tuple(v for v in [hidden_states, cache_params, all_hidden_states] if v is not None) return TTTOutput( last_hidden_state=hidden_states, cache_params=cache_params if use_cache else None, hidden_states=all_hidden_states, ) class TTTForCausalLM(TTTPreTrainedModel, GenerationMixin): _tied_weights_keys = ["lm_head.weight"] def __init__(self, config): super().__init__(config) self.model = TTTModel(config) self.vocab_size = config.vocab_size self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) # Initialize weights and apply final processing self.post_init() def get_input_embeddings(self): return self.model.embed_tokens def set_input_embeddings(self, value): self.model.embed_tokens = value def get_output_embeddings(self): return self.lm_head def set_output_embeddings(self, new_embeddings): self.lm_head = new_embeddings def set_decoder(self, decoder): self.model = decoder def get_decoder(self): return self.model def _update_model_kwargs_for_generation( self, outputs: ModelOutput, model_kwargs: Dict[str, Any], **kwargs ) -> Dict[str, Any]: model_kwargs["cache_params"] = outputs.get("cache_params", None) # update attention mask if "attention_mask" in model_kwargs: attention_mask = model_kwargs["attention_mask"] model_kwargs["attention_mask"] = torch.cat( [attention_mask, attention_mask.new_ones((attention_mask.shape[0], 1))], dim=-1, ) return model_kwargs def prepare_inputs_for_generation( self, input_ids, attention_mask=None, cache_params: Optional[TTTCache] = None, inputs_embeds=None, **kwargs, ): # only last token for inputs_ids if the state is passed along. if cache_params is not None: input_ids = input_ids[:, -1].unsqueeze(-1) attention_mask = attention_mask[:, -1].unsqueeze(-1) if attention_mask is not None else None if inputs_embeds is not None and cache_params is None: model_inputs = {"inputs_embeds": inputs_embeds} else: model_inputs = {"input_ids": input_ids} model_inputs.update( { "cache_params": cache_params, "use_cache": kwargs.get("use_cache"), "attention_mask": attention_mask, } ) return model_inputs def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, cache_params: Optional[TTTCache] = None, labels: Optional[torch.LongTensor] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, use_cache: Optional[bool] = None, *, output_attentions: Optional[bool] = None, ) -> Union[Tuple, CausalLMOutputWithPast]: """ Args: labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. """ output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict assert not output_attentions, "output_attentions is not available in TTTForCausalLM" # decoder outputs consists of (dec_features, layer_state, dec_hidden, dec_attn) outputs = self.model( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, cache_params=cache_params, inputs_embeds=inputs_embeds, output_hidden_states=output_hidden_states, return_dict=return_dict, use_cache=use_cache, ) hidden_states = outputs[0] if self.config.pretraining_tp > 1: lm_head_slices = self.lm_head.weight.split(self.vocab_size // self.config.pretraining_tp, dim=0) logits = [F.linear(hidden_states, lm_head_slices[i]) for i in range(self.config.pretraining_tp)] logits = torch.cat(logits, dim=-1) else: logits = self.lm_head(hidden_states) logits = logits.float() loss = None if labels is not None: # Shift so that tokens < n predict n shift_logits = logits[..., :-1, :].contiguous() shift_labels = labels[..., 1:].contiguous() # Flatten the tokens loss_fct = CrossEntropyLoss() shift_logits = shift_logits.view(-1, self.config.vocab_size) shift_labels = shift_labels.view(-1) # Enable model parallelism shift_labels = shift_labels.to(shift_logits.device) loss = loss_fct(shift_logits, shift_labels) if not return_dict: output = (logits,) + outputs[1:] return (loss,) + output if loss is not None else output return TTTCausalLMOutput( loss=loss, logits=logits, cache_params=outputs.cache_params, hidden_states=outputs.hidden_states, ) # TTTPilot Core Layer Components for MAC Architecture # These components handle transformer-based core processing with GQA class TTTPilotRetriever(nn.Module): """ TTTPilot Retriever - Read-only memory layer with Q-projection only. Used in MAC architecture for parallel memory retrieval: - Only computes queries (no K/V projections needed for retrieval) - No TTT adaptation (read-only mode) - Shares Q-projection weights with full memory layers - Enables efficient parallel processing of segments Args: config (TTTConfig): Model configuration layer_idx (int): Layer index for weight tying """ def __init__(self, config: TTTConfig, layer_idx: int): super().__init__() self.config = config self.layer_idx = layer_idx self.hidden_size = config.hidden_size self.num_heads = config.num_attention_heads self.head_dim = config.hidden_size // config.num_attention_heads self.width = config.hidden_size # Q-projection only (for memory retrieval) # This will be tied to the full memory layer's Q projection self.q_proj = nn.Linear(self.hidden_size, self.width, bias=False) # RoPE for positional encoding self.rotary_emb = RotaryEmbedding( self.head_dim, max_position_embeddings=config.max_position_embeddings, base=config.rope_theta, ) # Norm for retrieval queries self.query_norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) def forward( self, hidden_states: torch.Tensor, position_ids: Optional[torch.LongTensor] = None, ) -> torch.Tensor: """ Read-only forward pass - generates memory queries. Args: hidden_states: [batch, seq_len, hidden_size] position_ids: [batch, seq_len] Returns: memory_queries: [batch, seq_len, hidden_size] """ B, L, _ = hidden_states.shape # Normalize input hidden_states = self.query_norm(hidden_states) # Generate queries only (no K/V) queries = self.q_proj(hidden_states) # Reshape for multi-head queries = queries.reshape(B, L, self.num_heads, self.head_dim).transpose(1, 2) # Apply RoPE if position_ids is None: position_ids = torch.arange(L, dtype=torch.long, device=hidden_states.device).unsqueeze(0) cos, sin = self.rotary_emb(queries, position_ids) queries, _ = apply_rotary_pos_emb(queries, queries, cos, sin, position_ids) # Reshape back: [B, num_heads, L, head_dim] -> [B, L, hidden_size] memory_queries = queries.transpose(1, 2).reshape(B, L, self.width) return memory_queries def repeat_kv(hidden_states: torch.Tensor, n_rep: int) -> torch.Tensor: """ Repeat key/value heads for Grouped Query Attention. This is the equivalent of torch.repeat_interleave(x, dim=1, repeats=n_rep). The hidden states go from (batch, num_key_value_heads, seqlen, head_dim) to (batch, num_attention_heads, seqlen, head_dim) """ batch, num_key_value_heads, slen, head_dim = hidden_states.shape if n_rep == 1: return hidden_states hidden_states = hidden_states[:, :, None, :, :].expand(batch, num_key_value_heads, n_rep, slen, head_dim) return hidden_states.reshape(batch, num_key_value_heads * n_rep, slen, head_dim) class TTTPilotCoreAttention(nn.Module): """ Multi-headed attention with Grouped Query Attention (GQA) for TTTPilot Core layers. Supports efficient inference with fewer KV heads than query heads. """ def __init__(self, config: TTTConfig, layer_idx: Optional[int] = None): super().__init__() self.config = config self.layer_idx = layer_idx self.hidden_size = config.core_hidden_size # Use core hidden size # For GQA: Q heads are determined by hidden_size / head_dim self.head_dim = config.head_dim self.num_heads = self.hidden_size // self.head_dim # This gives us Q heads self.num_key_value_heads = config.num_key_value_heads self.num_key_value_groups = self.num_heads // self.num_key_value_heads self.max_position_embeddings = config.max_position_embeddings self.rope_theta = config.rope_theta self.is_causal = True self.attention_dropout = config.attention_dropout if (self.head_dim * self.num_heads) != self.hidden_size: raise ValueError( f"core_hidden_size must be divisible by head_dim (got `core_hidden_size`: {self.hidden_size}" f" and `head_dim`: {self.head_dim}, num_heads calculated: {self.num_heads})." ) self.q_proj = nn.Linear(self.hidden_size, self.num_heads * self.head_dim, bias=config.attention_bias) self.k_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias) self.v_proj = nn.Linear(self.hidden_size, self.num_key_value_heads * self.head_dim, bias=config.attention_bias) self.o_proj = nn.Linear(self.num_heads * self.head_dim, self.hidden_size, bias=config.attention_bias) self.rotary_emb = RotaryEmbedding( self.head_dim, max_position_embeddings=self.max_position_embeddings, base=self.rope_theta, ) def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_value: Optional[TTTCache] = None, output_attentions: bool = False, use_cache: bool = False, ) -> Tuple[torch.Tensor, Optional[torch.Tensor], Optional[TTTCache]]: bsz, q_len, _ = hidden_states.size() query_states = self.q_proj(hidden_states) key_states = self.k_proj(hidden_states) value_states = self.v_proj(hidden_states) query_states = query_states.view(bsz, q_len, self.num_heads, self.head_dim).transpose(1, 2) key_states = key_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) value_states = value_states.view(bsz, q_len, self.num_key_value_heads, self.head_dim).transpose(1, 2) # RoPE cos, sin = self.rotary_emb(value_states, position_ids) query_states, key_states = apply_rotary_pos_emb(query_states, key_states, cos, sin, position_ids) # Cache handling if past_key_value is not None: # Retrieve cached K, V from previous forward passes if hasattr(past_key_value, 'core_cache') and self.layer_idx in past_key_value.core_cache: cached_kv = past_key_value.core_cache[self.layer_idx] key_states = torch.cat([cached_kv[0], key_states], dim=2) value_states = torch.cat([cached_kv[1], value_states], dim=2) # Store current K, V if use_cache: if not hasattr(past_key_value, 'core_cache'): past_key_value.core_cache = {} past_key_value.core_cache[self.layer_idx] = (key_states, value_states) # Repeat k/v for GQA key_states = repeat_kv(key_states, self.num_key_value_groups) value_states = repeat_kv(value_states, self.num_key_value_groups) # Attention attn_weights = torch.matmul(query_states, key_states.transpose(2, 3)) / torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32)) if attention_mask is not None: attn_weights = attn_weights + attention_mask # Upcast to fp32 for stability attn_weights = F.softmax(attn_weights, dim=-1, dtype=torch.float32).to(query_states.dtype) attn_weights = F.dropout(attn_weights, p=self.attention_dropout, training=self.training) attn_output = torch.matmul(attn_weights, value_states) attn_output = attn_output.transpose(1, 2).contiguous() attn_output = attn_output.reshape(bsz, q_len, self.num_heads * self.head_dim) attn_output = self.o_proj(attn_output) if not output_attentions: attn_weights = None return attn_output, attn_weights, past_key_value class TTTPilotCoreLayer(nn.Module): """ TTTPilot Core transformer decoder layer. Standard transformer architecture with GQA attention and SwiGLU MLP. """ def __init__(self, config: TTTConfig, layer_idx: int): super().__init__() self.hidden_size = config.core_hidden_size # Use core hidden size self.layer_idx = layer_idx self.self_attn = TTTPilotCoreAttention(config, layer_idx=layer_idx) # Create a temporary config for MLP with core_intermediate_size # SwiGluMLP reads config.intermediate_size and config.hidden_size directly core_mlp_config = type(config)(**config.to_dict()) core_mlp_config.intermediate_size = config.core_intermediate_size core_mlp_config.hidden_size = config.core_hidden_size # Override hidden size for MLP self.mlp = SwiGluMLP(core_mlp_config) self.input_layernorm = RMSNorm(config.core_hidden_size, eps=config.rms_norm_eps) self.post_attention_layernorm = RMSNorm(config.core_hidden_size, eps=config.rms_norm_eps) def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, past_key_value: Optional[TTTCache] = None, output_attentions: Optional[bool] = False, use_cache: Optional[bool] = False, ) -> Tuple[torch.FloatTensor, Optional[Tuple[torch.FloatTensor, torch.FloatTensor]]]: """ Args: hidden_states (torch.FloatTensor): [batch_size, seq_len, hidden_size] attention_mask (torch.Tensor, optional): [batch_size, 1, seq_len, seq_len] position_ids (torch.LongTensor, optional): [batch_size, seq_len] past_key_value (TTTCache, optional): cached key/value states output_attentions (bool, optional): return attention weights use_cache (bool, optional): use KV caching """ residual = hidden_states hidden_states = self.input_layernorm(hidden_states) # Self Attention hidden_states, self_attn_weights, present_key_value = self.self_attn( hidden_states=hidden_states, attention_mask=attention_mask, position_ids=position_ids, past_key_value=past_key_value, output_attentions=output_attentions, use_cache=use_cache, ) hidden_states = residual + hidden_states # MLP residual = hidden_states hidden_states = self.post_attention_layernorm(hidden_states) hidden_states = self.mlp(hidden_states) hidden_states = residual + hidden_states outputs = (hidden_states,) if output_attentions: outputs += (self_attn_weights,) if use_cache: outputs += (present_key_value,) return outputs class TTTPilotMemoryLayer(nn.Module): """ TTTPilot Memory layer - wraps the existing Block with TTT adaptation. This is an alias for clarity in MAC architecture. """ def __init__(self, config: TTTConfig, layer_idx: int): super().__init__() self.layer = Block(config, layer_idx) def forward( self, hidden_states: torch.Tensor, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, cache_params: Optional[TTTCache] = None, ): return self.layer(hidden_states, attention_mask, position_ids, cache_params) ############################ ### MAC Model Classes ### ############################ class TTTPilotMACModel(TTTPreTrainedModel): """ TTTPilot MAC (Memory-Augmented-Core) Model. Combines: - Memory layers: TTT-based adaptive layers with test-time training - Core layers: Standard transformer layers with GQA - Fixed persistent memory: Global context storage Architecture flow: 1. Input embeddings 2. Fixed persistent memory (trainable parameter) 3. Memory layers (TTT adaptation) 4. Core layers (transformer processing) 5. Final norm + LM head """ def __init__(self, config: TTTConfig): super().__init__(config) self.padding_idx = config.pad_token_id self.vocab_size = config.vocab_size self.config = config self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx) # Fixed persistent memory if config.fixed_memory_size > 0: self.fixed_memories = nn.Parameter(torch.zeros(config.fixed_memory_size, config.hidden_size)) else: self.fixed_memories = None # Memory layers (TTT-based) if config.model_variant in ["mac", "memory_only"]: self.memories = nn.ModuleList([ TTTPilotMemoryLayer(config, layer_idx) for layer_idx in range(config.num_memory_layers) ]) # Retrievers (read-only memory with Q-only projection) # Weight tying: retriever.q_proj shares weights with memory.q_proj self.retrievers = nn.ModuleList([ TTTPilotRetriever(config, layer_idx) for layer_idx in range(config.num_memory_layers) ]) # Tie Q-projection weights between retriever and memory for layer_idx in range(config.num_memory_layers): # Get the actual TTT layer from inside TTTPilotMemoryLayer memory_ttt_layer = self.memories[layer_idx].layer.seq_modeling_block if hasattr(memory_ttt_layer, 'q_proj'): # Share the Q projection self.retrievers[layer_idx].q_proj = memory_ttt_layer.q_proj else: self.memories = nn.ModuleList() self.retrievers = nn.ModuleList() # Core layers (Transformer-based) if config.model_variant in ["mac", "core_only"]: self.cores = nn.ModuleList([ TTTPilotCoreLayer(config, layer_idx) for layer_idx in range(config.num_core_layers) ]) else: self.cores = nn.ModuleList() # For standard variant, use original layers if config.model_variant == "standard": self.layers = nn.ModuleList([ Block(config, layer_idx) for layer_idx in range(config.num_hidden_layers) ]) else: self.layers = nn.ModuleList() # Projection layers for dimension matching between memory and core # Only created if hidden sizes differ self.needs_projection = (config.hidden_size != config.core_hidden_size) if self.needs_projection: # Memory (hidden_size) → Core (core_hidden_size) self.memory_to_core_proj = nn.Linear(config.hidden_size, config.core_hidden_size, bias=False) # Core (core_hidden_size) → Memory (hidden_size) self.core_to_memory_proj = nn.Linear(config.core_hidden_size, config.hidden_size, bias=False) # Initialize projection layers to preserve information self._init_projection_layers() logger.info(f"Projection layers created: memory ({config.hidden_size}) ↔ core ({config.core_hidden_size})") else: self.memory_to_core_proj = None self.core_to_memory_proj = None logger.info(f"No projection needed: memory and core both use hidden_size={config.hidden_size}") self.norm = RMSNorm(config.hidden_size, eps=config.rms_norm_eps) self.gradient_checkpointing = False self.post_init() def get_input_embeddings(self): return self.embed_tokens def set_input_embeddings(self, value): self.embed_tokens = value def _init_projection_layers(self): """ Initialize projection layers with near-identity strategy. Strategy: - For expansion (small → large): Identity for shared dimensions, small random init for new dimensions - For reduction (large → small): Truncated identity This preserves information flow between pretrained modules. """ if not self.needs_projection: return mem_dim = self.config.hidden_size core_dim = self.config.core_hidden_size # Memory → Core projection (2048 → 2560) if mem_dim < core_dim: # Expansion: identity for existing dims, small random for new dims with torch.no_grad(): # First mem_dim dimensions are identity self.memory_to_core_proj.weight[:mem_dim, :] = torch.eye(mem_dim) # Remaining dimensions: small random (to be learned) nn.init.normal_(self.memory_to_core_proj.weight[mem_dim:, :], mean=0.0, std=0.02) else: # Reduction: truncated identity with torch.no_grad(): self.memory_to_core_proj.weight[:, :core_dim] = torch.eye(core_dim) if mem_dim > core_dim: nn.init.xavier_uniform_(self.memory_to_core_proj.weight[:, core_dim:]) self.memory_to_core_proj.weight[:, core_dim:] *= 0.1 # Core → Memory projection (2560 → 2048) if core_dim < mem_dim: # Expansion: identity + zeros with torch.no_grad(): self.core_to_memory_proj.weight[:core_dim, :] = torch.eye(core_dim) self.core_to_memory_proj.weight[core_dim:, :] = 0.0 else: # Reduction: truncated identity with torch.no_grad(): self.core_to_memory_proj.weight[:, :mem_dim] = torch.eye(mem_dim) if core_dim > mem_dim: nn.init.xavier_uniform_(self.core_to_memory_proj.weight[:, mem_dim:]) self.core_to_memory_proj.weight[:, mem_dim:] *= 0.1 logger.info(f"Projection layers initialized with near-identity strategy") def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, cache_params: Optional[TTTCache] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, use_cache: Optional[bool] = None, ) -> Union[Tuple, TTTOutput]: output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) use_cache = use_cache if use_cache is not None else self.config.use_cache return_dict = return_dict if return_dict is not None else self.config.use_return_dict if (input_ids is None) ^ (inputs_embeds is not None): raise ValueError( "You cannot specify both input_ids and inputs_embeds at the same time, and must specify either one" ) if self.gradient_checkpointing and self.training and use_cache: logger.warning_once( "`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`." ) use_cache = False if inputs_embeds is None: inputs_embeds = self.embed_tokens(input_ids) if cache_params is None and use_cache: cache_params = TTTCache(self, inputs_embeds.size(0)) seqlen_offset = 0 if cache_params is not None: seqlen_offset = cache_params.seqlen_offset if position_ids is None: position_ids = torch.arange( seqlen_offset, seqlen_offset + inputs_embeds.shape[1], dtype=torch.long, device=inputs_embeds.device, ).unsqueeze(0) hidden_states = inputs_embeds if attention_mask is None: attention_mask = torch.ones_like(input_ids if input_ids is not None else inputs_embeds[:,:,0]) # Collect all hidden states if requested all_hidden_states = () if output_hidden_states else None # Process through layers based on variant if self.config.model_variant == "standard": # Standard TTT model for decoder_layer in self.layers: if self.gradient_checkpointing and self.training: hidden_states = self._gradient_checkpointing_func( decoder_layer.__call__, hidden_states, attention_mask, position_ids, cache_params, ) else: hidden_states = decoder_layer( hidden_states, attention_mask=attention_mask, position_ids=position_ids, cache_params=cache_params, ) if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) elif self.config.model_variant in ["mac", "memory_only", "core_only"]: # MAC model with memory and/or core layers # Process through memory layers for memory_layer in self.memories: if self.gradient_checkpointing and self.training: hidden_states = self._gradient_checkpointing_func( memory_layer.__call__, hidden_states, attention_mask, position_ids, cache_params, ) else: hidden_states = memory_layer( hidden_states, attention_mask=attention_mask, position_ids=position_ids, cache_params=cache_params, ) if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) # Project memory output to core input if dimensions differ if self.needs_projection and len(self.cores) > 0: hidden_states = self.memory_to_core_proj(hidden_states) # Process through core layers for core_layer in self.cores: if self.gradient_checkpointing and self.training: layer_outputs = self._gradient_checkpointing_func( core_layer.__call__, hidden_states, attention_mask, position_ids, cache_params, False, # output_attentions use_cache, ) else: layer_outputs = core_layer( hidden_states, attention_mask=attention_mask, position_ids=position_ids, past_key_value=cache_params, output_attentions=False, use_cache=use_cache, ) hidden_states = layer_outputs[0] if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) # Project core output back to memory dimensions if needed if self.needs_projection and len(self.cores) > 0: hidden_states = self.core_to_memory_proj(hidden_states) if use_cache and cache_params is not None: cache_params.seqlen_offset += inputs_embeds.shape[1] hidden_states = self.norm(hidden_states) # Add final hidden state if output_hidden_states: all_hidden_states = all_hidden_states + (hidden_states,) if not return_dict: return tuple(v for v in [hidden_states, cache_params, all_hidden_states] if v is not None) return TTTOutput( last_hidden_state=hidden_states, cache_params=cache_params if use_cache else None, hidden_states=all_hidden_states, ) class TTTPilotMACForCausalLM(TTTPreTrainedModel, GenerationMixin): """ TTTPilot MAC Model for Causal Language Modeling. Extends the MAC model with a language modeling head. """ _tied_weights_keys = ["lm_head.weight"] def __init__(self, config): super().__init__(config) # Use MAC model instead of standard TTTModel self.model = TTTPilotMACModel(config) self.vocab_size = config.vocab_size self.lm_head = nn.Linear(config.hidden_size, config.vocab_size, bias=False) self.post_init() def get_input_embeddings(self): return self.model.embed_tokens def set_input_embeddings(self, value): self.model.embed_tokens = value def get_output_embeddings(self): return self.lm_head def set_output_embeddings(self, new_embeddings): self.lm_head = new_embeddings def set_decoder(self, decoder): self.model = decoder def get_decoder(self): return self.model def _update_model_kwargs_for_generation( self, outputs: ModelOutput, model_kwargs: Dict[str, Any], **kwargs ) -> Dict[str, Any]: model_kwargs["cache_params"] = outputs.get("cache_params", None) # Update attention mask if "attention_mask" in model_kwargs: attention_mask = model_kwargs["attention_mask"] model_kwargs["attention_mask"] = torch.cat( [attention_mask, attention_mask.new_ones((attention_mask.shape[0], 1))], dim=-1, ) return model_kwargs def prepare_inputs_for_generation( self, input_ids, attention_mask=None, cache_params: Optional[TTTCache] = None, inputs_embeds=None, **kwargs, ): # Only last token for inputs_ids if cache is passed if cache_params is not None: input_ids = input_ids[:, -1].unsqueeze(-1) attention_mask = attention_mask[:, -1].unsqueeze(-1) if attention_mask is not None else None if inputs_embeds is not None and cache_params is None: model_inputs = {"inputs_embeds": inputs_embeds} else: model_inputs = {"input_ids": input_ids} model_inputs.update( { "cache_params": cache_params, "use_cache": kwargs.get("use_cache"), "attention_mask": attention_mask, } ) return model_inputs def forward( self, input_ids: torch.LongTensor = None, attention_mask: Optional[torch.Tensor] = None, position_ids: Optional[torch.LongTensor] = None, inputs_embeds: Optional[torch.FloatTensor] = None, cache_params: Optional[TTTCache] = None, labels: Optional[torch.LongTensor] = None, output_hidden_states: Optional[bool] = None, return_dict: Optional[bool] = None, use_cache: Optional[bool] = None, *, output_attentions: Optional[bool] = None, ) -> Union[Tuple, TTTCausalLMOutput]: """ Args: labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*): Labels for computing the masked language modeling loss. Indices should either be in `[0, ..., config.vocab_size]` or -100 (see `input_ids` docstring). Tokens with indices set to `-100` are ignored (masked), the loss is only computed for the tokens with labels in `[0, ..., config.vocab_size]`. """ output_hidden_states = ( output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states ) return_dict = return_dict if return_dict is not None else self.config.use_return_dict # Forward through MAC model outputs = self.model( input_ids=input_ids, attention_mask=attention_mask, position_ids=position_ids, cache_params=cache_params, inputs_embeds=inputs_embeds, output_hidden_states=output_hidden_states, return_dict=return_dict, use_cache=use_cache, ) hidden_states = outputs[0] # LM head if self.config.pretraining_tp > 1: lm_head_slices = self.lm_head.weight.split(self.vocab_size // self.config.pretraining_tp, dim=0) logits = [F.linear(hidden_states, lm_head_slices[i]) for i in range(self.config.pretraining_tp)] logits = torch.cat(logits, dim=-1) else: logits = self.lm_head(hidden_states) logits = logits.float() loss = None if labels is not None: # Shift for next token prediction shift_logits = logits[..., :-1, :].contiguous() shift_labels = labels[..., 1:].contiguous() # Flatten loss_fct = CrossEntropyLoss() shift_logits = shift_logits.view(-1, self.config.vocab_size) shift_labels = shift_labels.view(-1) # Enable model parallelism shift_labels = shift_labels.to(shift_logits.device) loss = loss_fct(shift_logits, shift_labels) if not return_dict: output = (logits,) + outputs[1:] return (loss,) + output if loss is not None else output return TTTCausalLMOutput( loss=loss, logits=logits, cache_params=outputs.cache_params, hidden_states=outputs.hidden_states, )