Skip to content

vllm.lora.layers

Modules:

Name Description
base
base_linear
column_parallel_linear
fused_moe
logits_processor
replicated_linear
row_parallel_linear
utils
vocal_parallel_embedding

__all__ module-attribute

__all__ = [
    "BaseLayerWithLoRA",
    "VocabParallelEmbeddingWithLoRA",
    "LogitsProcessorWithLoRA",
    "ColumnParallelLinearWithLoRA",
    "ColumnParallelLinearWithShardedLoRA",
    "MergedColumnParallelLinearWithLoRA",
    "MergedColumnParallelLinearWithShardedLoRA",
    "MergedQKVParallelLinearWithLoRA",
    "MergedQKVParallelLinearWithShardedLoRA",
    "QKVParallelLinearWithLoRA",
    "QKVParallelLinearWithShardedLoRA",
    "RowParallelLinearWithLoRA",
    "RowParallelLinearWithShardedLoRA",
    "ReplicatedLinearWithLoRA",
    "LoRAMapping",
    "FusedMoEWithLoRA",
]

BaseLayerWithLoRA

Bases: Module

Source code in vllm/lora/layers/base.py
class BaseLayerWithLoRA(nn.Module):
    def slice_lora_a(
        self, lora_a: torch.Tensor | list[torch.Tensor | None]
    ) -> torch.Tensor | list[torch.Tensor | None]:
        """Slice lora a if splitting for tensor parallelism."""
        ...

    def slice_lora_b(
        self, lora_b: torch.Tensor | list[torch.Tensor | None]
    ) -> torch.Tensor | list[torch.Tensor | None]:
        """Slice lora b if splitting with tensor parallelism."""
        ...

    def create_lora_weights(
        self,
        max_loras: int,
        lora_config: LoRAConfig,
        model_config: PretrainedConfig | None = None,
    ) -> None:
        """Initializes lora matrices."""
        ...

    def reset_lora(self, index: int):
        """Resets the lora weights at index back to 0."""
        ...

    def set_lora(
        self,
        index: int,
        lora_a: torch.Tensor,
        lora_b: torch.Tensor,
        embeddings_tensor: torch.Tensor | None,
    ):
        """Overwrites lora tensors at index."""
        ...

    def set_mapping(
        self,
        punica_wrapper,
    ):
        self.punica_wrapper: PunicaWrapperBase = punica_wrapper

    @classmethod
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        """Returns True if the layer can be replaced by this LoRA layer."""
        raise NotImplementedError

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool

Returns True if the layer can be replaced by this LoRA layer.

Source code in vllm/lora/layers/base.py
@classmethod
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    """Returns True if the layer can be replaced by this LoRA layer."""
    raise NotImplementedError

create_lora_weights

