Python(十四):第十三章: 高级数据处理

第十三章: 高级数据处理

Python 提供了多种处理不同类型数据的工具和库,能够轻松处理结构化和非结构化数据。本章将深入探讨 Python 中常用的数据格式处理技术,包括 JSON、CSV、XML 和配置文件等。

13.1 JSON 处理

JSON (JavaScript Object Notation) 是一种轻量级的数据交换格式,易于人阅读和编写,也易于机器解析和生成。Python 通过内置的 json 模块提供了 JSON 的序列化和反序列化功能。

方法描述
json.dump(obj, fp)将 Python 对象 obj 编码为 JSON 格式并写入文件 fp
json.dumps(obj)将 Python 对象 obj 编码为 JSON 格式并返回字符串。
json.load(fp)从文件 fp 读取 JSON 数据并解码为 Python 对象。
json.loads(s)将字符串 s 解码为 Python 对象。

13.1.1 基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import json

# Python对象转JSON
data = {
"name": "张三",
"age": 30,
"is_student": False,
"courses": ["Python", "数据分析", "机器学习"],
"scores": {"Python": 95, "数据分析": 88}
}

# 转换为JSON字符串
json_str = json.dumps(data, ensure_ascii=False, indent=4)
print(json_str)

# 写入JSON文件
with open("data.json", "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)

# 从JSON字符串解析
parsed_data = json.loads(json_str)
print(parsed_data["name"]) # 张三

# 从JSON文件读取
with open("data.json", "r", encoding="utf-8") as f:
loaded_data = json.load(f)
print(loaded_data["scores"]["Python"]) # 95

13.1.2 重要参数说明

参数说明用法示例
ensure_ascii是否转义非 ASCII 字符,False 时保留原始字符json.dumps(data, ensure_ascii=False)
indent缩进格式,美化输出json.dumps(data, indent=4)
separators指定分隔符,用于紧凑输出json.dumps(data, separators=(',', ':'))
sort_keys是否按键排序json.dumps(data, sort_keys=True)
default指定序列化函数,处理不可序列化对象json.dumps(obj, default=lambda o: o.__dict__)

13.1.3 自定义对象序列化

Python 的 json 模块默认无法直接序列化自定义类对象,但提供了多种方式解决:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import json

# ========== 方法一:提供default参数 ==========

class Person:
def __init__(self, name, age):
self.name = name
self.age = age


def person_to_dict(person):
"""将Person对象转换为字典"""
return {
"name": person.name,
"age": person.age
}


# 示例:使用default参数序列化自定义对象
person = Person("李四", 25)
json_str = json.dumps(person, default=person_to_dict, ensure_ascii=False)
print(json_str) # {"name": "李四", "age": 25}


# ========== 方法二:通过自定义编码器 ==========

class PersonEncoder(json.JSONEncoder):
"""自定义JSON编码器处理Person类"""
def default(self, obj):
if isinstance(obj, Person):
return {"name": obj.name, "age": obj.age}
return super().default(obj)


# 示例:使用自定义编码器序列化对象
json_str = json.dumps(person, cls=PersonEncoder, ensure_ascii=False)
print(json_str) # {"name": "李四", "age": 25}


# ========== 方法三:添加to_json方法 ==========

class Student:
def __init__(self, name, grade):
self.name = name
self.grade = grade

def __repr__(self):
return f"Student('{self.name}', {self.grade})"

def to_json(self):
"""返回可JSON序列化的字典"""
return {
"name": self.name,
"grade": self.grade
}


# 示例:使用对象的to_json方法序列化
students = [Student("小明", 90), Student("小红", 88)]
json_str = json.dumps([s.to_json() for s in students], ensure_ascii=False)
print(json_str) # [{"name": "小明", "grade": 90}, {"name": "小红", "grade": 88}]

13.1.4 JSON 解码为自定义对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import json
from typing import Dict


class Person:
def __init__(self, name: str, age: int):
self.name = name
self.age = age

def __str__(self):
return f"{self.name}({self.age})"


def dict_to_person(data: Dict) -> Person:
return Person(data["name"], data["age"])


# 使用 json.loads() 的 object_hook 参数将 JSON 字符串直接转换为自定义对象
# object_hook 的用途:
# 1. 自动将 JSON 解析出的字典转换为自定义类的实例
# 2. 在解析 JSON 时进行数据转换和验证
# 3. 简化从 JSON 到对象模型的映射过程
# 4. 避免手动创建对象的繁琐步骤

# 工作原理:
# - json.loads() 首先将 JSON 字符串解析为 Python 字典
# - 然后对每个解析出的字典调用 object_hook 函数
# - object_hook 函数返回的对象将替代原始字典

# 实际应用场景:
# - API 响应数据转换为应用程序对象模型
# - 配置文件解析为配置对象
# - 数据导入时的格式转换

person_data = '{"name": "Alice", "age": 25}'
person = json.loads(person_data, object_hook=dict_to_person)
print(type(person)) # <class '__main__.Person'>
print([person.name, person.age]) # ['Alice', 25]
print(person) # Alice(25)

13.1.5 处理复杂 JSON 数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 处理嵌套结构
nested_json = '''
{
"company": "ABC Corp",
"employees": [
{"name": "张三", "department": "技术", "skills": ["Python", "Java"]},
{"name": "李四", "department": "市场", "skills": ["营销", "策划"]}
],
"locations": {
"headquarters": "北京",
"branches": ["上海", "广州", "深圳"]
}
}
'''

data = json.loads(nested_json)

# 访问嵌套数据
print(data["employees"][0]["name"]) # 张三
print(data["employees"][0]["skills"][0]) # Python
print(data["locations"]["branches"][1]) # 广州

# 修改嵌套数据
data["employees"][0]["skills"].append("C++")
data["locations"]["branches"].append("成都")

# 保存修改后的数据
updated_json = json.dumps(data, ensure_ascii=False, indent=2)
print(updated_json)

13.1.6 性能优化

处理大型 JSON 文件时,可以使用流式解析来提高性能:

1
2
3
4
5
6
7
8
import ijson  # 需安装: pip install ijson

# 流式解析大型JSON文件
with open("large_file.json", "rb") as f:
# 只提取特定字段
for item in ijson.items(f, "items.item"):
print(item["id"], item["name"])
# 处理一项后继续,不必载入整个文件

13.1.7 JSON Schema 验证

验证 JSON 数据是否符合预期格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from jsonschema import validate  # 需安装: pip install jsonschema

# 定义JSON Schema
schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "integer", "minimum": 0},
"email": {"type": "string", "format": "email"}
},
"required": ["name", "age"]
}

