Python(八):第七章: 文件操作

第七章: 文件操作

Python 提供了强大而灵活的文件操作接口,从基础的读写能力到高级的路径操作和目录管理。本章将由浅入深地介绍 Python 文件操作的全面知识。

7.1 文件打开模式

文件操作的第一步是打开文件,Python 提供了多种打开模式以满足不同需求。

模式描述文件不存在时文件存在时常见应用场景
r只读模式报错从头读取读取配置文件、日志文件
w只写模式创建新文件清空内容生成报告、写入日志
a追加模式创建新文件在末尾追加日志记录、数据收集
r+读写模式报错可读可写需要同时读写的场景
w+读写模式创建新文件清空内容需要先写后读的场景
a+追加读写创建新文件追加且可读数据分析、日志分析
b二进制模式与其他模式组合处理二进制图片、视频、压缩文件
t文本模式(默认)与其他模式组合处理文本文本文件处理

实用提示:选择合适的文件模式可以避免数据丢失。例如,使用 w 模式时要格外小心,因为它会清空现有文件。当不确定时,使用 a 模式追加内容会更安全。

7.2 基本文件操作

7.2.1 文件读取操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 使用 with 语句自动处理文件关闭(推荐方式)
with open('example.txt', mode='r', encoding='utf-8') as file:
# 方法1:一次性读取整个文件
content = file.read() # 读取全部内容到内存
print(content)

# 注意:read()后文件指针已经到达文件末尾
# 需要重新打开文件或使用seek(0)重置指针
file.seek(0) # 重置文件指针到文件开头

# 方法2:读取一行
line = file.readline() # 读取第一行(包含换行符)
print(line)

# 方法3:读取所有行到列表
file.seek(0) # 重置文件指针
lines = file.readlines() # 返回包含所有行的列表
print(lines) # ['第一行\n', '第二行\n', ...]

# 方法4:逐行读取(内存效率最高,推荐用于大文件)
file.seek(0) # 重置文件指针
for line in file: # 文件对象本身是可迭代的
# line包含换行符,通常需要移除
print(line.strip()) # 去除行首行尾的空白字符

7.2.2 文件写入操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 写入文件(覆盖模式)
with open('example.txt', mode='w', encoding='utf-8') as file:
# 方法1:写入字符串
file.write('第一行内容\n') # 注意需手动添加换行符
file.write('第二行内容\n')

# 方法2:写入多行
lines = ['第三行内容\n', '第四行内容\n']
file.writelines(lines) # 注意writelines不会自动添加换行符

# 追加内容到文件
with open('example.txt', mode='a', encoding='utf-8') as file:
file.write('追加的内容\n')

# 读写模式示例
with open('example.txt', mode='r+', encoding='utf-8') as file:
content = file.read(10) # 读取前10个字符
file.write('插入的内容') # 在当前位置(第10个字符后)写入

7.2.3 多文件操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 同时操作多个文件(例如:文件复制)
with open('source.txt', 'r', encoding='utf-8') as source, \
open('destination.txt', 'w', encoding='utf-8') as destination:

# 按块读取和写入(适用于大文件)
chunk_size = 4096 # 4KB 的块大小
while True:
chunk = source.read(chunk_size)
if not chunk: # 到达文件末尾
break
destination.write(chunk)

# 或者简单地复制所有内容
# source.seek(0) # 先重置到文件开头
# destination.write(source.read())

# 或者逐行复制(适合文本处理)
# source.seek(0)
# for line in source:
# # 可以在此添加行处理逻辑
# destination.write(line)

7.2.4 文件修改示例

在实际应用中,我们经常需要读取、修改并写回文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 示例:为Markdown文件的所有标题增加一个#符号
file_name = 'document.md'
output_file = 'document_modified.md'

# 读取并修改
with open(file_name, 'r', encoding='utf-8') as input_file:
lines = input_file.readlines()

# 处理每一行
for i in range(len(lines)):
# 如果行以#开头(表示标题),则增加一个#
if lines[i].strip().startswith('#'):
lines[i] = '#' + lines[i]

# 将修改后的内容写入新文件
with open(output_file, 'w', encoding='utf-8') as output_file:
output_file.writelines(lines)

print(f"文件已修改并保存为 {output_file}")

最佳实践:对于文件修改操作,始终先写入临时文件,然后在确认写入成功后才替换原文件,这样可以防止文件损坏。

7.3 文件指针控制

文件指针(或文件位置)决定了读写操作的起始位置。掌握指针控制对于高级文件操作至关重要。

7.3.1 seek() 和 tell() 函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
with open('example.txt', 'w+', encoding='utf-8') as file:
# 写入一些测试内容
file.write("Hello World! 你好世界!") # 英文部分 "Hello World " 占用 12 个字符,中文部分 "你好世界!" 占用 12 个字符,剩余的空格+符号占据了 4 个字符 总共 28 个字符

# tell() 获取当前文件指针位置
position = file.tell()
print(f"写入内容后的位置: {position}") # 写入内容后的位置: 28

# 将指针移回文件开头
file.seek(0)
print(f"回到文件开头位置: {file.tell()}") # 回到文件开头位置: 0

# 读取全部内容
content = file.read()
print(f"文件全部内容: {content}") # 文件全部内容: Hello World! 你好世界!
print(f"读取后的位置: {file.tell()}") # 读取后的位置: 28

# 再次回到文件开头
file.seek(0)

