diff --git a/utils/logger.py b/utils/logger.py new file mode 100644 index 0000000..5b0c2a4 --- /dev/null +++ b/utils/logger.py @@ -0,0 +1,343 @@ +from __future__ import annotations + +import logging +import logging.handlers +import os +from pathlib import Path +import sys + + +def setup_logging( + *, + app_name: str | None = "script", + log_dir: str | Path = "logs", + console_level: int | None = None, + file_level: int | None = None, + root_level: int | None = None, + calendar: str | None = None, # "gregorian" | "jalali" | None + json_logs: bool | None = None, + enable_file: bool | None = None, # None -> auto based on docker + env overrides + max_bytes: int = 10 * 1024 * 1024, + backup_count: int = 10, + use_rich: bool = False, + include_app_name: bool = True, + include_name: bool = True, + microseconds: int = 3, # 0, 3, or 6 + include_pid: bool = True, + include_thread: bool = True, +) -> None: + """ + Script-friendly logging: + - Console always + - File only if enabled (default: enabled locally, disabled in Docker) + - Optional JSON logs + + Formatter options: + - microseconds: 0 (none), 3 (milliseconds), 6 (microseconds) + - app_name: if None, it's omitted + - include_pid/include_thread toggles + """ + + if use_rich: + try: + from rich.logging import RichHandler + except Exception: + use_rich = False + + if use_rich and not sys.stderr.isatty(): + use_rich = False + + # ---- Environment overrides ---- + env_root = os.getenv("LOG_LEVEL") + env_console = os.getenv("LOG_LEVEL_CONSOLE") + env_file = os.getenv("LOG_LEVEL_FILE") + + if root_level is None: + root_level = _level_from_env(env_root, default=logging.DEBUG) + if console_level is None: + console_level = _level_from_env(env_console, default=logging.INFO) + if file_level is None: + file_level = _level_from_env(env_file, default=logging.DEBUG) + + if json_logs is None: + json_logs = _truthy_env(os.getenv("LOG_JSON"), default=False) + + if enable_file is None: + env_log_to_file = os.getenv("LOG_TO_FILE") + if env_log_to_file is not None: + enable_file = _truthy_env(env_log_to_file, default=True) + else: + enable_file = not _looks_like_docker() + + log_dir = Path(os.getenv("LOG_DIR", str(log_dir))).resolve() + + # ---- Calendar selection ---- + if calendar is None: + calendar = "gregorian" + calendar = calendar.strip().lower() + + if calendar == "jalali": + try: + import jdatetime # noqa: F401 + except Exception: + calendar = "gregorian" + + if calendar not in ("gregorian", "jalali"): + raise ValueError("calendar must be 'gregorian' or 'jalali'") + + # ---- Validate formatter knobs ---- + if microseconds not in (0, 3, 6): + raise ValueError("microseconds must be 0, 3, or 6") + + # ---- Configure root ---- + root = logging.getLogger() + root.setLevel(root_level) + + for h in list(root.handlers): + root.removeHandler(h) + + # ---- Formatter ---- + if json_logs: + formatter: logging.Formatter = _JsonFormatter( + app_name=app_name, + calendar=calendar, + microseconds=microseconds, + include_app_name=include_app_name, + include_name=include_name, + include_pid=include_pid, + include_thread=include_thread, + ) + else: + formatter = _TextFormatter( + app_name=app_name, + calendar=calendar, + microseconds=microseconds, + include_app_name=include_app_name, + include_name=include_name, + include_pid=include_pid, + include_thread=include_thread, + ) + + # ---- Console handler ---- + if use_rich and not json_logs: + console = RichHandler( + level=console_level, + rich_tracebacks=True, + tracebacks_show_locals=True, + show_time=False, # YOU already handle time + show_level=False, # YOU already handle level + show_path=False, # avoid noise + markup=False, + ) + console.setFormatter(formatter) + else: + console = logging.StreamHandler() + console.setLevel(console_level) + console.setFormatter(formatter) + + root.addHandler(console) + + # ---- File handler (rotating) ---- + if enable_file: + log_dir.mkdir(parents=True, exist_ok=True) + log_path = log_dir / f"{app_name or 'app'}.log" + + file_handler = logging.handlers.RotatingFileHandler( + filename=log_path, + maxBytes=max_bytes, + backupCount=backup_count, + encoding="utf-8", + delay=True, + ) + file_handler.setLevel(file_level) + file_handler.setFormatter(formatter) + root.addHandler(file_handler) + + +def _looks_like_docker() -> bool: + if os.path.exists("/.dockerenv"): + return True + try: + cgroup = Path("/proc/1/cgroup") + if cgroup.exists(): + txt = cgroup.read_text(errors="ignore") + if "docker" in txt or "kubepods" in txt or "containerd" in txt: + return True + except Exception: + pass + return False + + +def _truthy_env(val: str | None, *, default: bool) -> bool: + if val is None: + return default + return val.strip().lower() in ("1", "true", "yes", "y", "on") + + +def _level_from_env(val: str | None, *, default: int) -> int: + if not val: + return default + name = val.strip().upper() + return getattr(logging, name, default) + + +def _format_timestamp(*, calendar: str, microseconds: int) -> str: + """ + microseconds: + - 0: YYYY-mm-dd HH:MM:SS + - 3: YYYY-mm-dd HH:MM:SS.mmm + - 6: YYYY-mm-dd HH:MM:SS.ffffff + """ + from datetime import datetime + + if calendar == "gregorian": + now = datetime.now() + base = now.strftime("%Y-%m-%d %H:%M:%S") + if microseconds == 0: + return base + us = now.microsecond # 0..999999 + if microseconds == 3: + return f"{base}.{us // 1000:03d}" + return f"{base}.{us:06d}" + + # Jalali + try: + import jdatetime + except Exception: + # fallback + now = datetime.now() + base = now.strftime("%Y-%m-%d %H:%M:%S") + if microseconds == 0: + return base + " (gregorian-fallback)" + us = now.microsecond + if microseconds == 3: + return f"{base}.{us // 1000:03d} (gregorian-fallback)" + return f"{base}.{us:06d} (gregorian-fallback)" + + jnow = jdatetime.datetime.now() + base = jnow.strftime("%Y-%m-%d %H:%M:%S") + if microseconds == 0: + return base + us = jnow.microsecond + if microseconds == 3: + return f"{base}.{us // 1000:03d}" + return f"{base}.{us:06d}" + + +def _build_context_suffix(record: logging.LogRecord, *, include_pid: bool, include_thread: bool) -> str: + parts: list[str] = [] + if include_pid: + parts.append(str(record.process)) + if include_thread: + parts.append(record.threadName) + if not parts: + return "" + return " [" + ":".join(parts) + "]" + + +class _TextFormatter(logging.Formatter): + def __init__( + self, + *, + app_name: str | None, + calendar: str, + microseconds: int, + include_app_name: bool, + include_name: bool, + include_pid: bool, + include_thread: bool, + ): + super().__init__() + self.app_name = app_name + self.calendar = calendar + self.microseconds = microseconds + self.include_app_name = include_app_name + self.include_name = include_name + self.include_pid = include_pid + self.include_thread = include_thread + + def format(self, record: logging.LogRecord) -> str: + ts = _format_timestamp(calendar=self.calendar, microseconds=self.microseconds) + + # best practice: keep a stable, grep-friendly prefix + prefix_parts = [ts, record.levelname] + + if self.include_app_name: + prefix_parts.append(self.app_name) + + if self.include_name: + prefix_parts.append(record.name) + + ctx = _build_context_suffix(record, include_pid=self.include_pid, include_thread=self.include_thread) + + base = " ".join(prefix_parts) + ctx + " " + record.getMessage() + + if record.exc_info: + base += "\n" + self.formatException(record.exc_info) + + return base + + +class _JsonFormatter(logging.Formatter): + def __init__( + self, + *, + app_name: str | None, + calendar: str, + microseconds: int, + include_app_name: bool, + include_name: bool, + include_pid: bool, + include_thread: bool, + ): + super().__init__() + self.app_name = app_name + self.calendar = calendar + self.microseconds = microseconds + self.include_app_name = include_app_name + self.include_name = include_name + self.include_pid = include_pid + self.include_thread = include_thread + + def format(self, record: logging.LogRecord) -> str: + import json + + payload: dict[str, object] = { + "ts": _format_timestamp(calendar=self.calendar, microseconds=self.microseconds), + "level": record.levelname, + "msg": record.getMessage(), + } + + if self.include_app_name: + payload["app"] = self.app_name + if self.include_name: + payload["logger"] = record.name + if self.include_pid: + payload["process"] = record.process + if self.include_thread: + payload["thread"] = record.threadName + if record.exc_info: + payload["exc"] = self.formatException(record.exc_info) + + return json.dumps(payload, ensure_ascii=False) + + +if __name__ == "__main__": + setup_logging( + calendar="jalali", + include_app_name=False, + include_name=True, + include_pid=False, + include_thread=False, + console_level=logging.DEBUG, + use_rich=True, + ) + logging.getLogger("elasticsearch").setLevel(logging.WARNING) + logging.getLogger("elastic_transport").setLevel(logging.WARNING) + logging.getLogger("urllib3").setLevel(logging.WARNING) + logging.getLogger("tortoise.db_client").setLevel(logging.WARNING) + logging.getLogger("asyncio").setLevel(logging.WARNING) + logging.getLogger("tortoise").setLevel(logging.WARNING) + log = logging.getLogger(__name__) + log.info("script_started") + # time.sleep(2)