Page MenuHomec4science

dropboxfs.py
No OneTemporary

File Metadata

Created
Tue, May 28, 17:15

dropboxfs.py

# -*- coding: utf-8 -*-
##
## This file is part of Invenio.
## Copyright (C) 2013 CERN.
##
## Invenio is free software; you can redistribute it and/or
## modify it under the terms of the GNU General Public License as
## published by the Free Software Foundation; either version 2 of the
## License, or (at your option) any later version.
##
## Invenio is distributed in the hope that it will be useful, but
## WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
## General Public License for more details.
##
## You should have received a copy of the GNU General Public License
## along with Invenio; if not, write to the Free Software Foundation, Inc.,
## 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
"""
Dropbox file system
-------------------
Installation::
pip install dropbox
"""
import os
import time
import datetime
import calendar
from UserDict import UserDict
from fs.base import FS, synchronize, NoDefaultMeta
from fs.path import normpath, abspath, pathsplit, basename, dirname
from fs.errors import DirectoryNotEmptyError, UnsupportedError, \
CreateFailedError, ResourceInvalidError, \
ResourceNotFoundError, \
OperationFailedError, DestinationExistsError, \
RemoteConnectionError
from fs.remote import RemoteFileBuffer
from fs.filelike import SpooledTemporaryFile
from dropbox import rest
from dropbox import client
# Items in cache are considered expired after 5 minutes.
CACHE_TTL = 300
# The format Dropbox uses for times.
TIME_FORMAT = '%a, %d %b %Y %H:%M:%S +0000'
# Max size for spooling to memory before using disk (5M).
MAX_BUFFER = 1024**2*5
class CacheItem(object):
"""Represents a path in the cache. There are two components to a path.
It's individual metadata, and the children contained within it."""
def __init__(self, metadata=None, children=None, timestamp=None):
self.metadata = metadata
self.children = children
if timestamp is None:
timestamp = time.time()
self.timestamp = timestamp
def add_child(self, name, client=None):
if self.children is None:
if client != None:
# This is a fix. When you add a child to a folder that
# was still not listed, that folder gets only one
# child when you list it afterwards. So this fix
# first tries to check are the files/folders inside
# this directory on cloud.
client.children(self.metadata['path'])
else:
self.children = [name]
else:
if name not in self.children:
self.children.append(name)
def del_child(self, name):
if self.children is None:
return
try:
i = self.children.index(name)
except ValueError:
return
self.children.pop(i)
def _get_expired(self):
if self.timestamp <= time.time() - CACHE_TTL:
return True
expired = property(_get_expired)
def renew(self):
self.timestamp = time.time()
class DropboxCache(UserDict):
def __init__(self, client):
self._client = client
UserDict.__init__(self)
def set(self, path, metadata):
self[path] = CacheItem(metadata)
dname, bname = pathsplit(path)
item = self.get(dname)
if item:
item.add_child(bname, self._client)
def pop(self, path, default=None):
value = UserDict.pop(self, path, default)
dname, bname = pathsplit(path)
item = self.get(dname)
if item:
item.del_child(bname)
return value
class DropboxClient(client.DropboxClient):
"""A wrapper around the official DropboxClient. This wrapper performs
caching as well as converting errors to fs exceptions."""
def __init__(self, *args, **kwargs):
super(DropboxClient, self).__init__(*args, **kwargs)
self.cache = DropboxCache(self)
# Below we split the DropboxClient metadata() method into two methods
# metadata() and children(). This allows for more fine-grained fetches
# and caching.
def metadata(self, path):
"Gets metadata for a given path."
item = self.cache.get(path)
if not item or item.metadata is None or item.expired:
try:
metadata = super(DropboxClient, self).metadata(path,
include_deleted=False, list=False)
except rest.ErrorResponse as e:
if e.status == 404:
raise ResourceNotFoundError(path)
raise OperationFailedError(opname='metadata', path=path,
msg=str(e) )
except:
raise RemoteConnectionError("Most probable reasons: " + \
"access token has expired " + \
"or user credentials are invalid.")
if metadata.get('is_deleted', False):
raise ResourceNotFoundError(path)
item = self.cache[path] = CacheItem(metadata)
# Copy the info so the caller cannot affect our cache.
return dict(item.metadata.items())
def children(self, path):
"Gets children of a given path."
update = False
hash_ = None
item = self.cache.get(path)
if item:
if item.expired:
update = True
if item.metadata and item.children:
hash_ = item.metadata['hash']
else:
if not item.metadata.get('is_dir'):
raise ResourceInvalidError(path)
if not item.children:
update = True
else:
update = True
if update:
try:
metadata = super(DropboxClient, self).metadata(path, hash=hash_,
include_deleted=False, list=True)
children = []
contents = metadata.pop('contents')
for child in contents:
if child.get('is_deleted', False):
continue
children.append(basename(child['path']))
self.cache[child['path']] = CacheItem(child)
item = self.cache[path] = CacheItem(metadata, children)
except rest.ErrorResponse as e:
if not item or e.status != 304:
raise OperationFailedError(opname='metadata', path=path,
msg=str(e) )
# We have an item from cache (perhaps expired), but it's
# hash is still valid (as far as Dropbox is concerned),
# so just renew it and keep using it.
item.renew()
except:
raise RemoteConnectionError("Most probable reasons: " + \
"access token has expired " + \
"or user credentials are invalid.")
return item.children
def file_create_folder(self, path):
"Add newly created directory to cache."
try:
metadata = super(DropboxClient, self).file_create_folder(path)
except rest.ErrorResponse as e:
if e.status == 403:
raise DestinationExistsError(path)
if e.status == 400:
raise OperationFailedError(opname='file_create_folder', msg=str(e) )
except:
raise RemoteConnectionError("Most probable reasons: " + \
"access token has expired " + \
"or user credentials are invalid.")
self.cache.set(path, metadata)
return metadata['path']
def file_copy(self, src, dst):
try:
metadata = super(DropboxClient, self).file_copy(src, dst)
except rest.ErrorResponse as e:
if e.status == 404:
raise ResourceNotFoundError(src)
if e.status == 403:
raise DestinationExistsError(dst)
if e.status == 503:
raise OperationFailedError(opname='file_copy', msg="User over storage quota")
raise OperationFailedError(opname='file_copy', msg= str(e) )
except:
raise RemoteConnectionError("Most probable reasons: " + \
"access token has expired " + \
"or user credentials are invalid.")
self.cache.set(dst, metadata)
return metadata['path']
def file_move(self, src, dst):
try:
metadata = super(DropboxClient, self).file_move(src, dst)
except rest.ErrorResponse as e:
if e.status == 404:
raise ResourceNotFoundError(src)
if e.status == 403:
raise DestinationExistsError(dst)
if e.status == 503:
raise OperationFailedError(opname='file_copy', msg="User over storage quota")
raise OperationFailedError(opname='file_copy', msg= str(e) )
except:
raise RemoteConnectionError("Most probable reasons: " + \
"access token has expired " + \
"or user credentials are invalid.")
self.cache.pop(src, None)
self.cache.set(dst, metadata)
return metadata['path']
def file_delete(self, path):
try:
super(DropboxClient, self).file_delete(path)
except rest.ErrorResponse as e:
if e.status == 404:
raise ResourceNotFoundError(path)
if e.status == 400 and 'must not be empty' in str(e):
raise DirectoryNotEmptyError(path)
raise OperationFailedError(opname='file_copy', msg=str(e) )
except:
raise RemoteConnectionError("Most probable reasons: " + \
"access token has expired " + \
"or user credentials are invalid.")
self.cache.pop(path, None)
def put_file(self, path, f, overwrite=False):
try:
super(DropboxClient, self).put_file(path, f, overwrite=overwrite)
except rest.ErrorResponse as e:
raise OperationFailedError(opname='file_copy', msg=str(e) )
except TypeError as e:
raise ResourceInvalidError("put_file", path)
except:
raise RemoteConnectionError("Most probable reasons: " + \
"access token has expired " + \
"or user credentials are invalid.")
self.cache.pop(dirname(path), None)
return path
def media(self, path):
try:
info = super(DropboxClient, self).media( path )
return info.get('url', None)
except rest.ErrorResponse as e:
if e.status == 400:
raise UnsupportedError("create a link to a folder")
if e.status == 404:
raise ResourceNotFoundError(path)
raise OperationFailedError(opname='file_copy', msg=str(e) )
except:
raise RemoteConnectionError("Most probable reasons: " + \
"access token has expired " + \
"or user credentials are invalid.")
class DropboxFS(FS):
"""A Dropbox filesystem."""
__name__ = "Dropbox"
_meta = { 'thread_safe' : True,
'virtual' : False,
'read_only' : False,
'unicode_paths' : True,
'case_insensitive_paths' : True,
'network' : True,
'atomic.setcontents' : True,
'atomic.makedir': True,
'atomic.rename': True,
'mime_type': 'virtual/dropbox',
}
def __init__(self, root=None, credentials=None, localtime=False, thread_synchronize=True):
self._root = root
self._credentials = credentials
if root == None:
root = "/"
if self._credentials == None:
if "DROPBOX_ACCESS_TOKEN" not in os.environ:
raise CreateFailedError("DROPBOX_ACCESS_TOKEN is not set in os.environ")
else:
self._credentials['access_token'] = os.environ.get('DROPBOX_ACCESS_TOKEN')
super(DropboxFS, self).__init__(thread_synchronize=thread_synchronize)
self.client = DropboxClient(
oauth2_access_token=self._credentials['access_token'])
self.localtime = localtime
def __repr__(self):
args = (self.__class__.__name__, self._root)
return '<FileSystem: %s - Root Directory: %s>' % args
__str__ = __repr__
def __unicode__(self):
args = (self.__class__.__name__, self._root)
return u'<FileSystem: %s - Root Directory: %s>' % args
def getmeta(self, meta_name, default=NoDefaultMeta):
if meta_name == 'read_only':
return self.read_only
return super(DropboxFS, self).getmeta(meta_name, default)
def is_root(self, path):
"""Checks if the given path is the root folder of this
instance of DropboxFS
@param path: Path to the folder to check
"""
if( path == self._root):
return True
else:
return False
@synchronize
def open(self, path, mode="rb", **kwargs):
""" Open the named file in the given mode.
This method downloads the file contents into a local temporary
file so that it can be worked on efficiently. Any changes
made to the file are only sent back to cloud storage when
the file is flushed or closed.
@param path: Path to the file to be opened
@param mode: In which mode to open the file
@raise ResourceNotFoundError: If given path doesn't exist and
'w' is not in mode
@return: RemoteFileBuffer object
"""
path = abspath(normpath(path))
spooled_file = SpooledTemporaryFile(mode=mode, bufsize=MAX_BUFFER)
if "w" in mode:
# Truncate the file if requested
self.client.put_file(path, "", True)
else:
# Try to write to the spooled file, if path doesn't exist create it if
# 'w' is in mode
try:
spooled_file.write( self.client.get_file(path).read() )
spooled_file.seek(0, 0)
except:
if "w" not in mode:
raise ResourceNotFoundError(path)
else:
self.createfile(path, True)
# This will take care of closing the socket when it's done.
return RemoteFileBuffer(self,path,mode,spooled_file)
@synchronize
def getcontents(self, path, mode="rb", **kwargs):
path = abspath(normpath(path))
return self.open(path, mode).read()
def setcontents(self, path, data, *args, **kwargs):
"""Sets new content to remote file
Method works only with existing files and sets
new content to them.
@param path: Path the file in which to write the new content
@param contents: File contents as a string, or any object with
read and seek methods
@param kwargs: additional parameters like:
encoding: the type of encoding to use if data is text
errors: encoding errors
@param chunk_size: Number of bytes to read in a chunk,
if the implementation has to resort to a read copy loop
@return: Path of the updated file
"""
path = abspath(normpath(path))
self.client.put_file(path, data, overwrite=True)
return path
def desc(self, path):
"""
@return: The title for the given path.
"""
path = abspath(normpath(path))
info = self.getinfo(path)
return info["title"]
def getsyspath(self, path, allow_none=False):
"Returns a path as the Dropbox API specifies."
if allow_none:
return None
return client.format_path(abspath(normpath(path)))
def isdir(self, path):
""" Checks if a the specified path is a folder
@param path: Path to the file/folder to check
"""
info = self.getinfo(path)
return info.get('isdir')
def isfile(self, path):
""" Checks if a the specified path is a file
@param path: Path to the file/folder to check
"""
info = self.getinfo(path)
return not info.get('isdir')
def exists(self, path):
""" Checks if a the specified path exists
@param path: Path to the file/folder to check
"""
try:
self.getinfo(path)
return True
except ResourceNotFoundError:
return False
def listdir(self, path="/", wildcard=None, full=False, absolute=False, dirs_only=False, files_only=False):
""" Lists the the files and directories under a given path.
The directory contents are returned as a list of unicode paths
@param path: path to the folder to list
@type path: string
@param wildcard: Only returns paths that match this wildcard
@type wildcard: string containing a wildcard, or a callable
that accepts a path and returns a boolean
@param full: returns full paths (relative to the root)
@type full: bool
@param absolute: returns absolute paths
(paths beginning with /)
@type absolute: bool
@param dirs_only: if True, only return directories
@type dirs_only: bool
@param files_only: if True, only return files
@type files_only: bool
@return: a list of unicode paths
"""
path = abspath(normpath(path))
children = self.client.children(path)
return self._listdir_helper(path, children, wildcard, full, absolute, dirs_only, files_only)
@synchronize
def getinfo(self, path):
""" Returned information is metadata from cloud service +
a few more fields with standard names for some parts
of the metadata.
@param path: path to the file/folder for which to return
informations
@return: dictionary with informations about the specific file
"""
path = abspath(normpath(path))
metadata = self.client.metadata(path)
return self._metadata_to_info(metadata, localtime=self.localtime)
def copy(self, src, dst, *args, **kwargs):
"""
@param src: Path to the file to be copied
@param dst: Path to the folder in which to copy the file
@return: Path to the copied file
"""
src = abspath(normpath(src))
dst = abspath(normpath(dst))
return self.client.file_copy(src, dst)
def copydir(self, src, dst, *args, **kwargs):
"""
@param src: Path to the folder to be copied
@param dst: Path to the folder in which to copy the folder
@return: Path to the copied folder
"""
src = abspath(normpath(src))
dst = abspath(normpath(dst))
return self.client.file_copy(src, dst)
def move(self, src, dst, chunk_size=16384, *args, **kwargs):
"""
@param src: Path to the file to be moved
@param dst: Path to the folder in which the file will be moved
@param chunk_size: if using chunk upload
@return: Path to the moved file
"""
src = abspath(normpath(src))
dst = abspath(normpath(dst))
return self.client.file_move(src, dst)
def movedir(self, src, dst, *args, **kwargs):
"""
@param src: Path to the folder to be moved
@param dst: Path to the folder in which the folder will be moved
@param chunk_size: if using chunk upload
@return: Path to the moved folder
"""
src = abspath(normpath(src))
dst = abspath(normpath(dst))
return self.client.file_move(src, dst)
def rename(self, src, dst, *args, **kwargs):
"""
@param src: Path to the file to be renamed
@param dst: Full path with the new name
@raise UnsupportedError: If trying to remove the root directory
@return: Path to the renamed file
"""
src = abspath(normpath(src))
dst = abspath(normpath(dst))
return self.client.file_move(src, dst)
def makedir(self, path, recursive=False, allow_recreate=False):
"""
@param path: path to the folder to be created.
If only the new folder is specified
it will be created in the root directory
@param recursive: allows recursive creation of directories
@param allow_recreate: dropbox currently doesn't support
allow_recreate, so if a folder exists it will
@return: Id of the created directory
"""
if not self._checkRecursive(recursive, path):
raise UnsupportedError("recursively create specified folder")
path = abspath(normpath(path))
return self.client.file_create_folder(path)
def createfile(self, path, wipe=False, **kwargs):
"""Creates an empty file.
@param path: path to the new file.
@param wipe: New file with empty content.
@param kwargs: Additional parameters like:
description - a short description of the new file
@attention: Root directory is the current root directory
of this instance of filesystem and not the root of
your Google Drive.
@return: Path to the created file
"""
return self.client.put_file(path, '', overwrite=wipe)
def remove(self, path):
"""
@param path: path to the file to be deleted
@return: None if removal was successful
"""
path = abspath(normpath(path))
if self.is_root(path = path):
raise UnsupportedError("Can't remove the root directory")
if self.isdir(path = path):
raise ResourceInvalidError("Specified path is a directory. " +
"Please use removedir.")
self.client.file_delete(path)
def removedir(self, path, *args, **kwargs):
"""
@param path: path to the file to be deleted
@return: None if removal was successful
"""
path = abspath(normpath(path))
if self.is_root(path = path):
raise UnsupportedError("Can't remove the root directory")
if self.isfile(path = path):
raise ResourceInvalidError("Specified path is a directory. " +
"Please use removedir.")
self.client.file_delete(path)
def getpathurl(self, path):
"""
@param path: path to the file for which to return the url path
@param allow_none: if true, this method can return None if
there is no URL form of the given path
@type allow_none: bool
@return: url that corresponds to the given path, if one exists
"""
path = abspath(normpath(path))
return self.client.media(path)
def about(self):
"""
@return: information about the current user
with whose credentials is the file system instantiated.
"""
info = self.client.account_info()
info['cloud_storage_url'] = "http://www.dropbox.com/"
info['user_name'] = info.pop('display_name')
info['quota'] = 100*(info['quota_info']["normal"]+info['quota_info']["shared"])/ float(info['quota_info']["quota"])
return info
return self.client.account_info()
def _checkRecursive(self, recursive, path):
# Checks if the new folder to be created is compatible with current
# value of recursive
parts = path.split("/")
if( parts < 3 ):
return True
testPath = "/".join(parts[:-1])
if self.exists(testPath):
return True
elif recursive:
return True
else:
return False
def _metadata_to_info(self, metadata, localtime=False):
""" Returns modified metadata
Method adds a few standard names to the metadata:
size - the size of the file/folder
isdir - is something a file or a directory
created_time - the time of the creation
path - path to the object which metadata are we showing
revision - google drive doesn't have a revision parameter
modified - time of the last modification
@return: The full metadata and a few more fields
with standard names.
"""
info = {
'size': metadata.get('bytes', 0),
'isdir': metadata.get('is_dir', False),
'title': metadata['path'].split("/")[-1],
'created_time': None
}
try:
mtime = metadata.get('modified', None)
if mtime:
# Parse date/time from Dropbox as struct_time.
mtime = time.strptime(mtime, TIME_FORMAT)
if localtime:
# Convert time to local timezone in seconds.
mtime = calendar.timegm(mtime)
else:
mtime = time.mktime(mtime)
# Convert to datetime object, store in modified_time
info['modified'] = datetime.datetime.fromtimestamp(mtime)
except KeyError:
pass
info.update(metadata)
return info

Event Timeline