# 读取前5个字符
print(f"前5个字符: {file.read(5)}") # 前5个字符: Hello
print(f"读取5个字符后的位置: {file.tell()}") # 读取5个字符后的位置: 5

# 回到文件开头
file.seek(0)

# 一次性定位到第13个字符位置(从文件开头算起)
file.seek(13)
print(f"直接定位到第10个位置: {file.tell()}")
print(f"从第13个位置开始读取的内容: {file.read()}") # 从第13个位置开始读取的内容: 你好世界!

注意:在文本模式下,由于字符编码原因,seek() 可能无法精确定位到任意字节位置。对于需要精确控制的场景,应使用二进制模式 (‘rb’, ‘wb’ 等)。

7.3.2 实际应用场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
# 场景:在大日志文件中读取最后100行
def tail(file_path, n=10):
"""读取文件最后n行,类似于Unix的tail命令"""
with open(file_path, 'rb') as f:
# 移动到文件末尾
f.seek(0, 2)
# 文件总大小
total_size = f.tell()

# 初始化变量
block_size = 1024
block = -1
lines = []

# 从文件末尾向前读取
while len(lines) < n and -block * block_size < total_size:
# 移动到倒数第block个块
position = max(0, total_size + block * block_size)
f.seek(position)

# 读取数据块
data = f.read(min(block_size, total_size - position))

# 处理可能被截断的行
if position > 0:
# 丢弃第一行不完整数据
data = data.split(b'\n', 1)[1] if b'\n' in data else b''

# 计算行数
lines_in_block = data.split(b'\n')

# 合并行
lines = lines_in_block + lines
block -= 1

# 返回最后n行
return [line.decode('utf-8') for line in lines[-n:]]

# 使用示例
last_lines = tail('large_log_file.txt', 100)
for line in last_lines:
print(line)

7.4 缓冲区管理

理解缓冲区对于优化文件操作性能至关重要,特别是在处理大量小块写入时。

7.4.1 缓冲设置与刷新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 设置不同的缓冲策略
# buffering=0: 无缓冲 (仅在二进制模式可用)
# buffering=1: 行缓冲 (仅在文本模式可用)
# buffering>1: 指定缓冲区大小(字节)
# buffering=-1: 使用默认缓冲区大小

# 无缓冲 (每次写入都直接写入磁盘)
with open('binary_file.bin', 'wb', buffering=0) as f:
f.write(b'Data will be written immediately')

# 行缓冲 (遇到换行符时刷新)
with open('log.txt', 'w', buffering=1) as f:
f.write('This line will be buffered\n') # 遇到\n会刷新
f.write('Until a newline is encountered') # 保留在缓冲区

# 指定缓冲区大小
with open('large_file.txt', 'w', buffering=4096) as f:
# 4KB缓冲区,适合频繁小写入
for i in range(10000):
f.write(f'Line {i}\n') # 数据会积累到4KB才写入磁盘

# 手动刷新缓冲区
with open('important.txt', 'w') as f:
f.write('Critical data')
f.flush() # 立即刷新缓冲区,确保数据写入磁盘
os.fsync(f.fileno()) # 进一步确保数据写入物理存储设备

7.4.2 缓冲区触发条件

缓冲区会在以下条件下自动刷新:

  1. 缓冲区满时
  2. 文件关闭时(如 with 块结束)
  3. 调用 flush() 方法时
  4. 程序正常退出时
  5. 行缓冲模式下遇到换行符时

性能提示:对于大量小写操作,使用适当的缓冲区大小可以显著提高性能。但对于关键数据,应及时调用 flush() 确保数据安全。

7.5 文件路径操作

有效管理文件路径是文件操作的基础,Python 提供了两种主要方式:传统的 os.path 模块和现代的 pathlib 库。

7.5.1 使用 os.path 模块

方法/属性描述
os.getcwd()获取当前工作目录
os.chdir(path)改变当前工作目录
os.mkdir(name)创建目录
os.makedirs(path)递归创建多级目录
os.rmdir(name)删除空目录
os.remove(path)删除文件
os.listdir(path)列出指定目录的文件和目录
os.rename(src, dst)重命名文件或目录
os.system(command)执行系统命令
os.environ环境变量字典
os.path.exists(path)检查路径是否存在
os.path.isfile(path)检查路径是否为文件
os.path.isdir(path)检查路径是否为目录
os.path.join(path1, path2)连接路径
os.path.split(path)根据最后一个路径分隔符分割路径为(目录, 文件名)
os.path.splitext(path)根据最后一个点号分割路径为(文件名,拓展名)
os.path.dirname(path)获取路径的目录部分
os.path.basename(path)获取路径的文件名部分
os.path.getsize(path)获取文件大小(字节)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os.path
import datetime

# 获取当前脚本所在目录
current_dir = os.path.dirname(os.path.abspath(__file__))

# 在当前目录下创建data.txt
data_path = os.path.join(current_dir, "data.txt")

# 首先创建一个测试文件
with open(data_path, 'w', encoding='utf-8') as file:
file.write("这是测试内容!\nHello World!")

# 文件路径处理演示
print(f"=== 文件路径信息 ===")
directory = os.path.dirname(data_path)
filename = os.path.basename(data_path)
name, extension = os.path.splitext(filename)
print(f"目录: {directory}")
print(f"文件名: {filename}")
print(f"纯名称: {name}")
print(f"扩展名: {extension}")

