Source code for duplicity.backends.gdrivebackend

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
#
# Copyright 2015 Yigal Asnis
# Copyright 2021 Jindrich Makovicka
#
# This file is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
#
# It is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

from builtins import str

import os
import pickle

from duplicity import log
from duplicity import util
from duplicity.errors import BackendException
import duplicity.backend


[docs]class GDriveBackend(duplicity.backend.Backend): u"""Connect to remote store using Google Drive API V3""" PAGE_SIZE = 100 MIN_RESUMABLE_UPLOAD = 5 * 1024 * 1024
[docs] def __init__(self, parsed_url): duplicity.backend.Backend.__init__(self, parsed_url) try: from googleapiclient.discovery import build from google.oauth2.service_account import Credentials except ImportError as e: raise BackendException(u"""\ GDrive backend requires Google API client installation. Please read the manpage for setup details. Exception: %s""" % str(e)) # Note Google has 2 drive methods, `Shared(previously Team) Drives` and `My Drive` # both can be shared but require different addressing # For a Google Shared Drives folder # --------------------------------- # Share Drive ID specified as a query parameter in the backend URL. # Example: # gdrive://developer.gserviceaccount.com/target-folder/?driveID=<SHARED DRIVE ID> # # For a Google My Drive based shared folder # ----------------------------------------- # MyDrive folder ID specified as a query parameter in the backend URL # # Example # export GOOGLE_SERVICE_ACCOUNT_URL=<serviceaccount-name>@<serviceaccount-name>.iam.gserviceaccount.com # gdrive://${GOOGLE_SERVICE_ACCOUNT_URL}/<target-folder-name/>?myDriveFolderID=<google-myDrive-folder-id> # # both methods use a Google Services Account # export GOOGLE_SERVICE_JSON_FILE=<serviceaccount-credentials.json> # export GOOGLE_SERVICE_ACCOUNT_URL=<serviceaccount-name>@<serviceaccount-name>.iam.gserviceaccount.com # # Note that a local http server will be created on port 8080 to receive the redirect from the Google # OAuth service. If you are running on a remote server, you will need to port forward 8080 from the machine # which will do the web based authentication. self.shared_drive_corpora = {} self.shared_drive_id = {} self.shared_drive_flags_include = {} self.shared_drive_flags_support = {} self.shared_root_folder_id = None if u'driveID' in parsed_url.query_args: self.shared_drive_corpora = {u'corpora': u'drive'} self.shared_drive_id = {u'driveId': parsed_url.query_args[u'driveID'][0]} self.shared_drive_flags_include = {u'includeItemsFromAllDrives': True} self.shared_drive_flags_support = {u'supportsAllDrives': True} elif u'myDriveFolderID' in parsed_url.query_args: self.shared_drive_corpora = {u'corpora': u'user'} self.shared_drive_flags_include = {u'includeItemsFromAllDrives': True} self.shared_drive_flags_support = {u'supportsAllDrives': True} self.shared_root_folder_id = parsed_url.query_args[u'myDriveFolderID'][0] else: raise BackendException( u"gdrive: backend requires a query paramater should either be driveID or myDriveFolderID") if parsed_url.username is not None: client_id = parsed_url.username + u'@' + parsed_url.hostname else: client_id = parsed_url.hostname if u'GOOGLE_SERVICE_JSON_FILE' in os.environ: credentials = Credentials.from_service_account_file(os.environ[u'GOOGLE_SERVICE_JSON_FILE']) if credentials.service_account_email != client_id: raise BackendException( u'Service account email in the JSON file (%s) does not match the URL (%s)' % (credentials.service_account_email, client_id)) elif u'GOOGLE_CLIENT_SECRET_JSON_FILE' in os.environ and u'GOOGLE_CREDENTIALS_FILE' in os.environ: from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request credentials = None if os.path.exists(os.environ[u'GOOGLE_CREDENTIALS_FILE']): with open(os.environ[u'GOOGLE_CREDENTIALS_FILE'], u'rb') as token: credentials = pickle.load(token) # If there are no (valid) credentials available, let the user log in. if not credentials or not credentials.valid: if credentials and credentials.expired and credentials.refresh_token: credentials.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( os.environ[u'GOOGLE_CLIENT_SECRET_JSON_FILE'], [u'https://www.googleapis.com/auth/drive.file']) if flow.client_config[u'client_id'] != client_id: raise BackendException( u'Client ID in the JSON file (%s) does not match the URL (%s)' % (flow.client_config[u'client_id'], client_id)) flow_args = {} if u'GOOGLE_OAUTH_LOCAL_SERVER_PORT' in os.environ: flow_args[u'port'] = int(os.environ[u'GOOGLE_OAUTH_LOCAL_SERVER_PORT']) if u'GOOGLE_OAUTH_LOCAL_SERVER_HOST' in os.environ: flow_args[u'host'] = os.environ[u'GOOGLE_OAUTH_LOCAL_SERVER_HOST'] credentials = flow.run_local_server(**flow_args) # Save the credentials for the next run with open(os.environ[u'GOOGLE_CREDENTIALS_FILE'], u'wb') as token: pickle.dump(credentials, token) if credentials.client_id != client_id: raise BackendException( u'Client ID in the credentials file (%s) does not match the URL (%s)' % (credentials.client_id, client_id)) else: raise BackendException( u'GOOGLE_SERVICE_JSON_FILE or GOOGLE_CLIENT_SECRET_JSON_FILE environment ' u'variable not set. Please read the manpage to fix.') self.drive = build(u'drive', u'v3', credentials=credentials) if self.shared_drive_id: parent_folder_id = self.shared_drive_id[u'driveId'] elif self.shared_root_folder_id: parent_folder_id = self.shared_root_folder_id else: parent_folder_id = u'root' # Fetch destination folder entry and create hierarchy if required. folder_names = parsed_url.path.split(u'/') for folder_name in folder_names: if not folder_name: continue q = (u"name = '" + folder_name + u"' and '" + parent_folder_id + u"' in parents and mimeType = 'application/vnd.google-apps.folder' and trashed=false") results = self.drive.files().list(q=q, pageSize=1, fields=u"files(name,id),nextPageToken", **self.shared_drive_corpora, **self.shared_drive_id, **self.shared_drive_flags_include, **self.shared_drive_flags_support).execute() file_list = results.get(u'files', []) if len(file_list) == 0: file_metadata = {u'name': folder_name, u'mimeType': u"application/vnd.google-apps.folder", u'parents': [parent_folder_id]} file_metadata.update(self.shared_drive_id) folder = self.drive.files().create(body=file_metadata, fields=u'id', **self.shared_drive_flags_support).execute() else: folder = file_list[0] parent_folder_id = folder[u'id'] self.folder = parent_folder_id self.id_cache = {}
[docs] def file_by_name(self, filename): from googleapiclient.errors import HttpError filename = util.fsdecode(filename) if filename in self.id_cache: # It might since have been locally moved, renamed or deleted, so we # need to validate the entry. file_id = self.id_cache[filename] try: drive_file = self.drive.files().get(fileId=file_id, fields=u'id,size,name,parents,trashed', **self.shared_drive_flags_support).execute() if drive_file[u'name'] == filename and not drive_file[u'trashed']: for parent in drive_file[u'parents']: if parent == self.folder: log.Info(u"GDrive backend: found file '%s' with id %s in ID cache" % (filename, file_id)) return drive_file except HttpError as error: # A 404 occurs if the ID is no longer valid if error.resp.status != 404: raise # If we get here, the cache entry is invalid log.Info(u"GDrive backend: invalidating '%s' (previously ID %s) from ID cache" % (filename, file_id)) del self.id_cache[filename] # Not found in the cache, so use directory listing. This is less # reliable because there is no strong consistency. q = u"name = '%s' and '%s' in parents and trashed = false" % (filename, self.folder) results = self.drive.files().list(q=q, fields=u'files(name,id,size),nextPageToken', pageSize=2, **self.shared_drive_corpora, **self.shared_drive_id, **self.shared_drive_flags_include, **self.shared_drive_flags_support).execute() file_list = results.get(u'files', []) if len(file_list) > 1: log.FatalError(u"GDrive backend: multiple files called '%s'." % (filename,)) elif len(file_list) > 0: file_id = file_list[0][u'id'] self.id_cache[filename] = file_list[0][u'id'] log.Info(u"GDrive backend: found file '%s' with id %s on server, " u"adding to cache" % (filename, file_id)) return file_list[0] log.Info(u"GDrive backend: file '%s' not found in cache or on server" % (filename,)) return None
[docs] def id_by_name(self, filename): drive_file = self.file_by_name(filename) if drive_file is None: return u'' else: return drive_file[u'id']
[docs] def _put(self, source_path, remote_filename): from googleapiclient.http import MediaFileUpload remote_filename = util.fsdecode(remote_filename) drive_file = self.file_by_name(remote_filename) if remote_filename.endswith(u'.gpg'): mime_type = u'application/pgp-encrypted' else: mime_type = u'text/plain' file_size = os.path.getsize(source_path.name) if file_size >= self.MIN_RESUMABLE_UPLOAD: resumable = True num_retries = 5 else: resumable = False num_retries = 0 media = MediaFileUpload(source_path.name, mimetype=mime_type, resumable=resumable) if drive_file is None: # No existing file, make a new one file_metadata = {u'name': remote_filename, u'parents': [self.folder]} file_metadata.update(self.shared_drive_id) log.Info(u"GDrive backend: creating new file '%s'" % (remote_filename,)) drive_file = self.drive.files().create( body=file_metadata, media_body=media, **self.shared_drive_flags_support).execute(num_retries=num_retries) else: log.Info(u"GDrive backend: replacing existing file '%s' with id '%s'" % ( remote_filename, drive_file[u'id'])) drive_file = self.drive.files().update( media_body=media, fileId=drive_file[u'id'], **self.shared_drive_flags_support).execute(num_retries=num_retries) self.id_cache[remote_filename] = drive_file[u'id']
[docs] def _get(self, remote_filename, local_path): from googleapiclient.http import MediaIoBaseDownload drive_file = self.file_by_name(remote_filename) request = self.drive.files().get_media(fileId=drive_file[u'id'], **self.shared_drive_flags_support) with open(util.fsdecode(local_path.name), u"wb") as fh: done = False downloader = MediaIoBaseDownload(fh, request) while done is False: status, done = downloader.next_chunk()
[docs] def _list(self): page_token = None drive_files = [] while True: response = self.drive.files().list( q=u"'" + self.folder + u"' in parents and trashed=false", pageSize=self.PAGE_SIZE, fields=u"files(name,id),nextPageToken", pageToken=page_token, **self.shared_drive_corpora, **self.shared_drive_id, **self.shared_drive_flags_include, **self.shared_drive_flags_support).execute() drive_files += response.get(u'files', []) page_token = response.get(u'nextPageToken', None) if page_token is None: break filenames = set(item[u'name'] for item in drive_files) # Check the cache as well. A file might have just been uploaded but # not yet appear in the listing. # Note: do not use iterkeys() here, because file_by_name will modify # the cache if it finds invalid entries. for filename in list(self.id_cache.keys()): if (filename not in filenames) and (self.file_by_name(filename) is not None): filenames.add(filename) return list(filenames)
[docs] def _delete(self, filename): file_id = self.id_by_name(filename) if file_id == u'': log.Warn(u"File '%s' does not exist while trying to delete it" % (util.fsdecode(filename),)) else: self.drive.files().delete(fileId=file_id, **self.shared_drive_flags_support).execute()
[docs] def _query(self, filename): drive_file = self.file_by_name(filename) if drive_file is None: size = -1 else: size = int(drive_file[u'size']) return {u'size': size}
[docs] def _error_code(self, operation, error): # pylint: disable=unused-argument from google.auth.exceptions import RefreshError from googleapiclient.errors import HttpError if isinstance(error, HttpError): return log.ErrorCode.backend_not_found elif isinstance(error, RefreshError): return log.ErrorCode.backend_permission_denied return log.ErrorCode.backend_error
duplicity.backend.register_backend(u'gdrive', GDriveBackend) duplicity.backend.uses_netloc.extend([u'gdrive'])