create_lora_weights(
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None

Initializes lora matrices.

Source code in vllm/lora/layers/base.py
def create_lora_weights(
    self,
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None:
    """Initializes lora matrices."""
    ...

reset_lora

reset_lora(index: int)

Resets the lora weights at index back to 0.

Source code in vllm/lora/layers/base.py
def reset_lora(self, index: int):
    """Resets the lora weights at index back to 0."""
    ...

set_lora

set_lora(
    index: int,
    lora_a: Tensor,
    lora_b: Tensor,
    embeddings_tensor: Tensor | None,
)

Overwrites lora tensors at index.

Source code in vllm/lora/layers/base.py
def set_lora(
    self,
    index: int,
    lora_a: torch.Tensor,
    lora_b: torch.Tensor,
    embeddings_tensor: torch.Tensor | None,
):
    """Overwrites lora tensors at index."""
    ...

set_mapping

set_mapping(punica_wrapper)
Source code in vllm/lora/layers/base.py
def set_mapping(
    self,
    punica_wrapper,
):
    self.punica_wrapper: PunicaWrapperBase = punica_wrapper

slice_lora_a

slice_lora_a(
    lora_a: Tensor | list[Tensor | None],
) -> Tensor | list[Tensor | None]

Slice lora a if splitting for tensor parallelism.

Source code in vllm/lora/layers/base.py
def slice_lora_a(
    self, lora_a: torch.Tensor | list[torch.Tensor | None]
) -> torch.Tensor | list[torch.Tensor | None]:
    """Slice lora a if splitting for tensor parallelism."""
    ...

slice_lora_b

slice_lora_b(
    lora_b: Tensor | list[Tensor | None],
) -> Tensor | list[Tensor | None]

Slice lora b if splitting with tensor parallelism.

Source code in vllm/lora/layers/base.py
def slice_lora_b(
    self, lora_b: torch.Tensor | list[torch.Tensor | None]
) -> torch.Tensor | list[torch.Tensor | None]:
    """Slice lora b if splitting with tensor parallelism."""
    ...

ColumnParallelLinearWithLoRA

Bases: BaseLinearLayerWithLoRA

LoRA on top of ColumnParallelLinear layer. LoRA B is sliced for tensor parallelism. There are two types for the base_layer: 1. ColumnParallelLinear, e.g.dense_h_to_4h in FalconForCausalLM. 2. MergedColumnParallelLinear, e.g.gate_up_proj in Phi3ForCausalLM.

Source code in vllm/lora/layers/column_parallel_linear.py
class ColumnParallelLinearWithLoRA(BaseLinearLayerWithLoRA):
    """
    LoRA on top of ColumnParallelLinear layer.
    LoRA B is sliced for tensor parallelism.
    There are two types for the `base_layer`:
    1. ColumnParallelLinear, e.g.`dense_h_to_4h` in `FalconForCausalLM`.
    2. MergedColumnParallelLinear, e.g.`gate_up_proj` in `Phi3ForCausalLM`.
    """

    def __init__(self, base_layer: ColumnParallelLinear) -> None:
        super().__init__(base_layer)
        # The base_layer type is ColumnParallelLinear or
        # MergedColumnParallelLinear, their weight sharding logic is
        # inconsistent when TP is greater than 1.
        self.is_merged_col_linear = type(base_layer) is MergedColumnParallelLinear
        self.output_size = self.base_layer.output_size_per_partition
        # There is only one LoRA layer
        self.n_slices = 1

    def slice_lora_a(self, lora_a: torch.Tensor) -> torch.Tensor:
        return lora_a

    def slice_lora_b(self, lora_b: torch.Tensor) -> torch.Tensor:
        # Applicable to cases where the base_layer is
        # MergedColumnParallelLinear.
        if self.is_merged_col_linear:
            shard_size = self.output_size // 2
            offset = lora_b.shape[0] // 2

            left_weight = lora_b[
                self.tp_rank * shard_size : (self.tp_rank + 1) * shard_size, :
            ]
            right_weight = lora_b[
                offset + self.tp_rank * shard_size : offset
                + (self.tp_rank + 1) * shard_size,
                :,
            ]
            lora_b = torch.cat([left_weight, right_weight], dim=0)
        # Applicable to cases where the base_layer is
        # ColumnParallelLinear.
        else:
            shard_size = self.output_size
            start_idx = self.tp_rank * shard_size
            end_idx = (self.tp_rank + 1) * shard_size
            lora_b = lora_b[start_idx:end_idx, :]
        return lora_b

    def forward(
        self, input_: torch.Tensor
    ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor | None]:
        """Forward of ColumnParallelLinear

        Args:
            input_: Tensor whose last dimension is `input_size`.

        Returns:
            - output
            - bias
        """
        bias = self.base_layer.bias if not self.base_layer.skip_bias_add else None

        # Matrix multiply.
        output_parallel = self.apply(input_, bias)
        if self.base_layer.gather_output and self.tp_size > 1:
            # All-gather across the partitions.
            output = tensor_model_parallel_all_gather(output_parallel)
        else:
            output = output_parallel

        if not self.base_layer.return_bias:
            return output

        output_bias = self.base_layer.bias if self.base_layer.skip_bias_add else None
        return output, output_bias

    @classmethod
    @_not_fully_sharded_can_replace
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        return type(source_layer) is ColumnParallelLinear or (
            type(source_layer) is MergedColumnParallelLinear
            and len(packed_modules_list) == 1
        )

is_merged_col_linear instance-attribute

is_merged_col_linear = (
    type(base_layer) is MergedColumnParallelLinear
)

n_slices instance-attribute

n_slices = 1

output_size instance-attribute

output_size = output_size_per_partition

__init__

__init__(base_layer: ColumnParallelLinear) -> None
Source code in vllm/lora/layers/column_parallel_linear.py
def __init__(self, base_layer: ColumnParallelLinear) -> None:
    super().__init__(base_layer)
    # The base_layer type is ColumnParallelLinear or
    # MergedColumnParallelLinear, their weight sharding logic is
    # inconsistent when TP is greater than 1.
    self.is_merged_col_linear = type(base_layer) is MergedColumnParallelLinear
    self.output_size = self.base_layer.output_size_per_partition
    # There is only one LoRA layer
    self.n_slices = 1

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/column_parallel_linear.py
@classmethod
@_not_fully_sharded_can_replace
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    return type(source_layer) is ColumnParallelLinear or (
        type(source_layer) is MergedColumnParallelLinear
        and len(packed_modules_list) == 1
    )

forward

forward(
    input_: Tensor,
) -> Tensor | tuple[Tensor, Tensor | None]

Forward of ColumnParallelLinear

Parameters:

Name Type Description Default
input_ Tensor

Tensor whose last dimension is input_size.

required

Returns:

Type Description
Tensor | tuple[Tensor, Tensor | None]
  • output
Tensor | tuple[Tensor, Tensor | None]
  • bias
Source code in vllm/lora/layers/column_parallel_linear.py
def forward(
    self, input_: torch.Tensor
) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor | None]:
    """Forward of ColumnParallelLinear

    Args:
        input_: Tensor whose last dimension is `input_size`.

    Returns:
        - output
        - bias
    """
    bias = self.base_layer.bias if not self.base_layer.skip_bias_add else None

    # Matrix multiply.
    output_parallel = self.apply(input_, bias)
    if self.base_layer.gather_output and self.tp_size > 1:
        # All-gather across the partitions.
        output = tensor_model_parallel_all_gather(output_parallel)
    else:
        output = output_parallel

    if not self.base_layer.return_bias:
        return output

    output_bias = self.base_layer.bias if self.base_layer.skip_bias_add else None
    return output, output_bias

slice_lora_a

slice_lora_a(lora_a: Tensor) -> Tensor
Source code in vllm/lora/layers/column_parallel_linear.py
def slice_lora_a(self, lora_a: torch.Tensor) -> torch.Tensor:
    return lora_a

slice_lora_b

slice_lora_b(lora_b: Tensor) -> Tensor
Source code in vllm/lora/layers/column_parallel_linear.py
def slice_lora_b(self, lora_b: torch.Tensor) -> torch.Tensor:
    # Applicable to cases where the base_layer is
    # MergedColumnParallelLinear.
    if self.is_merged_col_linear:
        shard_size = self.output_size // 2
        offset = lora_b.shape[0] // 2

        left_weight = lora_b[
            self.tp_rank * shard_size : (self.tp_rank + 1) * shard_size, :
        ]
        right_weight = lora_b[
            offset + self.tp_rank * shard_size : offset
            + (self.tp_rank + 1) * shard_size,
            :,
        ]
        lora_b = torch.cat([left_weight, right_weight], dim=0)
    # Applicable to cases where the base_layer is
    # ColumnParallelLinear.
    else:
        shard_size = self.output_size
        start_idx = self.tp_rank * shard_size
        end_idx = (self.tp_rank + 1) * shard_size
        lora_b = lora_b[start_idx:end_idx, :]
    return lora_b

ColumnParallelLinearWithShardedLoRA

Bases: ColumnParallelLinearWithLoRA

Differs from ColumnParallelLinearWithLoRA by slicing LoRA A also.

Based on S-LoRA, slicing happens along the rank dim.

Source code in vllm/lora/layers/column_parallel_linear.py
class ColumnParallelLinearWithShardedLoRA(ColumnParallelLinearWithLoRA):
    """
    Differs from ColumnParallelLinearWithLoRA by slicing LoRA A also.

    Based on S-LoRA, slicing happens along the rank dim.
    """

    # For all LoRA layers where the `base_layer` is `ColumnParallelLinear`,
    # their `lora_a` and `lora_b` have different sharding patterns. After
    # completing the `lora_a` GEMM , a gather operation is performed.
    # Therefore, the sharding of `lora_a` only needs to correspond with the
    # gather operation.
    def slice_lora_a(self, lora_a: torch.Tensor) -> torch.Tensor:
        shard_size = self.lora_a_stacked[0].shape[2]
        start_idx = self.tp_rank * shard_size
        lora_a = lora_a[start_idx : start_idx + shard_size, :]
        return lora_a

    def apply(self, x: torch.Tensor, bias: torch.Tensor | None = None) -> torch.Tensor:
        return _mcp_apply(x, bias, self)

    @classmethod
    @_fully_sharded_can_replace
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        # specifying kwargs so they can be easily accessed in decorator
        return super().can_replace_layer(
            source_layer=source_layer,
            lora_config=lora_config,
            packed_modules_list=packed_modules_list,
            model_config=model_config,
            decorate=False,
        )

apply

apply(x: Tensor, bias: Tensor | None = None) -> Tensor
Source code in vllm/lora/layers/column_parallel_linear.py
def apply(self, x: torch.Tensor, bias: torch.Tensor | None = None) -> torch.Tensor:
    return _mcp_apply(x, bias, self)

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/column_parallel_linear.py
@classmethod
@_fully_sharded_can_replace
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    # specifying kwargs so they can be easily accessed in decorator
    return super().can_replace_layer(
        source_layer=source_layer,
        lora_config=lora_config,
        packed_modules_list=packed_modules_list,
        model_config=model_config,
        decorate=False,
    )

slice_lora_a

slice_lora_a(lora_a: Tensor) -> Tensor
Source code in vllm/lora/layers/column_parallel_linear.py
def slice_lora_a(self, lora_a: torch.Tensor) -> torch.Tensor:
    shard_size = self.lora_a_stacked[0].shape[2]
    start_idx = self.tp_rank * shard_size
    lora_a = lora_a[start_idx : start_idx + shard_size, :]
    return lora_a

FusedMoEWithLoRA

Bases: BaseLayerWithLoRA

Source code in vllm/lora/layers/fused_moe.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
class FusedMoEWithLoRA(BaseLayerWithLoRA):
    def __init__(self, base_layer: FusedMoE) -> None:
        super().__init__()
        self.base_layer = base_layer
        self.tp_size = get_tensor_model_parallel_world_size()
        self.tp_rank = get_tensor_model_parallel_rank()
        self.device = base_layer.w2_weight.device
        self._inject_lora_into_fused_moe()

    def _inject_lora_into_fused_moe(self):
        moe_state_dict = {}
        top_k = self.base_layer.top_k

        if self.base_layer.quant_config is None:
            quant_config = FUSED_MOE_UNQUANTIZED_CONFIG
        elif not isinstance(self.base_layer.quant_config, Mxfp4Config):
            quant_config = self.base_layer.quant_config
        else:
            quant_config = mxfp4_w4a16_moe_quant_config(
                w1_bias=self.base_layer.w13_bias,
                w2_bias=self.base_layer.w2_bias,
                w1_scale=self.base_layer.w13_weight_scale,
                w2_scale=self.base_layer.w2_weight_scale,
            )

        m_fused_moe_fn = (
            modular_triton_fused_moe(
                quant_config, shared_experts=self.base_layer.shared_experts
            )
            if not quant_config.use_mxfp4_w4a16
            else modular_marlin_fused_moe(
                quant_config, shared_experts=self.base_layer.shared_experts
            )
        )

        def fwd_decorator(layer, func):
            def wrapper(*args, **kwargs):
                moe_state_dict["hidden_states"] = kwargs["hidden_states"]
                moe_state_dict["topk_ids"] = kwargs["topk_ids"]
                moe_state_dict["topk_weights"] = kwargs["topk_weights"]
                moe_state_dict["global_num_experts"] = kwargs["global_num_experts"]
                moe_state_dict["expert_map"] = kwargs["expert_map"]
                moe_state_dict["apply_router_weight_on_input"] = kwargs[
                    "apply_router_weight_on_input"
                ]
                result = func(*args, **kwargs)
                return result

            return wrapper

        def act_decorator(layer, func):
            def wrapper(*args, **kwargs):
                _, output, input = args

                hidden_states = moe_state_dict["hidden_states"]
                topk_weights = moe_state_dict["topk_weights"]
                curr_topk_ids = moe_state_dict["topk_ids"]
                global_num_experts = moe_state_dict["global_num_experts"]
                expert_map = moe_state_dict["expert_map"]

                config_dtype = _get_config_dtype_str(
                    dtype=hidden_states.dtype,
                    use_fp8_w8a8=False,
                    use_int8_w8a16=False,
                    use_int4_w4a16=False,
                )
                CHUNK_SIZE = envs.VLLM_FUSED_MOE_CHUNK_SIZE
                num_tokens = hidden_states.size(0)
                M = min(num_tokens, CHUNK_SIZE)

                get_config_func = functools.partial(
                    try_get_optimal_moe_config,
                    layer.w13_weight.size(),
                    layer.w2_weight.size(),
                    top_k,
                    config_dtype,
                    block_shape=layer.quant_method.moe_quant_config.block_shape,
                )

                (_, _, num_tokens_per_lora, _, _, _) = (
                    self.punica_wrapper.token_mapping_meta.meta_args(
                        hidden_states.size(0)
                    )
                )
                max_loras = self.w1_lora_a_stacked.shape[0]
                config = get_config_func(M)
                (
                    sorted_token_ids_lora,
                    expert_ids_lora,
                    num_tokens_post_padded_lora,
                ) = self.punica_wrapper.moe_lora_align_block_size(
                    curr_topk_ids,
                    num_tokens,
                    config["BLOCK_SIZE_M"],
                    global_num_experts,
                    max_loras,
                    num_tokens_per_lora,
                    self.adapter_enabled,
                    expert_map,
                )

                moe_state_dict["sorted_token_ids_lora"] = sorted_token_ids_lora
                moe_state_dict["expert_ids_lora"] = expert_ids_lora
                moe_state_dict["num_tokens_post_padded_lora"] = (
                    num_tokens_post_padded_lora
                )

                w13_lora_a_stacked = [self.w1_lora_a_stacked, self.w3_lora_a_stacked]
                w13_lora_b_stacked = [self.w1_lora_b_stacked, self.w3_lora_b_stacked]
                max_lora_rank = self.w1_lora_a_stacked.shape[-2]
                expert_ids_lora = expert_ids_lora.view(max_loras, -1)
                sorted_token_ids_lora = sorted_token_ids_lora.view(max_loras, -1)

                self.punica_wrapper.add_lora_fused_moe(
                    input.view(-1, top_k, input.shape[-1]),
                    hidden_states,
                    w13_lora_a_stacked,
                    w13_lora_b_stacked,
                    topk_weights,
                    sorted_token_ids_lora,
                    expert_ids_lora,
                    num_tokens_post_padded_lora,
                    max_lora_rank,
                    top_k,
                    config,
                    self.adapter_enabled,
                )

                result = func(*args, **kwargs)

                moe_state_dict["intermediate_cache2"] = output
                return result

            return wrapper

        def moe_sum_decorator(layer, func):
            def wrapper(*args, **kwargs):
                hidden_states = moe_state_dict["hidden_states"]
                topk_weights = moe_state_dict["topk_weights"]

                config_dtype = _get_config_dtype_str(
                    dtype=hidden_states.dtype,
                    use_fp8_w8a8=False,
                    use_int8_w8a16=False,
                    use_int4_w4a16=False,
                )
                CHUNK_SIZE = envs.VLLM_FUSED_MOE_CHUNK_SIZE
                num_tokens = hidden_states.size(0)
                M = min(num_tokens, CHUNK_SIZE)

                get_config_func = functools.partial(
                    try_get_optimal_moe_config,
                    layer.w13_weight.size(),
                    layer.w2_weight.size(),
                    top_k,
                    config_dtype,
                    block_shape=layer.quant_method.moe_quant_config.block_shape,
                )

                config = get_config_func(M)

                sorted_token_ids_lora = moe_state_dict["sorted_token_ids_lora"]
                expert_ids_lora = moe_state_dict["expert_ids_lora"]
                num_tokens_post_padded_lora = moe_state_dict[
                    "num_tokens_post_padded_lora"
                ]
                max_loras = self.w1_lora_a_stacked.shape[0]
                expert_ids_lora = expert_ids_lora.view(max_loras, -1)
                sorted_token_ids_lora = sorted_token_ids_lora.view(max_loras, -1)
                intermediate_cache2 = moe_state_dict["intermediate_cache2"]
                intermediate_cache3 = args[0]
                max_lora_rank = self.w1_lora_a_stacked.shape[-2]
                self.punica_wrapper.add_lora_fused_moe(
                    intermediate_cache3,
                    intermediate_cache2,
                    [self.w2_lora_a_stacked],
                    [self.w2_lora_b_stacked],
                    topk_weights,
                    sorted_token_ids_lora,
                    expert_ids_lora,
                    num_tokens_post_padded_lora,
                    max_lora_rank,
                    top_k,
                    config,
                    self.adapter_enabled,
                    True,
                )

                result = func(*args, **kwargs)
                return result

            return wrapper

        fused_experts = m_fused_moe_fn.fused_experts

        m_fused_moe_fn.forward = fwd_decorator(self.base_layer, m_fused_moe_fn.forward)
        fused_experts.activation = act_decorator(
            self.base_layer, fused_experts.activation
        )
        fused_experts.moe_sum = moe_sum_decorator(
            self.base_layer, fused_experts.moe_sum
        )

        self.base_layer.quant_method.old_fused_experts = (
            self.base_layer.quant_method.fused_experts
        )
        self.base_layer.quant_method.fused_experts = m_fused_moe_fn

    def create_lora_weights(
        self,
        max_loras: int,
        lora_config: LoRAConfig,
        model_config: PretrainedConfig | None = None,
    ) -> None:
        """Initializes lora matrices."""

        assert not self.base_layer.use_ep, (
            "EP support for Fused MoE LoRA is not implemented yet."
        )
        self.adapter_enabled = torch.tensor(
            [0] * (max_loras + 1), dtype=torch.int, device=self.device
        )

        self.w1_lora_a_stacked = torch.zeros(
            (
                max_loras,
                self.base_layer.global_num_experts,
                lora_config.max_lora_rank,
                self.base_layer.hidden_size,
            ),
            dtype=lora_config.lora_dtype,
            device=self.device,
        )
        self.w1_lora_b_stacked = torch.zeros(
            (
                max_loras,
                self.base_layer.global_num_experts,
                self.base_layer.intermediate_size_per_partition,
                lora_config.max_lora_rank,
            ),
            dtype=lora_config.lora_dtype,
            device=self.device,
        )

        self.w2_lora_a_stacked = torch.zeros(
            (
                max_loras,
                self.base_layer.global_num_experts,
                lora_config.max_lora_rank,
                self.base_layer.intermediate_size_per_partition,
            ),
            dtype=lora_config.lora_dtype,
            device=self.device,
        )
        self.w2_lora_b_stacked = torch.zeros(
            (
                max_loras,
                self.base_layer.global_num_experts,
                self.base_layer.hidden_size,
                lora_config.max_lora_rank,
            ),
            dtype=lora_config.lora_dtype,
            device=self.device,
        )

        self.w3_lora_a_stacked = torch.zeros(
            (
                max_loras,
                self.base_layer.global_num_experts,
                lora_config.max_lora_rank,
                self.base_layer.hidden_size,
            ),
            dtype=lora_config.lora_dtype,
            device=self.device,
        )
        self.w3_lora_b_stacked = torch.zeros(
            (
                max_loras,
                self.base_layer.global_num_experts,
                self.base_layer.intermediate_size_per_partition,
                lora_config.max_lora_rank,
            ),
            dtype=lora_config.lora_dtype,
            device=self.device,
        )

        # They will be used by 'LoRALayerWeights.create_dummy_lora_weights'
        # to create a dummy LoRA weights.
        self.lora_a_stacked = []
        self.lora_b_stacked = []
        for lora_id in range(max_loras):
            for experts_id in range(self.base_layer.global_num_experts):
                # gate_proj,down_proj,up_proj
                self.lora_a_stacked.append(self.w1_lora_a_stacked[lora_id][experts_id])
                self.lora_a_stacked.append(self.w2_lora_a_stacked[lora_id][experts_id])
                self.lora_a_stacked.append(self.w3_lora_a_stacked[lora_id][experts_id])

                self.lora_b_stacked.append(self.w1_lora_b_stacked[lora_id][experts_id])
                self.lora_b_stacked.append(self.w2_lora_b_stacked[lora_id][experts_id])
                self.lora_b_stacked.append(self.w3_lora_b_stacked[lora_id][experts_id])

    def reset_lora(self, index: int):
        """Resets the lora weights at index back to 0."""
        self.w1_lora_a_stacked[index] = 0
        self.w1_lora_b_stacked[index] = 0
        self.w3_lora_a_stacked[index] = 0
        self.w3_lora_b_stacked[index] = 0
        self.w2_lora_a_stacked[index] = 0
        self.w2_lora_b_stacked[index] = 0
        self.adapter_enabled[index] = 0

    def set_lora(
        self,
        index: int,
        lora_a: torch.Tensor,
        lora_b: torch.Tensor,
        embeddings_tensor: torch.Tensor | None,
        bias: torch.Tensor | None = None,
    ):
        """Overwrites lora tensors at index."""
        self.reset_lora(index)
        self.adapter_enabled[index] = 1
        for eid in range(len(lora_a) // 3):
            w1_lora_a = lora_a[eid * 3]
            w2_lora_a = lora_a[eid * 3 + 1]
            w3_lora_a = lora_a[eid * 3 + 2]
            w1_lora_b = lora_b[eid * 3]
            w2_lora_b = lora_b[eid * 3 + 1]
            w3_lora_b = lora_b[eid * 3 + 2]

            # Handle the case of adding LoRA to only a subset of experts
            if w1_lora_a is None or w2_lora_a is None or w3_lora_a is None:
                continue

            if self.tp_size > 1:
                shard_size = self.base_layer.intermediate_size_per_partition
                start_idx = self.tp_rank * shard_size
                end_idx = (self.tp_rank + 1) * shard_size

                w1_lora_b = w1_lora_b[start_idx:end_idx, :]
                w3_lora_b = w3_lora_b[start_idx:end_idx, :]
                w2_lora_a = w2_lora_a[:, start_idx:end_idx]

            self.w1_lora_a_stacked[
                index, eid, : w1_lora_a.shape[0], : w1_lora_a.shape[1]
            ].copy_(w1_lora_a, non_blocking=True)

            self.w3_lora_a_stacked[
                index, eid, : w3_lora_a.shape[0], : w3_lora_a.shape[1]
            ].copy_(w3_lora_a, non_blocking=True)

            self.w2_lora_b_stacked[
                index, eid, : w2_lora_b.shape[0], : w2_lora_b.shape[1]
            ].copy_(w2_lora_b, non_blocking=True)

            self.w1_lora_b_stacked[
                index, eid, : w1_lora_b.shape[0], : w1_lora_b.shape[1]
            ].copy_(w1_lora_b, non_blocking=True)
            self.w3_lora_b_stacked[
                index, eid, : w3_lora_b.shape[0], : w3_lora_b.shape[1]
            ].copy_(w3_lora_b, non_blocking=True)
            self.w2_lora_a_stacked[
                index, eid, : w2_lora_a.shape[0], : w2_lora_a.shape[1]
            ].copy_(w2_lora_a, non_blocking=True)

    @classmethod
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        """Returns True if the layer can be replaced by this LoRA layer."""
        # return type(source_layer) is FusedMoE
        return isinstance(source_layer, FusedMoE)

    def forward(self, *args, **kwargs):
        return self.base_layer.forward(*args, **kwargs)

    def maybe_all_reduce_tensor_model_parallel(self, *args, **kwargs):
        return self.base_layer.maybe_all_reduce_tensor_model_parallel(*args, **kwargs)

    @property
    def _shared_experts(self):
        return self.base_layer._shared_experts

    @property
    def quant_method(self):
        return self.base_layer.quant_method

    @property
    def is_internal_router(self) -> bool:
        return self.base_layer.is_internal_router

_shared_experts property

_shared_experts

base_layer instance-attribute

base_layer = base_layer

device instance-attribute

device = device

is_internal_router property

is_internal_router: bool

quant_method property

quant_method

tp_rank instance-attribute

tp_size instance-attribute

__init__

__init__(base_layer: FusedMoE) -> None
Source code in vllm/lora/layers/fused_moe.py
def __init__(self, base_layer: FusedMoE) -> None:
    super().__init__()
    self.base_layer = base_layer
    self.tp_size = get_tensor_model_parallel_world_size()
    self.tp_rank = get_tensor_model_parallel_rank()
    self.device = base_layer.w2_weight.device
    self._inject_lora_into_fused_moe()

_inject_lora_into_fused_moe

_inject_lora_into_fused_moe()
Source code in vllm/lora/layers/fused_moe.py
def _inject_lora_into_fused_moe(self):
    moe_state_dict = {}
    top_k = self.base_layer.top_k

    if self.base_layer.quant_config is None:
        quant_config = FUSED_MOE_UNQUANTIZED_CONFIG
    elif not isinstance(self.base_layer.quant_config, Mxfp4Config):
        quant_config = self.base_layer.quant_config
    else:
        quant_config = mxfp4_w4a16_moe_quant_config(
            w1_bias=self.base_layer.w13_bias,
            w2_bias=self.base_layer.w2_bias,
            w1_scale=self.base_layer.w13_weight_scale,
            w2_scale=self.base_layer.w2_weight_scale,
        )

    m_fused_moe_fn = (
        modular_triton_fused_moe(
            quant_config, shared_experts=self.base_layer.shared_experts
        )
        if not quant_config.use_mxfp4_w4a16
        else modular_marlin_fused_moe(
            quant_config, shared_experts=self.base_layer.shared_experts
        )
    )

    def fwd_decorator(layer, func):
        def wrapper(*args, **kwargs):
            moe_state_dict["hidden_states"] = kwargs["hidden_states"]
            moe_state_dict["topk_ids"] = kwargs["topk_ids"]
            moe_state_dict["topk_weights"] = kwargs["topk_weights"]
            moe_state_dict["global_num_experts"] = kwargs["global_num_experts"]
            moe_state_dict["expert_map"] = kwargs["expert_map"]
            moe_state_dict["apply_router_weight_on_input"] = kwargs[
                "apply_router_weight_on_input"
            ]
            result = func(*args, **kwargs)
            return result

        return wrapper

    def act_decorator(layer, func):
        def wrapper(*args, **kwargs):
            _, output, input = args

            hidden_states = moe_state_dict["hidden_states"]
            topk_weights = moe_state_dict["topk_weights"]
            curr_topk_ids = moe_state_dict["topk_ids"]
            global_num_experts = moe_state_dict["global_num_experts"]
            expert_map = moe_state_dict["expert_map"]

            config_dtype = _get_config_dtype_str(
                dtype=hidden_states.dtype,
                use_fp8_w8a8=False,
                use_int8_w8a16=False,
                use_int4_w4a16=False,
            )
            CHUNK_SIZE = envs.VLLM_FUSED_MOE_CHUNK_SIZE
            num_tokens = hidden_states.size(0)
            M = min(num_tokens, CHUNK_SIZE)

            get_config_func = functools.partial(
                try_get_optimal_moe_config,
                layer.w13_weight.size(),
                layer.w2_weight.size(),
                top_k,
                config_dtype,
                block_shape=layer.quant_method.moe_quant_config.block_shape,
            )

            (_, _, num_tokens_per_lora, _, _, _) = (
                self.punica_wrapper.token_mapping_meta.meta_args(
                    hidden_states.size(0)
                )
            )
            max_loras = self.w1_lora_a_stacked.shape[0]
            config = get_config_func(M)
            (
                sorted_token_ids_lora,
                expert_ids_lora,
                num_tokens_post_padded_lora,
            ) = self.punica_wrapper.moe_lora_align_block_size(
                curr_topk_ids,
                num_tokens,
                config["BLOCK_SIZE_M"],
                global_num_experts,
                max_loras,
                num_tokens_per_lora,
                self.adapter_enabled,
                expert_map,
            )

            moe_state_dict["sorted_token_ids_lora"] = sorted_token_ids_lora
            moe_state_dict["expert_ids_lora"] = expert_ids_lora
            moe_state_dict["num_tokens_post_padded_lora"] = (
                num_tokens_post_padded_lora
            )

            w13_lora_a_stacked = [self.w1_lora_a_stacked, self.w3_lora_a_stacked]
            w13_lora_b_stacked = [self.w1_lora_b_stacked, self.w3_lora_b_stacked]
            max_lora_rank = self.w1_lora_a_stacked.shape[-2]
            expert_ids_lora = expert_ids_lora.view(max_loras, -1)
            sorted_token_ids_lora = sorted_token_ids_lora.view(max_loras, -1)

            self.punica_wrapper.add_lora_fused_moe(
                input.view(-1, top_k, input.shape[-1]),
                hidden_states,
                w13_lora_a_stacked,
                w13_lora_b_stacked,
                topk_weights,
                sorted_token_ids_lora,
                expert_ids_lora,
                num_tokens_post_padded_lora,
                max_lora_rank,
                top_k,
                config,
                self.adapter_enabled,
            )

            result = func(*args, **kwargs)

            moe_state_dict["intermediate_cache2"] = output
            return result

        return wrapper

    def moe_sum_decorator(layer, func):
        def wrapper(*args, **kwargs):
            hidden_states = moe_state_dict["hidden_states"]
            topk_weights = moe_state_dict["topk_weights"]

            config_dtype = _get_config_dtype_str(
                dtype=hidden_states.dtype,
                use_fp8_w8a8=False,
                use_int8_w8a16=False,
                use_int4_w4a16=False,
            )
            CHUNK_SIZE = envs.VLLM_FUSED_MOE_CHUNK_SIZE
            num_tokens = hidden_states.size(0)
            M = min(num_tokens, CHUNK_SIZE)

            get_config_func = functools.partial(
                try_get_optimal_moe_config,
                layer.w13_weight.size(),
                layer.w2_weight.size(),
                top_k,
                config_dtype,
                block_shape=layer.quant_method.moe_quant_config.block_shape,
            )

            config = get_config_func(M)

            sorted_token_ids_lora = moe_state_dict["sorted_token_ids_lora"]
            expert_ids_lora = moe_state_dict["expert_ids_lora"]
            num_tokens_post_padded_lora = moe_state_dict[
                "num_tokens_post_padded_lora"
            ]
            max_loras = self.w1_lora_a_stacked.shape[0]
            expert_ids_lora = expert_ids_lora.view(max_loras, -1)
            sorted_token_ids_lora = sorted_token_ids_lora.view(max_loras, -1)
            intermediate_cache2 = moe_state_dict["intermediate_cache2"]
            intermediate_cache3 = args[0]
            max_lora_rank = self.w1_lora_a_stacked.shape[-2]
            self.punica_wrapper.add_lora_fused_moe(
                intermediate_cache3,
                intermediate_cache2,
                [self.w2_lora_a_stacked],
                [self.w2_lora_b_stacked],
                topk_weights,
                sorted_token_ids_lora,
                expert_ids_lora,
                num_tokens_post_padded_lora,
                max_lora_rank,
                top_k,
                config,
                self.adapter_enabled,
                True,
            )

            result = func(*args, **kwargs)
            return result

        return wrapper

    fused_experts = m_fused_moe_fn.fused_experts

    m_fused_moe_fn.forward = fwd_decorator(self.base_layer, m_fused_moe_fn.forward)
    fused_experts.activation = act_decorator(
        self.base_layer, fused_experts.activation
    )
    fused_experts.moe_sum = moe_sum_decorator(
        self.base_layer, fused_experts.moe_sum
    )

    self.base_layer.quant_method.old_fused_experts = (
        self.base_layer.quant_method.fused_experts
    )
    self.base_layer.quant_method.fused_experts = m_fused_moe_fn

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool

Returns True if the layer can be replaced by this LoRA layer.

Source code in vllm/lora/layers/fused_moe.py
@classmethod
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    """Returns True if the layer can be replaced by this LoRA layer."""
    # return type(source_layer) is FusedMoE
    return isinstance(source_layer, FusedMoE)

create_lora_weights

create_lora_weights(
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None

Initializes lora matrices.

Source code in vllm/lora/layers/fused_moe.py
def create_lora_weights(
    self,
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None:
    """Initializes lora matrices."""

    assert not self.base_layer.use_ep, (
        "EP support for Fused MoE LoRA is not implemented yet."
    )
    self.adapter_enabled = torch.tensor(
        [0] * (max_loras + 1), dtype=torch.int, device=self.device
    )

    self.w1_lora_a_stacked = torch.zeros(
        (
            max_loras,
            self.base_layer.global_num_experts,
            lora_config.max_lora_rank,
            self.base_layer.hidden_size,
        ),
        dtype=lora_config.lora_dtype,
        device=self.device,
    )
    self.w1_lora_b_stacked = torch.zeros(
        (
            max_loras,
            self.base_layer.global_num_experts,
            self.base_layer.intermediate_size_per_partition,
            lora_config.max_lora_rank,
        ),
        dtype=lora_config.lora_dtype,
        device=self.device,
    )

    self.w2_lora_a_stacked = torch.zeros(
        (
            max_loras,
            self.base_layer.global_num_experts,
            lora_config.max_lora_rank,
            self.base_layer.intermediate_size_per_partition,
        ),
        dtype=lora_config.lora_dtype,
        device=self.device,
    )
    self.w2_lora_b_stacked = torch.zeros(
        (
            max_loras,
            self.base_layer.global_num_experts,
            self.base_layer.hidden_size,
            lora_config.max_lora_rank,
        ),
        dtype=lora_config.lora_dtype,
        device=self.device,
    )

    self.w3_lora_a_stacked = torch.zeros(
        (
            max_loras,
            self.base_layer.global_num_experts,
            lora_config.max_lora_rank,
            self.base_layer.hidden_size,
        ),
        dtype=lora_config.lora_dtype,
        device=self.device,
    )
    self.w3_lora_b_stacked = torch.zeros(
        (
            max_loras,
            self.base_layer.global_num_experts,
            self.base_layer.intermediate_size_per_partition,
            lora_config.max_lora_rank,
        ),
        dtype=lora_config.lora_dtype,
        device=self.device,
    )

    # They will be used by 'LoRALayerWeights.create_dummy_lora_weights'
    # to create a dummy LoRA weights.
    self.lora_a_stacked = []
    self.lora_b_stacked = []
    for lora_id in range(max_loras):
        for experts_id in range(self.base_layer.global_num_experts):
            # gate_proj,down_proj,up_proj
            self.lora_a_stacked.append(self.w1_lora_a_stacked[lora_id][experts_id])
            self.lora_a_stacked.append(self.w2_lora_a_stacked[lora_id][experts_id])
            self.lora_a_stacked.append(self.w3_lora_a_stacked[lora_id][experts_id])

            self.lora_b_stacked.append(self.w1_lora_b_stacked[lora_id][experts_id])
            self.lora_b_stacked.append(self.w2_lora_b_stacked[lora_id][experts_id])
            self.lora_b_stacked.append(self.w3_lora_b_stacked[lora_id][experts_id])

forward

forward(*args, **kwargs)
Source code in vllm/lora/layers/fused_moe.py
def forward(self, *args, **kwargs):
    return self.base_layer.forward(*args, **kwargs)

maybe_all_reduce_tensor_model_parallel

maybe_all_reduce_tensor_model_parallel(*args, **kwargs)
Source code in vllm/lora/layers/fused_moe.py
def maybe_all_reduce_tensor_model_parallel(self, *args, **kwargs):
    return self.base_layer.maybe_all_reduce_tensor_model_parallel(*args, **kwargs)

reset_lora

reset_lora(index: int)

Resets the lora weights at index back to 0.

Source code in vllm/lora/layers/fused_moe.py
def reset_lora(self, index: int):
    """Resets the lora weights at index back to 0."""
    self.w1_lora_a_stacked[index] = 0
    self.w1_lora_b_stacked[index] = 0
    self.w3_lora_a_stacked[index] = 0
    self.w3_lora_b_stacked[index] = 0
    self.w2_lora_a_stacked[index] = 0
    self.w2_lora_b_stacked[index] = 0
    self.adapter_enabled[index] = 0

set_lora

set_lora(
    index: int,
    lora_a: Tensor,
    lora_b: Tensor,
    embeddings_tensor: Tensor | None,
    bias: Tensor | None = None,
)

Overwrites lora tensors at index.

Source code in vllm/lora/layers/fused_moe.py
def set_lora(
    self,
    index: int,
    lora_a: torch.Tensor,
    lora_b: torch.Tensor,
    embeddings_tensor: torch.Tensor | None,
    bias: torch.Tensor | None = None,
):
    """Overwrites lora tensors at index."""
    self.reset_lora(index)
    self.adapter_enabled[index] = 1
    for eid in range(len(lora_a) // 3):
        w1_lora_a = lora_a[eid * 3]
        w2_lora_a = lora_a[eid * 3 + 1]
        w3_lora_a = lora_a[eid * 3 + 2]
        w1_lora_b = lora_b[eid * 3]
        w2_lora_b = lora_b[eid * 3 + 1]
        w3_lora_b = lora_b[eid * 3 + 2]

        # Handle the case of adding LoRA to only a subset of experts
        if w1_lora_a is None or w2_lora_a is None or w3_lora_a is None:
            continue

        if self.tp_size > 1:
            shard_size = self.base_layer.intermediate_size_per_partition
            start_idx = self.tp_rank * shard_size
            end_idx = (self.tp_rank + 1) * shard_size

            w1_lora_b = w1_lora_b[start_idx:end_idx, :]
            w3_lora_b = w3_lora_b[start_idx:end_idx, :]
            w2_lora_a = w2_lora_a[:, start_idx:end_idx]

        self.w1_lora_a_stacked[
            index, eid, : w1_lora_a.shape[0], : w1_lora_a.shape[1]
        ].copy_(w1_lora_a, non_blocking=True)

        self.w3_lora_a_stacked[
            index, eid, : w3_lora_a.shape[0], : w3_lora_a.shape[1]
        ].copy_(w3_lora_a, non_blocking=True)

        self.w2_lora_b_stacked[
            index, eid, : w2_lora_b.shape[0], : w2_lora_b.shape[1]
        ].copy_(w2_lora_b, non_blocking=True)

        self.w1_lora_b_stacked[
            index, eid, : w1_lora_b.shape[0], : w1_lora_b.shape[1]
        ].copy_(w1_lora_b, non_blocking=True)
        self.w3_lora_b_stacked[
            index, eid, : w3_lora_b.shape[0], : w3_lora_b.shape[1]
        ].copy_(w3_lora_b, non_blocking=True)
        self.w2_lora_a_stacked[
            index, eid, : w2_lora_a.shape[0], : w2_lora_a.shape[1]
        ].copy_(w2_lora_a, non_blocking=True)

LoRAMapping dataclass

Source code in vllm/lora/layers/utils.py
@dataclass
class LoRAMapping:
    index_mapping: tuple[int, ...]
    prompt_mapping: tuple[int, ...]
    is_prefill: bool = False

    def __post_init__(self):
        self.index_mapping = tuple(self.index_mapping)
        self.prompt_mapping = tuple(self.prompt_mapping)

index_mapping instance-attribute

index_mapping: tuple[int, ...]

is_prefill class-attribute instance-attribute

is_prefill: bool = False

prompt_mapping instance-attribute

prompt_mapping: tuple[int, ...]

__init__

__init__(
    index_mapping: tuple[int, ...],
    prompt_mapping: tuple[int, ...],
    is_prefill: bool = False,
) -> None

__post_init__

__post_init__()
Source code in vllm/lora/layers/utils.py
def __post_init__(self):
    self.index_mapping = tuple(self.index_mapping)
    self.prompt_mapping = tuple(self.prompt_mapping)

LogitsProcessorWithLoRA

Bases: BaseLayerWithLoRA

LoRA wrapper for LogitsProcessor, with extra logic to handle the application of the LoRA adapter and added LoRA vocabulary.

Parameters:

Name Type Description Default
base_layer LogitsProcessor

LogitsProcessor layer

required
hidden_size int

hidden size of the model

required
dtype dtype

data type of the model

required
device device

device of the model

required
sharded_to_full_mapping list[int] | None

index mapping from sharded vocab to full vocab received from base_layer.get_sharded_to_full_mapping(). If None, no reindexing will be done.

required
Source code in vllm/lora/layers/logits_processor.py
class LogitsProcessorWithLoRA(BaseLayerWithLoRA):
    """
    LoRA wrapper for LogitsProcessor, with extra logic to handle the
    application of the LoRA adapter and added LoRA vocabulary.

    Args:
        base_layer: LogitsProcessor layer
        hidden_size: hidden size of the model
        dtype: data type of the model
        device: device of the model
        sharded_to_full_mapping: index mapping from sharded vocab to full vocab
            received from base_layer.get_sharded_to_full_mapping(). If None,
            no reindexing will be done.
    """

    def __init__(
        self,
        base_layer: LogitsProcessor,
        hidden_size: int,
        dtype: torch.dtype,
        device: torch.device,
        sharded_to_full_mapping: list[int] | None,
    ) -> None:
        super().__init__()
        self.base_layer = base_layer
        self.hidden_size = hidden_size
        self.dtype = dtype
        self.device = device
        self.tp_size = get_tensor_model_parallel_world_size()
        self.tp_rank = get_tensor_model_parallel_rank()
        self.sharded_to_full_mapping = sharded_to_full_mapping

    @property
    def logits_as_input(self):
        return self.base_layer.logits_as_input

    @property
    def vocab_size(self):
        return self.base_layer.vocab_size

    @property
    def scale(self):
        return self.base_layer.scale

    @property
    def soft_cap(self):
        return self.base_layer.soft_cap

    @property
    def use_all_gather(self):
        return self.base_layer.use_all_gather

    @property
    def org_vocab_size(self):
        return self.base_layer.org_vocab_size

    @property
    def include_gpu_probs_tensor(self):
        return self.base_layer.include_gpu_probs_tensor

    @property
    def should_modify_greedy_probs_inplace(self):
        return self.base_layer.should_modify_greedy_probs_inplace

    def create_lora_weights(
        self,
        max_loras: int,
        lora_config: LoRAConfig,
        model_config: PretrainedConfig | None = None,
    ) -> None:
        # TODO: Verify if this condition can be further relaxed
        if 32000 < self.base_layer.vocab_size > 257024:
            raise ValueError(
                "When using LoRA, vocab size must be 32000 >= vocab_size <= 257024"
            )
        self.lora_a_stacked = torch.zeros(
            (
                max_loras,
                1,
                lora_config.max_lora_rank,
                self.hidden_size,
            ),
            dtype=lora_config.lora_dtype,
            device=self.device,
        )
        self.lora_b_stacked = torch.zeros(
            (
                max_loras,
                1,
                # Pad for kernel compatibility
                math.ceil(
                    self.base_layer.vocab_size / lora_config.lora_vocab_padding_size
                )
                * lora_config.lora_vocab_padding_size,
                lora_config.max_lora_rank,
            ),
            dtype=lora_config.lora_dtype,
            device=self.device,
        )
        self.embeddings_tensors = torch.full(
            (max_loras, lora_config.lora_extra_vocab_size, self.hidden_size),
            fill_value=float("-inf"),
            dtype=self.dtype,
            device=self.device,
        )
        if self.sharded_to_full_mapping is not None:
            self.sharded_to_full_mapping_gpu = torch.tensor(
                self.sharded_to_full_mapping, device=self.device, dtype=torch.long
            )
        else:
            self.sharded_to_full_mapping_gpu = None

    def reset_lora(self, index: int):
        self.lora_a_stacked[index] = 0
        self.lora_b_stacked[index] = 0
        self.embeddings_tensors[index] = float("-inf")

    def set_lora(
        self,
        index: int,
        lora_a: torch.Tensor,
        lora_b: torch.Tensor,
        embeddings_tensor: torch.Tensor | None,
    ):
        self.reset_lora(index)
        self.lora_a_stacked[index, 0, : lora_a.shape[0], : lora_a.shape[1]].copy_(
            lora_a, non_blocking=True
        )
        self.lora_b_stacked[index, 0, : lora_b.shape[0], : lora_b.shape[1]].copy_(
            lora_b, non_blocking=True
        )
        if embeddings_tensor is not None:
            self.embeddings_tensors[
                index,
                : embeddings_tensor.shape[0],
                : embeddings_tensor.shape[1],
            ] = embeddings_tensor

    def _get_logits(
        self,
        hidden_states: torch.Tensor,
        lm_head: VocabParallelEmbedding,
        embedding_bias: torch.Tensor | None = None,
    ) -> torch.Tensor | None:
        # Get the logits for the next tokens.
        logits = lm_head.quant_method.apply(lm_head, hidden_states)
        if embedding_bias is not None:
            logits += embedding_bias

        # Gather logits for TP
        logits = self.base_layer._gather_logits(logits)

        if logits is None:
            return None

        if self.sharded_to_full_mapping_gpu is not None:
            # Reindex full logits tensor to ensure 1:1 mapping between
            # index and token_id
            # Example for:
            #   org_vocab_size = 4
            #   added_vocab_size = 2
            #   pad_to_size = 8
            #   tp_size = 2

            # indices:  [0, 1, 2,  3, 4, 5, 6,  7]
            # token_id: [0, 1, 4, -1, 2, 3, 5, -1]

            # Therefore, the mapping is expected to be:
            # [0, 1, 4, 6, 2, 3, 5, 7] so that when we reindex,
            # we get:
            # indices:  [0, 1, 2, 3, 4, 5,  6,  7]
            # token_id: [0, 1, 2, 3, 4, 5, -1, -1]
            logits = logits[:, self.sharded_to_full_mapping_gpu]

        lora_logits = torch.empty(
            self.embeddings_tensors.shape[0] + 1,
            self.embeddings_tensors.shape[1],
            hidden_states.shape[0],
            dtype=self.embeddings_tensors.dtype,
            device=self.embeddings_tensors.device,
        )
        torch.matmul(self.embeddings_tensors, hidden_states.T, out=lora_logits[:-1])

        neg_inf, pos_inf = current_platform.get_infinity_values(lora_logits.dtype)

        lora_logits[-1] = neg_inf
        lora_logits = lora_logits.mT
        indices_padded = self.punica_wrapper.sampler_indices_padded

        if current_platform.is_tpu() or current_platform.is_xpu():
            indices_padded = indices_padded[: logits.size(0)]

        lora_logits = (
            lora_logits.reshape(
                lora_logits.shape[0] * lora_logits.shape[1],
                lora_logits.shape[2],
            )
            .index_select(0, indices_padded)
            .nan_to_num_(nan=neg_inf, posinf=pos_inf, neginf=neg_inf)
        )

        logits[
            :,
            self.base_layer.org_vocab_size : self.base_layer.org_vocab_size
            + lora_logits.shape[1],
        ] = lora_logits

        lora_output: torch.Tensor | None = self.punica_wrapper.add_lora_logits(
            logits, hidden_states, self.lora_a_stacked, self.lora_b_stacked, 1.0
        )

        if not current_platform.can_update_inplace():
            logits = lora_output

        # Remove paddings in vocab (if any).
        logits = logits[:, : self.base_layer.vocab_size]
        return logits

    def forward(self, *args, **kwargs):
        return type(self.base_layer).forward(self, *args, **kwargs)

    @classmethod
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        # Special handling for the LogitsProcessor.
        return False

base_layer instance-attribute

base_layer = base_layer

device instance-attribute

device = device

dtype instance-attribute

dtype = dtype

hidden_size instance-attribute

hidden_size = hidden_size

include_gpu_probs_tensor property

include_gpu_probs_tensor

logits_as_input property

logits_as_input

org_vocab_size property

org_vocab_size

scale property

scale

sharded_to_full_mapping instance-attribute

sharded_to_full_mapping = sharded_to_full_mapping

should_modify_greedy_probs_inplace property

should_modify_greedy_probs_inplace

soft_cap property

soft_cap

tp_rank instance-attribute

tp_size instance-attribute

use_all_gather property

use_all_gather

vocab_size property

vocab_size

__init__

__init__(
    base_layer: LogitsProcessor,
    hidden_size: int,
    dtype: dtype,
    device: device,
    sharded_to_full_mapping: list[int] | None,
) -> None
Source code in vllm/lora/layers/logits_processor.py
def __init__(
    self,
    base_layer: LogitsProcessor,
    hidden_size: int,
    dtype: torch.dtype,
    device: torch.device,
    sharded_to_full_mapping: list[int] | None,
) -> None:
    super().__init__()
    self.base_layer = base_layer
    self.hidden_size = hidden_size
    self.dtype = dtype
    self.device = device
    self.tp_size = get_tensor_model_parallel_world_size()
    self.tp_rank = get_tensor_model_parallel_rank()
    self.sharded_to_full_mapping = sharded_to_full_mapping

_get_logits

_get_logits(
    hidden_states: Tensor,
    lm_head: VocabParallelEmbedding,
    embedding_bias: Tensor | None = None,
) -> Tensor | None
Source code in vllm/lora/layers/logits_processor.py
def _get_logits(
    self,
    hidden_states: torch.Tensor,
    lm_head: VocabParallelEmbedding,
    embedding_bias: torch.Tensor | None = None,
) -> torch.Tensor | None:
    # Get the logits for the next tokens.
    logits = lm_head.quant_method.apply(lm_head, hidden_states)
    if embedding_bias is not None:
        logits += embedding_bias

    # Gather logits for TP
    logits = self.base_layer._gather_logits(logits)

    if logits is None:
        return None

    if self.sharded_to_full_mapping_gpu is not None:
        # Reindex full logits tensor to ensure 1:1 mapping between
        # index and token_id
        # Example for:
        #   org_vocab_size = 4
        #   added_vocab_size = 2
        #   pad_to_size = 8
        #   tp_size = 2

        # indices:  [0, 1, 2,  3, 4, 5, 6,  7]
        # token_id: [0, 1, 4, -1, 2, 3, 5, -1]

        # Therefore, the mapping is expected to be:
        # [0, 1, 4, 6, 2, 3, 5, 7] so that when we reindex,
        # we get:
        # indices:  [0, 1, 2, 3, 4, 5,  6,  7]
        # token_id: [0, 1, 2, 3, 4, 5, -1, -1]
        logits = logits[:, self.sharded_to_full_mapping_gpu]

    lora_logits = torch.empty(
        self.embeddings_tensors.shape[0] + 1,
        self.embeddings_tensors.shape[1],
        hidden_states.shape[0],
        dtype=self.embeddings_tensors.dtype,
        device=self.embeddings_tensors.device,
    )
    torch.matmul(self.embeddings_tensors, hidden_states.T, out=lora_logits[:-1])

    neg_inf, pos_inf = current_platform.get_infinity_values(lora_logits.dtype)

    lora_logits[-1] = neg_inf
    lora_logits = lora_logits.mT
    indices_padded = self.punica_wrapper.sampler_indices_padded

    if current_platform.is_tpu() or current_platform.is_xpu():
        indices_padded = indices_padded[: logits.size(0)]

    lora_logits = (
        lora_logits.reshape(
            lora_logits.shape[0] * lora_logits.shape[1],
            lora_logits.shape[2],
        )
        .index_select(0, indices_padded)
        .nan_to_num_(nan=neg_inf, posinf=pos_inf, neginf=neg_inf)
    )

    logits[
        :,
        self.base_layer.org_vocab_size : self.base_layer.org_vocab_size
        + lora_logits.shape[1],
    ] = lora_logits

    lora_output: torch.Tensor | None = self.punica_wrapper.add_lora_logits(
        logits, hidden_states, self.lora_a_stacked, self.lora_b_stacked, 1.0
    )

    if not current_platform.can_update_inplace():
        logits = lora_output

    # Remove paddings in vocab (if any).
    logits = logits[:, : self.base_layer.vocab_size]
    return logits

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/logits_processor.py
@classmethod
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    # Special handling for the LogitsProcessor.
    return False

create_lora_weights

create_lora_weights(
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None
Source code in vllm/lora/layers/logits_processor.py
def create_lora_weights(
    self,
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None:
    # TODO: Verify if this condition can be further relaxed
    if 32000 < self.base_layer.vocab_size > 257024:
        raise ValueError(
            "When using LoRA, vocab size must be 32000 >= vocab_size <= 257024"
        )
    self.lora_a_stacked = torch.zeros(
        (
            max_loras,
            1,
            lora_config.max_lora_rank,
            self.hidden_size,
        ),
        dtype=lora_config.lora_dtype,
        device=self.device,
    )
    self.lora_b_stacked = torch.zeros(
        (
            max_loras,
            1,
            # Pad for kernel compatibility
            math.ceil(
                self.base_layer.vocab_size / lora_config.lora_vocab_padding_size
            )
            * lora_config.lora_vocab_padding_size,
            lora_config.max_lora_rank,
        ),
        dtype=lora_config.lora_dtype,
        device=self.device,
    )
    self.embeddings_tensors = torch.full(
        (max_loras, lora_config.lora_extra_vocab_size, self.hidden_size),
        fill_value=float("-inf"),
        dtype=self.dtype,
        device=self.device,
    )
    if self.sharded_to_full_mapping is not None:
        self.sharded_to_full_mapping_gpu = torch.tensor(
            self.sharded_to_full_mapping, device=self.device, dtype=torch.long
        )
    else:
        self.sharded_to_full_mapping_gpu = None

forward

forward(*args, **kwargs)
Source code in vllm/lora/layers/logits_processor.py
def forward(self, *args, **kwargs):
    return type(self.base_layer).forward(self, *args, **kwargs)

reset_lora

reset_lora(index: int)
Source code in vllm/lora/layers/logits_processor.py
def reset_lora(self, index: int):
    self.lora_a_stacked[index] = 0
    self.lora_b_stacked[index] = 0
    self.embeddings_tensors[index] = float("-inf")

set_lora

set_lora(
    index: int,
    lora_a: Tensor,
    lora_b: Tensor,
    embeddings_tensor: Tensor | None,
)
Source code in vllm/lora/layers/logits_processor.py
def set_lora(
    self,
    index: int,
    lora_a: torch.Tensor,
    lora_b: torch.Tensor,
    embeddings_tensor: torch.Tensor | None,
):
    self.reset_lora(index)
    self.lora_a_stacked[index, 0, : lora_a.shape[0], : lora_a.shape[1]].copy_(
        lora_a, non_blocking=True
    )
    self.lora_b_stacked[index, 0, : lora_b.shape[0], : lora_b.shape[1]].copy_(
        lora_b, non_blocking=True
    )
    if embeddings_tensor is not None:
        self.embeddings_tensors[
            index,
            : embeddings_tensor.shape[0],
            : embeddings_tensor.shape[1],
        ] = embeddings_tensor

MergedColumnParallelLinearWithLoRA

Bases: ColumnParallelLinearWithLoRA

ColumnParallelLinear layer that is composed of 2 sublayers (slices) packed together (e.g. gate_proj + up_proj -> gate_up_proj).

This means we have 2 LoRAs, each applied to one half of the layer.

Both slices must have the same size.

Source code in vllm/lora/layers/column_parallel_linear.py
class MergedColumnParallelLinearWithLoRA(ColumnParallelLinearWithLoRA):
    """ColumnParallelLinear layer that is composed of 2 sublayers (slices)
    packed together (e.g. gate_proj + up_proj -> gate_up_proj).

    This means we have 2 LoRAs, each applied to one half of the layer.

    Both slices must have the same size.
    """

    def __init__(
        self, base_layer: MergedColumnParallelLinear | QKVParallelLinear
    ) -> None:
        super().__init__(base_layer)
        # There are two LoRA layers
        # the output_sizes in MergedColumnParallelLinear is not sharded by tp
        # we need to divide it by the tp_size to get correct slices size
        output_sizes = self.base_layer.output_sizes
        self.output_slices = tuple(
            divide(output_size, self.tp_size) for output_size in output_sizes
        )
        self.n_slices = len(self.output_slices)
        self.output_ids = (self.tp_rank,) * self.n_slices

    def create_lora_weights(
        self,
        max_loras: int,
        lora_config: LoRAConfig,
        model_config: PretrainedConfig | None = None,
    ) -> None:
        """
        The main reason for overriding this function is to enhance  code
        maintainability.
        """
        self.lora_config = lora_config

        lora_a_output_size_per_partition = (
            lora_config.max_lora_rank
            if not lora_config.fully_sharded_loras
            else divide(lora_config.max_lora_rank, self.tp_size)
        )

        self.lora_a_stacked = tuple(
            torch.zeros(
                max_loras,
                1,
                lora_a_output_size_per_partition,
                self.input_size,
                dtype=lora_config.lora_dtype,
                device=self.device,
            )
            for _ in range(self.n_slices)
        )
        self.lora_b_stacked = tuple(
            torch.zeros(
                max_loras,
                1,
                output_size,
                lora_config.max_lora_rank,
                dtype=lora_config.lora_dtype,
                device=self.device,
            )
            for output_size in self.output_slices
        )

    def slice_lora_a(
        self, lora_a: list[torch.Tensor | None]
    ) -> list[torch.Tensor | None]:
        return lora_a

    def slice_lora_b(
        self, lora_b: list[torch.Tensor | None]
    ) -> list[torch.Tensor | None]:
        sliced_lora_b = [None] * self.n_slices
        for i, (shard_id, shard_size) in enumerate(
            zip(self.output_ids, self.output_slices)
        ):
            if (lora_b_i := lora_b[i]) is not None:
                sliced_lora_b[i] = lora_b_i[
                    shard_size * shard_id : shard_size * (shard_id + 1), :
                ]
        return sliced_lora_b

    def set_lora(
        self,
        index: int,
        lora_a: torch.Tensor,
        lora_b: torch.Tensor,
        embeddings_tensor: torch.Tensor | None,
    ):
        self.reset_lora(index)

        if self.tp_size > 1:
            lora_a = self.slice_lora_a(lora_a)
            lora_b = self.slice_lora_b(lora_b)

        for i in range(self.n_slices):
            if (lora_a_i := lora_a[i]) is not None:
                self.lora_a_stacked[i][
                    index, 0, : lora_a_i.shape[0], : lora_a_i.shape[1]
                ].copy_(lora_a_i, non_blocking=True)
            if (lora_b_i := lora_b[i]) is not None:
                self.lora_b_stacked[i][
                    index, 0, : lora_b_i.shape[0], : lora_b_i.shape[1]
                ].copy_(lora_b_i, non_blocking=True)

    @classmethod
    @_not_fully_sharded_can_replace
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        return (
            type(source_layer) is MergedColumnParallelLinear
            and len(packed_modules_list) == 2
        )

n_slices instance-attribute

n_slices = len(output_slices)

output_ids instance-attribute

output_ids = (tp_rank,) * n_slices

output_slices instance-attribute

output_slices = tuple(
    (divide(output_size, tp_size))
    for output_size in output_sizes
)

__init__

__init__(
    base_layer: MergedColumnParallelLinear
    | QKVParallelLinear,
) -> None
Source code in vllm/lora/layers/column_parallel_linear.py
def __init__(
    self, base_layer: MergedColumnParallelLinear | QKVParallelLinear
) -> None:
    super().__init__(base_layer)
    # There are two LoRA layers
    # the output_sizes in MergedColumnParallelLinear is not sharded by tp
    # we need to divide it by the tp_size to get correct slices size
    output_sizes = self.base_layer.output_sizes
    self.output_slices = tuple(
        divide(output_size, self.tp_size) for output_size in output_sizes
    )
    self.n_slices = len(self.output_slices)
    self.output_ids = (self.tp_rank,) * self.n_slices

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/column_parallel_linear.py
@classmethod
@_not_fully_sharded_can_replace
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    return (
        type(source_layer) is MergedColumnParallelLinear
        and len(packed_modules_list) == 2
    )

create_lora_weights

create_lora_weights(
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None

The main reason for overriding this function is to enhance code maintainability.

Source code in vllm/lora/layers/column_parallel_linear.py
def create_lora_weights(
    self,
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None:
    """
    The main reason for overriding this function is to enhance  code
    maintainability.
    """
    self.lora_config = lora_config

    lora_a_output_size_per_partition = (
        lora_config.max_lora_rank
        if not lora_config.fully_sharded_loras
        else divide(lora_config.max_lora_rank, self.tp_size)
    )

    self.lora_a_stacked = tuple(
        torch.zeros(
            max_loras,
            1,
            lora_a_output_size_per_partition,
            self.input_size,
            dtype=lora_config.lora_dtype,
            device=self.device,
        )
        for _ in range(self.n_slices)
    )
    self.lora_b_stacked = tuple(
        torch.zeros(
            max_loras,
            1,
            output_size,
            lora_config.max_lora_rank,
            dtype=lora_config.lora_dtype,
            device=self.device,
        )
        for output_size in self.output_slices
    )

set_lora

set_lora(
    index: int,
    lora_a: Tensor,
    lora_b: Tensor,
    embeddings_tensor: Tensor | None,
)
Source code in vllm/lora/layers/column_parallel_linear.py
def set_lora(
    self,
    index: int,
    lora_a: torch.Tensor,
    lora_b: torch.Tensor,
    embeddings_tensor: torch.Tensor | None,
):
    self.reset_lora(index)

    if self.tp_size > 1:
        lora_a = self.slice_lora_a(lora_a)
        lora_b = self.slice_lora_b(lora_b)

    for i in range(self.n_slices):
        if (lora_a_i := lora_a[i]) is not None:
            self.lora_a_stacked[i][
                index, 0, : lora_a_i.shape[0], : lora_a_i.shape[1]
            ].copy_(lora_a_i, non_blocking=True)
        if (lora_b_i := lora_b[i]) is not None:
            self.lora_b_stacked[i][
                index, 0, : lora_b_i.shape[0], : lora_b_i.shape[1]
            ].copy_(lora_b_i, non_blocking=True)

slice_lora_a

slice_lora_a(
    lora_a: list[Tensor | None],
) -> list[Tensor | None]
Source code in vllm/lora/layers/column_parallel_linear.py
def slice_lora_a(
    self, lora_a: list[torch.Tensor | None]
) -> list[torch.Tensor | None]:
    return lora_a

slice_lora_b

slice_lora_b(
    lora_b: list[Tensor | None],
) -> list[Tensor | None]
Source code in vllm/lora/layers/column_parallel_linear.py
def slice_lora_b(
    self, lora_b: list[torch.Tensor | None]
) -> list[torch.Tensor | None]:
    sliced_lora_b = [None] * self.n_slices
    for i, (shard_id, shard_size) in enumerate(
        zip(self.output_ids, self.output_slices)
    ):
        if (lora_b_i := lora_b[i]) is not None:
            sliced_lora_b[i] = lora_b_i[
                shard_size * shard_id : shard_size * (shard_id + 1), :
            ]
    return sliced_lora_b

MergedColumnParallelLinearWithShardedLoRA

Bases: MergedColumnParallelLinearWithLoRA

Differs from MergedColumnParallelLinearWithLoRA by slicing the LoRA A's also.

Based on S-LoRA, slicing happens along the rank dim.

Source code in vllm/lora/layers/column_parallel_linear.py
class MergedColumnParallelLinearWithShardedLoRA(MergedColumnParallelLinearWithLoRA):
    """
    Differs from MergedColumnParallelLinearWithLoRA by slicing the
    LoRA A's also.

    Based on S-LoRA, slicing happens along the rank dim.
    """

    def slice_lora_a(
        self, lora_a: list[torch.Tensor | None]
    ) -> list[torch.Tensor | None]:
        # NOTE: lora_a contains 2 subloras, and each sublora could be None.
        output_shard_size = self.lora_a_stacked[0].shape[2]
        output_start_idx = self.tp_rank * output_shard_size
        lora_a = [
            lora_a[0][output_start_idx : output_start_idx + output_shard_size, :]
            if lora_a[0] is not None
            else None,
            lora_a[1][output_start_idx : output_start_idx + output_shard_size, :]
            if lora_a[1] is not None
            else None,
        ]
        return lora_a

    def apply(self, x: torch.Tensor, bias: torch.Tensor | None = None) -> torch.Tensor:
        return _mcp_apply(x, bias, self)

    @classmethod
    @_fully_sharded_can_replace
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        # specifying kwargs so they can be easily accessed in decorator
        return super().can_replace_layer(
            source_layer=source_layer,
            lora_config=lora_config,
            packed_modules_list=packed_modules_list,
            model_config=model_config,
            decorate=False,
        )

apply

apply(x: Tensor, bias: Tensor | None = None) -> Tensor
Source code in vllm/lora/layers/column_parallel_linear.py
def apply(self, x: torch.Tensor, bias: torch.Tensor | None = None) -> torch.Tensor:
    return _mcp_apply(x, bias, self)

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/column_parallel_linear.py
@classmethod
@_fully_sharded_can_replace
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    # specifying kwargs so they can be easily accessed in decorator
    return super().can_replace_layer(
        source_layer=source_layer,
        lora_config=lora_config,
        packed_modules_list=packed_modules_list,
        model_config=model_config,
        decorate=False,
    )

slice_lora_a

slice_lora_a(
    lora_a: list[Tensor | None],
) -> list[Tensor | None]
Source code in vllm/lora/layers/column_parallel_linear.py
def slice_lora_a(
    self, lora_a: list[torch.Tensor | None]
) -> list[torch.Tensor | None]:
    # NOTE: lora_a contains 2 subloras, and each sublora could be None.
    output_shard_size = self.lora_a_stacked[0].shape[2]
    output_start_idx = self.tp_rank * output_shard_size
    lora_a = [
        lora_a[0][output_start_idx : output_start_idx + output_shard_size, :]
        if lora_a[0] is not None
        else None,
        lora_a[1][output_start_idx : output_start_idx + output_shard_size, :]
        if lora_a[1] is not None
        else None,
    ]
    return lora_a

MergedQKVParallelLinearWithLoRA

Bases: MergedColumnParallelLinearWithLoRA

MergedColumnParallelLinear layer that is composed of 3 sublayers (slices) packed together in qkv proj fashion (q_proj + k_proj + v_proj -> qkv_proj).

This means we have 3 LoRAs, each applied to one slice of the layer.

Q slice may have different shape than K and V slices (which both have the same shape).

Source code in vllm/lora/layers/column_parallel_linear.py
class MergedQKVParallelLinearWithLoRA(MergedColumnParallelLinearWithLoRA):
    """MergedColumnParallelLinear layer that is composed of 3 sublayers (slices)
    packed together in qkv proj fashion
    (q_proj + k_proj + v_proj -> qkv_proj).

    This means we have 3 LoRAs, each applied to one slice of the layer.

    Q slice may have different shape than K and V slices (which both have
    the same shape).
    """

    def __init__(self, base_layer: QKVParallelLinear) -> None:
        super().__init__(base_layer)
        # There are three LoRA layer.
        self.n_slices = len(self.base_layer.output_sizes)

        self.q_proj_shard_size = self.base_layer.num_heads * self.base_layer.head_size
        self.kv_proj_shard_size = (
            self.base_layer.num_kv_heads * self.base_layer.head_size
        )
        self.q_shard_id = self.tp_rank
        self.kv_shard_id = self.tp_rank // self.base_layer.num_kv_head_replicas

        self.output_slices = (
            self.q_proj_shard_size,
            self.kv_proj_shard_size,
            self.kv_proj_shard_size,
        )
        self.output_ids = (
            self.q_shard_id,
            self.kv_shard_id,
            self.kv_shard_id,
        )

    def create_lora_weights(
        self,
        max_loras: int,
        lora_config: LoRAConfig,
        model_config: PretrainedConfig | None = None,
    ) -> None:
        """
        The main reason for overloading this function is to handle inconsistent
        weight dimensions in qkv lora.
        """
        super().create_lora_weights(max_loras, lora_config, model_config)

    @classmethod
    @_not_fully_sharded_can_replace
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        return type(source_layer) is QKVParallelLinear and len(packed_modules_list) == 3

kv_proj_shard_size instance-attribute

kv_proj_shard_size = num_kv_heads * head_size

kv_shard_id instance-attribute

kv_shard_id = tp_rank // num_kv_head_replicas

n_slices instance-attribute

n_slices = len(output_sizes)

output_ids instance-attribute

output_ids = (q_shard_id, kv_shard_id, kv_shard_id)

output_slices instance-attribute

output_slices = (
    q_proj_shard_size,
    kv_proj_shard_size,
    kv_proj_shard_size,
)

q_proj_shard_size instance-attribute

q_proj_shard_size = num_heads * head_size

q_shard_id instance-attribute

q_shard_id = tp_rank

__init__

__init__(base_layer: QKVParallelLinear) -> None
Source code in vllm/lora/layers/column_parallel_linear.py
def __init__(self, base_layer: QKVParallelLinear) -> None:
    super().__init__(base_layer)
    # There are three LoRA layer.
    self.n_slices = len(self.base_layer.output_sizes)

    self.q_proj_shard_size = self.base_layer.num_heads * self.base_layer.head_size
    self.kv_proj_shard_size = (
        self.base_layer.num_kv_heads * self.base_layer.head_size
    )
    self.q_shard_id = self.tp_rank
    self.kv_shard_id = self.tp_rank // self.base_layer.num_kv_head_replicas

    self.output_slices = (
        self.q_proj_shard_size,
        self.kv_proj_shard_size,
        self.kv_proj_shard_size,
    )
    self.output_ids = (
        self.q_shard_id,
        self.kv_shard_id,
        self.kv_shard_id,
    )

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/column_parallel_linear.py
@classmethod
@_not_fully_sharded_can_replace
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    return type(source_layer) is QKVParallelLinear and len(packed_modules_list) == 3

create_lora_weights

create_lora_weights(
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None

The main reason for overloading this function is to handle inconsistent weight dimensions in qkv lora.

Source code in vllm/lora/layers/column_parallel_linear.py
def create_lora_weights(
    self,
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None:
    """
    The main reason for overloading this function is to handle inconsistent
    weight dimensions in qkv lora.
    """
    super().create_lora_weights(max_loras, lora_config, model_config)

MergedQKVParallelLinearWithShardedLoRA

Bases: MergedQKVParallelLinearWithLoRA

Differs from MergedQKVParallelLinearWithLoRA by slicing the LoRA A's also.

Based on S-LoRA, slicing happens along the rank dim.

Source code in vllm/lora/layers/column_parallel_linear.py
class MergedQKVParallelLinearWithShardedLoRA(MergedQKVParallelLinearWithLoRA):
    """
    Differs from MergedQKVParallelLinearWithLoRA by slicing the
    LoRA A's also.

    Based on S-LoRA, slicing happens along the rank dim.
    """

    def slice_lora_a(
        self, lora_a: list[torch.Tensor | None]
    ) -> list[torch.Tensor | None]:
        # NOTE: lora_a contains 3 subloras, and each sublora could be None.
        shard_size = [self.lora_a_stacked[i].shape[2] for i in range(3)]
        start_idx = [self.tp_rank * shard_size[i] for i in range(3)]
        lora_a = [
            lora_a[0][start_idx[0] : start_idx[0] + shard_size[0], :]
            if lora_a[0] is not None
            else None,
            lora_a[1][start_idx[1] : start_idx[1] + shard_size[1], :]
            if lora_a[1] is not None
            else None,
            lora_a[2][start_idx[2] : start_idx[2] + shard_size[2], :]
            if lora_a[2] is not None
            else None,
        ]
        return lora_a

    def apply(self, x: torch.Tensor, bias: torch.Tensor | None = None) -> torch.Tensor:
        return _mcp_apply(x, bias, self)

    @classmethod
    @_fully_sharded_can_replace
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        # specifying kwargs so they can be easily accessed in decorator
        return super().can_replace_layer(
            source_layer=source_layer,
            lora_config=lora_config,
            packed_modules_list=packed_modules_list,
            model_config=model_config,
            decorate=False,
        )

apply

apply(x: Tensor, bias: Tensor | None = None) -> Tensor
Source code in vllm/lora/layers/column_parallel_linear.py
def apply(self, x: torch.Tensor, bias: torch.Tensor | None = None) -> torch.Tensor:
    return _mcp_apply(x, bias, self)

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/column_parallel_linear.py
@classmethod
@_fully_sharded_can_replace
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    # specifying kwargs so they can be easily accessed in decorator
    return super().can_replace_layer(
        source_layer=source_layer,
        lora_config=lora_config,
        packed_modules_list=packed_modules_list,
        model_config=model_config,
        decorate=False,
    )

slice_lora_a

slice_lora_a(
    lora_a: list[Tensor | None],
) -> list[Tensor | None]
Source code in vllm/lora/layers/column_parallel_linear.py
def slice_lora_a(
    self, lora_a: list[torch.Tensor | None]
) -> list[torch.Tensor | None]:
    # NOTE: lora_a contains 3 subloras, and each sublora could be None.
    shard_size = [self.lora_a_stacked[i].shape[2] for i in range(3)]
    start_idx = [self.tp_rank * shard_size[i] for i in range(3)]
    lora_a = [
        lora_a[0][start_idx[0] : start_idx[0] + shard_size[0], :]
        if lora_a[0] is not None
        else None,
        lora_a[1][start_idx[1] : start_idx[1] + shard_size[1], :]
        if lora_a[1] is not None
        else None,
        lora_a[2][start_idx[2] : start_idx[2] + shard_size[2], :]
        if lora_a[2] is not None
        else None,
    ]
    return lora_a

QKVParallelLinearWithLoRA

Bases: ColumnParallelLinearWithLoRA

ColumnParallelLinear layer that is specifically designed for qkv_proj. Certain models, such as chatglm3 and baichuan-7b, only contains a single LoRA within their qkv_proj layer.

During inference with Tensor Parallel, the weights of lora_b must be accurately partitioned according to the respective ranks.

Q slice may have different shape than K and V slices (which both have the same shape).

Source code in vllm/lora/layers/column_parallel_linear.py
class QKVParallelLinearWithLoRA(ColumnParallelLinearWithLoRA):
    """
    ColumnParallelLinear layer that is specifically designed for
    qkv_proj. Certain models, such as chatglm3 and baichuan-7b,
    only contains a single LoRA within their qkv_proj layer.

    During inference with Tensor Parallel, the weights of lora_b
    must be accurately partitioned according to the respective ranks.

    Q slice may have different shape than K and V slices (which both have
    the same shape).
    """

    def __init__(self, base_layer: QKVParallelLinear) -> None:
        super().__init__(base_layer)
        self.q_proj_total_size = (
            self.base_layer.total_num_heads * self.base_layer.head_size
        )
        self.q_proj_shard_size = self.base_layer.num_heads * self.base_layer.head_size
        self.kv_proj_shard_size = (
            self.base_layer.num_kv_heads * self.base_layer.head_size
        )
        self.kv_proj_total_size = (
            self.base_layer.total_num_kv_heads * self.base_layer.head_size
        )
        # There is only one LoRA layer
        self.n_slices = 1

    def slice_lora_b(self, lora_b: torch.Tensor) -> torch.Tensor:
        self.q_shard_id = self.tp_rank
        self.kv_shard_id = self.tp_rank // self.base_layer.num_kv_head_replicas
        lora_b_q = lora_b[
            self.q_proj_shard_size * self.q_shard_id : self.q_proj_shard_size
            * (self.q_shard_id + 1),
            :,
        ]
        k_offset = self.q_proj_total_size
        lora_b_k = lora_b[
            k_offset + self.kv_proj_shard_size * self.kv_shard_id : k_offset
            + self.kv_proj_shard_size * (self.kv_shard_id + 1),
            :,
        ]
        v_offset = k_offset + self.kv_proj_total_size
        lora_b_v = lora_b[
            v_offset + self.kv_proj_shard_size * self.kv_shard_id : v_offset
            + self.kv_proj_shard_size * (self.kv_shard_id + 1),
            :,
        ]
        lora_b = torch.cat([lora_b_q, lora_b_k, lora_b_v], dim=0)
        return lora_b

    @classmethod
    @_not_fully_sharded_can_replace
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        return type(source_layer) is QKVParallelLinear and len(packed_modules_list) == 1

kv_proj_shard_size instance-attribute

kv_proj_shard_size = num_kv_heads * head_size

kv_proj_total_size instance-attribute

kv_proj_total_size = total_num_kv_heads * head_size

n_slices instance-attribute

n_slices = 1

q_proj_shard_size instance-attribute

q_proj_shard_size = num_heads * head_size

q_proj_total_size instance-attribute

q_proj_total_size = total_num_heads * head_size

__init__

__init__(base_layer: QKVParallelLinear) -> None
Source code in vllm/lora/layers/column_parallel_linear.py
def __init__(self, base_layer: QKVParallelLinear) -> None:
    super().__init__(base_layer)
    self.q_proj_total_size = (
        self.base_layer.total_num_heads * self.base_layer.head_size
    )
    self.q_proj_shard_size = self.base_layer.num_heads * self.base_layer.head_size
    self.kv_proj_shard_size = (
        self.base_layer.num_kv_heads * self.base_layer.head_size
    )
    self.kv_proj_total_size = (
        self.base_layer.total_num_kv_heads * self.base_layer.head_size
    )
    # There is only one LoRA layer
    self.n_slices = 1

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/column_parallel_linear.py
@classmethod
@_not_fully_sharded_can_replace
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    return type(source_layer) is QKVParallelLinear and len(packed_modules_list) == 1

slice_lora_b

slice_lora_b(lora_b: Tensor) -> Tensor
Source code in vllm/lora/layers/column_parallel_linear.py
def slice_lora_b(self, lora_b: torch.Tensor) -> torch.Tensor:
    self.q_shard_id = self.tp_rank
    self.kv_shard_id = self.tp_rank // self.base_layer.num_kv_head_replicas
    lora_b_q = lora_b[
        self.q_proj_shard_size * self.q_shard_id : self.q_proj_shard_size
        * (self.q_shard_id + 1),
        :,
    ]
    k_offset = self.q_proj_total_size
    lora_b_k = lora_b[
        k_offset + self.kv_proj_shard_size * self.kv_shard_id : k_offset
        + self.kv_proj_shard_size * (self.kv_shard_id + 1),
        :,
    ]
    v_offset = k_offset + self.kv_proj_total_size
    lora_b_v = lora_b[
        v_offset + self.kv_proj_shard_size * self.kv_shard_id : v_offset
        + self.kv_proj_shard_size * (self.kv_shard_id + 1),
        :,
    ]
    lora_b = torch.cat([lora_b_q, lora_b_k, lora_b_v], dim=0)
    return lora_b

QKVParallelLinearWithShardedLoRA

Bases: QKVParallelLinearWithLoRA

Differs from QKVParallelLinearWithLoRA by slicing the LoRA A's also.

Based on S-LoRA, slicing happens along the rank dim.

Source code in vllm/lora/layers/column_parallel_linear.py
class QKVParallelLinearWithShardedLoRA(QKVParallelLinearWithLoRA):
    """
    Differs from QKVParallelLinearWithLoRA by slicing the
    LoRA A's also.

    Based on S-LoRA, slicing happens along the rank dim.
    """

    def slice_lora_a(self, lora_a: torch.Tensor) -> torch.Tensor:
        shard_size = self.lora_a_stacked[0].shape[2]
        start_idx = self.tp_rank * shard_size
        lora_a = lora_a[start_idx : start_idx + shard_size, :]
        return lora_a

    def apply(self, x: torch.Tensor, bias: torch.Tensor | None = None) -> torch.Tensor:
        return _mcp_apply(x, bias, self)

    @classmethod
    @_fully_sharded_can_replace
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        # specifying kwargs so they can be easily accessed in decorator
        return super().can_replace_layer(
            source_layer=source_layer,
            lora_config=lora_config,
            packed_modules_list=packed_modules_list,
            model_config=model_config,
            decorate=False,
        )

apply

apply(x: Tensor, bias: Tensor | None = None) -> Tensor
Source code in vllm/lora/layers/column_parallel_linear.py
def apply(self, x: torch.Tensor, bias: torch.Tensor | None = None) -> torch.Tensor:
    return _mcp_apply(x, bias, self)

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/column_parallel_linear.py
@classmethod
@_fully_sharded_can_replace
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    # specifying kwargs so they can be easily accessed in decorator
    return super().can_replace_layer(
        source_layer=source_layer,
        lora_config=lora_config,
        packed_modules_list=packed_modules_list,
        model_config=model_config,
        decorate=False,
    )

slice_lora_a

slice_lora_a(lora_a: Tensor) -> Tensor
Source code in vllm/lora/layers/column_parallel_linear.py
def slice_lora_a(self, lora_a: torch.Tensor) -> torch.Tensor:
    shard_size = self.lora_a_stacked[0].shape[2]
    start_idx = self.tp_rank * shard_size
    lora_a = lora_a[start_idx : start_idx + shard_size, :]
    return lora_a

ReplicatedLinearWithLoRA

Bases: BaseLinearLayerWithLoRA

Source code in vllm/lora/layers/replicated_linear.py
class ReplicatedLinearWithLoRA(BaseLinearLayerWithLoRA):
    def __init__(self, base_layer: ReplicatedLinear) -> None:
        super().__init__(
            base_layer,
        )
        # To ensure interface compatibility, set to 1 always.
        self.output_size = self.base_layer.output_size
        self.n_slices = 1

    def forward(
        self, input_: torch.Tensor
    ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor | None]:
        """Forward of ReplicatedLinearWithLoRA

        Args:
            input_: Tensor whose last dimension is `input_size`.

        Returns:
            - output
            - bias
        """
        bias = self.base_layer.bias if not self.base_layer.skip_bias_add else None

        # Matrix multiply.
        output = self.apply(input_, bias)

        output_bias = self.base_layer.bias if self.base_layer.skip_bias_add else None

        if not self.base_layer.return_bias:
            return output

        return output, output_bias

    # ReplicatedLinear should always be replaced, regardless of the fully
    # sharded LoRAs setting, because it is, by definition, copied per GPU.
    @classmethod
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        return type(source_layer) is ReplicatedLinear

    def slice_lora_a(
        self, lora_a: torch.Tensor | list[torch.Tensor | None]
    ) -> torch.Tensor | list[torch.Tensor | None]:
        """Slice lora a if splitting for tensor parallelism."""
        return lora_a

    def slice_lora_b(
        self, lora_b: torch.Tensor | list[torch.Tensor | None]
    ) -> torch.Tensor | list[torch.Tensor | None]:
        """Slice lora b if splitting with tensor parallelism."""
        return lora_b

n_slices instance-attribute

n_slices = 1

output_size instance-attribute

output_size = output_size

__init__

__init__(base_layer: ReplicatedLinear) -> None
Source code in vllm/lora/layers/replicated_linear.py
def __init__(self, base_layer: ReplicatedLinear) -> None:
    super().__init__(
        base_layer,
    )
    # To ensure interface compatibility, set to 1 always.
    self.output_size = self.base_layer.output_size
    self.n_slices = 1

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/replicated_linear.py
@classmethod
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    return type(source_layer) is ReplicatedLinear

forward

forward(
    input_: Tensor,
) -> Tensor | tuple[Tensor, Tensor | None]

Forward of ReplicatedLinearWithLoRA

Parameters:

Name Type Description Default
input_ Tensor

Tensor whose last dimension is input_size.

required

Returns:

Type Description
Tensor | tuple[Tensor, Tensor | None]
  • output
Tensor | tuple[Tensor, Tensor | None]
  • bias
Source code in vllm/lora/layers/replicated_linear.py
def forward(
    self, input_: torch.Tensor
) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor | None]:
    """Forward of ReplicatedLinearWithLoRA

    Args:
        input_: Tensor whose last dimension is `input_size`.

    Returns:
        - output
        - bias
    """
    bias = self.base_layer.bias if not self.base_layer.skip_bias_add else None

    # Matrix multiply.
    output = self.apply(input_, bias)

    output_bias = self.base_layer.bias if self.base_layer.skip_bias_add else None

    if not self.base_layer.return_bias:
        return output

    return output, output_bias

slice_lora_a

slice_lora_a(
    lora_a: Tensor | list[Tensor | None],
) -> Tensor | list[Tensor | None]

Slice lora a if splitting for tensor parallelism.

Source code in vllm/lora/layers/replicated_linear.py
def slice_lora_a(
    self, lora_a: torch.Tensor | list[torch.Tensor | None]
) -> torch.Tensor | list[torch.Tensor | None]:
    """Slice lora a if splitting for tensor parallelism."""
    return lora_a

slice_lora_b

slice_lora_b(
    lora_b: Tensor | list[Tensor | None],
) -> Tensor | list[Tensor | None]

Slice lora b if splitting with tensor parallelism.

Source code in vllm/lora/layers/replicated_linear.py
def slice_lora_b(
    self, lora_b: torch.Tensor | list[torch.Tensor | None]
) -> torch.Tensor | list[torch.Tensor | None]:
    """Slice lora b if splitting with tensor parallelism."""
    return lora_b

RowParallelLinearWithLoRA

Bases: BaseLinearLayerWithLoRA

Source code in vllm/lora/layers/row_parallel_linear.py
class RowParallelLinearWithLoRA(BaseLinearLayerWithLoRA):
    def __init__(self, base_layer: RowParallelLinear) -> None:
        super().__init__(base_layer)

        # reset input_size
        self.input_size = self.base_layer.input_size_per_partition
        self.output_size = self.base_layer.output_size
        # There is only one LoRA layer.
        self.n_slices = 1

    def slice_lora_a(self, lora_a: torch.Tensor) -> torch.Tensor:
        shard_size = self.input_size
        start_idx = self.tp_rank * shard_size
        end_idx = (self.tp_rank + 1) * shard_size
        lora_a = lora_a[:, start_idx:end_idx]
        return lora_a

    def slice_lora_b(self, lora_b: torch.Tensor) -> torch.Tensor:
        return lora_b

    def forward(
        self, input_: torch.Tensor
    ) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor | None]:
        """Forward of RowParallelLinear

        Args:
            input_: tensor whose last dimension is `input_size`. If
                    `input_is_parallel` is set, then the last dimension
                    is `input_size // tp_size`.

        Returns:
            - output
            - bias
        """
        # set up backprop all-reduce.
        if self.base_layer.input_is_parallel:
            input_parallel = input_
        else:
            # TODO: simplify code below
            splitted_input = split_tensor_along_last_dim(
                input_, num_partitions=self.tp_size
            )
            input_parallel = splitted_input[self.tp_rank].contiguous()

        # Matrix multiply.
        output_parallel = self.apply(input_parallel)
        if self.base_layer.reduce_results and self.tp_size > 1:
            output_ = tensor_model_parallel_all_reduce(output_parallel)
        else:
            output_ = output_parallel

        if not self.base_layer.skip_bias_add:
            output = (
                output_ + self.base_layer.bias
                if self.base_layer.bias is not None
                else output_
            )
            output_bias = None
        else:
            output = output_
            output_bias = self.base_layer.bias

        if not self.base_layer.return_bias:
            return output

        return output, output_bias

    @classmethod
    @_not_fully_sharded_can_replace
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        return type(source_layer) is RowParallelLinear

input_size instance-attribute

input_size = input_size_per_partition

n_slices instance-attribute

n_slices = 1

output_size instance-attribute

output_size = output_size

__init__

__init__(base_layer: RowParallelLinear) -> None
Source code in vllm/lora/layers/row_parallel_linear.py
def __init__(self, base_layer: RowParallelLinear) -> None:
    super().__init__(base_layer)

    # reset input_size
    self.input_size = self.base_layer.input_size_per_partition
    self.output_size = self.base_layer.output_size
    # There is only one LoRA layer.
    self.n_slices = 1

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/row_parallel_linear.py
@classmethod
@_not_fully_sharded_can_replace
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    return type(source_layer) is RowParallelLinear

forward

forward(
    input_: Tensor,
) -> Tensor | tuple[Tensor, Tensor | None]

Forward of RowParallelLinear

Parameters:

Name Type Description Default
input_ Tensor

tensor whose last dimension is input_size. If input_is_parallel is set, then the last dimension is input_size // tp_size.

required

Returns:

Type Description
Tensor | tuple[Tensor, Tensor | None]
  • output
Tensor | tuple[Tensor, Tensor | None]
  • bias
Source code in vllm/lora/layers/row_parallel_linear.py
def forward(
    self, input_: torch.Tensor
) -> torch.Tensor | tuple[torch.Tensor, torch.Tensor | None]:
    """Forward of RowParallelLinear

    Args:
        input_: tensor whose last dimension is `input_size`. If
                `input_is_parallel` is set, then the last dimension
                is `input_size // tp_size`.

    Returns:
        - output
        - bias
    """
    # set up backprop all-reduce.
    if self.base_layer.input_is_parallel:
        input_parallel = input_
    else:
        # TODO: simplify code below
        splitted_input = split_tensor_along_last_dim(
            input_, num_partitions=self.tp_size
        )
        input_parallel = splitted_input[self.tp_rank].contiguous()

    # Matrix multiply.
    output_parallel = self.apply(input_parallel)
    if self.base_layer.reduce_results and self.tp_size > 1:
        output_ = tensor_model_parallel_all_reduce(output_parallel)
    else:
        output_ = output_parallel

    if not self.base_layer.skip_bias_add:
        output = (
            output_ + self.base_layer.bias
            if self.base_layer.bias is not None
            else output_
        )
        output_bias = None
    else:
        output = output_
        output_bias = self.base_layer.bias

    if not self.base_layer.return_bias:
        return output

    return output, output_bias

slice_lora_a

slice_lora_a(lora_a: Tensor) -> Tensor
Source code in vllm/lora/layers/row_parallel_linear.py
def slice_lora_a(self, lora_a: torch.Tensor) -> torch.Tensor:
    shard_size = self.input_size
    start_idx = self.tp_rank * shard_size
    end_idx = (self.tp_rank + 1) * shard_size
    lora_a = lora_a[:, start_idx:end_idx]
    return lora_a

slice_lora_b

slice_lora_b(lora_b: Tensor) -> Tensor
Source code in vllm/lora/layers/row_parallel_linear.py
def slice_lora_b(self, lora_b: torch.Tensor) -> torch.Tensor:
    return lora_b

RowParallelLinearWithShardedLoRA

Bases: RowParallelLinearWithLoRA

Differs from RowParallelLinearWithLoRA by slicing the LoRA B's also.

Based on S-LoRA, slicing happens along the output dim. This yields a combined partial sum from the row parallel base layer and column partitioned output from the LoRA.

Source code in vllm/lora/layers/row_parallel_linear.py
class RowParallelLinearWithShardedLoRA(RowParallelLinearWithLoRA):
    """
    Differs from RowParallelLinearWithLoRA by slicing the
    LoRA B's also.

    Based on S-LoRA, slicing happens along the output dim.
    This yields a combined partial sum from the row parallel base
    layer and column partitioned output from the LoRA.
    """

    def slice_lora_b(self, lora_b: torch.Tensor) -> torch.Tensor:
        shard_size = self.lora_b_stacked[0].shape[2]
        start_idx = self.tp_rank * shard_size
        end_idx = (self.tp_rank + 1) * shard_size
        lora_b = lora_b[start_idx:end_idx, :]
        return lora_b

    def apply(self, x: torch.Tensor, bias: torch.Tensor | None = None) -> torch.Tensor:
        output = self.base_layer.quant_method.apply(self.base_layer, x)

        x = x.view(-1, x.shape[-1])
        output, out_orig_shape = output.view(-1, output.shape[-1]), output.shape
        buffer = torch.zeros(
            (self.n_slices, x.shape[0], self.lora_a_stacked[0].shape[2]),
            dtype=torch.float32,
            device=x.device,
        )

        shrunk_buffer: torch.Tensor | None = self.punica_wrapper.add_shrink(
            buffer, x, self.lora_a_stacked, 1.0
        )
        if not current_platform.can_update_inplace():
            buffer = shrunk_buffer
        if self.tp_size > 1:
            buffer = tensor_model_parallel_all_reduce(buffer)

        # following S-LoRA, allows the fusing of all_gather and all_reduce
        # by adding the column partitioned lora output to a slice of output
        # tensor, which is a partial sum due to row parallel. All that
        # remains is a standard all_reduce. User should be aware though that
        # the output is not the same as a normal row_parallel, it should be
        # reduced before being used
        # NOTE offset are based on the rank.
        shard_size = self.lora_b_stacked[0].shape[2]
        offset_start = self.tp_rank * shard_size
        lora_output: torch.Tensor | None = self.punica_wrapper.add_expand(
            output,
            buffer,
            self.lora_b_stacked,
            self.output_slices,
            offset_start=offset_start,
            add_input=True,
        )

        if not current_platform.can_update_inplace():
            output = lora_output

        output = output.view(*out_orig_shape)
        return output

    @classmethod
    @_fully_sharded_can_replace
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        # specifying kwargs so they can be easily accessed in decorator
        return super().can_replace_layer(
            source_layer=source_layer,
            lora_config=lora_config,
            packed_modules_list=packed_modules_list,
            model_config=model_config,
            decorate=False,
        )

apply

apply(x: Tensor, bias: Tensor | None = None) -> Tensor
Source code in vllm/lora/layers/row_parallel_linear.py
def apply(self, x: torch.Tensor, bias: torch.Tensor | None = None) -> torch.Tensor:
    output = self.base_layer.quant_method.apply(self.base_layer, x)

    x = x.view(-1, x.shape[-1])
    output, out_orig_shape = output.view(-1, output.shape[-1]), output.shape
    buffer = torch.zeros(
        (self.n_slices, x.shape[0], self.lora_a_stacked[0].shape[2]),
        dtype=torch.float32,
        device=x.device,
    )

    shrunk_buffer: torch.Tensor | None = self.punica_wrapper.add_shrink(
        buffer, x, self.lora_a_stacked, 1.0
    )
    if not current_platform.can_update_inplace():
        buffer = shrunk_buffer
    if self.tp_size > 1:
        buffer = tensor_model_parallel_all_reduce(buffer)

    # following S-LoRA, allows the fusing of all_gather and all_reduce
    # by adding the column partitioned lora output to a slice of output
    # tensor, which is a partial sum due to row parallel. All that
    # remains is a standard all_reduce. User should be aware though that
    # the output is not the same as a normal row_parallel, it should be
    # reduced before being used
    # NOTE offset are based on the rank.
    shard_size = self.lora_b_stacked[0].shape[2]
    offset_start = self.tp_rank * shard_size
    lora_output: torch.Tensor | None = self.punica_wrapper.add_expand(
        output,
        buffer,
        self.lora_b_stacked,
        self.output_slices,
        offset_start=offset_start,
        add_input=True,
    )

    if not current_platform.can_update_inplace():
        output = lora_output

    output = output.view(*out_orig_shape)
    return output

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/row_parallel_linear.py
@classmethod
@_fully_sharded_can_replace
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    # specifying kwargs so they can be easily accessed in decorator
    return super().can_replace_layer(
        source_layer=source_layer,
        lora_config=lora_config,
        packed_modules_list=packed_modules_list,
        model_config=model_config,
        decorate=False,
    )

slice_lora_b

slice_lora_b(lora_b: Tensor) -> Tensor
Source code in vllm/lora/layers/row_parallel_linear.py
def slice_lora_b(self, lora_b: torch.Tensor) -> torch.Tensor:
    shard_size = self.lora_b_stacked[0].shape[2]
    start_idx = self.tp_rank * shard_size
    end_idx = (self.tp_rank + 1) * shard_size
    lora_b = lora_b[start_idx:end_idx, :]
    return lora_b

VocabParallelEmbeddingWithLoRA

Bases: BaseLayerWithLoRA

Source code in vllm/lora/layers/vocal_parallel_embedding.py
class VocabParallelEmbeddingWithLoRA(BaseLayerWithLoRA):
    def __init__(self, base_layer: VocabParallelEmbedding) -> None:
        super().__init__()
        self.base_layer = base_layer
        self.embeddings_slice: tuple[int, int] | None
        self.embeddings_weights: torch.Tensor | None

    def create_lora_weights(
        self,
        max_loras: int,
        lora_config: LoRAConfig,
        model_config: PretrainedConfig | None = None,
    ) -> None:
        if self.base_layer.num_added_embeddings_per_partition > 0:
            # We can start adding lora weights
            self.embeddings_weights = self.base_layer.weight.data[
                self.base_layer.num_org_embeddings_per_partition : self.base_layer.num_org_embeddings_per_partition  # noqa: E501
                + self.base_layer.num_added_embeddings_per_partition
            ]
            self.embeddings_slice = (
                self.base_layer.shard_indices.added_vocab_start_index
                - self.base_layer.org_vocab_size,
                self.base_layer.shard_indices.added_vocab_end_index
                - self.base_layer.org_vocab_size,
            )
            self.base_layer.weight.data[
                self.base_layer.num_org_embeddings_per_partition :
            ].fill_(0)
        else:
            self.embeddings_slice = None
            self.embeddings_weights = None

        self.embeddings_tensors = torch.zeros(
            (
                max_loras,
                lora_config.lora_extra_vocab_size,
                self.base_layer.embedding_dim,
            ),
            dtype=self.base_layer.weight.dtype,
            device=self.base_layer.weight.device,
        )
        self.lora_a_stacked = torch.zeros(
            (
                max_loras,
                self.base_layer.org_vocab_size + lora_config.lora_extra_vocab_size,
                lora_config.max_lora_rank,
            ),
            dtype=lora_config.lora_dtype,
            device=self.base_layer.weight.device,
        )
        self.lora_b_stacked = torch.zeros(
            (
                max_loras,
                1,
                self.base_layer.embedding_dim,
                lora_config.max_lora_rank,
            ),
            dtype=lora_config.lora_dtype,
            device=self.base_layer.weight.device,
        )
        self.lora_a_stacked_2d = self.lora_a_stacked.view(
            self.lora_a_stacked.shape[0] * self.lora_a_stacked.shape[1],
            self.lora_a_stacked.shape[2],
        )

    def reset_lora(self, index: int):
        self.lora_a_stacked[index] = 0
        self.lora_b_stacked[index] = 0
        self.embeddings_tensors[index] = 0

    def set_lora(
        self,
        index: int,
        lora_a: torch.Tensor,
        lora_b: torch.Tensor,
        embeddings_tensor: torch.Tensor | None,
    ):
        self.reset_lora(index)
        # NOTE self.lora_a_stacked is row-major, and lora_a is col-major,
        # so we need transpose here
        self.lora_a_stacked[index, : lora_a.shape[1], : lora_a.shape[0]].copy_(
            lora_a.T, non_blocking=True
        )
        self.lora_b_stacked[index, 0, : lora_b.shape[0], : lora_b.shape[1]].copy_(
            lora_b, non_blocking=True
        )
        if embeddings_tensor is not None:
            self.embeddings_tensors[
                index,
                : embeddings_tensor.shape[0],
                : embeddings_tensor.shape[1],
            ].copy_(embeddings_tensor, non_blocking=True)
            if self.embeddings_slice is not None:
                # TODO(yard1): Optimize this copy, we don't need to copy
                # everything, just the modified part
                embeddings = self.embeddings_tensors.view(
                    self.embeddings_tensors.shape[0] * self.embeddings_tensors.shape[1],
                    self.embeddings_tensors.shape[2],
                )[self.embeddings_slice[0] : self.embeddings_slice[1]]
                assert self.embeddings_weights is not None
                self.embeddings_weights[: embeddings.shape[0]].copy_(embeddings)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        added_tokens_mask = torch.where(x > self.base_layer.org_vocab_size - 1, 1, 0)

        # NB: Don't use torch.narrow here. torch.narrow triggers some
        # Dynamic Shape specialization in torch.compile
        num_tokens = x.shape[0]
        indices_1 = self.punica_wrapper._embeddings_indices[1][:num_tokens]
        indices_0 = self.punica_wrapper._embeddings_indices[0][:num_tokens]

        full_lora_a_embeddings = F.embedding(
            x + indices_1,
            self.lora_a_stacked_2d,
        )
        full_output = self.base_layer.forward(x + (indices_0 * added_tokens_mask))

        full_output_org = full_output
        if full_output.ndim == 3:
            full_output = full_output.view(
                full_output.shape[0] * full_output.shape[1], -1
            )
        if full_lora_a_embeddings.ndim == 3:
            full_lora_a_embeddings = full_lora_a_embeddings.view(
                full_lora_a_embeddings.shape[0] * full_lora_a_embeddings.shape[1],
                -1,
            )

        lora_output: torch.Tensor | None = self.punica_wrapper.add_lora_embedding(
            full_output, full_lora_a_embeddings, self.lora_b_stacked, add_input=True
        )

        if not current_platform.can_update_inplace():
            full_output = lora_output

        return full_output.view_as(full_output_org)

    @classmethod
    def can_replace_layer(
        cls,
        source_layer: nn.Module,
        lora_config: LoRAConfig,
        packed_modules_list: list,
        model_config: PretrainedConfig | None,
    ) -> bool:
        return type(source_layer) is VocabParallelEmbedding

    @property
    def weight(self):
        return self.base_layer.weight

base_layer instance-attribute

base_layer = base_layer

embeddings_slice instance-attribute

embeddings_slice: tuple[int, int] | None

embeddings_weights instance-attribute

embeddings_weights: Tensor | None

weight property

weight

__init__

__init__(base_layer: VocabParallelEmbedding) -> None
Source code in vllm/lora/layers/vocal_parallel_embedding.py
def __init__(self, base_layer: VocabParallelEmbedding) -> None:
    super().__init__()
    self.base_layer = base_layer
    self.embeddings_slice: tuple[int, int] | None
    self.embeddings_weights: torch.Tensor | None

can_replace_layer classmethod

can_replace_layer(
    source_layer: Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool
Source code in vllm/lora/layers/vocal_parallel_embedding.py
@classmethod
def can_replace_layer(
    cls,
    source_layer: nn.Module,
    lora_config: LoRAConfig,
    packed_modules_list: list,
    model_config: PretrainedConfig | None,
) -> bool:
    return type(source_layer) is VocabParallelEmbedding

create_lora_weights

create_lora_weights(
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None
Source code in vllm/lora/layers/vocal_parallel_embedding.py
def create_lora_weights(
    self,
    max_loras: int,
    lora_config: LoRAConfig,
    model_config: PretrainedConfig | None = None,
) -> None:
    if self.base_layer.num_added_embeddings_per_partition > 0:
        # We can start adding lora weights
        self.embeddings_weights = self.base_layer.weight.data[
            self.base_layer.num_org_embeddings_per_partition : self.base_layer.num_org_embeddings_per_partition  # noqa: E501
            + self.base_layer.num_added_embeddings_per_partition
        ]
        self.embeddings_slice = (
            self.base_layer.shard_indices.added_vocab_start_index
            - self.base_layer.org_vocab_size,
            self.base_layer.shard_indices.added_vocab_end_index
            - self.base_layer.org_vocab_size,
        )
        self.base_layer.weight.data[
            self.base_layer.num_org_embeddings_per_partition :
        ].fill_(0)
    else:
        self.embeddings_slice = None
        self.embeddings_weights = None

    self.embeddings_tensors = torch.zeros(
        (
            max_loras,
            lora_config.lora_extra_vocab_size,
            self.base_layer.embedding_dim,
        ),
        dtype=self.base_layer.weight.dtype,
        device=self.base_layer.weight.device,
    )
    self.lora_a_stacked = torch.zeros(
        (
            max_loras,
            self.base_layer.org_vocab_size + lora_config.lora_extra_vocab_size,
            lora_config.max_lora_rank,
        ),
        dtype=lora_config.lora_dtype,
        device=self.base_layer.weight.device,
    )
    self.lora_b_stacked = torch.zeros(
        (
            max_loras,
            1,
            self.base_layer.embedding_dim,
            lora_config.max_lora_rank,
        ),
        dtype=lora_config.lora_dtype,
        device=self.base_layer.weight.device,
    )
    self.lora_a_stacked_2d = self.lora_a_stacked.view(
        self.lora_a_stacked.shape[0] * self.lora_a_stacked.shape[1],
        self.lora_a_stacked.shape[2],
    )

forward

forward(x: Tensor) -> Tensor
Source code in vllm/lora/layers/vocal_parallel_embedding.py
def forward(self, x: torch.Tensor) -> torch.Tensor:
    added_tokens_mask = torch.where(x > self.base_layer.org_vocab_size - 1, 1, 0)

    # NB: Don't use torch.narrow here. torch.narrow triggers some
    # Dynamic Shape specialization in torch.compile
    num_tokens = x.shape[0]
    indices_1 = self.punica_wrapper._embeddings_indices[1][:num_tokens]
    indices_0 = self.punica_wrapper._embeddings_indices[0][:num_tokens]

    full_lora_a_embeddings = F.embedding(
        x + indices_1,
        self.lora_a_stacked_2d,
    )
    full_output = self.base_layer.forward(x + (indices_0 * added_tokens_mask))

    full_output_org = full_output
    if full_output.ndim == 3:
        full_output = full_output.view(
            full_output.shape[0] * full_output.shape[1], -1
        )
    if full_lora_a_embeddings.ndim == 3:
        full_lora_a_embeddings = full_lora_a_embeddings.view(
            full_lora_a_embeddings.shape[0] * full_lora_a_embeddings.shape[1],
            -1,
        )

    lora_output: torch.Tensor | None = self.punica_wrapper.add_lora_embedding(
        full_output, full_lora_a_embeddings, self.lora_b_stacked, add_input=True
    )

    if not current_platform.can_update_inplace():
        full_output = lora_output

    return full_output.view_as(full_output_org)

reset_lora

reset_lora(index: int)
Source code in vllm/lora/layers/vocal_parallel_embedding.py
def reset_lora(self, index: int):
    self.lora_a_stacked[index] = 0
    self.lora_b_stacked[index] = 0
    self.embeddings_tensors[index] = 0

set_lora

set_lora(
    index: int,
    lora_a: Tensor,
    lora_b: Tensor,
    embeddings_tensor: Tensor | None,
)
Source code in vllm/lora/layers/vocal_parallel_embedding.py
def set_lora(
    self,
    index: int,
    lora_a: torch.Tensor,
    lora_b: torch.Tensor,
    embeddings_tensor: torch.Tensor | None,
):
    self.reset_lora(index)
    # NOTE self.lora_a_stacked is row-major, and lora_a is col-major,
    # so we need transpose here
    self.lora_a_stacked[index, : lora_a.shape[1], : lora_a.shape[0]].copy_(
        lora_a.T, non_blocking=True
    )
    self.lora_b_stacked[index, 0, : lora_b.shape[0], : lora_b.shape[1]].copy_(
        lora_b, non_blocking=True
    )
    if embeddings_tensor is not None:
        self.embeddings_tensors[
            index,
            : embeddings_tensor.shape[0],
            : embeddings_tensor.shape[1],
        ].copy_(embeddings_tensor, non_blocking=True)
        if self.embeddings_slice is not None:
            # TODO(yard1): Optimize this copy, we don't need to copy
            # everything, just the modified part
            embeddings = self.embeddings_tensors.view(
                self.embeddings_tensors.shape[0] * self.embeddings_tensors.shape[1],
                self.embeddings_tensors.shape[2],
            )[self.embeddings_slice[0] : self.embeddings_slice[1]]
            assert self.embeddings_weights is not None
            self.embeddings_weights[: embeddings.shape[0]].copy_(embeddings)