print(f"\n=== 路径检查 ===")
exists = os.path.exists(data_path)
is_file = os.path.isfile(data_path)
is_dir = os.path.isdir(data_path)
print(f"文件是否存在: {exists}")
print(f"是否是文件: {is_file}")
print(f"是否是目录: {is_dir}")

print(f"\n=== 文件信息 ===")
try:
size = os.path.getsize(data_path)
mod_time = os.path.getmtime(data_path) # 获取修改时间
create_time = os.path.getctime(data_path) # 获取创建时间
access_time = os.path.getatime(data_path) # 获取访问时间

# 转换时间戳为可读时间
mod_datetime = datetime.datetime.fromtimestamp(mod_time)
create_datetime = datetime.datetime.fromtimestamp(create_time)
access_datetime = datetime.datetime.fromtimestamp(access_time)

print(f"文件大小: {size} 字节")
print(f"创建时间: {create_datetime}")
print(f"修改时间: {mod_datetime}")
print(f"访问时间: {access_datetime}")

print(f"\n=== 文件内容 ===")
with open(data_path, 'r', encoding='utf-8') as file:
content = file.read()
print(f"文件内容:\n{content}")

except OSError as e:
print(f"获取文件信息时发生错误: {e}")

7.5.2 使用 pathlib 模块 (Python 3.4+)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
from pathlib import Path
import datetime

# 获取当前脚本所在目录并创建data.txt的路径
current_file = Path(__file__)
data_path = current_file.parent / "data.txt"

# 创建并写入测试文件
data_path.write_text("这是测试内容!\nHello World!", encoding='utf-8')

print("=== 基本路径信息 ===")
print(f"完整路径: {data_path}")
print(f"父目录: {data_path.parent}")
print(f"文件名: {data_path.name}")
print(f"纯名称: {data_path.stem}")
print(f"扩展名: {data_path.suffix}")
print(f"所有路径组件: {data_path.parts}")

print("\n=== 路径解析 ===")
print(f"绝对路径: {data_path.absolute()}")
print(f"解析路径: {data_path.resolve()}")
try:
print(f"相对路径: {data_path.relative_to(Path.cwd())}")
except ValueError:
print("无法计算相对路径(文件可能在不同驱动器或目录)")

print("\n=== 文件状态检查 ===")
print(f"文件是否存在: {data_path.exists()}")
print(f"是否是文件: {data_path.is_file()}")
print(f"是否是目录: {data_path.is_dir()}")
print(f"是否是符号链接: {data_path.is_symlink()}")

print("\n=== 文件信息 ===")
if data_path.exists() and data_path.is_file():
stat = data_path.stat()
print(f"文件大小: {stat.st_size} 字节")
print(f"创建时间: {datetime.datetime.fromtimestamp(stat.st_ctime)}")
print(f"修改时间: {datetime.datetime.fromtimestamp(stat.st_mtime)}")
print(f"访问时间: {datetime.datetime.fromtimestamp(stat.st_atime)}")

print("\n=== 文件内容 ===")
print(f"文件内容:\n{data_path.read_text(encoding='utf-8')}")

print("\n=== 路径修改示例 ===")
new_name = data_path.with_name("newdata.txt")
new_ext = data_path.with_suffix(".csv")
new_stem = data_path.with_stem("newdata")
print(f"修改整个文件名: {new_name}")
print(f"修改扩展名: {new_ext}")
print(f"仅修改文件名(不含扩展名): {new_stem}")
功能os.path 方式pathlib 方式推荐
路径连接os.path.join(dir, file)Path(dir) / filepathlib 更直观
获取目录os.path.dirname(path)path.parentpathlib 属性访问更清晰
获取文件名os.path.basename(path)path.namepathlib 属性访问更清晰
分离扩展名os.path.splitext(path)[1]path.suffixpathlib 更简洁
检查存在os.path.exists(path)path.exists()两者类似,pathlib 面向对象
获取绝对路径os.path.abspath(path)path.absolute()两者相当

最佳实践:在新项目中优先使用 pathlib,它提供了更现代、更直观的面向对象接口。在维护旧代码时可能需要继续使用 os.path。

7.6 高级文件操作

7.6.1 二进制文件操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# 读取整个二进制文件
def read_binary_file(file_path):
"""读取并返回整个二进制文件的内容"""
with open(file_path, "rb") as f:
return f.read()


# 分块读取大型二进制文件
def process_large_binary_file(file_path, chunk_size=1024 * 1024):
"""分块处理大型二进制文件,避免内存溢出"""
with open(file_path, "rb") as f:
while True:
chunk = f.read(chunk_size) # 每次读取1MB
if not chunk: # 到达文件末尾
break

# 处理当前数据块
process_chunk(chunk) # 假设的处理函数


# 处理数据块的函数
def process_chunk(chunk):
"""处理二进制数据块的函数"""
# 这里可以根据需要实现具体的数据处理逻辑
# 例如:计算校验和、搜索特定字节模式、转换数据格式等
chunk_size = len(chunk)
print(f"处理数据块: {chunk_size} 字节")

# 示例:计算数据块的哈希值
import hashlib
chunk_hash = hashlib.md5(chunk).hexdigest()
print(f"数据块MD5哈希值: {chunk_hash}")

# 示例:检查数据块中的特定字节序列
if b'\x00\x00\xff\xff' in chunk:
print("在数据块中发现特定字节序列")

