| from pydantic import BaseModel |
| from llama_cpp import Llama |
| from concurrent.futures import ThreadPoolExecutor, as_completed |
| import re |
| import httpx |
| import asyncio |
| import gradio as gr |
| import os |
| import gptcache |
| from dotenv import load_dotenv |
| from fastapi import FastAPI, Request |
| from fastapi.responses import JSONResponse |
| import uvicorn |
| from threading import Thread |
|
|
| load_dotenv() |
|
|
| HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") |
|
|
| global_data = { |
| 'models': {}, |
| 'tokens': { |
| 'eos': 'eos_token', |
| 'pad': 'pad_token', |
| 'padding': 'padding_token', |
| 'unk': 'unk_token', |
| 'bos': 'bos_token', |
| 'sep': 'sep_token', |
| 'cls': 'cls_token', |
| 'mask': 'mask_token' |
| }, |
| 'model_metadata': {}, |
| 'max_tokens': 256, |
| 'tokenizers': {}, |
| 'model_params': {}, |
| 'model_size': {}, |
| 'model_ftype': {}, |
| 'n_ctx_train': {}, |
| 'n_embd': {}, |
| 'n_layer': {}, |
| 'n_head': {}, |
| 'n_head_kv': {}, |
| 'n_rot': {}, |
| 'n_swa': {}, |
| 'n_embd_head_k': {}, |
| 'n_embd_head_v': {}, |
| 'n_gqa': {}, |
| 'n_embd_k_gqa': {}, |
| 'n_embd_v_gqa': {}, |
| 'f_norm_eps': {}, |
| 'f_norm_rms_eps': {}, |
| 'f_clamp_kqv': {}, |
| 'f_max_alibi_bias': {}, |
| 'f_logit_scale': {}, |
| 'n_ff': {}, |
| 'n_expert': {}, |
| 'n_expert_used': {}, |
| 'causal_attn': {}, |
| 'pooling_type': {}, |
| 'rope_type': {}, |
| 'rope_scaling': {}, |
| 'freq_base_train': {}, |
| 'freq_scale_train': {}, |
| 'n_ctx_orig_yarn': {}, |
| 'rope_finetuned': {}, |
| 'ssm_d_conv': {}, |
| 'ssm_d_inner': {}, |
| 'ssm_d_state': {}, |
| 'ssm_dt_rank': {}, |
| 'ssm_dt_b_c_rms': {}, |
| 'vocab_type': {}, |
| 'model_type': {} |
| } |
|
|
| model_configs = [ |
| {"repo_id": "Hjgugugjhuhjggg/testing_semifinal-Q2_K-GGUF", "filename": "testing_semifinal-q2_k.gguf", "name": "testing"} |
| ] |
|
|
| class ModelManager: |
| def __init__(self): |
| self.models = {} |
|
|
| def load_model(self, model_config): |
| if model_config['name'] not in self.models: |
| try: |
| self.models[model_config['name']] = Llama.from_pretrained( |
| repo_id=model_config['repo_id'], |
| filename=model_config['filename'], |
| use_auth_token=HUGGINGFACE_TOKEN, |
| n_threads=8, |
| use_gpu=False |
| ) |
| except Exception as e: |
| pass |
|
|
| def load_all_models(self): |
| with ThreadPoolExecutor() as executor: |
| for config in model_configs: |
| executor.submit(self.load_model, config) |
| return self.models |
|
|
| model_manager = ModelManager() |
| global_data['models'] = model_manager.load_all_models() |
|
|
| class ChatRequest(BaseModel): |
| message: str |
|
|
| def normalize_input(input_text): |
| return input_text.strip() |
|
|
| def remove_duplicates(text): |
| lines = text.split('\n') |
| unique_lines = [] |
| seen_lines = set() |
| for line in lines: |
| if line not in seen_lines: |
| unique_lines.append(line) |
| seen_lines.add(line) |
| return '\n'.join(unique_lines) |
|
|
| def cache_response(func): |
| def wrapper(*args, **kwargs): |
| cache_key = f"{args}-{kwargs}" |
| if gptcache.get(cache_key): |
| return gptcache.get(cache_key) |
| response = func(*args, **kwargs) |
| gptcache.set(cache_key, response) |
| return response |
| return wrapper |
|
|
| @cache_response |
| def generate_model_response(model, inputs): |
| try: |
| response = model(inputs) |
| return remove_duplicates(response['choices'][0]['text']) |
| except Exception as e: |
| return "" |
|
|
| def remove_repetitive_responses(responses): |
| unique_responses = {} |
| for response in responses: |
| if response['model'] not in unique_responses: |
| unique_responses[response['model']] = response['response'] |
| return unique_responses |
|
|
| async def process_message(message): |
| inputs = normalize_input(message) |
| with ThreadPoolExecutor() as executor: |
| futures = [ |
| executor.submit(generate_model_response, model, inputs) |
| for model in global_data['models'].values() |
| ] |
| responses = [{'model': model_name, 'response': future.result()} for model_name, future in zip(global_data['models'].keys(), as_completed(futures))] |
| unique_responses = remove_repetitive_responses(responses) |
| formatted_response = "" |
| for model, response in unique_responses.items(): |
| formatted_response += f"**{model}:**\n{response}\n\n" |
| return formatted_response |
|
|
| app = FastAPI() |
|
|
| @app.post("/generate") |
| async def generate(request: ChatRequest): |
| response = await process_message(request.message) |
| return JSONResponse(content={"response": response}) |
|
|
| def run_uvicorn(): |
| uvicorn.run(app, host="0.0.0.0", port=7860) |
|
|
| iface = gr.Interface( |
| fn=process_message, |
| inputs=gr.Textbox(lines=2, placeholder="Enter your message here..."), |
| outputs=gr.Markdown(), |
| title="Multi-Model LLM API (CPU Optimized)", |
| description="Enter a message and get responses from multiple LLMs using CPU." |
| ) |
|
|
| def run_gradio(): |
| iface.launch(server_port=7861, prevent_thread_lock=True) |
|
|
| if __name__ == "__main__": |
| Thread(target=run_uvicorn).start() |
| Thread(target=run_gradio).start() |
|
|