Source code for IQDMPDF.utilities

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# utilities.py
"""Common functions for IQDM-PDF"""
#
# Copyright (c) 2021 Dan Cutright
# This file is part of IQDM-PDF, released under a MIT license.
#    See the file LICENSE included with this distribution

from os.path import join, splitext, normpath, getctime
from os import walk, listdir, sep, stat
import platform
import argparse
from multiprocessing import Pool
from tqdm import tqdm


[docs]def are_all_strings_in_text(text, list_of_strings): """Check that all strings in list_of_strings exist in text Parameters ---------- text : str output from IQDMPDF.pdf_reader.convert_pdf_to_text list_of_strings : list of str a list of strings used to identify document type Returns ---------- bool Returns true if every string in list_of_strings is found in text data """ for str_to_find in list_of_strings: if str_to_find not in text: return False return True
[docs]def get_sorted_indices(some_list, reverse=False): """Get sorted indices of some_list Parameters ---------- some_list : list Any list compatible with sorted() reverse : bool Reverse sort if True """ return [ i[0] for i in sorted( enumerate(some_list), key=lambda x: x[1], reverse=reverse ) ]
[docs]def is_in_tol(value, expected_value, tolerance): """Is the provided value within expected_value +/- tolerance Parameters ---------- value : int, float Value of interest expected_value : int, float Expected value tolerance : int, float Allowed deviation from expected_value Returns ---------- bool True if value is within within expected_value +/- tolerance, exclusive """ return expected_value + tolerance > value > expected_value - tolerance
[docs]def bbox_to_pos(bbox, mode): """Convert a bounding box to an x-y position Parameters ---------- bbox : list Bounding box from pdf_reader layout object, which is a list of four floats [x0, y0, x1, y1] mode : str Options are combinations of top/center/bottom and right/center/left, e.g., 'top-right', 'center-left'. 'center' is assumed to be 'center-center' """ mode = "center-center" if mode == "center" else mode mode = {dim: mode.split("-")[i] for i, dim in enumerate(["y", "x"])} pos = { "x": { "left": bbox[0], "right": bbox[2], "center": (bbox[0] + bbox[2]) / 2.0, }, "y": { "bottom": bbox[1], "top": bbox[3], "center": (bbox[1] + bbox[3]) / 2.0, }, } return [pos[dim][mode[dim]] for dim in ["x", "y"]]
[docs]def get_files(init_dir, search_sub_dir=True, extension=None): """Collect paths of all files in a director Parameters ---------- init_dir : str Initial directory to begin scanning search_sub_dir : bool Recursively search through sub-directories if True extension : str, optional Collect file paths with only this extension (e.g., '.pdf') Returns ---------- list List of file paths """ files = [] if not search_sub_dir: append_files(files, init_dir, listdir(init_dir), extension) else: for dir_name, _, file_list in walk(init_dir): append_files(files, dir_name, file_list, extension) return files
[docs]def append_files(files, dir_name, files_to_append, extension=None): """Helper function for get_files Parameters ---------- files : list Accumulate file paths into this list dir_name : str The base path of the files in file_list files_to_append : list A list of file paths to loop accumulate extension : str, optional Collect file paths with only this extension (e.g., '.pdf') """ for file_name in files_to_append: if extension is None or splitext(file_name)[1].lower() == extension: files.append(join(dir_name, file_name))
[docs]def get_relative_path(path, relative_base): """Return a partial path with the specified base Parameters ---------- path : str A path with relative_base as a sub-component relative_base : str A directory within path Returns ---------- str The path with all components prior to relative_base removed """ path = normpath(path) relative_base = normpath(relative_base) path_split = path.split(sep) if relative_base in path_split: index = path_split.index(relative_base) return join(*path_split[index:])
[docs]def create_arg_parser(): """Create an argument parser Returns ---------- argparse.ArgumentParser Argument parsers for command-line use of IQDM-PDF """ cmd_parser = argparse.ArgumentParser( description="Command line interface for IQDM-PDF" ) cmd_parser.add_argument( "-ie", "--ignore-extension", dest="ignore_extension", help="Script will check all files, not just ones with .pdf extensions", default=False, action="store_true", ) cmd_parser.add_argument( "-od", "--output-dir", dest="output_dir", help="Output stored in local directory by default, specify otherwise here", default=None, ) cmd_parser.add_argument( "-of", "--output-file", dest="output_file", help="Output will be saved as <report_type>_results_<time-stamp>.csv by default. " "Define this tag to customize file name after <report_type>_", default=None, ) cmd_parser.add_argument( "-ver", "--version", dest="print_version", help="Print the IQDM version", default=False, action="store_true", ) cmd_parser.add_argument( "-nr", "--no-recursive-search", dest="no_recursive_search", help="Include this flag to skip sub-directories", default=False, action="store_true", ) cmd_parser.add_argument( "init_directory", nargs="?", help="Initiate scan here" ) cmd_parser.add_argument( "-re", "--raise-errors", dest="raise_errors", help="Allow failed file parsing to halt the program", default=False, action="store_true", ) cmd_parser.add_argument( "-n", "--processes", dest="processes", help="Enable multiprocessing, set number of parallel processes", default=1, ) return cmd_parser
[docs]def run_multiprocessing(worker, queue, processes, callback=None): """Parallel processing Parameters ---------- worker : callable single parameter function to be called on each item in queue queue : iterable A list of arguments for worker processes : int Number of processes for multiprocessing.Pool callback : callable Optional call back function on progress update, accepts str rep of tqdm object. Final call sent with 'complete' Returns ------- list List of returns from worker """ progress_kwargs = { "total": len(queue), "bar_format": "{desc:<5.5}{percentage:3.0f}%|{bar:30}{r_bar}", } data = [] with Pool(processes=processes) as pool: with tqdm(**progress_kwargs) as pbar: for item in pool.imap_unordered(worker, queue): data.append(item) pbar.update() if callback is not None: callback(str(pbar)) if callback is not None: callback("complete") return data
[docs]def is_numeric(val): """Check if value is numeric (float or int) Parameters ---------- val : any Any value Returns ------- bool Returns true if float(val) doesn't raise a ValueError """ try: float(val) return True except ValueError: return False
[docs]def creation_date(path_to_file): """ Try to get the date that a file was created, falling back to when it was last modified if that isn't possible. See http://stackoverflow.com/a/39501288/1709587 for explanation. Parameters ---------- path_to_file : str Path to any file Returns ------- float Time stamp of file """ if platform.system() == "Windows": return getctime(path_to_file) else: stat_ = stat(path_to_file) return getattr(stat_, "st_birthtime", stat_.st_mtime)