Python的异步编程能力在现代高并发应用中发挥着越来越重要的作用。从基础的协程到复杂的异步架构设计,掌握异步编程已成为Python高级开发的必备技能。本文将深入解析Asyncio的核心原理,并提供生产级别的异步编程解决方案。
![图片[1]-Python异步编程深度实战:从Asyncio到高性能并发架构设计](https://blogimg.vcvcc.cc/2025/11/20251119121909516-1024x768.png?imageView2/0/format/webp/q/75)
一、Asyncio核心原理深度解析
(1) 事件循环与协程调度机制
"""
Asyncio事件循环与协程调度深度解析
"""
import asyncio
import time
from typing import Any, Coroutine, List, Dict
import inspect
from collections import deque
class AdvancedEventLoop:
"""高级事件循环实现 - 深入理解Asyncio调度原理"""
def __init__(self):
self._ready = deque() # 就绪任务队列
self._scheduled = [] # 定时任务堆
self._running = False
self._current_task = None
self._task_count = 0
self._callbacks = []
def create_task(self, coro: Coroutine) -> 'AdvancedTask':
"""创建任务并加入调度"""
task = AdvancedTask(coro, self, f"Task-{self._task_count}")
self._task_count += 1
self._ready.append(task)
return task
def run_until_complete(self, coro: Coroutine) -> Any:
"""运行直到任务完成"""
task = self.create_task(coro)
self._running = True
while self._running and not self.is_empty():
self._run_once()
return task.result()
def _run_once(self) -> None:
"""单次事件循环迭代"""
# 处理就绪任务
while self._ready:
task = self._ready.popleft()
self._current_task = task
try:
# 执行协程
result = task._coro.send(None)
if isinstance(result, asyncio.Future):
# Future对象处理
result.add_done_callback(self._future_done_callback)
elif result is None:
# 普通协程执行完成
self._ready.append(task)
else:
# 其他类型的await对象
self._handle_special_await(result, task)
except StopIteration as e:
# 协程执行完成
task.set_result(e.value)
except Exception as e:
# 协程执行异常
task.set_exception(e)
self._current_task = None
def _future_done_callback(self, future: asyncio.Future) -> None:
"""Future完成回调"""
if not future.cancelled():
task = self._current_task
if task and not task.done():
try:
if future.exception():
task._coro.throw(future.exception())
else:
task._coro.send(future.result())
except StopIteration as e:
task.set_result(e.value)
def _handle_special_await(self, result: Any, task: 'AdvancedTask') -> None:
"""处理特殊await对象"""
# 这里可以扩展支持各种awaitable对象
if hasattr(result, '__await__'):
# 处理可等待对象
new_coro = result.__await__()
task._coro = new_coro
self._ready.append(task)
else:
# 未知类型,抛出异常
raise TypeError(f"Unsupported awaitable type: {type(result)}")
def is_empty(self) -> bool:
"""检查事件循环是否为空"""
return len(self._ready) == 0 and len(self._scheduled) == 0
def stop(self) -> None:
"""停止事件循环"""
self._running = False
class AdvancedTask:
"""高级任务实现"""
def __init__(self, coro: Coroutine, loop: AdvancedEventLoop, name: str):
self._coro = coro
self._loop = loop
self._name = name
self._result = None
self._exception = None
self._done = False
self._callbacks = []
def __await__(self):
return self._coro.__await__()
def set_result(self, result: Any) -> None:
"""设置任务结果"""
self._result = result
self._done = True
self._run_callbacks()
def set_exception(self, exception: Exception) -> None:
"""设置任务异常"""
self._exception = exception
self._done = True
self._run_callbacks()
def result(self) -> Any:
"""获取任务结果"""
if self._exception:
raise self._exception
return self._result
def _run_callbacks(self) -> None:
"""运行完成回调"""
for callback in self._callbacks:
try:
callback(self)
except Exception as e:
print(f"Callback error: {e}")
@property
def done(self) -> bool:
"""检查任务是否完成"""
return self._done
def add_done_callback(self, callback) -> None:
"""添加完成回调"""
if self._done:
callback(self)
else:
self._callbacks.append(callback)
# 协程状态监控装饰器
def monitor_coroutine(func):
"""协程执行监控装饰器"""
async def wrapper(*args, **kwargs):
start_time = time.time()
task_name = func.__name__
print(f"🚀 开始执行协程: {task_name}")
try:
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
print(f"✅ 协程完成: {task_name}, 耗时: {execution_time:.4f}s")
return result
except Exception as e:
execution_time = time.time() - start_time
print(f"❌ 协程失败: {task_name}, 耗时: {execution_time:.4f}s, 错误: {e}")
raise
return wrapper
# 深度测试Asyncio原理
async def test_eventloop_implementation():
"""测试自定义事件循环实现"""
print("=== Asyncio原理深度测试 ===")
# 创建自定义事件循环
loop = AdvancedEventLoop()
@monitor_coroutine
async def simple_coroutine(name: str, delay: float):
"""简单协程示例"""
print(f"协程 {name} 开始执行")
await asyncio.sleep(delay)
print(f"协程 {name} 执行完成")
return f"{name}_result"
@monitor_coroutine
async def complex_coroutine():
"""复杂协程示例"""
results = []
# 并发执行多个协程
tasks = [
simple_coroutine("task1", 0.1),
simple_coroutine("task2", 0.2),
simple_coroutine("task3", 0.3)
]
for task in tasks:
result = await task
results.append(result)
return results
# 使用自定义事件循环运行
try:
result = loop.run_until_complete(complex_coroutine())
print(f"自定义事件循环执行结果: {result}")
except Exception as e:
print(f"自定义事件循环执行失败: {e}")
# 对比标准Asyncio实现
print("\n=== 标准Asyncio对比测试 ===")
standard_result = await complex_coroutine()
print(f"标准Asyncio执行结果: {standard_result}")
# 运行测试
if __name__ == "__main__":
asyncio.run(test_eventloop_implementation())
(2) 协程性能分析与优化
"""
协程性能分析与优化工具
"""
import asyncio
import time
import cProfile
import pstats
from contextlib import contextmanager
from typing import Dict, List, Any, Callable
import functools
class CoroutineProfiler:
"""协程性能分析器"""
def __init__(self):
self.profiler = cProfile.Profile()
self.stats = None
@contextmanager
def profile_coroutine(self, name: str = "coroutine"):
"""协程性能分析上下文管理器"""
start_time = time.time()
self.profiler.enable()
try:
yield
finally:
self.profiler.disable()
end_time = time.time()
# 生成分析报告
self.stats = pstats.Stats(self.profiler)
self.stats.sort_stats('cumulative')
print(f"\n📊 协程性能分析报告: {name}")
print(f"⏱️ 总执行时间: {end_time - start_time:.4f}s")
print("=" * 50)
self.stats.print_stats(20) # 显示前20个最耗时的函数
def save_report(self, filename: str) -> None:
"""保存分析报告到文件"""
if self.stats:
self.stats.dump_stats(filename)
def async_profiler(func: Callable) -> Callable:
"""协程性能分析装饰器"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
profiler = CoroutineProfiler()
with profiler.profile_coroutine(func.__name__):
return await func(*args, **kwargs)
return wrapper
class AsyncPerformanceOptimizer:
"""异步性能优化器"""
@staticmethod
async def batch_processing(
items: List[Any],
processor: Callable,
batch_size: int = 100,
max_concurrency: int = 10
) -> List[Any]:
"""
批量处理优化 - 控制并发数和批量大小
"""
results = []
semaphore = asyncio.Semaphore(max_concurrency)
async def process_batch(batch: List[Any]) -> List[Any]:
"""处理单个批次"""
async with semaphore:
tasks = [processor(item) for item in batch]
return await asyncio.gather(*tasks, return_exceptions=True)
# 分批处理
for i in range(0, len(items), batch_size):
batch = items[i:i + batch_size]
batch_results = await process_batch(batch)
results.extend(batch_results)
# 添加小延迟避免过度并发
if i + batch_size < len(items):
await asyncio.sleep(0.001)
return results
@staticmethod
def create_connection_pool(
factory: Callable,
max_size: int = 20,
min_size: int = 5
) -> 'ConnectionPool':
"""创建连接池"""
return ConnectionPool(factory, max_size, min_size)
@staticmethod
async def with_timeout(
coro: Coroutine,
timeout: float,
fallback_value: Any = None
) -> Any:
"""带超时的协程执行"""
try:
return await asyncio.wait_for(coro, timeout)
except asyncio.TimeoutError:
print(f"⏰ 协程执行超时: {timeout}s")
return fallback_value
class ConnectionPool:
"""异步连接池实现"""
def __init__(self, factory: Callable, max_size: int, min_size: int):
self.factory = factory
self.max_size = max_size
self.min_size = min_size
self._pool = asyncio.Queue(max_size)
self._in_use = set()
self._current_size = 0
async def init_pool(self) -> None:
"""初始化连接池"""
for _ in range(self.min_size):
connection = await self.factory()
await self._pool.put(connection)
self._current_size += 1
async def acquire(self) -> Any:
"""获取连接"""
if self._pool.empty() and self._current_size < self.max_size:
# 创建新连接
connection = await self.factory()
self._current_size += 1
self._in_use.add(connection)
return connection
# 从池中获取连接
connection = await self._pool.get()
self._in_use.add(connection)
return connection
async def release(self, connection: Any) -> None:
"""释放连接"""
self._in_use.remove(connection)
await self._pool.put(connection)
async def close(self) -> None:
"""关闭连接池"""
while not self._pool.empty():
connection = await self._pool.get()
if hasattr(connection, 'close'):
await connection.close()
# 性能优化实战示例
@async_profiler
async def performance_optimization_demo():
"""性能优化实战演示"""
print("=== 异步性能优化实战 ===")
# 模拟耗时操作
async def mock_processor(item: int) -> int:
await asyncio.sleep(0.01) # 模拟I/O操作
return item * 2
# 测试数据
test_items = list(range(1000))
# 1. 批量处理优化
print("1. 批量处理优化测试...")
optimizer = AsyncPerformanceOptimizer()
results = await optimizer.batch_processing(
test_items,
mock_processor,
batch_size=50,
max_concurrency=5
)
print(f"批量处理完成,结果数量: {len(results)}")
# 2. 连接池优化
print("\n2. 连接池优化测试...")
class MockConnection:
def __init__(self, id: int):
self.id = id
self.is_closed = False
async def query(self, data: str) -> str:
await asyncio.sleep(0.001)
return f"Result_{self.id}_{data}"
async def close(self):
self.is_closed = True
async def create_connection() -> MockConnection:
await asyncio.sleep(0.01) # 模拟创建连接的耗时
return MockConnection(id(time.time()))
# 创建连接池
pool = AsyncPerformanceOptimizer.create_connection_pool(
create_connection,
max_size=5,
min_size=2
)
await pool.init_pool()
# 使用连接池执行查询
async def execute_with_pool(query: str) -> str:
connection = await pool.acquire()
try:
return await connection.query(query)
finally:
await pool.release(connection)
# 并发执行查询
queries = [f"query_{i}" for i in range(20)]
tasks = [execute_with_pool(query) for query in queries]
query_results = await asyncio.gather(*tasks)
print(f"连接池查询完成,结果数量: {len(query_results)}")
# 清理资源
await pool.close()
# 3. 超时控制优化
print("\n3. 超时控制优化测试...")
async def slow_operation() -> str:
await asyncio.sleep(2) # 模拟慢操作
return "slow_result"
# 设置超时
result = await AsyncPerformanceOptimizer.with_timeout(
slow_operation(),
timeout=1.0,
fallback_value="timeout_fallback"
)
print(f"超时控制结果: {result}")
# 运行性能优化演示
if __name__ == "__main__":
asyncio.run(performance_optimization_demo())
二、高性能异步Web框架实战
(1) 基于Asyncio的完整Web框架
"""
高性能异步Web框架实现
"""
import asyncio
import json
import inspect
from typing import Dict, List, Any, Callable, Optional, Tuple
from urllib.parse import parse_qs
from dataclasses import dataclass
from enum import Enum
class HTTPMethod(Enum):
"""HTTP方法枚举"""
GET = "GET"
POST = "POST"
PUT = "PUT"
DELETE = "DELETE"
PATCH = "PATCH"
OPTIONS = "OPTIONS"
@dataclass
class Request:
"""HTTP请求对象"""
method: HTTPMethod
path: str
headers: Dict[str, str]
body: bytes
query_params: Dict[str, List[str]]
@property
def json(self) -> Any:
"""解析JSON body"""
try:
return json.loads(self.body.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
return None
@property
def text(self) -> str:
"""获取文本body"""
return self.body.decode('utf-8')
@dataclass
class Response:
"""HTTP响应对象"""
status: int = 200
headers: Dict[str, str] = None
body: bytes = b""
def __post_init__(self):
if self.headers is None:
self.headers = {}
def json(self, data: Any) -> 'Response':
"""设置JSON响应"""
self.body = json.dumps(data).encode('utf-8')
self.headers['Content-Type'] = 'application/json'
return self
def text(self, text: str) -> 'Response':
"""设置文本响应"""
self.body = text.encode('utf-8')
self.headers['Content-Type'] = 'text/plain'
return self
def html(self, html: str) -> 'Response':
"""设置HTML响应"""
self.body = html.encode('utf-8')
self.headers['Content-Type'] = 'text/html'
return self
class Router:
"""异步路由实现"""
def __init__(self):
self.routes: Dict[HTTPMethod, Dict[str, Callable]] = {
method: {} for method in HTTPMethod
}
self.middlewares: List[Callable] = []
def add_route(self, method: HTTPMethod, path: str, handler: Callable) -> None:
"""添加路由"""
self.routes[method][path] = handler
def get(self, path: str) -> Callable:
"""GET路由装饰器"""
def decorator(handler: Callable) -> Callable:
self.add_route(HTTPMethod.GET, path, handler)
return handler
return decorator
def post(self, path: str) -> Callable:
"""POST路由装饰器"""
def decorator(handler: Callable) -> Callable:
self.add_route(HTTPMethod.POST, path, handler)
return handler
return decorator
def add_middleware(self, middleware: Callable) -> None:
"""添加中间件"""
self.middlewares.append(middleware)
async def handle_request(self, request: Request) -> Response:
"""处理请求"""
# 执行中间件
for middleware in self.middlewares:
result = await middleware(request)
if isinstance(result, Response):
return result
# 查找路由处理器
handler = self._find_handler(request.method, request.path)
if not handler:
return Response(404, body=b"Not Found")
try:
# 执行处理器
if inspect.iscoroutinefunction(handler):
result = await handler(request)
else:
result = handler(request)
# 处理返回结果
if isinstance(result, Response):
return result
elif isinstance(result, (dict, list)):
return Response().json(result)
elif isinstance(result, str):
return Response().text(result)
else:
return Response().text(str(result))
except Exception as e:
print(f"请求处理错误: {e}")
return Response(500, body=b"Internal Server Error")
def _find_handler(self, method: HTTPMethod, path: str) -> Optional[Callable]:
"""查找路由处理器"""
return self.routes[method].get(path)
class AsyncWebServer:
"""异步Web服务器"""
def __init__(self, host: str = 'localhost', port: int = 8000):
self.host = host
self.port = port
self.router = Router()
self._server = None
def route(self, method: HTTPMethod, path: str) -> Callable:
"""路由装饰器"""
def decorator(handler: Callable) -> Callable:
self.router.add_route(method, path, handler)
return handler
return decorator
def middleware(self, middleware_func: Callable) -> Callable:
"""中间件装饰器"""
self.router.add_middleware(middleware_func)
return middleware_func
async def start(self) -> None:
"""启动服务器"""
self._server = await asyncio.start_server(
self._handle_connection,
self.host,
self.port
)
print(f"🚀 服务器启动在 http://{self.host}:{self.port}")
async with self._server:
await self._server.serve_forever()
async def _handle_connection(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter) -> None:
"""处理客户端连接"""
try:
# 解析HTTP请求
request = await self._parse_request(reader)
if not request:
return
# 处理请求
response = await self.router.handle_request(request)
# 发送响应
await self._send_response(writer, response)
except Exception as e:
print(f"连接处理错误: {e}")
error_response = Response(500, body=b"Internal Server Error")
await self._send_response(writer, error_response)
finally:
await writer.drain()
writer.close()
async def _parse_request(self, reader: asyncio.StreamReader) -> Optional[Request]:
"""解析HTTP请求"""
# 读取请求行
request_line = await reader.readline()
if not request_line:
return None
# 解析请求行
method_str, path, _ = request_line.decode().strip().split(' ')
method = HTTPMethod(method_str)
# 解析查询参数
path_parts = path.split('?', 1)
clean_path = path_parts[0]
query_params = parse_qs(path_parts[1]) if len(path_parts) > 1 else {}
# 解析请求头
headers = {}
while True:
header_line = await reader.readline()
if header_line in (b'\r\n', b'\n', b''):
break
key, value = header_line.decode().strip().split(':', 1)
headers[key.strip()] = value.strip()
# 读取请求体
content_length = int(headers.get('Content-Length', 0))
body = await reader.read(content_length) if content_length > 0 else b''
return Request(method, clean_path, headers, body, query_params)
async def _send_response(self, writer: asyncio.StreamWriter, response: Response) -> None:
"""发送HTTP响应"""
# 状态行
status_line = f"HTTP/1.1 {response.status} OK\r\n"
writer.write(status_line.encode())
# 响应头
headers = response.headers.copy()
headers['Content-Length'] = str(len(response.body))
for key, value in headers.items():
header_line = f"{key}: {value}\r\n"
writer.write(header_line.encode())
# 空行分隔头部和主体
writer.write(b"\r\n")
# 响应主体
if response.body:
writer.write(response.body)
# 中间件实现
async def logging_middleware(request: Request) -> Optional[Response]:
"""日志记录中间件"""
print(f"📝 {request.method.value} {request.path} - Headers: {len(request.headers)}")
return None # 继续处理请求
async def auth_middleware(request: Request) -> Optional[Response]:
"""认证中间件"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if not token and request.path != '/login':
return Response(401, body=b"Unauthorized")
return None
# Web框架使用示例
async def web_framework_demo():
"""Web框架使用演示"""
app = AsyncWebServer(host='0.0.0.0', port=8080)
# 添加中间件
app.middleware(logging_middleware)
app.middleware(auth_middleware)
# 定义路由
@app.route(HTTPMethod.GET, '/')
async def home(request: Request) -> Response:
return Response().html("""
<html>
<head><title>Async Web Framework</title></head>
<body>
<h1>欢迎使用异步Web框架</h1>
<p>这是一个基于Asyncio的高性能Web框架</p>
</body>
</html>
""")
@app.route(HTTPMethod.GET, '/api/health')
async def health_check(request: Request) -> Dict[str, Any]:
return {
"status": "healthy",
"timestamp": asyncio.get_event_loop().time()
}
@app.route(HTTPMethod.POST, '/api/users')
async def create_user(request: Request) -> Dict[str, Any]:
user_data = request.json
return {
"id": 1,
"name": user_data.get('name'),
"created": True
}
@app.route(HTTPMethod.GET, '/api/users/{user_id}')
async def get_user(request: Request) -> Dict[str, Any]:
# 注意:这里需要扩展路由解析来支持路径参数
return {
"id": 1,
"name": "示例用户"
}
# 启动服务器
await app.start()
# 运行Web框架演示
if __name__ == "__main__":
print("=== 异步Web框架演示 ===")
asyncio.run(web_framework_demo())
三、异步数据库操作与ORM
(1) 高性能异步数据库客户端
"""
异步数据库操作与ORM实现
"""
import asyncio
import aiosqlite
from typing import List, Dict, Any, Optional, Type, TypeVar
from dataclasses import dataclass, field
from datetime import datetime
import json
T = TypeVar('T')
class AsyncDatabase:
"""异步数据库客户端"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self._connection_pool = None
async def connect(self) -> None:
"""连接数据库"""
self._connection_pool = await aiosqlite.connect(self.connection_string)
# 启用外键约束
await self._connection_pool.execute("PRAGMA foreign_keys = ON")
async def disconnect(self) -> None:
"""断开数据库连接"""
if self._connection_pool:
await self._connection_pool.close()
async def execute(self, query: str, params: tuple = ()) -> int:
"""执行SQL语句"""
async with self._connection_pool.execute(query, params) as cursor:
await self._connection_pool.commit()
return cursor.rowcount
async def fetch_one(self, query: str, params: tuple = ()) -> Optional[Dict[str, Any]]:
"""查询单条记录"""
async with self._connection_pool.execute(query, params) as cursor:
row = await cursor.fetchone()
if row:
return dict(zip([col[0] for col in cursor.description], row))
return None
async def fetch_all(self, query: str, params: tuple = ()) -> List[Dict[str, Any]]:
"""查询多条记录"""
async with self._connection_pool.execute(query, params) as cursor:
rows = await cursor.fetchall()
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in rows]
async def execute_many(self, query: str, params_list: List[tuple]) -> int:
"""批量执行SQL语句"""
async with self._connection_pool.executemany(query, params_list) as cursor:
await self._connection_pool.commit()
return cursor.rowcount
class Field:
"""数据库字段定义"""
def __init__(
self,
field_type: str,
primary_key: bool = False,
auto_increment: bool = False,
nullable: bool = True,
unique: bool = False,
default: Any = None
):
self.field_type = field_type
self.primary_key = primary_key
self.auto_increment = auto_increment
self.nullable = nullable
self.unique = unique
self.default = default
def model(cls: Type[T]) -> Type[T]:
"""模型类装饰器"""
if not hasattr(cls, '__annotations__'):
return cls
# 收集字段信息
fields = {}
for attr_name, attr_value in cls.__annotations__.items():
if isinstance(attr_value, Field):
fields[attr_name] = attr_value
else:
# 自动推断字段类型
field_type_map = {
int: 'INTEGER',
str: 'TEXT',
float: 'REAL',
bool: 'INTEGER',
datetime: 'TEXT'
}
field_type = field_type_map.get(attr_value, 'TEXT')
fields[attr_name] = Field(field_type)
# 设置模型元数据
cls._fields = fields
cls._table_name = getattr(cls, '__tablename__', cls.__name__.lower())
return cls
class QuerySet:
"""异步查询集"""
def __init__(self, model_class: Type[T], db: AsyncDatabase):
self.model_class = model_class
self.db = db
self._where_conditions = []
self._params = []
self._limit_value = None
self._offset_value = None
self._order_by = []
def filter(self, **conditions) -> 'QuerySet':
"""添加过滤条件"""
for field, value in conditions.items():
self._where_conditions.append(f"{field} = ?")
self._params.append(value)
return self
def limit(self, count: int) -> 'QuerySet':
"""限制查询数量"""
self._limit_value = count
return self
def offset(self, count: int) -> 'QuerySet':
"""设置偏移量"""
self._offset_value = count
return self
def order_by(self, *fields: str) -> 'QuerySet':
"""设置排序"""
self._order_by.extend(fields)
return self
async def all(self) -> List[T]:
"""获取所有记录"""
query = f"SELECT * FROM {self.model_class._table_name}"
# 添加WHERE条件
if self._where_conditions:
query += " WHERE " + " AND ".join(self._where_conditions)
# 添加ORDER BY
if self._order_by:
query += " ORDER BY " + ", ".join(self._order_by)
# 添加LIMIT和OFFSET
if self._limit_value is not None:
query += f" LIMIT {self._limit_value}"
if self._offset_value is not None:
query += f" OFFSET {self._offset_value}"
# 执行查询
rows = await self.db.fetch_all(query, tuple(self._params))
# 转换为模型实例
return [self.model_class(**row) for row in rows]
async def first(self) -> Optional[T]:
"""获取第一条记录"""
self._limit_value = 1
results = await self.all()
return results[0] if results else None
async def count(self) -> int:
"""获取记录数量"""
query = f"SELECT COUNT(*) as count FROM {self.model_class._table_name}"
if self._where_conditions:
query += " WHERE " + " AND ".join(self._where_conditions)
result = await self.db.fetch_one(query, tuple(self._params))
return result['count'] if result else 0
@model
class User:
"""用户模型"""
__tablename__ = 'users'
id: int = Field('INTEGER', primary_key=True, auto_increment=True)
username: str = Field('TEXT', unique=True, nullable=False)
email: str = Field('TEXT', unique=True, nullable=False)
password_hash: str = Field('TEXT', nullable=False)
created_at: datetime = Field('TEXT', default=datetime.utcnow)
is_active: bool = Field('INTEGER', default=True)
def __init__(self, **kwargs):
for field_name in self._fields:
value = kwargs.get(field_name)
setattr(self, field_name, value)
async def save(self, db: AsyncDatabase) -> None:
"""保存用户记录"""
if self.id is None:
# 插入新记录
fields = [f for f in self._fields if f != 'id']
placeholders = ', '.join(['?' for _ in fields])
field_names = ', '.join(fields)
query = f"INSERT INTO {self._table_name} ({field_names}) VALUES ({placeholders})"
params = [getattr(self, field) for field in fields]
await db.execute(query, tuple(params))
# 获取自增ID
result = await db.fetch_one("SELECT last_insert_rowid() as id")
self.id = result['id']
else:
# 更新记录
updates = [f"{field} = ?" for field in self._fields if field != 'id']
query = f"UPDATE {self._table_name} SET {', '.join(updates)} WHERE id = ?"
params = [getattr(self, field) for field in self._fields if field != 'id']
params.append(self.id)
await db.execute(query, tuple(params))
async def delete(self, db: AsyncDatabase) -> None:
"""删除用户记录"""
if self.id is not None:
query = f"DELETE FROM {self._table_name} WHERE id = ?"
await db.execute(query, (self.id,))
class AsyncORM:
"""异步ORM管理器"""
def __init__(self, db: AsyncDatabase):
self.db = db
def query(self, model_class: Type[T]) -> QuerySet:
"""创建查询集"""
return QuerySet(model_class, self.db)
async def create_tables(self) -> None:
"""创建数据库表"""
# 创建用户表
user_table_sql = """
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
is_active INTEGER DEFAULT 1
)
"""
await self.db.execute(user_table_sql)
# 创建文章表
post_table_sql = """
CREATE TABLE IF NOT EXISTS posts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
title TEXT NOT NULL,
content TEXT NOT NULL,
author_id INTEGER NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (author_id) REFERENCES users (id)
)
"""
await self.db.execute(post_table_sql)
# 异步数据库操作演示
async def database_operations_demo():
"""数据库操作演示"""
print("=== 异步数据库操作演示 ===")
# 初始化数据库
db = AsyncDatabase(":memory:") # 使用内存数据库
await db.connect()
# 初始化ORM
orm = AsyncORM(db)
await orm.create_tables()
try:
# 1. 创建用户
user1 = User(
username="alice",
email="alice@example.com",
password_hash="hashed_password_123"
)
await user1.save(db)
print(f"✅ 用户创建成功: {user1.username} (ID: {user1.id})")
# 2. 查询用户
queried_user = await orm.query(User).filter(username="alice").first()
print(f"✅ 用户查询成功: {queried_user.username}")
# 3. 更新用户
queried_user.email = "alice_updated@example.com"
await queried_user.save(db)
print("✅ 用户更新成功")
# 4. 批量操作
users_data = [
("bob", "bob@example.com", "hash_bob"),
("charlie", "charlie@example.com", "hash_charlie"),
("david", "david@example.com", "hash_david")
]
for username, email, password_hash in users_data:
user = User(username=username, email=email, password_hash=password_hash)
await user.save(db)
print("✅ 批量用户创建成功")
# 5. 复杂查询
all_users = await orm.query(User).filter(is_active=True).order_by("username").all()
print(f"✅ 活跃用户数量: {len(all_users)}")
user_count = await orm.query(User).filter(is_active=True).count()
print(f"✅ 活跃用户统计: {user_count}")
# 6. 分页查询
paginated_users = await orm.query(User).order_by("created_at").limit(2).offset(1).all()
print(f"✅ 分页查询结果: {[user.username for user in paginated_users]}")
finally:
await db.disconnect()
# 运行数据库演示
if __name__ == "__main__":
asyncio.run(database_operations_demo())
四、生产环境异步架构设计
(1) 完整的异步微服务架构
"""
生产级异步微服务架构
"""
import asyncio
import aiohttp
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import json
import logging
from contextlib import asynccontextmanager
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ServiceConfig:
"""服务配置"""
name: str
host: str
port: int
health_check_path: str = "/health"
timeout: int = 30
retry_count: int = 3
class ServiceDiscovery:
"""服务发现客户端"""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul_host = consul_host
self.consul_port = consul_port
self.services: Dict[str, List[ServiceConfig]] = {}
self._session: Optional[aiohttp.ClientSession] = None
async def init(self) -> None:
"""初始化服务发现"""
self._session = aiohttp.ClientSession()
# 注册当前服务(示例)
await self._register_service()
# 初始服务发现
await self.refresh_services()
async def close(self) -> None:
"""关闭服务发现"""
if self._session:
await self._session.close()
async def _register_service(self) -> None:
"""注册当前服务到Consul"""
registration_data = {
"Name": "async-web-service",
"Address": "localhost",
"Port": 8080,
"Check": {
"HTTP": "http://localhost:8080/health",
"Interval": "10s"
}
}
try:
async with self._session.put(
f"http://{self.consul_host}:{self.consul_port}/v1/agent/service/register",
json=registration_data
) as response:
if response.status == 200:
logger.info("✅ 服务注册成功")
else:
logger.warning(f"⚠️ 服务注册失败: {response.status}")
except Exception as e:
logger.error(f"❌ 服务注册错误: {e}")
async def refresh_services(self) -> None:
"""刷新服务列表"""
try:
async with self._session.get(
f"http://{self.consul_host}:{self.consul_port}/v1/agent/services"
) as response:
if response.status == 200:
services_data = await response.json()
self._parse_services(services_data)
logger.info(f"✅ 服务发现刷新成功,发现 {len(self.services)} 个服务")
else:
logger.warning(f"⚠️ 服务发现刷新失败: {response.status}")
except Exception as e:
logger.error(f"❌ 服务发现错误: {e}")
def _parse_services(self, services_data: Dict[str, Any]) -> None:
"""解析服务数据"""
self.services.clear()
for service_id, service_info in services_data.items():
service_name = service_info['Service']
service_config = ServiceConfig(
name=service_name,
host=service_info['Address'],
port=service_info['Port']
)
if service_name not in self.services:
self.services[service_name] = []
self.services[service_name].append(service_config)
def get_service(self, service_name: str) -> Optional[ServiceConfig]:
"""获取服务配置"""
service_list = self.services.get(service_name)
if service_list:
# 简单的负载均衡:轮询
return service_list[0] # 实际应该实现更复杂的负载均衡
return None
class CircuitBreaker:
"""断路器模式实现"""
def __init__(self, failure_threshold: int = 5, reset_timeout: int = 60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN
async def execute(self, coro_func, *args, **kwargs) -> Any:
"""执行受保护的操作"""
if self.state == "OPEN":
if self._should_try_reset():
self.state = "HALF_OPEN"
else:
raise CircuitBreakerError("断路器开启")
try:
result = await coro_func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _on_success(self) -> None:
"""操作成功处理"""
self.failure_count = 0
self.last_failure_time = None
self.state = "CLOSED"
def _on_failure(self) -> None:
"""操作失败处理"""
self.failure_count += 1
self.last_failure_time = asyncio.get_event_loop().time()
if self.failure_count >= self.failure_threshold:
self.state = "OPEN"
def _should_try_reset(self) -> bool:
"""检查是否应该尝试重置"""
if self.last_failure_time is None:
return True
current_time = asyncio.get_event_loop().time()
return (current_time - self.last_failure_time) > self.reset_timeout
class CircuitBreakerError(Exception):
"""断路器异常"""
pass
class AsyncHttpClient:
"""异步HTTP客户端"""
def __init__(self, service_discovery: ServiceDiscovery):
self.service_discovery = service_discovery
self.session = None
self.circuit_breakers: Dict[str, CircuitBreaker] = {}
async def init(self) -> None:
"""初始化HTTP客户端"""
self.session = aiohttp.ClientSession()
async def close(self) -> None:
"""关闭HTTP客户端"""
if self.session:
await self.session.close()
async def request(
self,
service_name: str,
method: str,
path: str,
**kwargs
) -> Any:
"""发送HTTP请求"""
# 获取服务配置
service_config = self.service_discovery.get_service(service_name)
if not service_config:
raise ServiceNotFoundError(f"服务未找到: {service_name}")
# 获取或创建断路器
if service_name not in self.circuit_breakers:
self.circuit_breakers[service_name] = CircuitBreaker()
breaker = self.circuit_breakers[service_name]
# 使用断路器执行请求
return await breaker.execute(
self._make_request,
service_config,
method,
path,
**kwargs
)
async def _make_request(
self,
service_config: ServiceConfig,
method: str,
path: str,
**kwargs
) -> Any:
"""实际发送HTTP请求"""
url = f"http://{service_config.host}:{service_config.port}{path}"
timeout = aiohttp.ClientTimeout(total=service_config.timeout)
async with self.session.request(
method, url, timeout=timeout, **kwargs
) as response:
if response.status >= 400:
raise HTTPError(f"HTTP错误: {response.status}")
content_type = response.headers.get('Content-Type', '')
if 'application/json' in content_type:
return await response.json()
else:
return await response.text()
class ServiceNotFoundError(Exception):
"""服务未找到异常"""
pass
class HTTPError(Exception):
"""HTTP错误异常"""
pass
# 异步微服务架构演示
async def microservice_architecture_demo():
"""微服务架构演示"""
print("=== 异步微服务架构演示 ===")
# 初始化服务发现
service_discovery = ServiceDiscovery()
await service_discovery.init()
# 初始化HTTP客户端
http_client = AsyncHttpClient(service_discovery)
await http_client.init()
try:
# 模拟服务间调用
logger.info("开始服务间调用测试...")
# 1. 正常服务调用
try:
response = await http_client.request(
"user-service",
"GET",
"/api/users/1"
)
logger.info(f"✅ 服务调用成功: {response}")
except ServiceNotFoundError:
logger.warning("用户服务未注册,这是预期的演示行为")
except HTTPError as e:
logger.warning(f"HTTP错误: {e}")
# 2. 断路器测试
logger.info("测试断路器功能...")
class MockFailingService:
"""模拟失败服务"""
call_count = 0
async def call(self):
self.call_count += 1
if self.call_count <= 3:
raise Exception("模拟服务失败")
return "服务调用成功"
failing_service = MockFailingService()
circuit_breaker = CircuitBreaker(failure_threshold=2, reset_timeout=10)
# 前两次调用会失败
for i in range(3):
try:
await circuit_breaker.execute(failing_service.call)
except Exception as e:
logger.info(f"调用 {i+1}: {e}")
# 第三次调用会被断路器阻止
try:
await circuit_breaker.execute(failing_service.call)
except CircuitBreakerError as e:
logger.info(f"断路器生效: {e}")
# 3. 服务发现演示
logger.info("当前发现的服务:")
for service_name, configs in service_discovery.services.items():
for config in configs:
logger.info(f" - {service_name}: {config.host}:{config.port}")
finally:
await http_client.close()
await service_discovery.close()
# 运行微服务架构演示
if __name__ == "__main__":
asyncio.run(microservice_architecture_demo())
总结
Python异步编程已经从边缘技术发展成为构建高性能应用的核心能力。通过深入理解Asyncio原理、掌握协程调度机制、优化异步性能,可以构建出能够处理海量并发的高性能应用。
核心架构要点:
- 深度原理理解:事件循环、协程调度、Future机制
- 性能优化策略:连接池、批量处理、断路器模式
- Web框架设计:异步路由、中间件、请求处理流水线
- 数据库操作:异步ORM、连接管理、事务处理
- 微服务架构:服务发现、负载均衡、容错处理
【进阶方向】
探索Uvloop的性能优化、异步机器学习的集成、分布式异步任务队列,以及异步编程在数据科学和AI领域的应用,进一步拓展异步编程的能力边界。
© 版权声明
THE END














暂无评论内容