# 验证数据
valid_data = {"name": "张三", "age": 30, "email": "zhangsan@example.com"}
invalid_data = {"name": "李四", "age": -5}

try:
validate(instance=valid_data, schema=schema)
print("有效数据")
except Exception as e:
print(f"验证失败: {e}")

try:
validate(instance=invalid_data, schema=schema)
print("有效数据")
except Exception as e:
print(f"验证失败: {e}") # 会因age小于0而失败

13.2 CSV 处理

CSV (Comma-Separated Values) 是一种常见的表格数据格式。Python 的 csv 模块提供了读写 CSV 文件的功能,适用于处理电子表格和数据库导出数据。

在我们写入中文数据时,尽量将编码更换为 GBK 否则写入 CSV 会导致一些乱码问题

13.2.1 基本读写操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import csv

# 写入CSV文件
data = [
["姓名", "年龄", "城市"],
["张三", 30, "北京"],
["李四", 25, "上海"],
["王五", 28, "广州"]
]

with open("people.csv", "w", newline="", encoding="gbk") as f:
writer = csv.writer(f)
writer.writerows(data) # 一次写入多行

# 逐行写入
with open("people_row.csv", "w", newline="", encoding="gbk") as f:
writer = csv.writer(f)
for row in data:
writer.writerow(row) # 一次写入一行

# 读取CSV文件
with open("people.csv", "r", encoding="gbk") as f:
reader = csv.reader(f)
for row in reader:
print(row)

13.2.2 使用字典处理 CSV 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 使用字典写入CSV
import csv

dict_data = [
{"姓名": "张三", "年龄": 30, "城市": "北京"},
{"姓名": "李四", "年龄": 25, "城市": "上海"},
{"姓名": "王五", "年龄": 28, "城市": "广州"}
]