# 返回处理结果(可选)
return {
'size': chunk_size,
'hash': chunk_hash
}


# 读取文件特定部分
def read_file_section(file_path, start, length):
"""读取文件中从start位置开始的length个字节"""
with open(file_path, "rb") as f:
f.seek(start) # 移动文件指针到指定位置
return f.read(length) # 读取指定长度字节


# 检测文件类型
def detect_file_type(file_path):
"""通过文件头部特征识别文件类型"""
# 常见文件格式的魔数(Magic Numbers)
file_signatures = {
b'\x89PNG\r\n\x1a\n': 'PNG image',
b'\xff\xd8\xff': 'JPEG image',
b'GIF87a': 'GIF image (87a)',
b'GIF89a': 'GIF image (89a)',
b'%PDF': 'PDF document',
b'PK\x03\x04': 'ZIP archive',
b'\x50\x4b\x03\x04': 'ZIP archive', # PK..
b'\x1f\x8b\x08': 'GZIP compressed file',
}

with open(file_path, "rb") as f:
# 读取文件前16个字节用于检测
header = f.read(16)

for signature, file_type in file_signatures.items():
if header.startswith(signature):
return file_type

return "未知文件类型"


# 实际演示:处理图像文件
def image_file_demo():
"""演示二进制文件操作的实际应用"""
# 定义两个图形的基础文件路径
png_path = "example.png"
jpg_path = "example.jpg"

# 检测图像类型
print(f"检测文件类型: {png_path}{detect_file_type(png_path)}") # 检测文件类型: example.png 是 JPEG image
print(f"检测文件类型: {jpg_path}{detect_file_type(jpg_path)}") # 检测文件类型: example.jpg 是 JPEG image

# 读取PNG文件头部信息
png_header = read_file_section(png_path, 0, 24)
print(f"\nPNG文件头部字节: {png_header.hex(' ', 4)}") # PNG文件头部字节: ffd8ffe0 00104a46 49460001 01000001 00010000 ffdb0043

# 获取文件大小
png_data = read_binary_file(png_path)
jpg_data = read_binary_file(jpg_path)

# 小技巧:在len函数中使用:, 即可以将数字以千分位分隔符展示
print(f"\n{png_path} 文件大小: {len(png_data):,} 字节") # example.png 文件大小: 7,189 字节
print(f"{jpg_path} 文件大小: {len(jpg_data):,} 字节") # example.jpg 文件大小: 7,189 字节

# 处理大型二进制文件
process_large_binary_file(png_path)
process_large_binary_file(jpg_path)

if __name__ == '__main__':
image_file_demo()

7.6.2 临时文件操作

临时文件对于需要中间处理结果但不想留下永久文件的操作非常有用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import tempfile
import os
import time
import random


# 示例2: 使用临时目录处理多个文件
def process_batch_files(data_items):
"""在临时目录中创建多个文件并进行批处理"""
results = []

with tempfile.TemporaryDirectory(prefix="batch_") as temp_dir:
print(f"创建临时工作区 {temp_dir}")

# 创建多个数据文件
file_paths = []
for i, data in enumerate(data_items):
# 获取到临时目录下的文件路径
file_path = os.path.join(temp_dir, f"data_{i}.txt")
with open(file_path, "w", encoding="utf-8") as f:
f.write(data)
file_paths.append(file_path)

# 处理所有的文件
for file_path in file_paths:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# 模拟处理:计算字符数并添加到结果集中
results.append(f"文件{os.path.basename(file_path)} 包含 {len(content)} 个字符")
# 列出处理的文件
print(f"处理文件为{','.join(os.path.basename(p) for p in file_paths)}")

# 退出with块后,tempfile会自动删除临时目录
# 暂停30秒,等待用户查看结果
print("处理开始,大约需要30秒,请稍候...")
time.sleep(random.randint(10, 30))
print("处理完成")

return results



# 演示临时目录的批处理使用
data_to_process = [
"这是第一个文件的测试内容!",
"这是第二个文件包含更多的信息以及携带数字13123123123132",
"这是第三个文件包含中文,你好,世界!"]

results = process_batch_files(data_to_process)
print("\n处理结果为:")
for r in results:
print(results)

7.7 目录操作

目录操作是文件系统操作的重要组成部分,Python 提供了多种目录操作方法。

7.7.1 基本目录操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import os
import shutil

# 获取当前工作目录
current_dir = os.getcwd()
print(f"当前工作目录: {current_dir}")

# 更改当前工作目录
os.chdir("../")
print(f"更改后的工作目录: {os.getcwd()}")

# 列出目录内容
entries = os.listdir(".")
print(f"目录内容: {entries}")

# 过滤目录内容
files_only = [f for f in entries if os.path.isfile(f)]
dirs_only = [d for d in entries if os.path.isdir(d)]
print(f"文件: {files_only}")
print(f"目录: {dirs_only}")

# 创建目录
os.chdir(current_dir) # 切换回原目录
os.mkdir("new_dir") # 创建单个目录
os.makedirs("new_dir2/sub_dir/sub_sub_dir",exist_ok=True) # 创建多级目录 (exist_ok=True 忽略已存在的目录)

# 删除目录
os.rmdir("new_dir") # 只能删除空目录
shutil.rmtree("new_dir2") # 删除目录以及所有内容(谨慎使用!!!)

7.7.2 使用 pathlib 进行目录操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pathlib import Path
import shutil

