Python并发编程与网络通信实战指南

一、并发与并行编程

1. 多线程编程(threading)

Python的threading模块提供了线程级别的并发能力,但由于GIL(全局解释器锁)的存在,多线程更适合I/O密集型任务。

import threading
import time

def worker(num):
    print(f"Worker {num} started")
    time.sleep(2)
    print(f"Worker {num} finished")

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

实践建议

  • 使用ThreadPoolExecutor简化线程池管理
  • 避免在多线程中直接共享可变状态,使用Queue进行线程间通信
  • 注意GIL对CPU密集型任务的限制

2. 多进程编程(multiprocessing)

multiprocessing模块通过创建多个Python进程绕过GIL限制,适合CPU密集型任务。

from multiprocessing import Process, Queue

def square(numbers, q):
    for num in numbers:
        q.put(num * num)

if __name__ == '__main__':
    numbers = [1, 2, 3, 4]
    q = Queue()
    p = Process(target=square, args=(numbers, q))
    p.start()
    p.join()
    
    while not q.empty():
        print(q.get())

进程间通信方式对比

图1

实践建议

  • 进程创建开销大,适合长时间运行的任务
  • 使用Pool简化进程池管理
  • 注意Windows和Unix系统在进程创建上的差异

二、网络编程

1. 基础Socket编程

# 服务端
import socket

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.bind(('localhost', 65432))
    s.listen()
    conn, addr = s.accept()
    with conn:
        print('Connected by', addr)
        while True:
            data = conn.recv(1024)
            if not data:
                break
            conn.sendall(data)

# 客户端
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
    s.connect(('localhost', 65432))
    s.sendall(b'Hello, world')
    data = s.recv(1024)
print('Received', repr(data))

2. 高级HTTP请求(requests)

import requests

# GET请求
response = requests.get('https://api.github.com/events')
print(response.json())

# POST请求
payload = {'key1': 'value1', 'key2': 'value2'}
response = requests.post('https://httpbin.org/post', data=payload)
print(response.text)

实践建议

  • 使用Session对象保持连接提高性能
  • 设置合理的超时时间
  • 处理SSL证书验证

3. 异步IO(asyncio)

import asyncio

async def fetch_data():
    print("开始获取数据")
    await asyncio.sleep(2)  # 模拟I/O操作
    print("数据获取完成")
    return {"data": 123}

async def main():
    task1 = asyncio.create_task(fetch_data())
    task2 = asyncio.create_task(fetch_data())
    
    await task1
    await task2

asyncio.run(main())

异步编程模式对比

模式适用场景典型库
回调函数简单异步操作传统socket
Future/Promise链式异步操作concurrent.futures
async/await复杂异步流程asyncio

三、测试与调试

1. 单元测试(unittest)

import unittest

def add(a, b):
    return a + b

class TestAdd(unittest.TestCase):
    def test_add_positive(self):
        self.assertEqual(add(2, 3), 5)
    
    def test_add_negative(self):
        self.assertEqual(add(-1, -1), -2)

if __name__ == '__main__':
    unittest.main()

2. 更灵活的测试框架(pytest)

# test_sample.py
def func(x):
    return x + 1

def test_answer():
    assert func(3) == 4

pytest高级特性

  • 参数化测试
  • fixture依赖注入
  • 插件系统

3. 调试技巧(pdb)

import pdb

def complex_function(a, b):
    pdb.set_trace()  # 设置断点
    result = a * b
    return result

print(complex_function(2, 3))

常用pdb命令

  • n(ext): 执行下一行
  • s(tep): 进入函数调用
  • c(ontinue): 继续执行直到下一个断点
  • l(ist): 显示当前代码上下文
  • p(rint): 打印变量值

四、性能优化建议

  1. 并发选择原则

    • I/O密集型:多线程或异步IO
    • CPU密集型:多进程
    • 混合型:多进程+多线程/协程
  2. 网络编程优化

    • 使用连接池
    • 合理设置超时和重试
    • 考虑使用HTTP/2或gRPC
  3. 测试最佳实践

    • 测试金字塔:单元测试>集成测试>端到端测试
    • 使用mock减少外部依赖
    • 定期运行性能测试

通过合理运用Python的并发编程模型和网络通信能力,可以显著提升应用程序的性能和响应能力。同时,完善的测试体系能够确保代码质量,减少生产环境中的问题。

添加新评论