在Windows环境下,持续运行的Python程序会产生大量的日志文件。为了方便管理和节省磁盘空间,我们需要实现日志文件的自动分割、定期归档压缩,并安全清理旧日志文件。下面我将详细介绍如何使用Python的logging
模块和schedule
库来实现这一目标。
1. 日志文件按日期分割
Python的logging
模块提供了多种Handler来处理日志输出。其中,TimedRotatingFileHandler
可以按照时间间隔分割日志文件,非常适合按日期分割的需求。
import logging
import logging.handlers
import datetime
import os
# 定义日志文件存放目录
LOG_DIR = 'logs'
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
# 定义日志文件名称
LOG_FILE = os.path.join(LOG_DIR, 'app.log')
# 配置日志记录器
logger = logging.getLogger('my_app')
logger.setLevel(logging.INFO)
# 创建TimedRotatingFileHandler
handler = logging.handlers.TimedRotatingFileHandler(
LOG_FILE,
when='D', # 每天分割
interval=1, # 每天
backupCount=7, # 最多保留7个日志文件
encoding='utf-8'
)
# 设置日志文件后缀名,例如:app.log.2023-10-27
handler.suffix = "%Y-%m-%d"
# 创建日志格式化器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
# 将Handler添加到Logger
logger.addHandler(handler)
# 示例日志记录
logger.info('This is an info message.')
logger.warning('This is a warning message.')
logger.error('This is an error message.')
代码解释:
LOG_DIR
: 定义日志文件存放的目录,如果目录不存在则创建。LOG_FILE
: 定义日志文件的基本名称。TimedRotatingFileHandler
: 该Handler用于按时间分割日志文件。when='D'
: 表示每天分割日志。interval=1
: 表示分割间隔为1天。backupCount=7
: 表示最多保留7个日志文件,超过7个会自动删除。encoding='utf-8'
: 设置编码格式为UTF-8,防止中文乱码。
handler.suffix = "%Y-%m-%d"
: 设置日志文件后缀名,使其包含日期信息。logging.Formatter
: 定义日志信息的格式。logger.addHandler(handler)
: 将Handler添加到Logger,使其生效。
2. 定期归档压缩日志文件
为了节省磁盘空间,我们可以定期将旧的日志文件进行归档压缩。可以使用zipfile
模块来实现压缩功能,并使用schedule
库来定期执行压缩任务。
import zipfile
import schedule
import time
import shutil
# 定义归档目录
ARCHIVE_DIR = 'archive'
if not os.path.exists(ARCHIVE_DIR):
os.makedirs(ARCHIVE_DIR)
# 定义需要压缩的文件目录
SOURCE_DIR = LOG_DIR
# 定义压缩包名称
def get_archive_name():
now = datetime.datetime.now()
return os.path.join(ARCHIVE_DIR, f'logs_{now.strftime("%Y-%m-%d")}.zip')
# 压缩指定目录下的文件
def zip_dir(dir_path, zip_path):
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(dir_path):
for file in files:
file_path = os.path.join(root, file)
zipf.write(file_path, os.path.relpath(file_path, dir_path))
# 归档压缩任务
def archive_logs():
archive_name = get_archive_name()
zip_dir(SOURCE_DIR, archive_name)
print(f'Logs archived to {archive_name}')
# 使用 shutil 删除源文件
def delete_source_files():
for filename in os.listdir(SOURCE_DIR):
file_path = os.path.join(SOURCE_DIR, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print('Failed to delete %s. Reason: %s' % (file_path, e))
# 定时任务
schedule.every().day.at('02:00').do(archive_logs)
schedule.every().day.at('03:00').do(delete_source_files)
# 运行定时任务
while True:
schedule.run_pending()
time.sleep(60)
代码解释:
ARCHIVE_DIR
: 定义归档文件存放的目录,如果目录不存在则创建。SOURCE_DIR
: 定义需要压缩的日志文件所在的目录,这里指向LOG_DIR
。get_archive_name()
: 生成压缩包的名称,包含日期信息。zip_dir(dir_path, zip_path)
: 压缩指定目录下的所有文件。archive_logs()
: 执行归档压缩任务,先生成压缩包名称,然后调用zip_dir
进行压缩。schedule.every().day.at('02:00').do(archive_logs)
: 使用schedule
库,设置每天凌晨2点执行archive_logs
任务。while True
: 循环运行定时任务,每隔60秒检查是否有需要执行的任务。delete_source_files()
: 使用shutil
模块安全删除日志文件。避免占用过多磁盘空间。
3. 安全清理旧日志文件
TimedRotatingFileHandler
的backupCount
参数可以自动删除旧的日志文件,但是这种方式只能按照文件数量来删除。如果需要按照时间来删除,可以结合os
模块和datetime
模块来实现。
注意: 在删除文件之前,请务必确认文件是否需要保留,避免误删重要数据。
import time
# 定义日志保留天数
RETENTION_DAYS = 30
# 清理旧日志文件
def cleanup_logs():
cutoff = time.time() - (RETENTION_DAYS * 86400) # 86400秒 = 1天
for filename in os.listdir(LOG_DIR):
file_path = os.path.join(LOG_DIR, filename)
if os.path.isfile(file_path):
timestamp = os.path.getmtime(file_path) # 获取文件最后修改时间
if timestamp < cutoff:
try:
os.remove(file_path)
print(f'Deleted old log file: {file_path}')
except OSError as e:
print(f'Error deleting {file_path}: {e}')
# 定时任务
schedule.every().day.at('04:00').do(cleanup_logs)
代码解释:
RETENTION_DAYS
: 定义日志文件保留的天数。cleanup_logs()
: 清理旧日志文件的函数。- 计算cutoff时间,即需要删除的文件的最后修改时间。
- 遍历日志目录下的所有文件,判断是否为文件,并且最后修改时间是否小于cutoff时间。
- 如果是,则删除该文件。
schedule.every().day.at('04:00').do(cleanup_logs)
: 使用schedule
库,设置每天凌晨4点执行cleanup_logs
任务。
4. 完整示例代码
import logging
import logging.handlers
import datetime
import os
import zipfile
import schedule
import time
import shutil
# --- 配置 --- #
LOG_DIR = 'logs'
ARCHIVE_DIR = 'archive'
RETENTION_DAYS = 30
LOG_FILE = os.path.join(LOG_DIR, 'app.log')
# --- 创建目录 --- #
if not os.path.exists(LOG_DIR):
os.makedirs(LOG_DIR)
if not os.path.exists(ARCHIVE_DIR):
os.makedirs(ARCHIVE_DIR)
# --- 日志配置 --- #
logger = logging.getLogger('my_app')
logger.setLevel(logging.INFO)
handler = logging.handlers.TimedRotatingFileHandler(
LOG_FILE,
when='D',
interval=1,
backupCount=7,
encoding='utf-8'
)
handler.suffix = "%Y-%m-%d"
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
# --- 归档压缩 --- #
def get_archive_name():
now = datetime.datetime.now()
return os.path.join(ARCHIVE_DIR, f'logs_{now.strftime("%Y-%m-%d")}.zip')
def zip_dir(dir_path, zip_path):
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
for root, _, files in os.walk(dir_path):
for file in files:
file_path = os.path.join(root, file)
zipf.write(file_path, os.path.relpath(file_path, dir_path))
def archive_logs():
archive_name = get_archive_name()
zip_dir(LOG_DIR, archive_name)
print(f'Logs archived to {archive_name}')
def delete_source_files():
for filename in os.listdir(LOG_DIR):
file_path = os.path.join(LOG_DIR, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
print('Failed to delete %s. Reason: %s' % (file_path, e))
# --- 清理旧日志 --- #
def cleanup_logs():
cutoff = time.time() - (RETENTION_DAYS * 86400)
for filename in os.listdir(LOG_DIR):
file_path = os.path.join(LOG_DIR, filename)
if os.path.isfile(file_path):
timestamp = os.path.getmtime(file_path)
if timestamp < cutoff:
try:
os.remove(file_path)
print(f'Deleted old log file: {file_path}')
except OSError as e:
print(f'Error deleting {file_path}: {e}')
# --- 定时任务 --- #
schedule.every().day.at('02:00').do(archive_logs)
schedule.every().day.at('03:00').do(delete_source_files)
schedule.every().day.at('04:00').do(cleanup_logs)
# --- 运行 --- #
if __name__ == '__main__':
# 示例日志记录
logger.info('Application started.')
logger.warning('Something might be wrong.')
logger.error('An error occurred.')
# 运行定时任务
while True:
schedule.run_pending()
time.sleep(60)
5. 注意事项
- 异常处理: 在实际应用中,需要添加更完善的异常处理机制,例如在文件操作时捕获
OSError
异常,并进行相应的处理。 - 日志级别: 根据实际需求设置合适的日志级别,例如
DEBUG
、INFO
、WARNING
、ERROR
、CRITICAL
。 - 配置化: 将配置信息(例如日志目录、保留天数、定时任务时间)放在配置文件中,方便修改和管理。
- 安全性: 确保日志目录的访问权限,防止未经授权的访问和修改。
- 测试: 在生产环境部署之前,务必进行充分的测试,确保日志管理功能的稳定性和可靠性。
通过以上步骤,你就可以在Windows环境下实现Python日志文件的自动化管理,包括按日期分割、定期归档压缩和安全清理旧日志文件。希望这些信息对你有所帮助!