# 创建目录
Path("new_dir").mkdir(exist_ok=True)
Path("parent/child/grandchild").mkdir(parents=True, exist_ok=True) # 创建多层目录 parents=True 递归创建父目录

# 列出目录内容
p = Path(".")
for item in p.iterdir():
if item.is_file():
print(f"文件: {item},大小: {item.stat().st_size} bytes")
elif item.is_dir():
print(f"目录: {item}")

# 过滤特定类型的文件
python_files = list(p.glob("*.py")) # 列出当前目录下所有.py 文件
all_python_files = list(p.rglob("*.py")) # 递归搜索所有子目录

print(f"当前目录下Python文件: {[f.name for f in python_files]}")
print(f"递归搜索所有Python文件: {[f.name for f in all_python_files]}")
print(f"所有Python文件: {len(all_python_files)}")


# 删除目录
Path('new_dir').rmdir() # 只能删除空目录
shutil.rmtree('parent') # 删除目录及其所有内容
操作os/shutil 方法pathlib 方法优势比较
获取当前目录os.getcwd()Path.cwd()pathlib 返回 Path 对象,便于后续操作
切换工作目录os.chdir(path)(无直接等效方法)os 更适合改变全局工作目录
列出目录内容os.listdir(path)Path(path).iterdir()pathlib 直接返回 Path 对象,无需再拼接路径
创建单个目录os.mkdir(path)Path(path).mkdir()功能相同,pathlib 更面向对象
创建多级目录os.makedirs(path, exist_ok=True)Path(path).mkdir(parents=True, exist_ok=True)语义更明确,参数名更具描述性
删除空目录os.rmdir(path)Path(path).rmdir()功能相同,pathlib 更面向对象
递归删除目录shutil.rmtree(path)(无直接等效方法,需循环删除)os/shutil 提供单一高效操作
文件路径拼接os.path.join(dir, file)Path(dir) / filepathlib 使用/运算符更直观简洁
检查路径是否存在os.path.exists(path)Path(path).exists()功能相同,pathlib 更面向对象
检查是否为文件os.path.isfile(path)Path(path).is_file()功能相同,pathlib 更面向对象
检查是否为目录os.path.isdir(path)Path(path).is_dir()功能相同,pathlib 更面向对象

7.7.3 递归遍历目录树

递归遍历是处理目录层次结构的强大工具,在很多需要列举文件目录树的场景都可以采用该思路去打印输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 使用os.walk遍历目录树
import os
#示例输出:
# 学习用/
# data.txt - 0.04 KB
# example.jpg - 7.02 KB
# example.png - 7.02 KB
# example.py - 4.34 KB
# example.txt - 0.03 KB
# main.py - 4.54 KB
# requirements.txt - 0.01 KB
# study.py - 1.40 KB
# 电子商务系统实现笔记.md - 24.60 KB
# (目录大小: 49.00 KB)

def scan_directory(directory):
"""递归扫描目录,显示结构和文件大小"""
total_size = 0
for root, dirs, files in os.walk(directory): # 其中root:当前目录路径,dirs:子目录列表,files:文件列表
# 计算当前的目录深度(用于缩进)
# 计算目录深度:
# 1. root.replace(directory, "") - 将当前路径中的起始目录部分替换为空字符串,得到相对路径
# 2. .count(os.sep) - 统计相对路径中目录分隔符(如'/'或'\')的数量
# 每一个分隔符代表一层目录深度,因此分隔符的数量就等于目录的嵌套层级
level = root.replace(directory, "").count(os.sep)
indent = ' ' * 4 * level
# 打印当前目录
print(f"{indent}{os.path.basename(root)}/")

# 缩进子文件
sub_indent = ' ' * 4 * (level + 1)

# 统计当前目录下的文件大小
dir_size = 0
for file in files:
file_path = os.path.join(root, file)
file_size = os.path.getsize(file_path)
dir_size += file_size
print(f"{sub_indent}{file} - {file_size / 1024:.2f} KB")

# 累加总大小
total_size += dir_size
print(f"{sub_indent}(目录大小: {dir_size / 1024:.2f} KB)")

return total_size / 1024 # 返回总大小(单位:KB)
if __name__ == '__main__':
total_size = scan_directory('D:/python/学习用')
print(f"总大小: {total_size:.2f} KB")

7.7.4 文件复制、移动和删除操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import os
import shutil

# ========== 文件复制操作 ==========
# shutil.copy(src, dst): 复制文件到目标路径,不保留元数据(如文件的创建时间、修改时间等)
# 参数说明:src - 源文件路径,dst - 目标路径(可以是目录或文件名)
# 返回值:目标文件路径
shutil.copy('source.txt', 'destination.txt')

# shutil.copy2(src, dst): 复制文件到目标路径,保留所有元数据(如修改时间、访问时间、权限等)
# 参数说明:同copy函数
# 返回值:目标文件路径
shutil.copy2('source.txt', 'destination.txt')

# ========== 目录复制操作 ==========
# shutil.copytree(src, dst): 递归复制整个目录树,目标目录不能已存在
# 参数说明:src - 源目录,dst - 目标目录(必须不存在)
# 返回值:目标目录路径
shutil.copytree('source_dir', 'destination_dir')

