|
- # Copyright (c) Facebook, Inc. and its affiliates.
- #
- # This source code is licensed under the MIT license found in the
- # LICENSE file in the root directory of this source tree.
-
- import os
- import typing as tp
-
-
- def _safe_readline(fd) -> str:
- pos = fd.tell()
- while True:
- try:
- return fd.readline()
- except UnicodeDecodeError:
- pos -= 1
- fd.seek(pos) # search where this character begins
-
-
- def find_offsets(filename: str, num_chunks: int) -> tp.List[int]:
- """
- given a file and a number of chuncks, find the offsets in the file
- to be able to chunk around full lines.
- """
- with open(filename, "r", encoding="utf-8") as f:
- size = os.fstat(f.fileno()).st_size
- chunk_size = size // num_chunks
- offsets = [0 for _ in range(num_chunks + 1)]
- for i in range(1, num_chunks):
- f.seek(chunk_size * i)
- _safe_readline(f)
- offsets[i] = f.tell()
- offsets[-1] = size
- return offsets
-
-
- class ChunkLineIterator:
- """
- Iterator to properly iterate over lines of a file chunck.
- """
-
- def __init__(self, fd, start_offset: int, end_offset: int):
- self._fd = fd
- self._start_offset = start_offset
- self._end_offset = end_offset
-
- def __iter__(self) -> tp.Iterable[str]:
- self._fd.seek(self._start_offset)
- # next(f) breaks f.tell(), hence readline() must be used
- line = _safe_readline(self._fd)
- while line:
- pos = self._fd.tell()
- # f.tell() does not always give the byte position in the file
- # sometimes it skips to a very large number
- # it is unlikely that through a normal read we go from
- # end bytes to end + 2**32 bytes (4 GB) and this makes it unlikely
- # that the procedure breaks by the undeterministic behavior of
- # f.tell()
- if (
- self._end_offset > 0
- and pos > self._end_offset
- and pos < self._end_offset + 2**32
- ):
- break
- yield line
- line = self._fd.readline()
-
-
- class Chunker:
- """
- contextmanager to read a chunck of a file line by line.
- """
-
- def __init__(self, path: str, start_offset: int, end_offset: int):
- self.path = path
- self.start_offset = start_offset
- self.end_offset = end_offset
-
- def __enter__(self) -> ChunkLineIterator:
- self.fd = open(self.path, "r", encoding="utf-8")
- return ChunkLineIterator(self.fd, self.start_offset, self.end_offset)
-
- def __exit__(self, exc_type, exc_val, exc_tb) -> None:
- self.fd.close()
|