with open("people_dict.csv", "w", newline="", encoding="gbk") as f:
fieldnames = ["姓名", "年龄", "城市"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader() # 写入表头
writer.writerows(dict_data) # 写入多行数据

# 使用字典读取CSV
with open("people_dict.csv", "r", encoding="gbk") as f:
reader = csv.DictReader(f)
for row in reader:
print(f"{row['姓名']} ({row['年龄']}岁) 来自 {row['城市']}")

13.2.3 CSV 方言与格式化选项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 自定义CSV方言
csv.register_dialect(
'tab_dialect',
delimiter='\t', # 使用制表符作为分隔符
quotechar='"', # 引号字符
escapechar='\\', # 转义字符
doublequote=False, # 不使用双引号转义
quoting=csv.QUOTE_MINIMAL # 最小引用策略
)

# 使用自定义方言
with open("tab_data.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f, dialect='tab_dialect')
writer.writerows(data)

# 常见格式化选项
with open("formatted.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(
f,
delimiter=',', # 分隔符
quotechar='"', # 引号字符
quoting=csv.QUOTE_NONNUMERIC, # 为非数值字段添加引号
escapechar='\\', # 转义字符
lineterminator='\n' # 行终止符
)
writer.writerows(data)

13.2.4 处理特殊情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 处理含有引号和逗号的数据
complex_data = [
["产品", "描述", "价格"],
["笔记本", "14\" 高配, i7处理器", 5999.99],
["手机", "5.5\" 屏幕, 双卡双待", 2999.50]
]

with open("complex.csv", "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f, quoting=csv.QUOTE_ALL) # 所有字段加引号
writer.writerows(complex_data)

# 跳过特定行
with open("complex.csv", "r", encoding="utf-8") as f:
reader = csv.reader(f)
next(reader) # 跳过表头
for row in reader:
print(row)

# 处理缺失值
with open("missing.csv", "r", encoding="utf-8") as f:
reader = csv.reader(f)
for row in reader:
# 将空字符串转换为None
processed_row = [None if cell == '' else cell for cell in row]
print(processed_row)

13.2.5 CSV 文件的高级操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 过滤行
with open("people.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
# 筛选年龄大于25的记录
filtered_data = [row for row in reader if int(row["年龄"]) > 25]

# 计算统计值
with open("grades.csv", "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
# 计算平均分
scores = [float(row["分数"]) for row in reader]
avg_score = sum(scores) / len(scores)
print(f"平均分: {avg_score:.2f}")

# 合并多个CSV文件
import glob

def merge_csv_files(file_pattern, output_file):
# 获取所有匹配的文件
all_files = glob.glob(file_pattern)

with open(output_file, "w", newline="", encoding="utf-8") as outfile:
# 假设所有文件结构相同
for i, filename in enumerate(all_files):
with open(filename, "r", encoding="utf-8") as infile:
reader = csv.reader(infile)
if i == 0:
# 第一个文件,保留表头
for row in reader:
csv.writer(outfile).writerow(row)
else:
# 跳过后续文件的表头
next(reader, None)
for row in reader:
csv.writer(outfile).writerow(row)

# 使用示例
# merge_csv_files("data_*.csv", "merged_data.csv")

13.3 XML 处理

XML (eXtensible Markup Language) 是一种用于存储和传输数据的标记语言。Python 提供多种处理 XML 的方法,最常用的是 xml.etree.ElementTree 模块。

13.3.1 创建和写入 XML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import xml.etree.ElementTree as ET

# 创建XML根元素
root = ET.Element("data")

# 添加子元素
items = ET.SubElement(root, "items")

# 添加多个项目
for i in range(1, 4):
item = ET.SubElement(items, "item")
item.set("id", str(i)) # 设置属性
item.text = f"第{i}项" # 设置文本内容

# 添加嵌套元素
detail = ET.SubElement(item, "detail")
detail.text = f"项目{i}的详情"

# 创建用户信息部分
users = ET.SubElement(root, "users")

# 添加用户
user = ET.SubElement(users, "user")
user.set("name", "张三")
ET.SubElement(user, "age").text = "30"
ET.SubElement(user, "city").text = "北京"

user2 = ET.SubElement(users, "user")
user2.set("name", "李四")
ET.SubElement(user2, "age").text = "25"
ET.SubElement(user2, "city").text = "上海"

# 生成XML字符串
xml_str = ET.tostring(root, encoding="utf-8").decode("utf-8")
print(xml_str)

# 写入XML文件
tree = ET.ElementTree(root)
tree.write("data.xml", encoding="utf-8", xml_declaration=True)

13.3.2 解析和读取 XML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# 从文件解析XML
tree = ET.parse("data.xml")
root = tree.getroot()

# 从字符串解析XML
xml_string = '<data><item id="1">测试</item></data>'
root = ET.fromstring(xml_string)

# 获取元素标签和属性
print(f"根元素标签: {root.tag}")

# 遍历子元素
for child in root:
print(f"子元素: {child.tag}, 属性: {child.attrib}")

# 查找特定元素 - find()查找第一个匹配元素
items = root.find("items")
if items is not None:
# 使用findall()查找所有匹配的子元素
for item in items.findall("item"):
print(f"项目ID: {item.get('id')}, 内容: {item.text}")
# 获取嵌套元素
detail = item.find("detail")
if detail is not None:
print(f" 详情: {detail.text}")

# 使用XPath查询
# 查找所有用户名称
users = root.findall(".//user")
for user in users:
print(f"用户: {user.get('name')}")
print(f" 年龄: {user.find('age').text}")
print(f" 城市: {user.find('city').text}")

# 更复杂的XPath查询 - 查找北京的用户
beijing_users = root.findall(".//user[city='北京']")
for user in beijing_users:
print(f"北京用户: {user.get('name')}")

13.3.3 修改 XML

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 修改元素属性
user = root.find(".//user[@name='张三']")
if user is not None:
user.set("status", "active") # 添加新属性

# 修改子元素文本
age_elem = user.find("age")
if age_elem is not None:
age_elem.text = "31" # 修改年龄

# 添加新元素
ET.SubElement(user, "email").text = "zhangsan@example.com"

# 删除元素
users = root.find("users")
if users is not None:
for user in users.findall("user"):
if user.get("name") == "李四":
users.remove(user)
break

# 保存修改
tree.write("updated_data.xml", encoding="utf-8", xml_declaration=True)

13.3.4 命名空间处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 创建带命名空间的XML
root = ET.Element("data", {"xmlns:dt": "http://example.org/datatypes"})

# 添加带命名空间前缀的元素
item = ET.SubElement(root, "dt:item")
item.set("dt:type", "special")
item.text = "带命名空间的元素"

# 生成XML字符串
ns_xml = ET.tostring(root, encoding="utf-8").decode("utf-8")
print(ns_xml)

# 解析带命名空间的XML
ns_root = ET.fromstring(ns_xml)

# 使用带命名空间的XPath查询
namespaces = {"dt": "http://example.org/datatypes"}
ns_items = ns_root.findall(".//dt:item", namespaces)

for item in ns_items:
print(f"找到命名空间元素: {item.text}")
print(f"类型属性: {item.get('{http://example.org/datatypes}type')}")

13.4 配置文件处理

配置文件是应用程序保存设置和首选项的常用方式。Python 提供了多种处理不同格式配置文件的方法。

13.4.1 INI 配置文件处理

INI 文件是一种结构简单的配置文件格式,Python 通过 configparser 模块提供支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import configparser
# configparser是Python标准库中用于处理配置文件的模块
# 它可以读取、写入和修改类似INI格式的配置文件
# 配置文件通常包含节(sections)
# 如:[DEFAULT]
# 和每个节下的键值对(key-value pairs)
# 如:
# language = 中文
# theme = 默认
# auto_save = true
# save_interval = 10


# 创建一个新的配置解析器
config = configparser.ConfigParser()

# 添加默认节和配置项
config["DEFAULT"] = {
"language": "中文",
"theme": "默认",
"auto_save": "true",
"save_interval": "10"
}

# 添加应用设置节
config["应用设置"] = {}
config["应用设置"]["font_size"] = "14"

# 添加用户信息节
config["用户信息"] = {}
user_info = config["用户信息"] # 创建一个引用,方便添加多个配置项
user_info["username"] = "张三"
user_info["email"] = "zhangsan@example.com"
user_info["remember_password"] = "false" # 修改为标准布尔值字符串

# 添加数据库连接节
config["数据库"] = {}
config["数据库"]["host"] = "localhost"
config["数据库"]["port"] = "3306"
config["数据库"]["username"] = "root"
config["数据库"]["password"] = "123456"

# 将配置写入文件
with open("config.ini", "w", encoding="utf-8") as f:
config.write(f)

# 读取配置文件
config = configparser.ConfigParser()
config.read("config.ini", encoding="utf-8")

# 获取所有节名称
print("所有配置节:", config.sections()) # ['应用设置', '用户信息', '数据库']

# 获取节中的所有键
print("用户信息节中的所有键:", list(config["用户信息"].keys()))

# 获取特定配置值
print("用户名:", config["用户信息"]["username"]) # 张三

# 获取默认节中的值
print("默认语言:", config.get("应用设置", "language")) # 使用DEFAULT中的值

# 类型转换方法
font_size = config.getint("应用设置", "font_size")
auto_save = config.getboolean("DEFAULT", "auto_save", fallback=True) # 将"true"转换为True
save_interval = config.getint("DEFAULT", "save_interval")

print(f"字体大小: {font_size}, 类型: {type(font_size)}") # 字体大小: 14, 类型: <class 'int'>
print(f"自动保存: {auto_save}, 类型: {type(auto_save)}") # 自动保存: True, 类型: <class 'bool'>

# 修改配置
config["用户信息"]["username"] = "李四"

# 添加新配置
if "日志设置" not in config:
config["日志设置"] = {}
config["日志设置"]["log_level"] = "INFO"
config["日志设置"]["log_file"] = "app.log"
config["日志设置"]["max_size"] = "10MB"

# 保存修改后的配置
with open("updated_config.ini", "w", encoding="utf-8") as f:
config.write(f)

13.4.2 YAML 配置文件处理

YAML 是一种人类友好的数据序列化格式,需要安装 PyYAML 库。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 需要安装PyYAML: pip install pyyaml
import yaml

# 创建YAML数据
data = {
"server": {
"host": "example.com",
"port": 8080
},
"database": {
"host": "localhost",
"port": 5432,
"username": "admin",
"password": "secret"
},
"logging": {
"level": "INFO",
"file": "/var/log/app.log"
},
"users": [
{"name": "张三", "role": "admin"},
{"name": "李四", "role": "user"}
]
}

# 写入YAML文件
with open("config.yaml", "w", encoding="utf-8") as f:
yaml.dump(data, f, default_flow_style=False, allow_unicode=True)

# 读取YAML文件
with open("config.yaml", "r", encoding="utf-8") as f:
config = yaml.safe_load(f)

# 访问配置
print(f"服务器地址: {config['server']['host']}") # example.com
print(f"第一个用户: {config['users'][0]['name']}") # 张三

# 修改配置
config["server"]["port"] = 9090
config["users"].append({"name": "王五", "role": "user"})

# 保存修改
with open("updated_config.yaml", "w", encoding="utf-8") as f:
yaml.dump(config, f, default_flow_style=False, allow_unicode=True)

13.4.3 使用环境变量作为配置

环境变量是一种灵活的配置方式,尤其适用于容器化应用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import os
from dotenv import load_dotenv # 需安装: pip install python-dotenv

# 从.env文件加载环境变量
load_dotenv() # 默认加载当前目录下的.env文件

# 读取环境变量,提供默认值
database_url = os.environ.get("DATABASE_URL", "sqlite:///default.db")
debug_mode = os.environ.get("DEBUG", "False").lower() in ("true", "1", "yes")
port = int(os.environ.get("PORT", "8000"))

print(f"数据库URL: {database_url}")
print(f"调试模式: {debug_mode}")
print(f"端口: {port}")

# 创建.env文件示例
env_content = """
# 数据库设置
DATABASE_URL=postgresql://user:pass@localhost/dbname
# 应用设置
DEBUG=True
PORT=5000
"""

with open(".env.example", "w") as f:
f.write(env_content)

13.4.4 JSON 作为配置文件

JSON 也是一种常用的配置文件格式,尤其适合需要与 Web 应用共享配置的场景。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import json
import os

# 默认配置
default_config = {
"app_name": "MyApp",
"version": "1.0.0",
"debug": False,
"database": {
"host": "localhost",
"port": 5432,
"name": "app_db"
},
"cache": {
"enabled": True,
"ttl": 3600
}
}

# 配置文件路径
config_path = "app_config.json"

# 加载配置
def load_config():
# 如果配置文件存在,则加载它
if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f)
# 否则使用默认配置并创建配置文件
else:
save_config(default_config)
return default_config

# 保存配置
def save_config(config):
with open(config_path, "w", encoding="utf-8") as f:
json.dump(config, f, indent=4, ensure_ascii=False)

# 更新配置
def update_config(key, value):
config = load_config()

# 处理嵌套键 (如 "database.host")
if "." in key:
parts = key.split(".")
current = config
for part in parts[:-1]:
if part not in current:
current[part] = {}
current = current[part]
current[parts[-1]] = value
else:
config[key] = value

save_config(config)
return config

# 使用示例
config = load_config()
print(f"应用名称: {config['app_name']}")
print(f"数据库主机: {config['database']['host']}")

# 更新配置
update_config("database.host", "db.example.com")
update_config("cache.ttl", 7200)

# 重新加载配置
config = load_config()
print(f"更新后的数据库主机: {config['database']['host']}")
print(f"更新后的缓存TTL: {config['cache']['ttl']}")

13.5 正则表达式

正则表达式(通常缩写为 regex 或 regexp)是一种强大的文本处理工具。它使用一种专门的语法来定义 搜索模式 (pattern),然后可以用这个模式在文本中进行查找、匹配、提取或替换操作。正则表达式在各种编程任务中都极为有用,例如:

  • 数据验证: 检查用户输入是否符合特定格式(如邮箱、手机号、日期)。
  • 数据提取: 从大量非结构化文本(如日志文件、网页内容)中精确地抽取所需信息(如 IP 地址、错误代码、特定标签内容)。
  • 文本替换: 对文本进行复杂的查找和替换操作,例如格式化代码、屏蔽敏感信息。
  • 文本分割: 根据复杂的模式分割字符串。

Python 通过内置的 re 模块提供了对正则表达式的全面支持。

核心概念: 正则表达式的核心在于使用 元字符 (metacharacters) 和普通字符组合来定义模式。元字符是具有特殊含义的字符,而普通字符则匹配它们自身。

13.5.1 常用元字符和语法

以下是一些最常用的正则表达式元字符及其含义:

元字符描述示例模式示例匹配
.匹配 除换行符 \n 之外 的任何单个字符 (使用 re.DOTALL 标志可匹配换行符)。a.cabc, a_c, a&c (但不匹配 ac)
^匹配字符串的 开头。在多行模式 (re.MULTILINE) 下,也匹配每行的开头。^HelloHello world (但不匹配 Say Hello)
$匹配字符串的 结尾。在多行模式 (re.MULTILINE) 下,也匹配每行的结尾。world$Hello world (但不匹配 world say)
*匹配前面的元素 零次或多次 (贪婪模式)。go*dgd, god, good, goooood
+匹配前面的元素 一次或多次 (贪婪模式)。go+dgod, good, goooood (但不匹配 gd)
?匹配前面的元素 零次或一次 (贪婪模式)。也用于将贪婪量词变为 非贪婪 (见后文)。colou?rcolor, colour
{n}匹配前面的元素 恰好 n\d{3}123 (但不匹配 121234)
{n,}匹配前面的元素 至少 n (贪婪模式)。\d{2,}12, 123, 12345
{n,m}匹配前面的元素 至少 n 次,但不超过 m (贪婪模式)。\d{2,4}12, 123, 1234 (但不匹配 112345)
[]字符集。匹配方括号中包含的 任意一个 字符。[abc]abc
[^...]否定字符集。匹配 不在 方括号中包含的任何字符。[^0-9]任何非数字字符
\转义符。用于转义元字符,使其匹配其字面含义 (如 \. 匹配句点 .),或用于引入特殊序列 (如 \d)。\$$ 字符本身
``或 (OR) 运算符。匹配 `` 左边或右边的表达式。
()分组。将括号内的表达式视为一个整体,用于应用量词、限制 `` 的范围,或 捕获 匹配的子字符串。(ab)+

踩坑提示:

  • 转义: 当需要匹配元字符本身时(如 .*?),必须在前面加上反斜杠 \ 进行转义。例如,要匹配 IP 地址中的点,应使用 \.
  • 原始字符串 (Raw Strings): 在 Python 中定义正则表达式模式时,强烈建议 使用原始字符串(在字符串前加 r),如 r"\d+"。这可以避免 Python 解释器对反斜杠进行自身的转义,从而简化正则表达式的书写,尤其是包含很多 \ 的模式。

13.5.2 特殊序列 (预定义字符集)

re 模块提供了一些方便的特殊序列来代表常见的字符集:

特殊序列描述等价于示例
\d匹配任何 Unicode 数字 字符 (包括 [0-9] 和其他语言的数字)。[0-9] (ASCII)1, 5
\D匹配任何 非数字 字符。[^0-9] (ASCII)a, _,
\s匹配任何 Unicode 空白 字符 (包括 \t\n\r\f\v 等)。, \t
\S匹配任何 非空白 字符。a, 1, .
\w匹配任何 Unicode 词语 字符 (字母、数字和下划线 _)。[a-zA-Z0-9_] (ASCII)a, B, 5, _
\W匹配任何 非词语 字符。[^a-zA-Z0-9_](ASCII)!, , @
\b匹配 词语边界 (word boundary)。这是一个零宽度断言,匹配词语字符 (\w) 和非词语字符 (\W) 之间,或词语字符和字符串开头/结尾之间的位置。\bword\b
\B匹配 非词语边界\Bword\B

13.5.3 贪婪模式 vs. 非贪婪模式

默认情况下,量词 (*, +, ?, {n,}, {n,m}) 都是 贪婪 (Greedy) 的,它们会尽可能多地匹配字符。

场景: 从 HTML 标签 <b>Bold Text</b> 中提取 <b>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import re

text = "<b>Bold Text</b> Regular Text <b>Another Bold</b>"

# 贪婪模式 (默认)
greedy_pattern = r"<.*>" # . 匹配任何字符,* 匹配零次或多次
match_greedy = re.search(greedy_pattern, text)
if match_greedy:
# * 会一直匹配到字符串的最后一个 >
print(f"贪婪匹配结果: {match_greedy.group(0)}")
# 输出: 贪婪匹配结果: <b>Bold Text</b> Regular Text <b>Another Bold</b>

# 非贪婪模式 (在量词后加 ?)
non_greedy_pattern = r"<.*?>" # *? 匹配零次或多次,但尽可能少地匹配
match_non_greedy = re.search(non_greedy_pattern, text)
if match_non_greedy:
# *? 遇到第一个 > 就停止匹配
print(f"非贪婪匹配结果: {match_non_greedy.group(0)}")
# 输出: 非贪婪匹配结果: <b>

# 查找所有非贪婪匹配
all_matches_non_greedy = re.findall(non_greedy_pattern, text)
print(f"所有非贪婪匹配: {all_matches_non_greedy}")
# 输出: 所有非贪婪匹配: ['<b>', '</b>', '<b>', '</b>']

何时使用非贪婪模式?

当需要匹配从某个开始标记到 最近的 结束标记之间的内容时,通常需要使用非贪婪量词 (*?, +?, ??, {n,}?, {n,m}?)。

13.5.4 分组与捕获

使用圆括号 () 可以将模式的一部分组合起来,形成一个 分组 (Group)。分组有几个重要作用:

  1. 应用量词: 将量词作用于整个分组,如 (abc)+ 匹配 abc, abcabc 等。
  2. 限制 | 范围: 如 gr(a|e)y 匹配 graygrey
  3. 捕获内容: 默认情况下,每个分组会 捕获 (Capture) 其匹配到的子字符串,以便后续引用或提取。

场景: 从 “Name: John Doe, Age: 30” 中提取姓名和年龄。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import re

text = "Name: John Doe, Age: 30; Name: Jane Smith, Age: 25"

# 定义带有捕获组的模式
# 第一个组 (\w+\s+\w+) 捕获姓名
# 第二个组 (\d+) 捕获年龄
pattern_capture = r"Name: (\w+\s+\w+), Age: (\d+)"

# 使用 findall 查找所有匹配项
# findall 返回一个列表,如果模式中有捕获组,列表元素是包含所有捕获组内容的元组
matches = re.findall(pattern_capture, text)
print(f"\n--- 使用 findall 提取分组 ---")
print(matches) # 输出: [('John Doe', '30'), ('Jane Smith', '25')]

# 使用 finditer 获取 Match 对象,可以更灵活地访问分组
print("\n--- 使用 finditer 访问分组 ---")
for match_obj in re.finditer(pattern_capture, text):
# match_obj.group(0) 或 group() 获取整个匹配
print(f"整个匹配: {match_obj.group(0)}")
# match_obj.group(1) 获取第一个捕获组的内容 (姓名)
print(f" 姓名 (组 1): {match_obj.group(1)}")
# match_obj.group(2) 获取第二个捕获组的内容 (年龄)
print(f" 年龄 (组 2): {match_obj.group(2)}")
# match_obj.groups() 获取所有捕获组组成的元组
print(f" 所有分组: {match_obj.groups()}")

# 非捕获组 (?:...)
# 如果只想分组而不捕获内容,可以使用非捕获组
pattern_non_capture = r"Name: (?:\w+\s+\w+), Age: (\d+)" # 第一个组不捕获
matches_nc = re.findall(pattern_non_capture, text)
print(f"\n--- 使用非捕获组的 findall ---")
print(matches_nc) # 输出: ['30', '25'] (只包含捕获组的内容)

反向引用 (Backreferences): 可以在模式内部或替换字符串中使用 \1, \2, … 来引用前面捕获组匹配到的文本。

场景: 查找重复的单词,如 “the the”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
text_repeat = "This is the the test sentence with repeated repeated words."
# \b 确保是完整的单词
# (\w+) 捕获第一个单词
# \s+ 匹配中间的空白
# \1 引用第一个捕获组匹配的内容
pattern_repeat = r"\b(\w+)\s+\1\b"
repeated_words = re.findall(pattern_repeat, text_repeat)
print(f"\n--- 查找重复单词 ---")
print(f"找到的重复单词: {repeated_words}") # 输出: ['the', 'repeated']

# 使用 sub 进行替换
# 将重复的单词替换为单个单词
corrected_text = re.sub(pattern_repeat, r"\1", text_repeat) # 使用 \1 引用捕获组
print(f"修正后的文本: {corrected_text}")
# 输出: This is the test sentence with repeated words.

13.5.5 re 模块核心函数

Python 的 re 模块提供了以下核心函数来执行正则表达式操作:

函数描述返回值主要用途
re.match(p, s, flags=0)从字符串 s开头 尝试匹配模式 p匹配成功返回 Match 对象,失败返回 None验证字符串是否以特定模式开始。
re.search(p, s, flags=0)整个 字符串 s 中搜索模式 p第一个 匹配项。匹配成功返回 Match 对象,失败返回 None在字符串中查找模式是否存在,并获取第一个匹配项的信息。
re.findall(p, s, flags=0)在字符串 s 中查找模式 p所有非重叠 匹配项。返回一个 列表。如果模式无捕获组,列表元素是匹配的字符串;如果有捕获组,列表元素是包含各捕获组内容的元组。提取字符串中所有符合模式的子串或捕获组内容。
re.finditer(p, s, flags=0)findall 类似,但返回一个 迭代器 (iterator),迭代器中的每个元素都是一个 Match 对象。返回一个迭代器,每个元素是 Match 对象。处理大量匹配结果时更 内存高效,因为不需要一次性存储所有结果。可以方便地访问每个匹配的详细信息(如位置)。
re.sub(p, repl, s, count=0, flags=0)在字符串 s 中查找模式 p 的所有匹配项,并用 repl 替换它们。repl 可以是字符串(支持 \g<name>\1 等反向引用)或函数。count 指定最大替换次数。返回替换后的 新字符串执行查找和替换操作。repl 可以是函数,实现更复杂的替换逻辑。
re.split(p, s, maxsplit=0, flags=0)使用模式 p 作为分隔符来 分割 字符串 smaxsplit 指定最大分割次数。返回一个 列表,包含分割后的子字符串。如果模式中有捕获组,捕获的内容也会包含在列表中。根据复杂的模式分割字符串。
re.compile(p, flags=0)编译 正则表达式模式 p 为一个 模式对象 (Pattern Object)返回一个 Pattern 对象。当一个模式需要被 多次 使用时,预先编译可以 提高性能。模式对象拥有与 re 模块函数同名的方法(如 pattern.search(s))。

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import re

text = "The quick brown fox jumps over the lazy dog. Phone: 123-456-7890. Email: test@example.com."

# 1. re.match() - 检查开头
pattern_start = r"The"
match_result = re.match(pattern_start, text)
if match_result:
print(f"match(): 字符串以 '{pattern_start}' 开头。匹配内容: '{match_result.group(0)}'")
else:
print(f"match(): 字符串不以 '{pattern_start}' 开头。")

match_fail = re.match(r"quick", text) # 不从开头匹配,所以失败
print(f"match() 失败示例: {match_fail}") # None

# 2. re.search() - 查找第一个匹配
pattern_word = r"fox"
search_result = re.search(pattern_word, text)
if search_result:
print(f"search(): 找到单词 '{pattern_word}'。 起始位置: {search_result.start()}, 结束位置: {search_result.end()}")
else:
print(f"search(): 未找到单词 '{pattern_word}'。")

# 3. re.findall() - 查找所有匹配
pattern_digits = r"\d+" # 查找所有数字序列
all_digits = re.findall(pattern_digits, text)
print(f"findall(): 找到的所有数字序列: {all_digits}") # ['123', '456', '7890']

pattern_email = r"(\w+)@(\w+\.\w+)" # 查找邮箱并捕获用户名和域名
email_parts = re.findall(pattern_email, text)
print(f"findall() 捕获组: {email_parts}") # [('test', 'example.com')]

# 4. re.finditer() - 迭代查找匹配对象
pattern_words_o = r"\b\w*o\w*\b" # 查找所有包含字母'o'的单词
print("finditer(): 查找包含 'o' 的单词:")
for match in re.finditer(pattern_words_o, text, re.IGNORECASE): # 使用 IGNORECASE 标志
print(f" 找到: '{match.group(0)}' at position {match.span()}")

# 5. re.sub() - 替换
pattern_phone = r"\d{3}-\d{3}-\d{4}"
# 将电话号码替换为 [REDACTED]
censored_text = re.sub(pattern_phone, "[REDACTED]", text)
print(f"sub() 替换电话号码: {censored_text}")

# 使用函数进行替换
def mask_email(match_obj):
username = match_obj.group(1)
domain = match_obj.group(2)
return f"{username[0]}***@{domain}" # 用户名只显示第一个字符

censored_email_text = re.sub(pattern_email, mask_email, text)
print(f"sub() 使用函数替换邮箱: {censored_email_text}")

# 6. re.split() - 分割
pattern_punct = r"[.,:;]\s*" # 按标点符号和后面的空格分割
parts = re.split(pattern_punct, text)
print(f"split(): 按标点分割: {parts}")

# 7. re.compile() - 编译模式
compiled_pattern = re.compile(r"l\w*y", re.IGNORECASE) # 编译查找以l开头y结尾的词
# 多次使用编译后的模式
match1 = compiled_pattern.search(text)
if match1:
print(f"compile() & search(): 找到 '{match1.group(0)}'")
match2 = compiled_pattern.findall("Actually, Lily is lovely.")
print(f"compile() & findall(): 找到 {match2}") # ['Lily', 'lovely']

13.5.6 Match 对象详解

re.match(), re.search()re.finditer() 中的一项成功匹配时,它们会返回一个 Match 对象。这个对象包含了关于匹配结果的详细信息。

Match 对象方法/属性描述示例 (假设 m = re.search(r"(\w+) (\d+)", "Order P123 45"))
m.group(0)m.group()返回整个匹配的字符串。'P123 45'
m.group(n)返回第 n 个捕获组匹配的字符串 (从 1 开始计数)。m.group(1) 返回 'P123', m.group(2) 返回 '45'
m.groups()返回一个包含所有捕获组匹配内容的 元组('P123', '45')
m.groupdict()如果模式中使用了 命名捕获组 (?P<name>...),返回一个包含组名和匹配内容的字典。(需要命名组,如下例)
m.start([group])返回整个匹配或指定 group起始索引 (包含)。m.start() 返回 6, m.start(1) 返回 6, m.start(2) 返回 11
m.end([group])返回整个匹配或指定 group结束索引 (不包含)。m.end() 返回 13, m.end(1) 返回 10, m.end(2) 返回 13
m.span([group])返回一个包含 (start, end) 索引的 元组m.span() 返回 (6, 13), m.span(1) 返回 (6, 10)
m.string传递给 match()search() 的原始字符串。'Order P123 45'
m.re匹配时使用的已编译的模式对象 (Pattern object)。

命名捕获组示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import re

text = "Product ID: ABC-987, Quantity: 50"
# 使用 ?P<name> 定义命名捕获组
pattern_named = r"Product ID: (?P<product_id>[A-Z]+-\d+), Quantity: (?P<quantity>\d+)"

match = re.search(pattern_named, text)
if match:
print("\n--- 使用命名捕获组 ---")
# 通过组名访问捕获的内容
print(f"产品 ID: {match.group('product_id')}") # ABC-987
print(f"数量: {match.group('quantity')}") # 50
# groupdict() 返回包含所有命名组的字典
print(f"捕获字典: {match.groupdict()}") # {'product_id': 'ABC-987', 'quantity': '50'}

13.5.7 正则表达式标志 (Flags)

标志可以修改正则表达式的匹配行为。可以在 re 函数的 flags 参数中指定,或在编译时指定。多个标志可以使用 | (按位或) 组合。

标志简写描述
re.IGNORECASEre.I进行 不区分大小写 的匹配。
re.MULTILINEre.M使 ^$ 匹配 每行的开头和结尾,而不仅仅是整个字符串的开头和结尾。
re.DOTALLre.S使元字符 . 能够匹配 包括换行符 \n 在内 的任何字符。
re.VERBOSEre.X详细模式。允许在模式字符串中添加 空白和注释 以提高可读性,此时模式中的空白会被忽略,# 后到行尾的内容视为注释。
re.ASCIIre.A使 \w, \W, \b, \B, \s, \S 只匹配 ASCII 字符,而不是完整的 Unicode 字符集 (Python 3 默认匹配 Unicode)。
re.UNICODE (默认)re.U使 \w, \W, \b, \B, \s, \S, \d, \D 匹配完整的 Unicode 字符集。这是 Python 3 的默认行为。

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import re

text_multi = """first line
second line
THIRD line"""

# re.I (忽略大小写)
print(f"\n--- Flags 示例 ---")
print(f"re.I: {re.findall(r'line', text_multi, re.IGNORECASE)}") # ['line', 'line', 'line']

# re.M (多行模式)
print(f"re.M (^): {re.findall(r'^s.*', text_multi, re.MULTILINE | re.IGNORECASE)}") # ['second line']
print(f"re.M ($): {re.findall(r'line$', text_multi, re.MULTILINE | re.IGNORECASE)}") # ['line', 'line', 'line']

# re.S (DOTALL)
text_dot = "Hello\nWorld"
print(f"re.S (.): {re.search(r'Hello.World', text_dot, re.DOTALL)}") # 匹配成功
print(f"No re.S (.): {re.search(r'Hello.World', text_dot)}") # 匹配失败 (None)

# re.X (VERBOSE)
# 一个复杂的邮箱模式,使用 VERBOSE 模式添加注释和空格
pattern_verbose = r"""
^ # 匹配字符串开头
[\w\.\-]+ # 用户名部分 (字母、数字、下划线、点、连字符)
@ # @ 符号
([\w\-]+\.)+ # 域名部分 (允许子域名,如 mail.example.)
[a-zA-Z]{2,7} # 顶级域名 (如 .com, .org)
$ # 匹配字符串结尾
"""
email = "test.user-1@sub.example.com"
match_verbose = re.match(pattern_verbose, email, re.VERBOSE)
print(f"re.X (VERBOSE): {'匹配成功' if match_verbose else '匹配失败'}") # 匹配成功

13.5.8 实际应用场景示例

场景 1: 验证中国大陆手机号 (简单示例)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import re

def is_valid_china_mobile(phone_number: str) -> bool:
"""简单验证中国大陆手机号码 (11位数字,常见号段)"""
# 模式解释:
# ^ 匹配字符串开头
# (?:...) 非捕获组
# 1[3-9] 第一位是1,第二位是3到9
# \d{9} 后面跟9位数字
# $ 匹配字符串结尾
pattern = r"^(?:1[3-9])\d{9}$"
if re.match(pattern, phone_number):
return True
else:
return False

print("\n--- 手机号验证 ---")
print(f"13812345678: {is_valid_china_mobile('13812345678')}") # True
print(f"12012345678: {is_valid_china_mobile('12012345678')}") # False (号段不对)
print(f"1381234567: {is_valid_china_mobile('1381234567')}") # False (位数不够)
print(f"138123456789: {is_valid_china_mobile('138123456789')}")# False (位数太多)

注意: 实际手机号验证可能需要更复杂的规则或查询号段数据库。

场景 2: 从 Apache/Nginx 日志中提取 IP 地址和请求路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import re

log_line = '192.168.1.101 - - [03/May/2025:17:20:01 +0900] "GET /index.html HTTP/1.1" 200 1542 "-" "Mozilla/5.0..."'

# 模式解释:
# ^([\d\.]+) 捕获开头的 IP 地址 (数字和点的组合)
# \s+-\s+-\s+ 匹配中间的 ' - - ' 部分
# \[.*?\] 匹配并忽略方括号内的时间戳 (非贪婪)
# \s+" 匹配时间戳后的空格和双引号
# (GET|POST|PUT|DELETE|HEAD) \s+ 捕获请求方法 (GET, POST 等) 和空格
# ([^\s"]+) 捕获请求路径 (非空格、非双引号的字符)
# \s+HTTP/[\d\.]+" 捕获 HTTP 版本部分
# .* 匹配剩余部分
pattern_log = r'^([\d\.]+) \s+-\s+-\s+ \[.*?\] \s+"(GET|POST|PUT|DELETE|HEAD)\s+([^\s"]+)\s+HTTP/[\d\.]+" .*'

match = re.match(pattern_log, log_line)
if match:
ip_address = match.group(1)
method = match.group(2)
path = match.group(3)
print("\n--- 日志解析 ---")
print(f"IP 地址: {ip_address}") # 192.168.1.101
print(f"请求方法: {method}") # GET
print(f"请求路径: {path}") # /index.html
else:
print("日志格式不匹配")

场景 3: 将 Markdown 样式的链接 [text](url) 转换为 HTML <a> 标签

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import re

markdown_text = "这是一个链接 [Google](https://www.google.com) 和另一个 [Python 官网](http://python.org) 的例子。"

# 模式解释:
# \[ 匹配字面量 '['
# ([^\]]+) 捕获链接文本 (不是 ']' 的任意字符一次或多次)
# \] 匹配字面量 ']'
# \( 匹配字面量 '('
# ([^\)]+) 捕获 URL (不是 ')' 的任意字符一次或多次)
# \) 匹配字面量 ')'
pattern_md_link = r'\[([^\]]+)\]\(([^\)]+)\)'

# 使用 re.sub 和反向引用 \1, \2 进行替换
html_text = re.sub(pattern_md_link, r'<a href="\2">\1</a>', markdown_text)

print("\n--- Markdown 转 HTML 链接 ---")
print(f"原始 Markdown: {markdown_text}")
print(f"转换后 HTML: {html_text}")
# 输出: 这是一个链接 <a href="https://www.google.com">Google</a> 和另一个 <a href="http://python.org">Python 官网</a> 的例子。