# shutil.copytree 高级用法:使用ignore参数忽略特定文件
# shutil.ignore_patterns(): 创建一个忽略函数,用于过滤不需要复制的文件
# 参数说明:可变参数,接受多个glob风格的模式字符串
shutil.copytree('source_dir', 'destination_dir',
ignore=shutil.ignore_patterns('*.pyc', '*.git'))

# ========== 文件移动操作 ==========
# shutil.move(src, dst): 移动文件或目录到目标路径
# 参数说明:src - 源路径,dst - 目标路径
# 返回值:目标路径
# 用法1:重命名文件
shutil.move('old_name.txt', 'new_name.txt')
# 用法2:移动文件到其他目录
shutil.move('file.txt', 'directory/')

# ========== 文件删除操作 ==========
# os.remove(path): 删除指定路径的文件(不能删除目录)
# 参数说明:path - 要删除的文件路径
# 注意:如果文件不存在会抛出FileNotFoundError异常
os.remove('file.txt')

7.8 文件监控与变更检测

在某些应用场景中,需要监控文件变化并作出响应。

7.8.1 基础文件监控

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import os
import time


def monitor_file(file_path, interval=1, encoding='utf-8'):
"""监控文件变化,并输出新增内容"""
if not os.path.exists(file_path):
print(f"文件 {file_path} 不存在")
return

# 获取初始状态
last_size = os.path.getsize(file_path) # 文件大小
last_modified = os.path.getmtime(file_path) # 最后修改时间戳
print(f"开始监控文件:{file_path}")
print(f"文件大小:{last_size} 最后修改时间:{time.ctime(last_modified)}")
try:
while True:
# 检查文件是否被修改
current_modified = os.path.getmtime(file_path)
current_size = os.path.getsize(file_path)
if current_modified != last_modified:
print(f"文件在{time.ctime(current_modified)}被修改")
# 如果文件增大了,读取新增内容
if current_size > last_size:
# 尝试不同的编码方式读取文件
encodings_to_try = ['utf-8', 'gbk', 'gb2312', 'gb18030']
content_read = False

for enc in encodings_to_try:
try:
with open(file_path, "r", encoding=enc) as f:
f.seek(last_size) # 移动文件指针到上次读取位置
new_content = f.read()
print(f"新增内容:\n{new_content}")
# 更新当前使用的编码
encoding = enc
content_read = True
break
except UnicodeDecodeError:
continue

if not content_read:
# 如果所有编码都失败,尝试以二进制方式读取并显示
try:
with open(file_path, "rb") as f:
f.seek(last_size)
binary_content = f.read()
print(f"无法解码文件内容,显示二进制内容: {binary_content}")
except Exception as e:
print(f"读取文件失败: {e}")

elif current_size < last_size: # 文件缩小了,可能是被清空了
print("文件大小减小了,可能被截断或重写")

# 更新状态
last_modified = current_modified
last_size = current_size

time.sleep(interval) # 休眠一段时间再检查文件
except KeyboardInterrupt:
print("监控已停止")



if __name__ == '__main__':
monitor_file("destination.txt", interval=1)

7.8.2 使用 watchdog 库进行高级文件监控

对于更复杂的文件系统监控需求,Python 的第三方库 watchdog 提供了更强大的功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent, DirDeletedEvent, \
FileDeletedEvent, DirModifiedEvent, FileModifiedEvent, DirMovedEvent, FileMovedEvent
import time
import os


class MyHandler(FileSystemEventHandler):
def on_created(self, event: DirCreatedEvent | FileCreatedEvent) -> None:
if not event.is_directory:
print(f"文件被创建: {event.src_path}")

def on_deleted(self, event: DirDeletedEvent | FileDeletedEvent) -> None:
if not event.is_directory:
print(f"文件被删除: {event.src_path}")

def on_modified(self, event: DirModifiedEvent | FileModifiedEvent) -> None:
if not event.is_directory:
print(f"文件被修改: {event.src_path}")

def on_moved(self, event: DirMovedEvent | FileMovedEvent) -> None:
if not event.is_directory:
print(f"文件被移动: {event.src_path} -> {event.dest_path}")


def watch_directory(path: str) -> None:
# 确保监控的是目录而不是文件
if os.path.isfile(path):
# 如果是文件,则监控其所在的目录
directory = os.path.dirname(path)
if not directory: # 如果是当前目录下的文件
directory = '.'
else:
directory = path

event_handler = MyHandler()
observer = Observer()
observer.schedule(event_handler, directory, recursive=True)
observer.start()
try:
print(f"开始监控目录: {directory}")
print("按Ctrl+C停止监控...")
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()


if __name__ == '__main__':
watch_directory(".") # 监控当前目录,或者指定一个确实存在的目录路径

7.9 实际应用场景示例

7.9.1 文件备份工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import os
import shutil
import datetime
from pathlib import Path
import zipfile

def backup_directory(source_dir, backup_dir=None, zip_backup=True):
"""
创建目录的备份

参数:
source_dir: 要备份的源目录
backup_dir: 备份文件存放目录(默认在源目录的父目录)
zip_backup: 是否创建zip压缩备份

返回:
备份文件的路径
"""
# 确保源目录存在
source_path = Path(source_dir)
if not source_path.exists() or not source_path.is_dir():
raise ValueError(f"源目录 '{source_dir}' 不存在或不是一个目录")

# 设置默认备份目录
if backup_dir is None:
backup_dir = source_path.parent
else:
backup_dir = Path(backup_dir)
if not backup_dir.exists():
backup_dir.mkdir(parents=True)

