Python并发与网络编程实战指南

一、并发与并行编程

1. 多线程编程(threading)

Python的threading模块提供了线程级别的并发能力,但由于GIL(全局解释器锁)的存在,多线程更适合I/O密集型任务。

import threading
import time

def worker(num):
    print(f"Worker {num} started")
    time.sleep(2)  # 模拟I/O操作
    print(f"Worker {num} finished")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()  # 等待所有线程完成

实践建议

  • 使用ThreadPoolExecutor简化线程管理
  • 避免在多线程中共享可变状态
  • 使用LockRLock进行线程同步

2. 多进程编程(multiprocessing)

multiprocessing模块通过创建多个进程绕过GIL限制,适合CPU密集型任务。

from multiprocessing import Process

def cpu_bound_task(n):
    result = 0
    for i in range(n):
        result += i*i
    print(result)

if __name__ == '__main__':
    processes = []
    for i in range(4):
        p = Process(target=cpu_bound_task, args=(10000000,))
        processes.append(p)
        p.start()
    
    for p in processes:
        p.join()

实践建议

  • 进程间通信使用QueuePipe
  • 考虑使用Pool管理进程池
  • 注意进程启动开销比线程大

3. 异步编程(asyncio)

asyncio提供了单线程并发模型,适合高I/O密集型应用。

import asyncio

async def fetch_data(url):
    print(f"开始获取 {url}")
    await asyncio.sleep(2)  # 模拟网络请求
    print(f"完成获取 {url}")
    return f"{url} 的数据"

async def main():
    tasks = [
        fetch_data("https://api.example.com/1"),
        fetch_data("https://api.example.com/2"),
        fetch_data("https://api.example.com/3")
    ]
    results = await asyncio.gather(*tasks)
    print(results)

asyncio.run(main())

并发模型对比

图1

二、网络编程实战

1. 基础socket编程

# 服务端
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind(('localhost', 65432))
    s.listen()
    conn, addr = s.accept()
    with conn:
        print('Connected by', addr)
        while True:
            data = conn.recv(1024)
            if not data:
                break
            conn.sendall(data)

# 客户端
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect(('localhost', 65432))
    s.sendall(b'Hello, world')
    data = s.recv(1024)
    print('Received', repr(data))

2. 使用requests处理HTTP请求

import requests

# GET请求
response = requests.get('https://api.github.com/events')
print(response.status_code)
print(response.json())

# POST请求
payload = {'key1': 'value1', 'key2': 'value2'}
response = requests.post('https://httpbin.org/post', data=payload)
print(response.text)

实践建议

  • 使用Session对象保持连接
  • 设置合理的超时时间
  • 处理HTTP错误状态码

3. 高级异步HTTP客户端(aiohttp)

import aiohttp
import asyncio

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    async with aiohttp.ClientSession() as session:
        html = await fetch(session, 'http://python.org')
        print(html[:200])  # 打印前200个字符

asyncio.run(main())

三、测试与调试技巧

1. 单元测试(unittest/pytest)

# 使用pytest
def add(a, b):
    return a + b

def test_add():
    assert add(2, 3) == 5
    assert add(-1, 1) == 0
    assert add(0, 0) == 0

测试金字塔原则

图2

2. 调试技巧(pdb)

import pdb

def complex_function(a, b):
    pdb.set_trace()  # 设置断点
    result = a * b
    result += a + b
    return result

print(complex_function(3, 4))

常用pdb命令

  • n(ext): 执行下一行
  • s(tep): 进入函数
  • c(ontinue): 继续执行直到下一个断点
  • l(ist): 显示当前代码
  • p(rint): 打印变量值

四、性能优化建议

  1. 选择正确的并发模型

    • I/O密集型:异步 > 多线程 > 多进程
    • CPU密集型:多进程 > 异步
  2. 网络编程优化

    • 使用连接池
    • 启用HTTP持久连接
    • 考虑使用gRPC等高效协议
  3. 测试策略

    • 为并发代码编写确定性测试
    • 使用unittest.mock隔离网络依赖
    • 定期运行性能测试

通过合理运用Python的并发模型和网络编程工具,可以构建出高性能的网络服务和数据处理应用。记住没有放之四海而皆准的方案,根据具体场景选择最适合的技术组合。

添加新评论