|
- # Copyright (c) Facebook, Inc. and its affiliates.
- #
- # This source code is licensed under the MIT license found in the
- # LICENSE file in the root directory of this source tree.
-
- import argparse
- import contextlib
- import copy
- import importlib
- import logging
- import os
- import sys
- import tempfile
- import warnings
- from itertools import accumulate
- from typing import Callable, Dict, List, Optional
-
- import torch
- import torch.nn.functional as F
- from fairseq.modules.multihead_attention import MultiheadAttention
- from torch import Tensor
-
-
- try:
- from amp_C import multi_tensor_l2norm
-
- multi_tensor_l2norm_available = True
- except ImportError:
- multi_tensor_l2norm_available = False
-
- try:
- import torch_xla.core.xla_model as xm
- except ImportError:
- xm = None
-
-
- logger = logging.getLogger(__name__)
-
-
- MANIFOLD_PATH_SEP = "|"
-
-
- class FileContentsAction(argparse.Action):
- def __init__(self, option_strings, dest, nargs=None, **kwargs):
- if nargs is not None:
- raise ValueError("nargs not allowed")
- super(FileContentsAction, self).__init__(option_strings, dest, **kwargs)
-
- def __call__(self, parser, namespace, values, option_string=None):
- from fairseq.file_io import PathManager
-
- if PathManager.isfile(values):
- with PathManager.open(values) as f:
- argument = f.read().strip()
- else:
- argument = values
- setattr(namespace, self.dest, argument)
-
-
- def split_paths(paths: str, separator=os.pathsep) -> List[str]:
- return (
- paths.split(separator)
- if "://" not in paths
- else paths.split(MANIFOLD_PATH_SEP)
- )
-
-
- def load_ensemble_for_inference(filenames, task, model_arg_overrides=None):
- from fairseq import checkpoint_utils
-
- deprecation_warning(
- "utils.load_ensemble_for_inference is deprecated. "
- "Please use checkpoint_utils.load_model_ensemble instead."
- )
- return checkpoint_utils.load_model_ensemble(
- filenames, arg_overrides=model_arg_overrides, task=task
- )
-
-
- def apply_to_sample(f, sample):
- if hasattr(sample, "__len__") and len(sample) == 0:
- return {}
-
- def _apply(x):
- if torch.is_tensor(x):
- return f(x)
- elif isinstance(x, dict):
- return {key: _apply(value) for key, value in x.items()}
- elif isinstance(x, list):
- return [_apply(x) for x in x]
- elif isinstance(x, tuple):
- return tuple(_apply(x) for x in x)
- elif isinstance(x, set):
- return {_apply(x) for x in x}
- else:
- return x
-
- return _apply(sample)
-
-
- def move_to_cuda(sample, device=None):
- device = device or torch.cuda.current_device()
-
- def _move_to_cuda(tensor):
- # non_blocking is ignored if tensor is not pinned, so we can always set
- # to True (see github.com/PyTorchLightning/pytorch-lightning/issues/620)
- return tensor.to(device=device, non_blocking=True)
-
- return apply_to_sample(_move_to_cuda, sample)
-
-
- def move_to_cpu(sample):
- def _move_to_cpu(tensor):
- # PyTorch has poor support for half tensors (float16) on CPU.
- # Move any such tensors to float32.
- if tensor.dtype in {torch.bfloat16, torch.float16}:
- tensor = tensor.to(dtype=torch.float32)
- return tensor.cpu()
-
- return apply_to_sample(_move_to_cpu, sample)
-
-
- def move_to_tpu(sample):
-
- import torch_xla.core.xla_model as xm
-
- device = xm.xla_device()
-
- def _move_to_tpu(tensor):
- return tensor.to(device)
-
- return apply_to_sample(_move_to_tpu, sample)
-
-
- def get_incremental_state(
- module: MultiheadAttention,
- incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]],
- key: str,
- ) -> Optional[Dict[str, Optional[Tensor]]]:
- """Helper for getting incremental state for an nn.Module."""
- return module.get_incremental_state(incremental_state, key)
-
-
- def set_incremental_state(
- module: MultiheadAttention,
- incremental_state: Optional[Dict[str, Dict[str, Optional[Tensor]]]],
- key: str,
- value: Dict[str, Optional[Tensor]],
- ) -> Optional[Dict[str, Dict[str, Optional[Tensor]]]]:
- """Helper for setting incremental state for an nn.Module."""
- if incremental_state is not None:
- result = module.set_incremental_state(incremental_state, key, value)
- if result is not None:
- incremental_state = result
- return incremental_state
-
-
- def load_align_dict(replace_unk):
- if replace_unk is None:
- align_dict = None
- elif isinstance(replace_unk, str) and len(replace_unk) > 0:
- # Load alignment dictionary for unknown word replacement if it was passed as an argument.
- align_dict = {}
- with open(replace_unk, "r") as f:
- for line in f:
- cols = line.split()
- align_dict[cols[0]] = cols[1]
- else:
- # No alignment dictionary provided but we still want to perform unknown word replacement by copying the
- # original source word.
- align_dict = {}
- return align_dict
-
-
- def print_embed_overlap(embed_dict, vocab_dict):
- embed_keys = set(embed_dict.keys())
- vocab_keys = set(vocab_dict.symbols)
- overlap = len(embed_keys & vocab_keys)
- logger.info("found {}/{} types in embedding file".format(overlap, len(vocab_dict)))
-
-
- def parse_embedding(embed_path):
- """Parse embedding text file into a dictionary of word and embedding tensors.
-
- The first line can have vocabulary size and dimension. The following lines
- should contain word and embedding separated by spaces.
-
- Example:
- 2 5
- the -0.0230 -0.0264 0.0287 0.0171 0.1403
- at -0.0395 -0.1286 0.0275 0.0254 -0.0932
- """
- embed_dict = {}
- with open(embed_path) as f_embed:
- next(f_embed) # skip header
- for line in f_embed:
- pieces = line.rstrip().split(" ")
- embed_dict[pieces[0]] = torch.Tensor(
- [float(weight) for weight in pieces[1:]]
- )
- return embed_dict
-
-
- def load_embedding(embed_dict, vocab, embedding):
- for idx in range(len(vocab)):
- token = vocab[idx]
- if token in embed_dict:
- embedding.weight.data[idx] = embed_dict[token]
- return embedding
-
-
- def replace_unk(hypo_str, src_str, alignment, align_dict, unk):
- from fairseq import tokenizer
-
- # Tokens are strings here
- hypo_tokens = tokenizer.tokenize_line(hypo_str)
- # TODO: Very rare cases where the replacement is '<eos>' should be handled gracefully
- src_tokens = tokenizer.tokenize_line(src_str) + ["<eos>"]
- for i, ht in enumerate(hypo_tokens):
- if ht == unk:
- src_token = src_tokens[alignment[i]]
- # Either take the corresponding value in the aligned dictionary or just copy the original value.
- hypo_tokens[i] = align_dict.get(src_token, src_token)
- return " ".join(hypo_tokens)
-
-
- def post_process_prediction(
- hypo_tokens,
- src_str,
- alignment,
- align_dict,
- tgt_dict,
- remove_bpe=None,
- extra_symbols_to_ignore=None,
- ):
- hypo_str = tgt_dict.string(
- hypo_tokens, remove_bpe, extra_symbols_to_ignore=extra_symbols_to_ignore
- )
- if align_dict is not None:
- hypo_str = replace_unk(
- hypo_str, src_str, alignment, align_dict, tgt_dict.unk_string()
- )
- if align_dict is not None or remove_bpe is not None:
- # Convert back to tokens for evaluating with unk replacement or without BPE
- # Note that the dictionary can be modified inside the method.
- hypo_tokens = tgt_dict.encode_line(hypo_str, add_if_not_exist=True)
- return hypo_tokens, hypo_str, alignment
-
-
- def make_positions(tensor, padding_idx: int, onnx_trace: bool = False):
- """Replace non-padding symbols with their position numbers.
-
- Position numbers begin at padding_idx+1. Padding symbols are ignored.
- """
- # The series of casts and type-conversions here are carefully
- # balanced to both work with ONNX export and XLA. In particular XLA
- # prefers ints, cumsum defaults to output longs, and ONNX doesn't know
- # how to handle the dtype kwarg in cumsum.
- mask = tensor.ne(padding_idx).int()
- return (torch.cumsum(mask, dim=1).type_as(mask) * mask).long() + padding_idx
-
-
- def strip_pad(tensor, pad):
- return tensor[tensor.ne(pad)]
-
-
- def buffered_arange(max):
- if not hasattr(buffered_arange, "buf"):
- buffered_arange.buf = torch.LongTensor()
- if max > buffered_arange.buf.numel():
- buffered_arange.buf.resize_(max)
- torch.arange(max, out=buffered_arange.buf)
- return buffered_arange.buf[:max]
-
-
- def convert_padding_direction(
- src_tokens, padding_idx, right_to_left: bool = False, left_to_right: bool = False
- ):
- assert right_to_left ^ left_to_right
- pad_mask = src_tokens.eq(padding_idx)
- if not pad_mask.any():
- # no padding, return early
- return src_tokens
- if left_to_right and not pad_mask[:, 0].any():
- # already right padded
- return src_tokens
- if right_to_left and not pad_mask[:, -1].any():
- # already left padded
- return src_tokens
- max_len = src_tokens.size(1)
- buffered = torch.empty(0).long()
- if max_len > 0:
- torch.arange(max_len, out=buffered)
- range = buffered.type_as(src_tokens).expand_as(src_tokens)
- num_pads = pad_mask.long().sum(dim=1, keepdim=True)
- if right_to_left:
- index = torch.remainder(range - num_pads, max_len)
- else:
- index = torch.remainder(range + num_pads, max_len)
- return src_tokens.gather(1, index)
-
-
- def item(tensor):
- # tpu-comment: making this a no-op for xla devices.
- if torch.is_tensor(tensor) and tensor.device.type == "xla":
- return tensor.detach()
- if hasattr(tensor, "item"):
- return tensor.item()
- if hasattr(tensor, "__getitem__"):
- return tensor[0]
- return tensor
-
-
- def multi_tensor_total_norm(grads, chunk_size=2048 * 32) -> torch.Tensor:
- per_device_grads = {}
- norms = []
- for grad in grads:
- device = grad.device
- cur_device_grads = per_device_grads.get(device)
- if cur_device_grads is None:
- cur_device_grads = []
- per_device_grads[device] = cur_device_grads
- cur_device_grads.append(grad)
- for device in per_device_grads.keys():
- cur_device_grads = per_device_grads[device]
- if device.type == "cuda":
- # TODO(msb) return has_inf
- has_inf = torch.zeros((1, 1), dtype=torch.int, device=device)
- with torch.cuda.device(device):
- norm = multi_tensor_l2norm(
- chunk_size, has_inf, [cur_device_grads], False
- )
- norms.append(norm[0].to(torch.cuda.current_device()))
- else:
- norms += [torch.norm(g, p=2, dtype=torch.float32) for g in cur_device_grads]
- total_norm = torch.norm(torch.stack(norms))
- return total_norm
-
-
- @torch.no_grad()
- def clip_grad_norm_(params, max_norm, aggregate_norm_fn=None) -> torch.Tensor:
- def grad_exists(p):
- return p is not None and getattr(p, "grad", None) is not None
-
- if isinstance(params, torch.Tensor):
- params = [params]
- params = list(params)
- grads = [
- p.grad.detach() for p in params if grad_exists(p) and not hasattr(p, "expert")
- ]
- expert_grads = [
- p.grad.detach() for p in params if grad_exists(p) and hasattr(p, "expert")
- ]
-
- if len(grads) == 0:
- if len(params) > 0:
- return params[0].new_tensor(0.0)
- else:
- return torch.tensor(0.0)
-
- if len(grads) == 1:
- total_norm = torch.norm(grads[0], p=2, dtype=torch.float32)
- else:
- if multi_tensor_l2norm_available:
- total_norm = multi_tensor_total_norm(grads)
- else:
- if torch.cuda.is_available():
- warnings.warn(
- "amp_C fused kernels unavailable, disabling multi_tensor_l2norm; "
- "you may get better performance by installing NVIDIA's apex library"
- )
- device = torch.cuda.current_device()
- elif grads[0].device.type == "xla":
- device = grads[0].device
- else:
- device = torch.device("cpu")
- total_norm = torch.norm(
- torch.stack(
- [torch.norm(g, p=2, dtype=torch.float32).to(device) for g in grads]
- )
- )
-
- if aggregate_norm_fn is not None:
- total_norm = aggregate_norm_fn(total_norm)
-
- if max_norm > 0:
- max_norm = float(max_norm)
- clip_coef = (max_norm / (total_norm + 1e-6)).clamp_(max=1)
- for g in grads + expert_grads:
- g.mul_(clip_coef)
- return total_norm
-
-
- def fill_with_neg_inf(t):
- """FP16-compatible function that fills a tensor with -inf."""
- return t.float().fill_(float("-inf")).type_as(t)
-
-
- def _match_types(arg1, arg2):
- """Convert the numerical argument to the same type as the other argument"""
-
- def upgrade(arg_number, arg_structure):
- if isinstance(arg_structure, tuple):
- return tuple([arg_number] * len(arg_structure))
- elif isinstance(arg_structure, dict):
- arg = copy.deepcopy(arg_structure)
- for k in arg:
- arg[k] = upgrade(arg_number, arg_structure[k])
- return arg
- else:
- return arg_number
-
- if isinstance(arg1, float) or isinstance(arg1, int):
- return upgrade(arg1, arg2), arg2
- elif isinstance(arg2, float) or isinstance(arg2, int):
- return arg1, upgrade(arg2, arg1)
-
- return arg1, arg2
-
-
- def resolve_max_positions(*args):
- """Resolve max position constraints from multiple sources."""
-
- def map_value_update(d1, d2):
- updated_value = copy.deepcopy(d1)
- for key in d2:
- if key not in updated_value:
- updated_value[key] = d2[key]
- else:
- updated_value[key] = min(d1[key], d2[key])
- return updated_value
-
- def nullsafe_min(l):
- minim = None
- for item in l:
- if minim is None:
- minim = item
- elif item is not None and item < minim:
- minim = item
- return minim
-
- max_positions = None
- for arg in args:
- if max_positions is None:
- max_positions = arg
- elif arg is not None:
- max_positions, arg = _match_types(max_positions, arg)
- if isinstance(arg, float) or isinstance(arg, int):
- max_positions = min(max_positions, arg)
- elif isinstance(arg, dict):
- max_positions = map_value_update(max_positions, arg)
- else:
- max_positions = tuple(map(nullsafe_min, zip(max_positions, arg)))
-
- return max_positions
-
-
- def import_user_module(args):
- module_path = getattr(args, "user_dir", None)
- if module_path is not None:
- module_path = os.path.abspath(args.user_dir)
- if not os.path.exists(module_path) and not os.path.isfile(
- os.path.dirname(module_path)
- ):
- fairseq_rel_path = os.path.join(os.path.dirname(__file__), args.user_dir)
- if os.path.exists(fairseq_rel_path):
- module_path = fairseq_rel_path
- else:
- fairseq_rel_path = os.path.join(
- os.path.dirname(__file__), "..", args.user_dir
- )
- if os.path.exists(fairseq_rel_path):
- module_path = fairseq_rel_path
- else:
- raise FileNotFoundError(module_path)
-
- # ensure that user modules are only imported once
- import_user_module.memo = getattr(import_user_module, "memo", set())
- if module_path not in import_user_module.memo:
- import_user_module.memo.add(module_path)
-
- module_parent, module_name = os.path.split(module_path)
- if module_name not in sys.modules:
- sys.path.insert(0, module_parent)
- importlib.import_module(module_name)
- else:
- raise ImportError(
- "Failed to import --user-dir={} because the corresponding module name "
- "({}) is not globally unique. Please rename the directory to "
- "something unique and try again.".format(module_path, module_name)
- )
-
-
- def softmax(x, dim: int, onnx_trace: bool = False):
- if onnx_trace:
- return F.softmax(x.float(), dim=dim)
- else:
- return F.softmax(x, dim=dim, dtype=torch.float32)
-
-
- def log_softmax(x, dim: int, onnx_trace: bool = False):
- if onnx_trace:
- return F.log_softmax(x.float(), dim=dim)
- else:
- return F.log_softmax(x, dim=dim, dtype=torch.float32)
-
-
- def get_perplexity(loss, round=2, base=2):
- from fairseq.logging.meters import safe_round
-
- if loss is None:
- return 0.0
- try:
- return safe_round(base ** loss, round)
- except OverflowError:
- return float("inf")
-
-
- def deprecation_warning(message, stacklevel=3):
- # don't use DeprecationWarning, since it's ignored by default
- warnings.warn(message, stacklevel=stacklevel)
-
-
- def get_activation_fn(activation: str) -> Callable:
- """Returns the activation function corresponding to `activation`"""
- from fairseq.modules import gelu, gelu_accurate
-
- if activation == "relu":
- return F.relu
- elif activation == "gelu":
- return gelu
- elif activation == "gelu_fast":
- deprecation_warning(
- "--activation-fn=gelu_fast has been renamed to gelu_accurate"
- )
- return gelu_accurate
- elif activation == "gelu_accurate":
- return gelu_accurate
- elif activation == "tanh":
- return torch.tanh
- elif activation == "linear":
- return lambda x: x
- else:
- raise RuntimeError("--activation-fn {} not supported".format(activation))
-
-
- def get_available_activation_fns() -> List:
- return [
- "relu",
- "gelu",
- "gelu_fast", # deprecated
- "gelu_accurate",
- "tanh",
- "linear",
- ]
-
-
- @contextlib.contextmanager
- def model_eval(model):
- is_training = model.training
- model.eval()
- yield
- model.train(is_training)
-
-
- def has_parameters(module):
- try:
- next(module.parameters())
- return True
- except StopIteration:
- return False
-
-
- def get_rng_state():
- state = {"torch_rng_state": torch.get_rng_state()}
- if xm is not None:
- state["xla_rng_state"] = xm.get_rng_state()
- if torch.cuda.is_available():
- state["cuda_rng_state"] = torch.cuda.get_rng_state()
- return state
-
-
- def set_rng_state(state):
- torch.set_rng_state(state["torch_rng_state"])
- if xm is not None:
- xm.set_rng_state(state["xla_rng_state"])
- if torch.cuda.is_available():
- torch.cuda.set_rng_state(state["cuda_rng_state"])
-
-
- class set_torch_seed(object):
- def __init__(self, seed):
- assert isinstance(seed, int)
- self.rng_state = get_rng_state()
-
- torch.manual_seed(seed)
- if xm is not None:
- xm.set_rng_state(seed)
- if torch.cuda.is_available():
- torch.cuda.manual_seed(seed)
-
- def __enter__(self):
- return self
-
- def __exit__(self, *exc):
- set_rng_state(self.rng_state)
-
-
- def parse_alignment(line):
- """
- Parses a single line from the alingment file.
-
- Args:
- line (str): String containing the alignment of the format:
- <src_idx_1>-<tgt_idx_1> <src_idx_2>-<tgt_idx_2> ..
- <src_idx_m>-<tgt_idx_m>. All indices are 0 indexed.
-
- Returns:
- torch.IntTensor: packed alignments of shape (2 * m).
- """
- alignments = line.strip().split()
- parsed_alignment = torch.IntTensor(2 * len(alignments))
- for idx, alignment in enumerate(alignments):
- src_idx, tgt_idx = alignment.split("-")
- parsed_alignment[2 * idx] = int(src_idx)
- parsed_alignment[2 * idx + 1] = int(tgt_idx)
- return parsed_alignment
-
-
- def get_token_to_word_mapping(tokens, exclude_list):
- n = len(tokens)
- word_start = [int(token not in exclude_list) for token in tokens]
- word_idx = list(accumulate(word_start))
- token_to_word = {i: word_idx[i] for i in range(n)}
- return token_to_word
-
-
- def extract_hard_alignment(attn, src_sent, tgt_sent, pad, eos):
- tgt_valid = (
- ((tgt_sent != pad) & (tgt_sent != eos)).nonzero(as_tuple=False).squeeze(dim=-1)
- )
- src_invalid = (
- ((src_sent == pad) | (src_sent == eos)).nonzero(as_tuple=False).squeeze(dim=-1)
- )
- src_token_to_word = get_token_to_word_mapping(src_sent, [eos, pad])
- tgt_token_to_word = get_token_to_word_mapping(tgt_sent, [eos, pad])
- alignment = []
- if len(tgt_valid) != 0 and len(src_invalid) < len(src_sent):
- attn_valid = attn[tgt_valid]
- attn_valid[:, src_invalid] = float("-inf")
- _, src_indices = attn_valid.max(dim=1)
- for tgt_idx, src_idx in zip(tgt_valid, src_indices):
- alignment.append(
- (
- src_token_to_word[src_idx.item()] - 1,
- tgt_token_to_word[tgt_idx.item()] - 1,
- )
- )
- return alignment
-
-
- def extract_soft_alignment(attn, src_sent, tgt_sent, pad, eos):
- tgt_valid = ((tgt_sent != pad)).nonzero(as_tuple=False)
- src_valid = ((src_sent != pad)).nonzero(as_tuple=False).squeeze(dim=-1)
- alignment = []
- if len(tgt_valid) != 0 and len(src_valid) != 0:
- attn_valid = attn[tgt_valid, src_valid]
- alignment = [
- ["{:.6f}".format(p) for p in src_probs.tolist()] for src_probs in attn_valid
- ]
- return alignment
-
-
- def new_arange(x, *size):
- """
- Return a Tensor of `size` filled with a range function on the device of x.
- If size is empty, using the size of the variable x.
- """
- if len(size) == 0:
- size = x.size()
- return torch.arange(size[-1], device=x.device).expand(*size).contiguous()
-
-
- def get_tpu_device():
- return xm.xla_device()
-
-
- def tpu_data_loader(itr):
- import torch_xla.core.xla_model as xm
- import torch_xla.distributed.parallel_loader as pl
- from fairseq.data import iterators
-
- xm.rendezvous("tpu_data_loader") # wait for all workers
- xm.mark_step()
- device = xm.xla_device()
- return iterators.CountingIterator(
- pl.ParallelLoader(itr, [device]).per_device_loader(device),
- start=getattr(itr, "n", 0),
- total=len(itr),
- )
-
-
- def is_xla_tensor(tensor):
- return torch.is_tensor(tensor) and tensor.device.type == "xla"
-
-
- def index_put(tensor, indices, value):
- if is_xla_tensor(tensor):
- for _ in range(indices.dim(), tensor.dim()):
- indices = indices.unsqueeze(-1)
- if indices.size(-1) < tensor.size(-1):
- indices = indices.expand_as(tensor)
- tensor = torch.mul(tensor, ~indices) + torch.mul(value, indices)
- else:
- tensor[indices] = value
- return tensor
-
-
- def xla_device_to_cpu(dat):
- import torch_xla.core.xla_model as xm
-
- return xm._maybe_convert_to_cpu(dat)
-
-
- class CudaEnvironment(object):
- def __init__(self):
- cur_device = torch.cuda.current_device()
- prop = torch.cuda.get_device_properties("cuda:{}".format(cur_device))
- self.name = prop.name
- self.major = prop.major
- self.minor = prop.minor
- self.total_memory_in_GB = prop.total_memory / 1024 / 1024 / 1024
-
- @staticmethod
- def pretty_print_cuda_env_list(cuda_env_list):
- """
- Given a list of CudaEnviorments, pretty print them
- """
- num_workers = len(cuda_env_list)
- center = "CUDA enviroments for all {} workers".format(num_workers)
- banner_len = 40 - len(center) // 2
- first_line = "*" * banner_len + center + "*" * banner_len
- logger.info(first_line)
- for r, env in enumerate(cuda_env_list):
- logger.info(
- "rank {:3d}: ".format(r)
- + "capabilities = {:2d}.{:<2d} ; ".format(env.major, env.minor)
- + "total memory = {:.3f} GB ; ".format(env.total_memory_in_GB)
- + "name = {:40s}".format(env.name)
- )
- logger.info(first_line)
-
-
- def csv_str_list(x):
- return x.split(",")
-
-
- def eval_str_list(x, type=float):
- if x is None:
- return None
- if isinstance(x, str):
- x = eval(x)
- try:
- return list(map(type, x))
- except TypeError:
- return [type(x)]
-
-
- def eval_str_dict(x, type=dict):
- if x is None:
- return None
- if isinstance(x, str):
- x = eval(x)
- return x
-
-
- def eval_bool(x, default=False):
- if x is None:
- return default
- try:
- return bool(eval(x))
- except TypeError:
- return default
-
-
- def reset_logging():
- root = logging.getLogger()
- for handler in root.handlers:
- root.removeHandler(handler)
- root.setLevel(os.environ.get("LOGLEVEL", "INFO").upper())
- handler = logging.StreamHandler(sys.stdout)
- handler.setFormatter(
- logging.Formatter(
- fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
- datefmt="%Y-%m-%d %H:%M:%S",
- )
- )
- root.addHandler(handler)
|