# 创建备份文件名(包含时间戳)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{source_path.name}_backup_{timestamp}"
backup_path = backup_dir / backup_name

if zip_backup:
# 创建ZIP备份
zip_path = str(backup_path) + '.zip'
print(f"创建ZIP备份: {zip_path}")

with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
# 遍历源目录中的所有文件
for root, _, files in os.walk(source_dir):
for file in files:
file_path = os.path.join(root, file)
# 计算文件在ZIP中的相对路径
rel_path = os.path.relpath(file_path, source_dir)
print(f"添加: {rel_path}")
zipf.write(file_path, rel_path)

return zip_path
else:
# 创建目录备份(复制)
print(f"创建目录备份: {backup_path}")
shutil.copytree(source_path, backup_path)
return str(backup_path)

# 使用示例
# source = "/path/to/important_data"
# backup = backup_directory(source, zip_backup=True)
# print(f"备份已创建: {backup}")

7.9.2 日志分析工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
from collections import Counter
import re
from datetime import datetime
import matplotlib.pyplot as plt
from pathlib import Path

def analyze_log_file(log_path, pattern=None):
"""
分析日志文件并生成报告

参数:
log_path: 日志文件路径
pattern: 用于匹配日志行的正则表达式模式(默认为None,表示所有行)

返回:
包含分析结果的字典
"""
log_path = Path(log_path)
if not log_path.exists():
raise FileNotFoundError(f"日志文件不存在: {log_path}")

# 初始化结果
results = {
'total_lines': 0,
'matched_lines': 0,
'errors': 0,
'warnings': 0,
'by_hour': Counter(),
'ip_addresses': Counter(),
'status_codes': Counter(),
'top_urls': Counter()
}

# 编译正则表达式
if pattern:
regex = re.compile(pattern)

# IP地址模式
ip_pattern = re.compile(r'\b(?:\d{1,3}\.){3}\d{1,3}\b')

# HTTP状态码模式
status_pattern = re.compile(r'\s(\d{3})\s')

# 时间戳模式(假设格式为[DD/Mon/YYYY:HH:MM:SS +ZZZZ])
timestamp_pattern = re.compile(r'\[(\d{2}/\w{3}/\d{4}):(\d{2}):\d{2}:\d{2}\s[+\-]\d{4}\]')

# URL模式
url_pattern = re.compile(r'"(?:GET|POST|PUT|DELETE)\s+([^\s"]+)')

# 错误和警告模式
error_pattern = re.compile(r'ERROR|CRITICAL|FATAL', re.IGNORECASE)
warning_pattern = re.compile(r'WARNING|WARN', re.IGNORECASE)

# 读取和分析日志文件
with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
results['total_lines'] += 1

# 应用模式匹配过滤(如果提供)
if pattern and not regex.search(line):
continue

results['matched_lines'] += 1

# 提取IP地址
ip_matches = ip_pattern.findall(line)
if ip_matches:
results['ip_addresses'].update([ip_matches[0]])

# 提取HTTP状态码
status_match = status_pattern.search(line)
if status_match:
results['status_codes'].update([status_match.group(1)])

# 提取URL
url_match = url_pattern.search(line)
if url_match:
results['top_urls'].update([url_match.group(1)])

# 提取时间并按小时汇总
time_match = timestamp_pattern.search(line)
if time_match:
date_str, hour = time_match.groups()
results['by_hour'].update([int(hour)])

# 检查错误和警告
if error_pattern.search(line):
results['errors'] += 1
elif warning_pattern.search(line):
results['warnings'] += 1

return results

def generate_log_report(results, output_dir=None):
"""生成日志分析报告(文本和图表)"""
output_dir = Path(output_dir) if output_dir else Path.cwd()
if not output_dir.exists():
output_dir.mkdir(parents=True)

# 创建文本报告
report_path = output_dir / "log_analysis_report.txt"
with open(report_path, 'w', encoding='utf-8') as f:
f.write("=== 日志分析报告 ===\n")
f.write(f"总行数: {results['total_lines']}\n")
f.write(f"匹配行数: {results['matched_lines']}\n")
f.write(f"错误数: {results['errors']}\n")
f.write(f"警告数: {results['warnings']}\n\n")

f.write("=== 按小时分布 ===\n")
for hour in sorted(results['by_hour']):
f.write(f"{hour}时: {results['by_hour'][hour]}行\n")

f.write("\n=== 前10个IP地址 ===\n")
for ip, count in results['ip_addresses'].most_common(10):
f.write(f"{ip}: {count}次\n")

f.write("\n=== HTTP状态码统计 ===\n")
for status, count in results['status_codes'].most_common():
f.write(f"{status}: {count}次\n")

f.write("\n=== 前10个URL ===\n")
for url, count in results['top_urls'].most_common(10):
f.write(f"{url}: {count}次\n")

# 生成图表报告

# 1. 按小时分布图
plt.figure(figsize=(10, 6))
hours = range(24)
counts = [results['by_hour'].get(hour, 0) for hour in hours]
plt.bar(hours, counts)
plt.xlabel('小时')
plt.ylabel('日志条目数')
plt.title('日志按小时分布')
plt.xticks(hours)
plt.grid(True, axis='y', alpha=0.3)
plt.savefig(output_dir / 'hourly_distribution.png')

