Source code for duplicity.backend_pool

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2002 Ben Escoto
# Copyright 2007 Kenneth Loafman
#
# This file is part of duplicity.
#
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

import concurrent.futures
import logging
import multiprocessing
import multiprocessing.connection
import os
import sys
import time
import traceback
from collections import Counter
from dataclasses import (
    dataclass,
    field,
)
from datetime import (
    datetime,
    timedelta,
)
from typing import (
    List,
    Optional,
    Dict,
    Iterator,
)

from duplicity import (
    backend,
    config,
    file_naming,
    log,
    path,
    util,
)

pool_backend = None


[docs] @dataclass class TrackRecord: # result tracking while executed in the pool track_id: int pid: int trace_back: Optional[List[str]] = field( default_factory=list ) # trace backs can't be pickled, store as string, to get it over into main process result: object = None # must be picklable log_prefix: str = "" start_time: datetime = field(default=datetime.now()) stop_time: datetime = field(default=datetime.min) def __post_init__(self): self.start_time = datetime.now()
[docs] def get_runtime(self) -> timedelta: return self.stop_time - self.start_time
[docs] def track_cmd(track_id, cmd_name: str, *args, **kwargs): """ wraps the pooled function for time tracking and exception handling. Recording the trace back of an exception when still in process pool context to point to the right place. (This function can't be part of the BackendPool, as then the whole class get pickled) """ global pool_backend p = multiprocessing.current_process() trk_rcd = TrackRecord(track_id, p.pid, log_prefix=log.PREFIX) # type: ignore # send cmd/process assignment back to pool for tracking. try: cmd = getattr(pool_backend, cmd_name) trk_rcd.result = cmd(*args, **kwargs) except Exception as e: trk_rcd.result = e trk_rcd.trace_back = traceback.format_tb(e.__traceback__) trk_rcd.stop_time = datetime.now() log.Debug(f"Command done: {trk_rcd} ") return trk_rcd
[docs] class BackendPool: """ uses concurrent.futures.ProcessPoolExecutor to run backend commands in background """
[docs] @dataclass class CmdStatus: function_name: str args: Dict kwargs: List trk_rcd: Optional[TrackRecord] = None done: bool = False
[docs] def __init__(self, url_string, processes=None) -> None: config_str = config.dump_dict(config) self.ppe = concurrent.futures.ProcessPoolExecutor( max_workers=processes, initializer=self._process_init, initargs=(url_string, config_str), mp_context=multiprocessing.get_context(method="spawn"), ) self._command_queue = [] self._track_id = 0 self.all_results: List[TrackRecord] = [] self._all_futures: List[concurrent.futures.Future] = [] self.peak_in_queue = 0 self._command_throttle_stats = [] self._last_results_reported = 0
def __enter__(self): return self.ppe def __exit__(self, type, value, traceback): self.shutdown()
[docs] @staticmethod def _process_init(url_string, config_dump): global pool_backend config.load_dict(config_dump, config) pid = os.getpid() pool_nr = multiprocessing.current_process()._identity[0] # get the multiprocessing logger with stderr stream handler added log.setup() log.PREFIX = f"Pool{pool_nr}: " log.setverbosity(config.verbosity) if config.verbosity == log.DEBUG: logger = multiprocessing.log_to_stderr() log.Info(f"Staring pool process with pid: {pid}") file_naming.prepare_regex() util.start_debugger() backend.import_backends() pool_backend = backend.get_backend(url_string)
[docs] def command(self, func_name, args=(), kwds=None): """ run function in a pool of independent processes. Call function by name. func_name: name of the backend method to execute args: positional arguments for the method kwds: key/value arguments for the method Returns a unique ID for each command, increasing int """ if kwds is None: kwds = {} self._track_id += 1 self._command_queue.append( self.ppe.submit( track_cmd, self._track_id, func_name, *args, ), ) self._collect_finished_cmds() return self._track_id
[docs] def _raise_exception_if_any(self, track_rcrd): """ raise the exception stored in `track_rcrd` if it occurred while running the command """ if isinstance(track_rcrd.result, Exception): exception_str = f"{''.join(track_rcrd.trace_back)}\n{track_rcrd.result}" log.Debug(f"Exception thrown in pool: \n{exception_str}") if hasattr(track_rcrd.result, "code"): log.FatalError( f"Exception {track_rcrd.result.__class__.__name__} in background " f"pool {track_rcrd.log_prefix}. " "For trace back set loglevel to DEBUG and check output for given pool.", code=track_rcrd.result.code, # type: ignore ) else: raise track_rcrd.result
[docs] def _collect_finished_cmds(self): """ store finished results in `all_results` and remove command from queue """ try: finished_commands = concurrent.futures.as_completed(self._command_queue, timeout=0.1) for result in finished_commands: track_rcrd = result.result(timeout=0) self._raise_exception_if_any(track_rcrd) self.all_results.append(track_rcrd) self._all_futures.append(result) self._command_queue.remove(result) self.peak_in_queue = max(self.peak_in_queue, len(self._command_queue)) except concurrent.futures._base.TimeoutError: pass
[docs] def command_throttled(self, func_name, commands_in_buffer=1, args=(), kwds=None): """ block, if queue gets bigger then number of workers + commands_in_buffer. Means this function my block a while to process queue and returns if enough queue items has been processed. func_name: name of the backend method to execute commands_in_buffer = number of commands that are queue for execution args: positional arguments for the method kwds: key/value arguments for the method Returns a unique ID for each command, increasing int """ if kwds is None: kwds = {} # define accepted queue buffer. max_pending_queue_len = self.ppe._max_workers + commands_in_buffer # type: ignore start = datetime.now() if len(self._command_queue) >= max_pending_queue_len: # queue full wait for free slot finished_commands = concurrent.futures.as_completed(self._command_queue) next(finished_commands) # blocking until next finished self._command_throttle_stats.append((datetime.now() - start).total_seconds()) log.Debug(f"Command delayed by {self._command_throttle_stats[-1]:.2f}") # queue command track_id = self.command(func_name, args, kwds) return track_id
[docs] def get_queue_length(self) -> int: return len(self._command_queue)
[docs] def results_since_last_call(self, from_pos=None) -> Iterator[TrackRecord]: """ collect results from commands finished since last run of this method. This method should be called from one receiver only, as it keep track of what was send internally. param: from_pos: use to overwrite the internal pointer `__last_results_reported` return: list of future results """ self._collect_finished_cmds() if from_pos is not None: self._last_results_reported = from_pos end = len(self.all_results) for result in self.all_results[self._last_results_reported : end]: yield result self._last_results_reported = end
[docs] def get_stats(self, last_index=None): vals = [x.get_runtime().total_seconds() for x in self.all_results[:last_index]] count = len(vals) if count > 0: avg_time = sum(vals) / count max_time = max(vals) min_time = min(vals) if len(self._command_throttle_stats) > 0: avg_throttle = sum(self._command_throttle_stats) / len(self._command_throttle_stats) max_throttle = max(self._command_throttle_stats) min_throttle = min(self._command_throttle_stats) else: avg_throttle = max_throttle = min_throttle = -1 else: avg_time = max_time = min_time = avg_throttle = max_throttle = min_throttle = -1 pool_usage = Counter([x.log_prefix for x in self.all_results[:last_index]]) log.Debug( f"count: {count}, avg: {avg_time}, max: {max_time}, min: {min_time}, pool usage: {pool_usage}, " f"peak in queue: {self.peak_in_queue}, avg throttle: {avg_throttle}" ) return { "count": count, "time": {"avg": avg_time, "max": max_time, "min": min_time}, "throttle": {"avg": avg_throttle, "max": max_throttle, "min": min_throttle}, "pool_usage": pool_usage, "peak_in_queue": self.peak_in_queue, }
[docs] def shutdown(self, *args): log.Debug("Process Pool: Start shutdown.") self.ppe.shutdown(*args) log.Debug("Process Pool: Shutdown done.")
# code to run/test the pool independent, not relevant for duplicity if __name__ == "__main__": from duplicity import config, log log.setup() log.add_file("/tmp/tmp.log") config.verbosity = log.INFO log.setverbosity(config.verbosity) backend.import_backends() config.async_concurrency = 4 config.num_retries = 2 url = "file:///tmp/test_direct" bw = backend.get_backend(url) # ^^^^^^^^^^ above commands are only there for mocking a duplicity config start_time = time.time() bpw = BackendPool(url, processes=config.async_concurrency) results: List[TrackRecord] = [] with bpw as pool: # issue tasks into the process pool import pathlib if len(sys.argv) > 1: src = sys.argv[1] else: src = "./" for file in [file for file in pathlib.Path(src).iterdir() if file.is_file()]: source_path = path.Path(file.as_posix()) bpw.command(bw.put_validated.__name__, args=(source_path, source_path.get_filename())) # type: ignore cmd_results = [x for x in bpw.results_since_last_call()] log.Info(f"got: {len(cmd_results)}, cmd left: {bpw.get_queue_length()}, track_id: {bpw._track_id}") results.extend(cmd_results) # wait for tasks to complete suppress_log = False while True: cmd_results = [x for x in bpw.results_since_last_call()] if len(cmd_results) > 0 or not suppress_log: log.Info( f"got: {len(cmd_results)}, {not suppress_log}, " f"precessed {len(results)} cmd left: {bpw.get_queue_length()}, track_id: {bpw._track_id}" ) suppress_log = False if len(cmd_results) == 0: suppress_log = True results.extend(cmd_results) if bpw.get_queue_length() == 0: break bpw.get_stats(last_index=-1) input("Press Enter to continue...") # process pool is closed automatically log.Notice(f"Bytes written: {sum([int(x.result) for x in results])}") # type: ignore log.Notice(f"Time elapsed: {time.time() - start_time}")