import logging
import os
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
from configs import settings

# 获取根记录器
logger = logging.getLogger()
# 移除根记录器的所有处理器
for handler in logger.handlers[:]:
    logger.removeHandler(handler)
 
class Logger:
    def __init__(self, name="app", log_file="app.log", level="DEBUG", rotating_type="size", backup_count=5):
        """
        初始化 Logger
        :param name: 日志器名称
        :param log_file: 日志文件路径
        :param level: 日志级别
        :param rotating_type: 滚动类型,"size" 或 "time"
        """
        self.logger = logging.getLogger(name)
 
        # 避免重复添加 Handler
        if not self.logger.handlers:
            self.logger.setLevel(level)
            # 控制台输出
            console_handler = logging.StreamHandler()
            console_handler.setFormatter(self._get_formatter())
            self.logger.addHandler(console_handler)
 
            # 文件输出(滚动日志)
            log_dir = os.path.dirname(log_file)
            if log_dir and not os.path.exists(log_dir):
                os.makedirs(log_dir)  # 创建日志目录
            
            if rotating_type == "size":
                # 基于大小滚动
                file_handler = RotatingFileHandler(
                    log_file, maxBytes=10 * 1024 * 1024, backupCount=backup_count, encoding="utf-8"
                )
            elif rotating_type == "time":
                # 基于时间滚动
                file_handler = TimedRotatingFileHandler(
                    log_file, when="midnight", interval=1, backupCount=backup_count, encoding="utf-8"
                )
            else:
                raise ValueError("rotating_type must be 'size' or 'time'")
 
            file_handler.setFormatter(self._get_formatter())
            self.logger.addHandler(file_handler)
 
    def _get_formatter(self):
        """
        获取日志格式
        """
        return logging.Formatter('%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s')
 
    def debug(self, message):
        self.logger.debug(message, stacklevel=2)
 
    def info(self, message):
        self.logger.info(message, stacklevel=2)
 
    def warning(self, message):
        self.logger.warning(message, stacklevel=2)
 
    def error(self, message):
        self.logger.error(message, stacklevel=2)
 
    def critical(self, message):
        self.logger.critical(message, stacklevel=2)

if settings.app_log_file:
    logger=Logger(name=settings.proc_name, log_file=settings.app_log_file, level=settings.app_log_level)
else:
    logger=logging.getLogger('gunicorn.error')
    if not self.logger.handlers:
        logger=Logger()
 
if __name__ == "__main__":
    # 使用基于文件大小的滚动
    size_logger = Logger(name="SizeLogger", log_file="logs/size_log.log", level="DEBUG", rotating_type="size", backup_count=3)
    size_logger.debug("这是基于文件大小滚动的日志")
 
    # 使用基于时间的滚动
    time_logger = Logger(name="TimeLogger", log_file="logs/time_log.log", level="DEBUG",rotating_type="time", backup_count=7)
    time_logger.debug("这是基于时间滚动的日志")

Logo

欢迎加入 MCP 技术社区!与志同道合者携手前行,一同解锁 MCP 技术的无限可能!

更多推荐