admin 管理员组文章数量: 1184232
OpenAI API 格式规范与Fastapi实现指南
目录
- 概述
- API 规范
- 请求格式
- 响应格式
- 流式响应
- 错误处理
- FastAPI 实现
- 认证与安全
- 最佳实践
- 示例代码
概述
OpenAI API 是一套标准化的 RESTful API 接口,用于与大型语言模型进行交互。本文档详细介绍了如何实现兼容 OpenAI API 格式的服务接口。
核心特性
- 标准化格式:遵循 OpenAI 官方 API 规范
- 流式响应:支持实时流式输出
- 多模型支持:可适配不同的语言模型
- 错误处理:完善的错误响应机制
- 认证安全:Bearer Token 认证方式
API 规范
基础 URL 结构
GET /v1/models # 获取模型列表
POST /v1/chat/completions # 聊天完成接口
GET /health # 健康检查
HTTP 头部要求
Content-Type: application/json
Authorization: Bearer YOUR_API_KEY
Accept: application/json
状态码规范
| 状态码 | 含义 | 使用场景 |
|---|---|---|
| 200 | 成功 | 正常响应 |
| 400 | 请求错误 | 参数验证失败 |
| 401 | 未授权 | API Key 无效 |
| 404 | 未找到 | 路由不存在 |
| 422 | 参数错误 | 请求格式错误 |
| 500 | 服务器错误 | 内部处理异常 |
请求格式
聊天完成请求 (POST /v1/chat/completions)
基本参数
{
"model": "gpt-3.5-turbo",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant."
},
{
"role": "user",
"content": "Hello, how are you?"
}
],
"stream": false,
"temperature": 0.7,
"max_tokens": 1000,
"top_p": 1.0,
"frequency_penalty": 0.0,
"presence_penalty": 0.0,
"stop": null,
"user": "user-123"
}
参数详解
| 参数 | 类型 | 必需 | 默认值 | 说明 |
|---|---|---|---|---|
model | string | ✓ | - | 模型名称 |
messages | array | ✓ | - | 对话消息列表 |
stream | boolean | ✗ | false | 是否流式响应 |
temperature | number | ✗ | 1.0 | 随机性控制 (0.0-2.0) |
max_tokens | integer | ✗ | null | 最大生成 token 数 |
top_p | number | ✗ | 1.0 | 核采样参数 (0.0-1.0) |
frequency_penalty | number | ✗ | 0.0 | 频率惩罚 (-2.0-2.0) |
presence_penalty | number | ✗ | 0.0 | 存在惩罚 (-2.0-2.0) |
stop | string/array | ✗ | null | 停止序列 |
user | string | ✗ | null | 用户标识 |
消息格式
{
"role": "user|assistant|system|function",
"content": "消息内容",
"name": "可选的名称",
"reasoning": "可选的推理内容(用于 reasoning 模型)"
}
角色说明:
system: 系统提示,设定 AI 的行为user: 用户消息assistant: AI 助手的回复function: 函数调用结果
响应格式
非流式响应
{
"id": "chatcmpl-123456789",
"object": "chatpletion",
"created": 1677652288,
"model": "gpt-3.5-turbo",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "Hello! I'm doing well, thank you for asking.",
"reasoning": "用户问候,我应该礼貌回应"
},
"finish_reason": "stop"
}
],
"usage": {
"prompt_tokens": 20,
"completion_tokens": 15,
"total_tokens": 35
}
}
响应字段说明
| 字段 | 类型 | 说明 |
|---|---|---|
id | string | 响应唯一标识符 |
object | string | 对象类型 |
created | integer | 创建时间戳 |
model | string | 使用的模型 |
choices | array | 生成的选择列表 |
usage | object | Token 使用统计 |
finish_reason 说明
| 值 | 说明 |
|---|---|
stop | 自然结束 |
length | 达到最大长度 |
content_filter | 内容过滤 |
function_call | 函数调用 |
流式响应
Server-Sent Events (SSE) 格式
流式响应使用 SSE 格式,每个数据块以 data: 开头:
data: {"id":"chatcmpl-123","object":"chatpletion.chunk","created":1677652288,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chatpletion.chunk","created":1677652288,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{"content":" there!"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chatpletion.chunk","created":1677652288,"model":"gpt-3.5-turbo","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]
流式响应结构
{
"id": "chatcmpl-123456789",
"object": "chatpletion.chunk",
"created": 1677652288,
"model": "gpt-3.5-turbo",
"choices": [
{
"index": 0,
"delta": {
"role": "assistant",
"content": "Hello",
"reasoning": "推理内容(可选)"
},
"finish_reason": null
}
]
}
流式响应要点
- Content-Type:
text/event-stream - 缓存控制:
Cache-Control: no-cache - 连接保持:
Connection: keep-alive - 结束标记: 最后发送
data: [DONE] - 错误处理: 异常时发送错误信息并结束流
错误处理
错误响应格式
{
"error": {
"message": "Invalid API key provided",
"type": "invalid_request_error",
"param": "api_key",
"code": "invalid_api_key"
}
}
常见错误类型
| 错误类型 | HTTP 状态码 | 说明 |
|---|---|---|
invalid_request_error | 400 | 请求格式错误 |
invalid_api_key | 401 | API Key 无效 |
insufficient_quota | 429 | 配额不足 |
model_not_found | 404 | 模型不存在 |
server_error | 500 | 服务器内部错误 |
FastAPI 实现
项目结构
project/
├── main.py # 应用入口
├── models.py # 数据模型
├── api_server.py # API 路由
├── converter.py # 格式转换
├── config.py # 配置管理
└── requirements.txt # 依赖包
数据模型定义 (models.py)
from typing import List, Optional, Union, Dict, Any
from pydantic import BaseModel, Field
from enum import Enum
class Role(str, Enum):
"""消息角色枚举"""
SYSTEM = "system"
USER = "user"
ASSISTANT = "assistant"
FUNCTION = "function"
class Message(BaseModel):
"""聊天消息模型"""
role: Role
content: str
name: Optional[str] = None
reasoning: Optional[str] = None
class ChatCompletionRequest(BaseModel):
"""聊天完成请求模型"""
model: str
messages: List[Message]
temperature: Optional[float] = Field(default=1.0, ge=0.0, le=2.0)
top_p: Optional[float] = Field(default=1.0, ge=0.0, le=1.0)
n: Optional[int] = Field(default=1, ge=1)
stream: Optional[bool] = False
stop: Optional[Union[str, List[str]]] = None
max_tokens: Optional[int] = Field(default=None, ge=1)
presence_penalty: Optional[float] = Field(default=0.0, ge=-2.0, le=2.0)
frequency_penalty: Optional[float] = Field(default=0.0, ge=-2.0, le=2.0)
user: Optional[str] = None
class Usage(BaseModel):
"""使用情况统计"""
prompt_tokens: int
completion_tokens: int
total_tokens: int
class Choice(BaseModel):
"""选择项模型"""
index: int
message: Message
finish_reason: Optional[str] = None
class ChatCompletionResponse(BaseModel):
"""聊天完成响应模型"""
id: str
object: str = "chatpletion"
created: int
model: str
choices: List[Choice]
usage: Usage
class Delta(BaseModel):
"""流式响应增量"""
role: Optional[str] = None
content: Optional[str] = None
reasoning: Optional[str] = None
class StreamChoice(BaseModel):
"""流式选择项"""
index: int
delta: Delta
finish_reason: Optional[str] = None
class ChatCompletionStreamResponse(BaseModel):
"""流式聊天完成响应"""
id: str
object: str = "chatpletion.chunk"
created: int
model: str
choices: List[StreamChoice]
API 路由实现 (api_server.py)
from fastapi import FastAPI, HTTPException, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
import logging
import time
from typing import AsyncGenerator
from models import (
ChatCompletionRequest,
ChatCompletionResponse,
ChatCompletionStreamResponse,
StreamChoice,
Delta
)
app = FastAPI(
title="OpenAI Compatible API",
description="OpenAI 格式兼容的 API 服务",
version="1.0.0"
)
# CORS 中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
async def root():
"""根路径"""
return {
"message": "OpenAI Compatible API Server",
"version": "1.0.0",
"endpoints": {
"chat": "/v1/chat/completions",
"models": "/v1/models",
"health": "/health"
}
}
@app.get("/health")
async def health_check():
"""健康检查"""
return {"status": "healthy", "timestamp": int(time.time())}
@app.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest, http_request: Request):
"""创建聊天完成"""
# 获取 API Key
auth_header = http_request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(
status_code=401,
detail={
"error": {
"message": "Invalid API key provided",
"type": "invalid_request_error",
"code": "invalid_api_key"
}
}
)
api_key = auth_header[7:] # 移除 "Bearer " 前缀
try:
if request.stream:
# 流式响应
return StreamingResponse(
generate_stream_response(request, api_key),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "text/event-stream"
}
)
else:
# 非流式响应
response = await generate_completion(request, api_key)
return response
except Exception as e:
raise HTTPException(
status_code=500,
detail={
"error": {
"message": str(e),
"type": "server_error",
"code": "internal_error"
}
}
)
async def generate_completion(request: ChatCompletionRequest, api_key: str) -> ChatCompletionResponse:
"""生成非流式完成响应"""
# 这里实现你的模型调用逻辑
# 示例实现
response_id = f"chatcmpl-{int(time.time())}"
# 模拟响应
return ChatCompletionResponse(
id=response_id,
created=int(time.time()),
model=request.model,
choices=[
Choice(
index=0,
message=Message(
role="assistant",
content="这是一个示例响应"
),
finish_reason="stop"
)
],
usage=Usage(
prompt_tokens=10,
completion_tokens=5,
total_tokens=15
)
)
async def generate_stream_response(request: ChatCompletionRequest, api_key: str) -> AsyncGenerator[str, None]:
"""生成流式响应"""
response_id = f"chatcmpl-{int(time.time())}"
created = int(time.time())
try:
# 模拟流式输出
content_chunks = ["这是", "一个", "流式", "响应", "示例"]
for chunk in content_chunks:
stream_response = ChatCompletionStreamResponse(
id=response_id,
created=created,
model=request.model,
choices=[
StreamChoice(
index=0,
delta=Delta(content=chunk),
finish_reason=None
)
]
)
yield f"data: {stream_response.model_dump_json()}\n\n"
# 模拟延迟
import asyncio
await asyncio.sleep(0.1)
# 发送结束标记
final_response = ChatCompletionStreamResponse(
id=response_id,
created=created,
model=request.model,
choices=[
StreamChoice(
index=0,
delta=Delta(),
finish_reason="stop"
)
]
)
yield f"data: {final_response.model_dump_json()}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
# 错误处理
error_response = {
"error": {
"message": str(e),
"type": "server_error",
"code": "internal_error"
}
}
yield f"data: {json.dumps(error_response)}\n\n"
yield "data: [DONE]\n\n"
@app.get("/v1/models")
async def list_models():
"""获取模型列表"""
return {
"object": "list",
"data": [
{
"id": "gpt-3.5-turbo",
"object": "model",
"created": 1677610602,
"owned_by": "openai"
},
{
"id": "gpt-4",
"object": "model",
"created": 1687882411,
"owned_by": "openai"
}
]
}
配置管理 (config.py)
import os
from typing import List
from dotenv import load_dotenv
load_dotenv()
class Config:
"""应用配置类"""
def __init__(self):
# 服务器配置
self.host: str = os.getenv("HOST", "0.0.0.0")
self.port: int = int(os.getenv("PORT", "8000"))
self.debug: bool = os.getenv("DEBUG", "false").lower() == "true"
# API 配置
self.api_prefix: str = os.getenv("API_PREFIX", "/v1")
self.cors_origins: List[str] = os.getenv("CORS_ORIGINS", "*").split(",")
# 模型配置
self.default_model: str = os.getenv("DEFAULT_MODEL", "gpt-3.5-turbo")
self.max_tokens: int = int(os.getenv("MAX_TOKENS", "2048"))
# 安全配置
self.api_keys: List[str] = os.getenv("API_KEYS", "").split(",")
def validate_api_key(self, api_key: str) -> bool:
"""验证 API Key"""
if not self.api_keys or not self.api_keys[0]:
return True # 如果没有配置 API Key,则不验证
return api_key in self.api_keys
config = Config()
应用入口 (main.py)
import uvicorn
from api_server import app
from config import config
if __name__ == "__main__":
uvicorn.run(
"api_server:app",
host=config.host,
port=config.port,
reload=config.debug,
log_level="info" if not config.debug else "debug"
)
认证与安全
Bearer Token 认证
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
async def verify_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""验证 API Key"""
if not config.validate_api_key(credentials.credentials):
raise HTTPException(
status_code=401,
detail={
"error": {
"message": "Invalid API key provided",
"type": "invalid_request_error",
"code": "invalid_api_key"
}
}
)
return credentials.credentials
# 在路由中使用
@app.post("/v1/chat/completions")
async def create_chat_completion(
request: ChatCompletionRequest,
api_key: str = Depends(verify_api_key)
):
# 处理请求
pass
请求限制
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@app.post("/v1/chat/completions")
@limiter.limit("10/minute")
async def create_chat_completion(request: Request, ...):
# 处理请求
pass
最佳实践
1. 错误处理
# 统一错误处理中间件
@app.middleware("http")
async def error_handling_middleware(request: Request, call_next):
try:
response = await call_next(request)
return response
except Exception as e:
return JSONResponse(
status_code=500,
content={
"error": {
"message": "Internal server error",
"type": "server_error",
"code": "internal_error"
}
}
)
2. 日志记录
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 请求日志中间件
@app.middleware("http")
async def log_requests(request: Request, call_next):
start_time = datetime.now()
# 记录请求
logger.info(f"Request: {request.method} {request.url}")
response = await call_next(request)
# 记录响应
process_time = (datetime.now() - start_time).total_seconds()
logger.info(f"Response: {response.status_code} - {process_time:.3f}s")
return response
3. 参数验证
from pydantic import validator
class ChatCompletionRequest(BaseModel):
model: str
messages: List[Message]
temperature: Optional[float] = 1.0
@validator('temperature')
def validate_temperature(cls, v):
if v < 0.0 or v > 2.0:
raise ValueError('temperature must be between 0.0 and 2.0')
return v
@validator('messages')
def validate_messages(cls, v):
if not v:
raise ValueError('messages cannot be empty')
return v
4. 性能优化
# 异步处理
import asyncio
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=4)
async def process_model_request(request_data):
"""异步处理模型请求"""
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor,
sync_model_call,
request_data
)
return result
# 连接池管理
from httpx import AsyncClient
class ModelClient:
def __init__(self):
self.client = AsyncClient(
timeout=30.0,
limits=httpx.Limits(max_connections=100, max_keepalive_connections=20)
)
async def call_model(self, data):
response = await self.client.post(
"https://api.model-provider/v1/completions",
json=data
)
return response.json()
示例代码
客户端调用示例
import requests
import json
# 非流式请求
def test_completion():
url = "http://localhost:8000/v1/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer your-api-key"
}
data = {
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hello, how are you?"}
],
"temperature": 0.7,
"max_tokens": 100
}
response = requests.post(url, headers=headers, json=data)
print(response.json())
# 流式请求
def test_stream():
url = "http://localhost:8000/v1/chat/completions"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer your-api-key"
}
data = {
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Tell me a story"}
],
"stream": True
}
response = requests.post(url, headers=headers, json=data, stream=True)
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
data_content = line[6:]
if data_content == '[DONE]':
break
try:
chunk = json.loads(data_content)
content = chunk['choices'][0]['delta'].get('content', '')
if content:
print(content, end='', flush=True)
except json.JSONDecodeError:
pass
print() # 换行
if __name__ == "__main__":
test_completion()
test_stream()
JavaScript 客户端示例
// 非流式请求
async function testCompletion() {
const response = await fetch('http://localhost:8000/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-api-key'
},
body: JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [
{ role: 'user', content: 'Hello, how are you?' }
],
temperature: 0.7,
max_tokens: 100
})
});
const data = await response.json();
console.log(data);
}
// 流式请求
async function testStream() {
const response = await fetch('http://localhost:8000/v1/chat/completions', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-api-key'
},
body: JSON.stringify({
model: 'gpt-3.5-turbo',
messages: [
{ role: 'user', content: 'Tell me a story' }
],
stream: true
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
return;
}
try {
const parsed = JSON.parse(data);
const content = parsed.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
} catch (e) {
// 忽略解析错误
}
}
}
}
}
部署配置
Docker 部署
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "api_server:app", "--host", "0.0.0.0", "--port", "8000"]
docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DEBUG=false
- API_KEYS=your-secret-key-1,your-secret-key-2
- CORS_ORIGINS=https://yourdomain
volumes:
- ./logs:/app/logs
restart: unless-stopped
requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
python-dotenv==1.0.0
httpx==0.25.2
slowapi==0.1.9
总结
本文档详细介绍了 OpenAI API 格式的规范和 FastAPI 实现方法。通过遵循这些规范和最佳实践,你可以构建一个完全兼容 OpenAI API 的服务,支持:
- ✅ 标准化的请求/响应格式
- ✅ 流式和非流式响应
- ✅ 完善的错误处理
- ✅ Bearer Token 认证
- ✅ 请求限制和安全控制
- ✅ 高性能异步处理
- ✅ 易于部署和扩展
这样的实现可以无缝集成到现有的 OpenAI 生态系统中,为用户提供一致的 API 体验。
本文标签: 格式 指南 OpenAI api FASTAPI
版权声明:本文标题:OpenAI API 格式规范与Fastapi实现指南 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.roclinux.cn/b/1765996480a3430611.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论