# 2. HTTP状态码分布饼图
plt.figure(figsize=(8, 8))
status_codes = list(results['status_codes'].keys())
counts = list(results['status_codes'].values())
plt.pie(counts, labels=status_codes, autopct='%1.1f%%', startangle=140)
plt.axis('equal')
plt.title('HTTP状态码分布')
plt.savefig(output_dir / 'status_codes_pie.png')

# 3. 前5个IP地址条形图
plt.figure(figsize=(10, 6))
top_ips = results['ip_addresses'].most_common(5)
ips = [ip for ip, _ in top_ips]
counts = [count for _, count in top_ips]
plt.barh(ips, counts)
plt.xlabel('请求次数')
plt.ylabel('IP地址')
plt.title('前5个IP地址')
plt.grid(True, axis='x', alpha=0.3)
plt.tight_layout()
plt.savefig(output_dir / 'top_ips.png')

return report_path

# 使用示例
# log_file = "access.log"
# results = analyze_log_file(log_file)
# report_path = generate_log_report(results, "reports")
# print(f"报告已生成: {report_path}")

7.9.3 文件同步工具

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import os
import shutil
import hashlib
from pathlib import Path
import time
import logging

# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("file_sync.log"),
logging.StreamHandler()
]
)

def calculate_file_hash(filepath):
"""计算文件的MD5哈希值"""
hash_md5 = hashlib.md5()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()

def sync_directories(source_dir, target_dir, delete=False, exclude=None):
"""
同步两个目录的内容

参数:
source_dir: 源目录
target_dir: 目标目录
delete: 是否删除目标目录中源目录没有的文件
exclude: 要排除的文件/目录列表

返回:
操作统计信息
"""
source_dir = Path(source_dir)
target_dir = Path(target_dir)
exclude = exclude or []

# 确保目录存在
if not source_dir.exists():
raise ValueError(f"源目录不存在: {source_dir}")

if not target_dir.exists():
logging.info(f"创建目标目录: {target_dir}")
target_dir.mkdir(parents=True)

# 初始化统计
stats = {
"copied": 0,
"updated": 0,
"deleted": 0,
"skipped": 0
}

# 获取源文件清单
source_files = {}
for root, dirs, files in os.walk(source_dir):
# 从dirs中移除要排除的目录(修改原地)
dirs[:] = [d for d in dirs if d not in exclude]

for file in files:
if file in exclude:
continue

file_path = Path(root) / file
rel_path = file_path.relative_to(source_dir)
source_files[rel_path] = file_path

# 同步文件到目标目录
for rel_path, source_path in source_files.items():
target_path = target_dir / rel_path

# 确保目标目录存在
target_path.parent.mkdir(parents=True, exist_ok=True)

# 检查文件是否需要更新
if not target_path.exists():
logging.info(f"复制新文件: {rel_path}")
shutil.copy2(source_path, target_path)
stats["copied"] += 1
else:
# 比较修改时间和哈希值
source_mtime = os.path.getmtime(source_path)
target_mtime = os.path.getmtime(target_path)

if abs(source_mtime - target_mtime) > 1: # 1秒容差
# 进一步比较内容哈希
source_hash = calculate_file_hash(source_path)
target_hash = calculate_file_hash(target_path)

if source_hash != target_hash:
logging.info(f"更新文件: {rel_path}")
shutil.copy2(source_path, target_path)
stats["updated"] += 1
else:
stats["skipped"] += 1
else:
stats["skipped"] += 1

# 处理需要删除的文件
if delete:
for root, dirs, files in os.walk(target_dir):
for file in files:
file_path = Path(root) / file
rel_path = file_path.relative_to(target_dir)

if rel_path not in source_files and file not in exclude:
logging.info(f"删除多余文件: {rel_path}")
file_path.unlink()
stats["deleted"] += 1

# 删除空目录(从下向上遍历)
for root, dirs, files in os.walk(target_dir, topdown=False):
for dir_name in dirs:
dir_path = Path(root) / dir_name
if not any(dir_path.iterdir()): # 检查目录是否为空
logging.info(f"删除空目录: {dir_path.relative_to(target_dir)}")
dir_path.rmdir()

return stats

# 定期同步功能
def periodic_sync(source_dir, target_dir, interval=3600, delete=False, exclude=None):
"""
定期同步两个目录

参数:
source_dir: 源目录
target_dir: 目标目录
interval: 同步间隔(秒)
delete: 是否删除目标目录中多余文件
exclude: 要排除的文件/目录列表
"""
logging.info(f"启动定期同步: 从 {source_dir}{target_dir}, 间隔 {interval}秒")

try:
while True:
logging.info("开始同步...")
start_time = time.time()

try:
stats = sync_directories(source_dir, target_dir, delete, exclude)
logging.info(f"同步完成: 复制={stats['copied']}, 更新={stats['updated']}, "
f"删除={stats['deleted']}, 跳过={stats['skipped']}")
except Exception as e:
logging.error(f"同步出错: {str(e)}")

# 计算实际等待时间
elapsed = time.time() - start_time
wait_time = max(0, interval - elapsed)

logging.info(f"等待{wait_time:.1f}秒后进行下次同步...")
time.sleep(wait_time)

except KeyboardInterrupt:
logging.info("同步服务已停止")

# 使用示例
# sync_directories("/path/to/source", "/path/to/backup", delete=True, exclude=[".git", "node_modules"])
# 定期同步(每小时)
# periodic_sync("/path/to/source", "/path/to/backup", interval=3600, delete=True)