Source code for duplicity.backends.imapbackend

# -*- Mode:Python; indent-tabs-mode:nil; tab-width:4; encoding:utf-8 -*-
# Copyright 2002 Ben Escoto <>
# Copyright 2007 Kenneth Loafman <>
# Copyright 2008 Ian Barton <>
# This file is part of duplicity.
# Duplicity is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.
# Duplicity is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with duplicity; if not, write to the Free Software Foundation,
# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

from future import standard_library
from builtins import input

import email
import email.encoders
import email.mime.multipart
import getpass
import imaplib
import os
import re
import socket
import sys
import time

from email.parser import Parser
    from email.policy import default  # pylint: disable=import-error

# TODO: should probably change use of socket.sslerror instead of doing this
if sys.version_info.major >= 3:
    import ssl
    socket.sslerror = ssl.SSLError

from duplicity import config
from duplicity import log
from duplicity.errors import *  # pylint: disable=unused-wildcard-import
import duplicity.backend

[docs]class ImapBackend(duplicity.backend.Backend):
[docs] def __init__(self, parsed_url): duplicity.backend.Backend.__init__(self, parsed_url) log.Debug(u"I'm %s (scheme %s) connecting to %s as %s" % (self.__class__.__name__, parsed_url.scheme, parsed_url.hostname, parsed_url.username)) # Store url for reconnection on error self.url = parsed_url # Set the username if (parsed_url.username is None): username = eval(input(u'Enter account userid: ')) else: username = parsed_url.username # Set the password if (not parsed_url.password): if u'IMAP_PASSWORD' in os.environ: password = os.environ.get(u'IMAP_PASSWORD') else: password = getpass.getpass(u"Enter account password: ") else: password = parsed_url.password self.username = username self.password = password self.resetConnection()
[docs] def resetConnection(self): parsed_url = self.url try: imap_server = os.environ[u'IMAP_SERVER'] except KeyError: imap_server = parsed_url.hostname # Try to close the connection cleanly try: self.conn.close() # pylint:disable=access-member-before-definition except Exception: pass if (parsed_url.scheme == u"imap"): cl = imaplib.IMAP4 self.conn = cl(imap_server, 143) elif (parsed_url.scheme == u"imaps"): cl = imaplib.IMAP4_SSL self.conn = cl(imap_server, 993) log.Debug(u"Type of imap class: %s" % (cl.__name__)) self.remote_dir = re.sub(r'^/', r'', parsed_url.path, 1) # Login if (not config.imap_full_address): self.conn.login(self.username, self.password) log.Info(u"IMAP connected") else: self.conn.login(self.username + u"@" + parsed_url.hostname, self.password) log.Info(u"IMAP connected")
[docs] def prepareBody(self, f, rname): mp = email.mime.multipart.MIMEMultipart() # I am going to use the remote_dir as the From address so that # multiple archives can be stored in an IMAP account and can be # accessed separately mp[u"From"] = self.remote_dir mp[u"Subject"] = rname.decode() a = email.mime.multipart.MIMEBase(u"application", u"binary") a.set_payload( email.encoders.encode_base64(a) mp.attach(a) return mp.as_string()
[docs] def _put(self, source_path, remote_filename): f ="rb") allowedTimeout = config.timeout if (allowedTimeout == 0): # Allow a total timeout of 1 day allowedTimeout = 2880 while allowedTimeout > 0: try: body = self.prepareBody(f, remote_filename) # If we don't select the IMAP folder before # append, the message goes into the INBOX. self.conn.append(config.imap_mailbox, None, None, body.encode()) break except (imaplib.IMAP4.abort, socket.error, socket.sslerror): allowedTimeout -= 1 log.Info(u"Error saving '%s', retrying in 30s " % remote_filename) time.sleep(30) while allowedTimeout > 0: try: self.resetConnection() break except (imaplib.IMAP4.abort, socket.error, socket.sslerror): allowedTimeout -= 1 log.Info(u"Error reconnecting, retrying in 30s ") time.sleep(30) log.Info(u"IMAP mail with '%s' subject stored" % remote_filename)
[docs] def _get(self, remote_filename, local_path): allowedTimeout = config.timeout if (allowedTimeout == 0): # Allow a total timeout of 1 day allowedTimeout = 2880 while allowedTimeout > 0: try: (result, flist) =, u'Subject', remote_filename) if result != u"OK": raise Exception(flist[0]) # check if there is any result if flist[0] == u'': raise Exception(u"no mail with subject %s") (result, flist) = self.conn.fetch(flist[0], u"(RFC822)") if result != u"OK": raise Exception(flist[0]) rawbody = flist[0][1] p = Parser() m = p.parsestr(rawbody.decode()) mp = m.get_payload(0) body = mp.get_payload(decode=True) break except (imaplib.IMAP4.abort, socket.error, socket.sslerror): allowedTimeout -= 1 log.Info(u"Error loading '%s', retrying in 30s " % remote_filename) time.sleep(30) while allowedTimeout > 0: try: self.resetConnection() break except (imaplib.IMAP4.abort, socket.error, socket.sslerror): allowedTimeout -= 1 log.Info(u"Error reconnecting, retrying in 30s ") time.sleep(30) tfile ="wb") tfile.write(body) tfile.close() local_path.setdata() log.Info(u"IMAP mail with '%s' subject fetched" % remote_filename)
[docs] def _list(self): ret = [] (result, flist) = if result != u"OK": raise BackendException(flist[0]) # Going to find all the archives which have remote_dir in the From # address # Search returns an error if you haven't selected an IMAP folder. (result, flist) =, u'FROM', self.remote_dir) if result != u"OK": raise Exception(flist[0]) if flist[0] == b'': return ret nums = flist[0].strip().split(b" ") set = b"%s:%s" % (nums[0], nums[-1]) # pylint: disable=redefined-builtin (result, flist) = self.conn.fetch(set, u"(BODY[HEADER])") if result != u"OK": raise Exception(flist[0]) for msg in flist: if (len(msg) == 1): continue if sys.version_info.major >= 3: headers = Parser(policy=default).parsestr(msg[1].decode(u"unicode-escape")) # noqa # pylint: disable=unsubscriptable-object else: headers = Parser().parsestr(msg[1].decode(u"unicode-escape")) # pylint: disable=unsubscriptable-object subj = headers[u"subject"] header_from = headers[u"from"] # Catch messages with empty headers which cause an exception. if (not (header_from is None)): if (re.compile(u"^" + self.remote_dir + u"$").match(header_from)): ret.append(subj) log.Info(u"IMAP flist: %s %s" % (subj, header_from)) return ret
[docs] def imapf(self, fun, *args): (ret, flist) = fun(*args) if ret != u"OK": raise Exception(flist[0]) return flist
[docs] def delete_single_mail(self, i): self.imapf(, i, u"+FLAGS", u'\\DELETED')
[docs] def expunge(self): flist = self.imapf(self.conn.expunge)
[docs] def _delete_list(self, filename_list): for filename in filename_list: flist = self.imapf(, None, u"(SUBJECT %s)" % filename) flist = flist[0].split() if len(flist) > 0 and flist[0] != u"": self.delete_single_mail(flist[0]) log.Notice(u"marked %s to be deleted" % filename) self.expunge() log.Notice(u"IMAP expunged %s files" % len(filename_list))
[docs] def _close(self): self.conn.close() self.conn.logout()
duplicity.backend.register_backend(u"imap", ImapBackend) duplicity.backend.register_backend(u"imaps", ImapBackend) duplicity.backend.uses_netloc.extend([u'imap', u'imaps'])