diff --git a/datajoint/__init__.py b/datajoint/__init__.py index 754f22747..9e98ba31b 100644 --- a/datajoint/__init__.py +++ b/datajoint/__init__.py @@ -14,30 +14,27 @@ http://dx.doi.org/10.1101/031658 """ -__author__ = "Dimitri Yatsenko, Edgar Y. Walker, and Fabian Sinz at Baylor College of Medicine" -__date__ = "Nov 15, 2018" +__author__ = "DataJoint Contributors" +__date__ = "February 7, 2019" __all__ = ['__author__', '__version__', - 'config', 'conn', 'kill', 'Table', - 'Connection', 'Heading', 'FreeTable', 'Not', 'schema', + 'config', 'conn', 'Connection', + 'schema', 'create_virtual_module', 'get_schema_names', + 'Table', 'FreeTable', 'Manual', 'Lookup', 'Imported', 'Computed', 'Part', - 'AndList', 'ERD', 'U', 'key', - 'DataJointError', 'DuplicateError', - 'set_password', 'create_virtual_module'] + 'Not', 'AndList', 'U', 'ERD', + 'set_password', 'kill', + 'DataJointError', 'DuplicateError', 'key'] -# ------------- flatten import hierarchy ------------------------- from .version import __version__ from .settings import config from .connection import conn, Connection -from .table import FreeTable, Table +from .schema import Schema as schema +from .schema import create_virtual_module, get_schema_names +from .table import Table, FreeTable from .user_tables import Manual, Lookup, Imported, Computed, Part from .expression import Not, AndList, U -from .heading import Heading -from .schema import Schema as schema -from .schema import create_virtual_module from .erd import ERD from .admin import set_password, kill from .errors import DataJointError, DuplicateError from .fetch import key - - diff --git a/datajoint/attach.py b/datajoint/attach.py new file mode 100644 index 000000000..d137aa3d2 --- /dev/null +++ b/datajoint/attach.py @@ -0,0 +1,28 @@ +""" +functionality for attaching files +""" +from os import path +from itertools import count, chain + + +def load(local_path): + """ make an attachment from a local file """ + with open(local_path, mode='rb') as f: # b is important -> binary + contents = f.read() + return str.encode(path.basename(local_path)) + b'\0' + contents + + +def save(buffer, save_path='.'): + """ save attachment from memory buffer into the save_path """ + p = buffer.find(b'\0') + file_path = path.abspath(path.join(save_path, buffer[:p].decode())) + + if path.isfile(file_path): + # generate a new filename + file, ext = path.splitext(file_path) + file_path = next(f for f in ('%s_%04x%s' % (file, n, ext) for n in count()) + if not path.isfile(f)) + + with open(file_path, mode='wb') as f: + f.write(buffer[p+1:]) + return file_path diff --git a/datajoint/declare.py b/datajoint/declare.py index 5b8424fb9..64b81c09d 100644 --- a/datajoint/declare.py +++ b/datajoint/declare.py @@ -197,7 +197,6 @@ def declare(full_table_name, definition, context): :param definition: DataJoint table definition :param context: dictionary of objects that might be referred to in the table. """ - table_name = full_table_name.strip('`').split('.')[1] if len(table_name) > MAX_TABLE_NAME_LENGTH: raise DataJointError( @@ -271,12 +270,13 @@ def compile_attribute(line, in_key, foreign_key_sql): match['default'] = '' match = {k: v.strip() for k, v in match.items()} match['nullable'] = match['default'].lower() == 'null' - accepted_datatype = r'time|date|year|enum|(var)?char|float|real|double|decimal|numeric|' \ - r'(tiny|small|medium|big)?int|bool|' \ - r'(tiny|small|medium|long)?blob|external|attach' + blob_datatype = r'(tiny|small|medium|long)?blob' + accepted_datatype = ( + r'time|date|year|enum|(var)?char|float|real|double|decimal|numeric|' + r'(tiny|small|medium|big)?int|bool|external|attach|' + blob_datatype) if re.match(accepted_datatype, match['type'], re.I) is None: raise DataJointError('DataJoint does not support datatype "{type}"'.format(**match)) - + is_blob = bool(re.match(blob_datatype, match['type'], re.I)) literals = ['CURRENT_TIMESTAMP'] # not to be enclosed in quotes if match['nullable']: if in_key: @@ -285,38 +285,45 @@ def compile_attribute(line, in_key, foreign_key_sql): else: if match['default']: quote = match['default'].upper() not in literals and match['default'][0] not in '"\'' - match['default'] = ('NOT NULL DEFAULT ' + - ('"%s"' if quote else "%s") % match['default']) + match['default'] = 'NOT NULL DEFAULT ' + ('"%s"' if quote else "%s") % match['default'] else: match['default'] = 'NOT NULL' match['comment'] = match['comment'].replace('"', '\\"') # escape double quotes in comment - - is_external = match['type'].startswith('external') - is_attachment = match['type'].startswith('attachment') - if not is_external: - sql = ('`{name}` {type} {default}' + (' COMMENT "{comment}"' if match['comment'] else '')).format(**match) - else: - # process externally stored attribute + is_configurable = match['type'].startswith(('external', 'blob-', 'attach')) + is_external = False + if is_configurable: if in_key: - raise DataJointError('External attributes cannot be primary in:\n%s' % line) + raise DataJointError('Configurable attributes cannot be primary in:\n%s' % line) + match['comment'] = ':{type}:{comment}'.format(**match) # insert configurable type into comment store_name = match['type'].split('-') - if store_name[0] != 'external': - raise DataJointError('External store types must be specified as "external" or "external-"') + if store_name[0] not in ('external', 'blob', 'attach'): + raise DataJointError('Configurable types must be in the form blob- or attach- in:\n%s' % line) store_name = '-'.join(store_name[1:]) - if store_name != '' and not store_name.isidentifier(): + if store_name and not store_name.isidentifier(): raise DataJointError( 'The external store name `{type}` is invalid. Make like a python identifier.'.format(**match)) if len(store_name) > STORE_NAME_LENGTH: raise DataJointError( 'The external store name `{type}` is too long. Must be <={max_len} characters.'.format( max_len=STORE_NAME_LENGTH, **match)) - if not match['default'] in ('DEFAULT NULL', 'NOT NULL'): - raise DataJointError('The only acceptable default value for an external field is null in:\n%s' % line) - if match['type'] not in config: - raise DataJointError('The external store `{type}` is not configured.'.format(**match)) + spec = config.get_store_spec(store_name) + is_external = spec['protocol'] in {'s3', 'file'} + if not is_external: + is_blob = re.match(blob_datatype, spec['protocol'], re.I) + if not is_blob: + raise DataJointError('Invalid protocol {protocol} in external store in:\n{line}'.format( + line=line, **spec)) + match['type'] = spec['protocol'] + + if (is_external or is_blob) and match['default'] not in ('DEFAULT NULL', 'NOT NULL'): + raise DataJointError( + 'The default value for a blob or attachment can only be NULL in:\n%s' % line) - # append external configuration name to the end of the comment - sql = '`{name}` {hash_type} {default} COMMENT ":{type}:{comment}"'.format( + if not is_external: + sql = ('`{name}` {type} {default}' + (' COMMENT "{comment}"' if match['comment'] else '')).format(**match) + else: + # add hash field with a dependency on the ~external table + sql = '`{name}` {hash_type} {default} COMMENT "{comment}"'.format( hash_type=HASH_DATA_TYPE, **match) foreign_key_sql.append( "FOREIGN KEY (`{name}`) REFERENCES {{external_table}} (`hash`) " diff --git a/datajoint/erd.py b/datajoint/erd.py index 677ef2267..6e830d26b 100644 --- a/datajoint/erd.py +++ b/datajoint/erd.py @@ -50,8 +50,8 @@ class ERD: """ def __init__(self, *args, **kwargs): - warnings.warn('ERD functionality depends on matplotlib and pygraphviz. Please install both of these ' - 'libraries to enable the ERD feature.') + warnings.warn('ERD functionality depends on matplotlib and pygraphviz. ' + 'Please install both of these libraries to enable the ERD feature.') else: class ERD(nx.DiGraph): """ @@ -228,7 +228,6 @@ def _make_graph(self): return graph def make_dot(self): - import networkx as nx graph = self._make_graph() graph.nodes() diff --git a/datajoint/external.py b/datajoint/external.py index 2978681ac..1449e6443 100644 --- a/datajoint/external.py +++ b/datajoint/external.py @@ -1,15 +1,21 @@ import os -from tqdm import tqdm +import itertools from .settings import config from .errors import DataJointError from .hash import long_hash -from .blob import pack, unpack from .table import Table from .declare import STORE_HASH_LENGTH, HASH_DATA_TYPE -from .s3 import Folder as S3Folder +from . import s3 from .utils import safe_write +def subfold(name, folds): + """ + subfolding for external storage: e.g. subfold('abcdefg', (2, 3)) --> ['ab','cde'] + """ + return (name[:folds[0]].lower(),) + subfold(name[folds[0]:], folds[1:]) if folds else () + + class ExternalTable(Table): """ The table tracking externally stored objects. @@ -42,15 +48,15 @@ def definition(self): def table_name(self): return '~external' - def put(self, store, obj): + def put(self, store, blob): """ put an object in external store """ - spec = self._get_store_spec(store) - blob = pack(obj) - blob_hash = long_hash(blob) + store[len('external-'):] + store = ''.join(store.split('-')[1:]) + spec = config.get_store_spec(store) + blob_hash = long_hash(blob) + store if spec['protocol'] == 'file': - folder = os.path.join(spec['location'], self.database) + folder = os.path.join(spec['location'], self.database, *subfold(blob_hash, spec['subfolding'])) full_path = os.path.join(folder, blob_hash) if not os.path.isfile(full_path): try: @@ -59,9 +65,10 @@ def put(self, store, obj): os.makedirs(folder) safe_write(full_path, blob) elif spec['protocol'] == 's3': - S3Folder(database=self.database, **spec).put(blob_hash, blob) + folder = '/'.join(subfold(blob_hash, spec['subfolding'])) + s3.Folder(database=self.database, **spec).put('/'.join((folder, blob_hash)), blob) else: - raise DataJointError('Unknown external storage protocol {protocol} for {store}'.format( + raise DataJointError('Unknown external storage protocol {protocol} in store "-{store}"'.format( store=store, protocol=spec['protocol'])) # insert tracking info @@ -80,12 +87,10 @@ def get(self, blob_hash): """ if blob_hash is None: return None - store = blob_hash[STORE_HASH_LENGTH:] - store = 'external' + ('-' if store else '') + store - - cache_folder = config.get('cache', None) + # attempt to get object from cache blob = None + cache_folder = config.get('cache', None) if cache_folder: try: with open(os.path.join(cache_folder, blob_hash), 'rb') as f: @@ -93,10 +98,13 @@ def get(self, blob_hash): except FileNotFoundError: pass + # attempt to get object from store if blob is None: - spec = self._get_store_spec(store) + store = blob_hash[STORE_HASH_LENGTH:] + spec = config.get_store_spec(store) if spec['protocol'] == 'file': - full_path = os.path.join(spec['location'], self.database, blob_hash) + subfolders = os.path.join(*subfold(blob_hash, spec['subfolding'])) + full_path = os.path.join(spec['location'], self.database, subfolders, blob_hash) try: with open(full_path, 'rb') as f: blob = f.read() @@ -104,7 +112,8 @@ def get(self, blob_hash): raise DataJointError('Lost access to external blob %s.' % full_path) from None elif spec['protocol'] == 's3': try: - blob = S3Folder(database=self.database, **spec).get(blob_hash) + subfolder = '/'.join(subfold(blob_hash, spec['subfolding'])) + blob = s3.Folder(database=self.database, **spec).get('/'.join((subfolder, blob_hash))) except TypeError: raise DataJointError('External store {store} configuration is incomplete.'.format(store=store)) else: @@ -115,7 +124,7 @@ def get(self, blob_hash): os.makedirs(cache_folder) safe_write(os.path.join(cache_folder, blob_hash), blob) - return unpack(blob) + return blob @property def references(self): @@ -156,34 +165,35 @@ def delete_garbage(self): for ref in self.references) or "TRUE") print('Deleted %d items' % self.connection.query("SELECT ROW_COUNT()").fetchone()[0]) - def clean_store(self, store, display_progress=True): + def clean_store(self, store, verbose=True): """ Clean unused data in an external storage repository from unused blobs. This must be performed after delete_garbage during low-usage periods to reduce risks of data loss. """ - spec = self._get_store_spec(store) - progress = tqdm if display_progress else lambda x: x + spec = config.get_store_spec(store) + in_use = set(x for x in (self & '`hash` LIKE "%%{store}"'.format(store=store)).fetch('hash')) if spec['protocol'] == 'file': - folder = os.path.join(spec['location'], self.database) - delete_list = set(os.listdir(folder)).difference(self.fetch('hash')) - print('Deleting %d unused items from %s' % (len(delete_list), folder), flush=True) - for f in progress(delete_list): - os.remove(os.path.join(folder, f)) + count = itertools.count() + print('Deleting...') + deleted_folders = set() + for folder, dirs, files in os.walk(os.path.join(spec['location'], self.database), topdown=False): + if dirs and files: + raise DataJointError('Invalid repository with files in non-terminal folder %s' % folder) + dirs = set(d for d in dirs if os.path.join(folder, d) not in deleted_folders) + if not dirs: + files_not_in_use = [f for f in files if f not in in_use] + for f in files_not_in_use: + filename = os.path.join(folder, f) + next(count) + if verbose: + print(filename) + os.remove(filename) + if len(files_not_in_use) == len(files): + os.rmdir(folder) + deleted_folders.add(folder) + print('Deleted %d objects' % next(count)) elif spec['protocol'] == 's3': try: - S3Folder(database=self.database, **spec).clean(self.fetch('hash')) + failed_deletes = s3.Folder(database=self.database, **spec).clean(in_use, verbose=verbose) except TypeError: raise DataJointError('External store {store} configuration is incomplete.'.format(store=store)) - - @staticmethod - def _get_store_spec(store): - try: - spec = config[store] - except KeyError: - raise DataJointError('Storage {store} is requested but not configured'.format(store=store)) from None - if 'protocol' not in spec: - raise DataJointError('Storage {store} config is missing the protocol field'.format(store=store)) - if spec['protocol'] not in {'file', 's3'}: - raise DataJointError( - 'Unknown external storage protocol "{protocol}" in "{store}"'.format(store=store, **spec)) - return spec diff --git a/datajoint/fetch.py b/datajoint/fetch.py index 79f59960f..69ac335f3 100644 --- a/datajoint/fetch.py +++ b/datajoint/fetch.py @@ -4,7 +4,7 @@ import pandas import re import numpy as np -from .blob import unpack +from . import blob, attach from .errors import DataJointError from .settings import config @@ -24,14 +24,34 @@ def is_key(attr): def to_dicts(recarray): """convert record array to a dictionaries""" for rec in recarray: - yield dict(zip(recarray.dtype.names, rec.tolist())) + yield OrderedDict(zip(recarray.dtype.names, rec.tolist())) -def _flatten_attribute_list(primary_key, attr): - for a in attr: - if re.match(r'^\s*KEY\s*(ASC\s*)?$', a): +def _get(connection, attr, data, squeeze, download_path): + """ + :param connection: + :param attr: an attribute from the heading + :param data: literal value fetched from the table + :param squeeze: if True squeeze blobs + :param download_path: for fetches that download data, e.g. attachments + :return: unpacked data + """ + if attr.is_external: + data = connection.schemas[attr.database].external_table.get(data) + return (blob.unpack(data, squeeze=squeeze) if attr.is_blob else + attach.save(data, download_path) if attr.is_attachment else data) + + +def _flatten_attribute_list(primary_key, attrs): + """ + :param primary_key: list of attributes in primary key + :param attrs: list of attribute names, which may include "KEY", "KEY DESC" or "KEY ASC" + :return: generator of attributes where "KEY" is replaces with its component attributes + """ + for a in attrs: + if re.match(r'^\s*KEY(\s+[aA][Ss][Cc])?\s*$', a): yield from primary_key - elif re.match(r'^\s*KEY\s*DESC\s*$', a): + elif re.match(r'^\s*KEY\s+[Dd][Ee][Ss][Cc]\s*$', a): yield from (q + ' DESC' for q in primary_key) else: yield a @@ -40,13 +60,14 @@ def _flatten_attribute_list(primary_key, attr): class Fetch: """ A fetch object that handles retrieving elements from the table expression. - :param relation: the table expression to fetch from + :param expression: the table expression to fetch from. """ def __init__(self, expression): self._expression = expression - def __call__(self, *attrs, offset=None, limit=None, order_by=None, format=None, as_dict=False, squeeze=False): + def __call__(self, *attrs, offset=None, limit=None, order_by=None, format=None, as_dict=False, + squeeze=False, download_path='.'): """ Fetches the expression results from the database into an np.array or list of dictionaries and unpacks blob attributes. @@ -64,6 +85,7 @@ def __call__(self, *attrs, offset=None, limit=None, order_by=None, format=None, "frame": output pandas.DataFrame. . :param as_dict: returns a list of dictionaries instead of a record array :param squeeze: if True, remove extra dimensions from arrays + :param download_path: for fetches that download data, e.g. attachments :return: the contents of the relation in the form of a structured numpy.array or a dict list """ @@ -97,29 +119,25 @@ def __call__(self, *attrs, offset=None, limit=None, order_by=None, format=None, 'Consider setting a limit explicitly.') limit = 2 * len(self._expression) + get = partial(_get, self._expression.connection, squeeze=squeeze, download_path=download_path) if not attrs: # fetch all attributes as a numpy.record_array or pandas.DataFrame cur = self._expression.cursor(as_dict=as_dict, limit=limit, offset=offset, order_by=order_by) heading = self._expression.heading if as_dict: - ret = [OrderedDict((name, unpack(d[name], squeeze=squeeze) if heading[name].is_blob else d[name]) - for name in heading.names) - for d in cur] + ret = [OrderedDict((name, get(heading[name], d[name])) for name in heading.names) for d in cur] else: ret = list(cur.fetchall()) ret = np.array(ret, dtype=heading.as_dtype) for name in heading: - if heading[name].is_external: - external_table = self._expression.connection.schemas[heading[name].database].external_table - ret[name] = list(map(external_table.get, ret[name])) - elif heading[name].is_blob: - ret[name] = list(map(partial(unpack, squeeze=squeeze), ret[name])) + ret[name] = list(map(partial(get, heading[name]), ret[name])) if format == "frame": ret = pandas.DataFrame(ret).set_index(heading.primary_key) else: # if list of attributes provided attributes = [a for a in attrs if not is_key(a)] result = self._expression.proj(*attributes).fetch( - offset=offset, limit=limit, order_by=order_by, as_dict=False, squeeze=squeeze) + offset=offset, limit=limit, order_by=order_by, + as_dict=False, squeeze=squeeze, download_path=download_path) return_values = [ list(to_dicts(result[self._expression.primary_key])) if is_key(attribute) else result[attribute] @@ -128,15 +146,6 @@ def __call__(self, *attrs, offset=None, limit=None, order_by=None, format=None, return ret - def keys(self, **kwargs): - """ - DEPRECATED - Iterator that returns primary keys as a sequence of dicts. - """ - warnings.warn('Use of `rel.fetch.keys()` notation is deprecated. ' - 'Please use `rel.fetch("KEY")` or `rel.fetch(dj.key)` for equivalent result', stacklevel=2) - yield from self._expression.proj().fetch(as_dict=True, **kwargs) - class Fetch1: """ @@ -147,7 +156,7 @@ class Fetch1: def __init__(self, relation): self._expression = relation - def __call__(self, *attrs, squeeze=False): + def __call__(self, *attrs, squeeze=False, download_path='.'): """ Fetches the expression results from the database when the expression is known to yield only one entry. @@ -160,6 +169,7 @@ def __call__(self, *attrs, squeeze=False): :params *attrs: attributes to return when expanding into a tuple. If empty, the return result is a dict :param squeeze: When true, remove extra dimensions from arrays in attributes + :param download_path: for fetches that download data, e.g. attachments :return: the one tuple in the relation in the form of a dict """ @@ -170,16 +180,12 @@ def __call__(self, *attrs, squeeze=False): ret = cur.fetchone() if not ret or cur.fetchone(): raise DataJointError('fetch1 should only be used for relations with exactly one tuple') - - def get_external(attr, _hash): - return self._expression.connection.schemas[attr.database].external_table.get(_hash) - - ret = OrderedDict((name, get_external(heading[name], ret[name])) if heading[name].is_external - else (name, unpack(ret[name], squeeze=squeeze) if heading[name].is_blob else ret[name]) + ret = OrderedDict((name, _get(self._expression.connection, heading[name], ret[name], + squeeze=squeeze, download_path=download_path)) for name in heading.names) else: # fetch some attributes, return as tuple attributes = [a for a in attrs if not is_key(a)] - result = self._expression.proj(*attributes).fetch(squeeze=squeeze) + result = self._expression.proj(*attributes).fetch(squeeze=squeeze, download_path=download_path) if len(result) != 1: raise DataJointError('fetch1 should only return one tuple. %d tuples were found' % len(result)) return_values = tuple( diff --git a/datajoint/heading.py b/datajoint/heading.py index f56585c40..f16dfb269 100644 --- a/datajoint/heading.py +++ b/datajoint/heading.py @@ -9,8 +9,8 @@ default_attribute_properties = dict( # these default values are set in computed attributes name=None, type='expression', in_key=False, nullable=False, default=None, comment='calculated attribute', - autoincrement=False, numeric=None, string=None, is_blob=False, is_external=False, sql_expression=None, - database=None, dtype=object) + autoincrement=False, numeric=None, string=None, is_blob=False, is_attachment=False, is_external=False, + unsupported=False, sql_expression=None, database=None, dtype=object) class Attribute(namedtuple('_Attribute', default_attribute_properties)): @@ -75,7 +75,7 @@ def blobs(self): @property def non_blobs(self): - return [k for k, v in self.attributes.items() if not v.is_blob] + return [k for k, v in self.attributes.items() if not v.is_blob and not v.is_attachment] @property def expressions(self): @@ -185,21 +185,32 @@ def init_from_database(self, conn, database, table_name): # additional attribute properties for attr in attributes: - # process external attributes - split_comment = attr['comment'].split(':') - attr['is_external'] = len(split_comment) >= 3 and split_comment[1].startswith('external') - if attr['is_external']: - attr['comment'] = ':'.join(split_comment[2:]) - attr['type'] = split_comment[1] - - attr['nullable'] = (attr['nullable'] == 'YES') + # process configurable attributes attr['in_key'] = (attr['in_key'] == 'PRI') + attr['database'] = database + attr['nullable'] = (attr['nullable'] == 'YES') attr['autoincrement'] = bool(re.search(r'auto_increment', attr['Extra'], flags=re.IGNORECASE)) - attr['type'] = re.sub(r'int\(\d+\)', 'int', attr['type'], count=1) # strip size off integers + attr['type'] = re.sub(r'int\(\d+\)', 'int', attr['type'], count=1) # strip size off integers attr['numeric'] = bool(re.match(r'(tiny|small|medium|big)?int|decimal|double|float', attr['type'])) attr['string'] = bool(re.match(r'(var)?char|enum|date|year|time|timestamp', attr['type'])) - attr['is_blob'] = attr['is_external'] or bool(re.match(r'(tiny|medium|long)?blob', attr['type'])) - attr['database'] = database + attr['is_blob'] = bool(re.match(r'(tiny|medium|long)?blob', attr['type'])) + + # recognize configurable fields + configurable_field = re.match( + r'^:(?P(blob|external|attach)(-\w*)?):(?P.*)$', attr['comment']) + if configurable_field is None: + attr['is_external'] = False + attr['is_attachment'] = False + else: + # configurable fields: blob- and attach + if attr['in_key']: + raise DataJointError('Configurable store attributes are not allowed in the primary key') + attr['comment'] = configurable_field.group('comment') + attr['is_external'] = not attr['is_blob'] + attr['type'] = configurable_field.group('type') + attr['is_attachment'] = attr['type'].startswith('attach') + attr['is_blob'] = attr['type'].startswith(('blob', 'external')) + attr['string'] = False if attr['string'] and attr['default'] is not None and attr['default'] not in sql_literals: attr['default'] = '"%s"' % attr['default'] @@ -208,9 +219,8 @@ def init_from_database(self, conn, database, table_name): attr['default'] = 'null' attr['sql_expression'] = None - if not (attr['numeric'] or attr['string'] or attr['is_blob']): - raise DataJointError('Unsupported field type {field} in `{database}`.`{table_name}`'.format( - field=attr['type'], database=database, table_name=table_name)) + attr['unsupported'] = not(attr['numeric'] or attr['string'] or attr['is_blob'] or attr['is_attachment']) + attr.pop('Extra') # fill out dtype. All floats and non-nullable integers are turned into specific dtypes @@ -225,7 +235,7 @@ def init_from_database(self, conn, database, table_name): t = re.sub(r' unsigned$', '', t) # remove unsigned assert (t, is_unsigned) in numeric_types, 'dtype not found for type %s' % t attr['dtype'] = numeric_types[(t, is_unsigned)] - self.attributes = OrderedDict([(q['name'], Attribute(**q)) for q in attributes]) + self.attributes = OrderedDict(((q['name'], Attribute(**q)) for q in attributes)) # Read and tabulate secondary indexes keys = defaultdict(dict) diff --git a/datajoint/s3.py b/datajoint/s3.py index ff56d6459..d591566ec 100644 --- a/datajoint/s3.py +++ b/datajoint/s3.py @@ -30,13 +30,34 @@ def get(self, blob_hash): except minio.error.NoSuchKey: return None - def clean(self, exclude, max_count=None): + def clean(self, exclude, max_count=None, verbose=False): """ Delete all objects except for those in the exclude :param exclude: a list of blob_hashes to skip. :param max_count: maximum number of object to delete - :return: generator of objects that failed to delete + :param verbose: If True, print deleted objects + :return: list of objects that failed to delete """ - return self.client.remove_objects(self.bucket, itertools.islice( - (x.object_name for x in self.client.list_objects(self.bucket, self.remote_path + '/') - if x not in exclude), max_count)) + count = itertools.count() + if verbose: + def out(name): + next(count) + print(name) + return name + else: + def out(name): + next(count) + return name + + if verbose: + print('Deleting...') + + names = (out(x.object_name) + for x in self.client.list_objects(self.bucket, self.remote_path + '/', recursive=True) + if x.object_name.split('/')[-1] not in exclude) + + failed_deletes = list( + self.client.remove_objects(self.bucket, itertools.islice(names, max_count))) + + print('Deleted: %i S3 objects' % next(count)) + return failed_deletes diff --git a/datajoint/schema.py b/datajoint/schema.py index 5f50c9d9a..849919151 100644 --- a/datajoint/schema.py +++ b/datajoint/schema.py @@ -228,7 +228,7 @@ def jobs(self): return self._jobs @property - def external_table(self): + def external(self): """ schema.external provides a view of the external hash table for the schema :return: external table @@ -237,6 +237,8 @@ def external_table(self): self._external = ExternalTable(self.connection, self.database) return self._external + external_table = external # for backward compatibility to pre-0.12.0 + def create_virtual_module(module_name, schema_name, create_schema=False, create_tables=False, connection=None): """ @@ -254,3 +256,15 @@ def create_virtual_module(module_name, schema_name, create_schema=False, create_ _schema.spawn_missing_classes(context=module.__dict__) module.__dict__['schema'] = _schema return module + + +def get_schema_names(connection=None): + """ + :param connection: a dj.Connection object + :return: a generator of all accessible schemas on the server + """ + if connection is None: + connection = conn() + for r in connection.query('SHOW SCHEMAS'): + if r[0] not in {'information_schema'}: + yield r[0] \ No newline at end of file diff --git a/datajoint/settings.py b/datajoint/settings.py index 14ac18220..45103525f 100644 --- a/datajoint/settings.py +++ b/datajoint/settings.py @@ -14,6 +14,9 @@ LOCALCONFIG = 'dj_local_conf.json' GLOBALCONFIG = '.datajoint_config.json' +DEFAULT_SUBFOLDING = (2, 2) # subfolding for external storage in filesystem. 2, 2 means that file abcdef is stored as /ab/cd/abcdef +DEFAULT_STORE_PROTOCOL = 'LONGBLOB' + validators = collections.defaultdict(lambda: lambda value: True) validators['database.port'] = lambda a: isinstance(a, int) @@ -124,6 +127,26 @@ def save_global(self, verbose=False): """ self.save(os.path.expanduser(os.path.join('~', GLOBALCONFIG)), verbose) + def get_store_spec(self, store): + """ + find configuration of blob and attachment stores + """ + # check new style + try: + spec = self['stores']['-' + store] + except KeyError: + # check old style + try: + spec = self['external' + ('-' + store if store else '')] + except KeyError: + raise DataJointError('Storage {store} is requested but not configured'.format(store=store)) + else: + spec['subfolding'] = spec.get('subfolding', ()) # old style external fields + else: + spec['subfolding'] = spec.get('subfolding', DEFAULT_SUBFOLDING) + spec['protocol'] = spec.get('protocol', DEFAULT_STORE_PROTOCOL) + return spec + @contextmanager def __call__(self, **kwargs): """ @@ -173,7 +196,7 @@ def __setitem__(self, key, value): raise DataJointError(u'Validator for {0:s} did not pass'.format(key)) -# ----------- load configuration from file ---------------- +# Load configuration from file config = Config() config_files = (os.path.expanduser(n) for n in (LOCALCONFIG, os.path.join('~', GLOBALCONFIG))) try: @@ -182,7 +205,6 @@ def __setitem__(self, key, value): config.add_history('No config file found, using default settings.') else: config.load(config_file) - del config_file # override login credentials with environment variables mapping = {k: v for k, v in zip( diff --git a/datajoint/table.py b/datajoint/table.py index 4ff6a22ba..3d7cec6de 100644 --- a/datajoint/table.py +++ b/datajoint/table.py @@ -11,7 +11,7 @@ from .settings import config from .declare import declare from .expression import QueryExpression -from .blob import pack +from . import attach, blob from .utils import user_choice from .heading import Heading from .errors import server_error_codes, DataJointError, DuplicateError @@ -170,7 +170,8 @@ def insert(self, rows, replace=False, skip_duplicates=False, ignore_extra_fields # prohibit direct inserts into auto-populated tables if not (allow_direct_insert or getattr(self, '_allow_insert', True)): # _allow_insert is only present in AutoPopulate raise DataJointError( - 'Auto-populate tables can only be inserted into from their make methods during populate calls. (see allow_direct_insert)') + 'Auto-populate tables can only be inserted into from their make methods during populate calls.' \ + ' To override, use the the allow_direct_insert argument.') heading = self.heading if inspect.isclass(rows) and issubclass(rows, QueryExpression): # instantiate if a class @@ -213,25 +214,24 @@ def make_placeholder(name, value): For a given attribute `name` with `value`, return its processed value or value placeholder as a string to be included in the query and the value, if any, to be submitted for processing by mysql API. - :param name: - :param value: + :param name: name of attribute to be inserted + :param value: value of attribute to be inserted """ if ignore_extra_fields and name not in heading: return None - if heading[name].is_external: - placeholder, value = '%s', self.external_table.put(heading[name].type, value) - elif heading[name].is_blob: - if value is None: - placeholder, value = 'NULL', None - else: - placeholder, value = '%s', pack(value) - elif heading[name].numeric: - if value is None or value == '' or np.isnan(np.float(value)): # nans are turned into NULLs - placeholder, value = 'NULL', None - else: - placeholder, value = '%s', (str(int(value) if isinstance(value, bool) else value)) + attr = heading[name] + if value is None or (attr.numeric and (value == '' or np.isnan(np.float(value)))): + placeholder, value = 'DEFAULT', None else: placeholder = '%s' + if attr.is_blob: + value = blob.pack(value) + value = self.external_table.put(attr.type, value) if attr.is_external else value + elif attr.is_attachment: + value = attach.load(value) + value = self.external_table.put(attr.type, value) if attr.is_external else value + elif attr.numeric: + value = str(int(value) if isinstance(value, bool) else value) return name, placeholder, value def check_fields(fields): @@ -450,8 +450,7 @@ def size_on_disk(self): return ret['Data_length'] + ret['Index_length'] def show_definition(self): - logger.warning('show_definition is deprecated. Use describe instead.') - return self.describe() + raise AttributeError('show_definition is deprecated. Use the describe method instead.') def describe(self, context=None, printout=True): """ @@ -549,7 +548,7 @@ def _update(self, attrname, value=None): attr = self.heading[attrname] if attr.is_blob: - value = pack(value) + value = blob.pack(value) placeholder = '%s' elif attr.numeric: if value is None or np.isnan(np.float(value)): # nans are turned into NULLs diff --git a/datajoint/version.py b/datajoint/version.py index fee46bd8c..92cebb540 100644 --- a/datajoint/version.py +++ b/datajoint/version.py @@ -1 +1 @@ -__version__ = "0.11.1" +__version__ = "0.12.dev" diff --git a/tests/schema_external.py b/tests/schema_external.py index cf9a0cc0c..c06c2b3cf 100644 --- a/tests/schema_external.py +++ b/tests/schema_external.py @@ -11,21 +11,31 @@ schema = dj.schema(PREFIX + '_extern', connection=dj.conn(**CONN_INFO)) -dj.config['external'] = { - 'protocol': 'file', - 'location': 'dj-store/external'} +dj.config['stores'] = { + '-': { + 'protocol': 'file', + 'location': 'dj-store/external', + 'folding': (1, 1) + }, -dj.config['external-raw'] = { - 'protocol': 'file', - 'location': 'dj-store/raw'} + '-b': { + 'protocol': 'longblob' + }, -dj.config['external-compute'] = { - 'protocol': 's3', - 'location': '/datajoint-projects/test', - 'user': 'djtest', - 'token': '2e05709792545ce'} + '-raw': { + 'protocol': 'file', + 'location': 'dj-store/raw'}, -dj.config['cache'] = tempfile.mkdtemp('dj-cache') + '-compute': { + 'protocol': 's3', + 'location': '/datajoint-projects/test', + 'user': 'djtest', + 'token': '2e05709792545ce'} + + +} + +dj.config['cache'] = tempfile.mkdtemp() @schema @@ -33,7 +43,7 @@ class Simple(dj.Manual): definition = """ simple : int --- - item : external-raw + item : blob- """ @@ -64,11 +74,22 @@ class Image(dj.Computed): -> Seed -> Dimension ---- - img : external-raw # objects are stored as specified by dj.config['external-raw'] - neg : external # objects are stored as specified by dj.config['external'] + img : blob-raw # objects are stored as specified by dj.config['stores'][-raw'] + neg : blob- # objects are stored as specified by dj.config['stores']['-'] """ def make(self, key): np.random.seed(key['seed']) img = np.random.rand(*(Dimension() & key).fetch1('dimensions')) self.insert1(dict(key, img=img, neg=-img.astype(np.float32))) + + +@schema +class Attach(dj.Manual): + definition = """ + # table for storing attachments + attach : int + ---- + img : attach-raw # attachments are stored as specified by dj.config['stores']['-file'] + txt : attach-b # attachments are stored as specified by dj.config['stores']['-b'] + """ diff --git a/tests/schema_legacy_external.py b/tests/schema_legacy_external.py new file mode 100644 index 000000000..a6969f6d4 --- /dev/null +++ b/tests/schema_legacy_external.py @@ -0,0 +1,74 @@ +""" +a schema for testing external attributes using legacy syntax pre-version 0.12.0 +""" + +import tempfile +import datajoint as dj + +from . import PREFIX, CONN_INFO +import numpy as np + +schema = dj.schema(PREFIX + '_legacy_extern', connection=dj.conn(**CONN_INFO)) + + +dj.config['external'] = { + 'protocol': 'file', + 'location': 'dj-legacy/external'} + +dj.config['external-raw'] = { + 'protocol': 'file', + 'location': 'dj-legacy/raw'} + +dj.config['external-compute'] = { + 'protocol': 's3', + 'location': '/datajoint-legacy/test', + 'user': 'djtest', + 'token': '2e05709792545ce'} + +dj.config['cache'] = tempfile.mkdtemp('dj-legacy-cache') + + +@schema +class Simple(dj.Manual): + definition = """ + simple : int + --- + item : external-raw + """ + + +@schema +class Seed(dj.Lookup): + definition = """ + seed : int + """ + contents = zip(range(4)) + + +@schema +class Dimension(dj.Lookup): + definition = """ + dim : int + --- + dimensions : blob + """ + contents = ( + [0, [100, 50]], + [1, [3, 4, 8, 6]]) + + +@schema +class Image(dj.Computed): + definition = """ + # table for storing + -> Seed + -> Dimension + ---- + img : external-raw # objects are stored as specified by dj.config['external-raw'] + neg : external # objects are stored as specified by dj.config['external'] + """ + + def make(self, key): + np.random.seed(key['seed']) + img = np.random.rand(*(Dimension() & key).fetch1('dimensions')) + self.insert1(dict(key, img=img, neg=-img.astype(np.float32))) diff --git a/tests/test_attach.py b/tests/test_attach.py new file mode 100644 index 000000000..0288ef334 --- /dev/null +++ b/tests/test_attach.py @@ -0,0 +1,61 @@ +from nose.tools import assert_true, assert_equal, assert_not_equal +import tempfile +import filecmp +from datajoint import attach +import os + +from .schema_external import Attach + + +def test_attach(): + """ + test attaching files and writing attached files + """ + # create a mock file + folder = tempfile.mkdtemp() + attach_file = os.path.join(folder, 'attachment.dat') + data = os.urandom(3000) + with open(attach_file, 'wb') as f: + f.write(data) + # load as an attachment buffer + buffer = attach.load(attach_file) + # save from an attachment buffer + download_file = attach.save(buffer, folder) + assert_true(filecmp.cmp(download_file, attach_file)) + assert_not_equal(os.path.basename(attach_file), os.path.basename(download_file)) + with open(download_file, 'rb') as f: + attachment_data = f.read() + assert_equal(data, attachment_data) + + +def test_attach_attributes(): + """ + test saving files in attachments + """ + # create a mock file + source_folder = tempfile.mkdtemp() + attach1 = os.path.join(source_folder, 'attach1.img') + data1 = os.urandom(100) + with open(attach1, 'wb') as f: + f.write(data1) + attach2 = os.path.join(source_folder, 'attach2.txt') + data2 = os.urandom(200) + with open(attach2, 'wb') as f: + f.write(data2) + + Attach().insert1(dict(attach=0, img=attach1, txt=attach2)) + + download_folder = tempfile.mkdtemp() + path1, path2 = Attach.fetch1('img', 'txt', download_path=download_folder) + + assert_not_equal(path1, path2) + assert_equal(os.path.split(path1)[0], download_folder) + with open(path1, 'rb') as f: + check1 = f.read() + with open(path2, 'rb') as f: + check2 = f.read() + assert_equal(data1, check1) + assert_equal(data2, check2) + + + diff --git a/tests/test_erd.py b/tests/test_erd.py index 4617852d8..f952efedf 100644 --- a/tests/test_erd.py +++ b/tests/test_erd.py @@ -34,6 +34,7 @@ def test_dependencies(): @staticmethod def test_erd(): + assert_true(dj.erd.erd_active, 'Failed to import networkx and pydot') erd = dj.ERD(schema, context=namespace) graph = erd._make_graph() assert_true(set(cls.__name__ for cls in (A, B, D, E, L)).issubset(graph.nodes())) diff --git a/tests/test_external.py b/tests/test_external.py index 2c1583e62..b7e75998d 100644 --- a/tests/test_external.py +++ b/tests/test_external.py @@ -2,6 +2,7 @@ from numpy.testing import assert_array_equal from nose.tools import assert_true, assert_equal from datajoint.external import ExternalTable +from datajoint.blob import pack, unpack from . schema_external import schema @@ -11,17 +12,18 @@ def test_external_put(): external storage put and get and remove """ ext = ExternalTable(schema.connection, schema.database) + initial_length = len(ext) input_ = np.random.randn(3, 7, 8) count = 7 extra = 3 for i in range(count): - hash1 = ext.put('external-raw', input_) + hash1 = ext.put('external-raw', pack(input_)) for i in range(extra): - hash2 = ext.put('external-raw', np.random.randn(4, 3, 2)) + hash2 = ext.put('external-raw', pack(np.random.randn(4, 3, 2))) fetched_hashes = ext.fetch('hash') assert_true(all(hash in fetched_hashes for hash in (hash1, hash2))) - assert_equal(len(ext), 1 + extra) + assert_equal(len(ext), initial_length + 1 + extra) - output_ = ext.get(hash1) + output_ = unpack(ext.get(hash1)) assert_array_equal(input_, output_) diff --git a/tests/test_external_class.py b/tests/test_external_class.py index e1f9b9c97..26bb1b0b7 100644 --- a/tests/test_external_class.py +++ b/tests/test_external_class.py @@ -5,7 +5,7 @@ def test_heading(): - heading = modu.Simple().heading + heading = modu.Simple.heading assert_true('item' in heading) assert_true(heading['item'].is_external) @@ -37,14 +37,14 @@ def test_populate(): image = modu.Image() image.populate() remaining, total = image.progress() - image.external_table.clean_store('external-raw') + image.external_table.clean_store('raw') assert_true(total == len(modu.Dimension() * modu.Seed()) and remaining == 0) for img, neg, dimensions in zip(*(image * modu.Dimension()).fetch('img', 'neg', 'dimensions')): assert_list_equal(list(img.shape), list(dimensions)) assert_almost_equal(img, -neg) image.delete() image.external_table.delete_garbage() - image.external_table.clean_store('external-raw') + image.external_table.clean_store('raw') @raises(dj.DataJointError) diff --git a/tests/test_fetch.py b/tests/test_fetch.py index 719ac0543..d701afd08 100644 --- a/tests/test_fetch.py +++ b/tests/test_fetch.py @@ -81,6 +81,7 @@ def test_head_tail(): query = schema.User * schema.Language n = 5 frame = query.head(n, format='frame') + assert_true(isinstance(frame, pandas.DataFrame)) array = query.head(n, format='array') assert_equal(array.size, n) assert_equal(len(frame), n) @@ -118,13 +119,13 @@ def test_iter(self): assert_true(row['name'] == tname and row['language'] == tlang, 'Values are not the same') def test_keys(self): - """test key iterator""" + """test key fetch""" languages = schema.Language.contents languages.sort(key=itemgetter(0), reverse=True) languages.sort(key=itemgetter(1), reverse=False) cur = self.lang.fetch('name', 'language', order_by=('language', 'name DESC')) - cur2 = list(self.lang.fetch.keys(order_by=['language', 'name DESC'])) + cur2 = list(self.lang.fetch("KEY", order_by=['language', 'name DESC'])) for c, c2 in zip(zip(*cur), cur2): assert_true(c == tuple(c2.values()), 'Values are not the same') diff --git a/tests/test_legacy_external.py b/tests/test_legacy_external.py new file mode 100644 index 000000000..d5ed915b9 --- /dev/null +++ b/tests/test_legacy_external.py @@ -0,0 +1,28 @@ +import numpy as np +from numpy.testing import assert_array_equal +from nose.tools import assert_true, assert_equal +from datajoint.external import ExternalTable +from datajoint.blob import pack, unpack + +from . schema_legacy_external import schema + + +def test_external_put(): + """ + external storage put and get and remove + """ + ext = ExternalTable(schema.connection, schema.database) + input_ = np.random.randn(3, 7, 8) + count = 7 + extra = 3 + for i in range(count): + hash1 = ext.put('external-raw', pack(input_)) + for i in range(extra): + hash2 = ext.put('external-raw', pack(np.random.randn(4, 3, 2))) + + fetched_hashes = ext.fetch('hash') + assert_true(all(hash in fetched_hashes for hash in (hash1, hash2))) + assert_equal(len(ext), 1 + extra) + + output_ = unpack(ext.get(hash1)) + assert_array_equal(input_, output_) diff --git a/tests/test_legacy_external_class.py b/tests/test_legacy_external_class.py new file mode 100644 index 000000000..1b5b7ef03 --- /dev/null +++ b/tests/test_legacy_external_class.py @@ -0,0 +1,63 @@ +from nose.tools import assert_true, assert_list_equal, raises +from numpy.testing import assert_almost_equal +import datajoint as dj +from . import schema_legacy_external as modu + + +def test_heading(): + heading = modu.Simple().heading + assert_true('item' in heading) + assert_true(heading['item'].is_external) + + +def test_insert_and_fetch(): + original_list = [1, 3, 8] + modu.Simple().insert1(dict(simple=1, item=original_list)) + # test fetch + q = (modu.Simple() & {'simple': 1}).fetch('item')[0] + assert_list_equal(list(q), original_list) + # test fetch1 as a tuple + q = (modu.Simple() & {'simple': 1}).fetch1('item') + assert_list_equal(list(q), original_list) + # test fetch1 as a dict + q = (modu.Simple() & {'simple': 1}).fetch1() + assert_list_equal(list(q['item']), original_list) + # test without cache + previous_cache = dj.config['cache'] + dj.config['cache'] = None + q = (modu.Simple() & {'simple': 1}).fetch1() + assert_list_equal(list(q['item']), original_list) + # test with cache + dj.config['cache'] = previous_cache + q = (modu.Simple() & {'simple': 1}).fetch1() + assert_list_equal(list(q['item']), original_list) + + +def test_populate(): + image = modu.Image() + image.populate() + remaining, total = image.progress() + image.external_table.clean_store('raw') + assert_true(total == len(modu.Dimension() * modu.Seed()) and remaining == 0) + for img, neg, dimensions in zip(*(image * modu.Dimension()).fetch('img', 'neg', 'dimensions')): + assert_list_equal(list(img.shape), list(dimensions)) + assert_almost_equal(img, -neg) + image.delete() + image.external_table.delete_garbage() + image.external_table.clean_store('raw') + + +@raises(dj.DataJointError) +def test_drop(): + image = modu.Image() + image.populate() + image.external_table.drop() + + +@raises(dj.DataJointError) +def test_delete(): + image = modu.Image() + image.external_table.delete() + + +