Source code for wbia.core_images

# -*- coding: utf-8 -*-
"""IBEIS CORE IMAGE.

Defines the core dependency cache supported by the image analysis api

Extracts detection results from images and applies additional processing
automatically


Ex
    python -m wbia.control.IBEISControl --test-show_depc_image_graph --show
    python -m wbia.control.IBEISControl --test-show_depc_image_graph --show --reduced


TODO:

NOTES:
    HOW TO DESIGN INTERACTIVE PLOTS:
        decorate as interactive

        depc.get_property(recompute=True)

        instead of calling preproc as a generator and then adding,
        calls preproc and passes in a callback function.
        preproc spawns interaction and must call callback function when finished.

        callback function adds the rowids to the table.

Needed Tables:
    Detections
    QualityClassifier
    ViewpointClassifier

"""
import logging
from wbia import dtool
import utool as ut
import numpy as np
import vtool as vt
import cv2
from wbia.control.controller_inject import register_preprocs
import sys
import wbia.constants as const

(print, rrr, profile) = ut.inject2(__name__, '[core_images]')
logger = logging.getLogger('wbia')


register_preproc = register_preprocs['image']


[docs]class ThumbnailConfig(dtool.Config): _param_info_list = [ ut.ParamInfo('draw_annots', True, hideif=True), ut.ParamInfo('thumbsize', None, type_=None, hideif=None), ut.ParamInfo('ext', '.png', hideif='.png'), ut.ParamInfo('force_serial', False, hideif=False), ]
[docs]@register_preproc( tablename='thumbnails', parents=['images'], colnames=['img', 'width', 'height'], coltypes=[('extern', vt.imread, vt.imwrite), int, int], configclass=ThumbnailConfig, fname='thumbcache', rm_extern_on_delete=True, chunksize=256, ) def compute_thumbnails(depc, gid_list, config=None): r"""Compute the thumbnail for a given input image. Args: depc (wbia.depends_cache.DependencyCache): gid_list (list): list of image rowids config (dict): (default = None) Yields: (uri, int, int): tup CommandLine: wbia --tf compute_thumbnails --show --db PZ_MTEST Example: >>> # ENABLE_DOCTEST >>> # xdoctest: +REQUIRES(--weird) >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'testdb1' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> gid_list = ibs.get_valid_gids()[0:10] >>> thumbs = depc.get_property('thumbnails', gid_list, 'img', config={'thumbsize': 221}, recompute=True) >>> # xdoctest: +REQUIRES(--show) >>> import wbia.plottool as pt >>> pt.quit_if_noshow() >>> iteract_obj = pt.interact_multi_image.MultiImageInteraction(thumbs, nPerPage=4) >>> iteract_obj.start() >>> pt.show_if_requested() """ ibs = depc.controller draw_annots = config['draw_annots'] thumbsize = config['thumbsize'] if thumbsize is None: cfg = ibs.cfg.other_cfg thumbsize = cfg.thumb_size if draw_annots else cfg.thumb_bare_size thumbsize_list = [thumbsize] * len(gid_list) gpath_list = ibs.get_image_paths(gid_list) orient_list = ibs.get_image_orientation(gid_list) aids_list = ibs.get_image_aids(gid_list) if draw_annots: bboxes_list = ibs.unflat_map(ibs.get_annot_bboxes, aids_list) thetas_list = ibs.unflat_map(ibs.get_annot_thetas, aids_list) interests_list = ibs.unflat_map(ibs.get_annot_interest, aids_list) else: bboxes_list = [[] for aids in aids_list] thetas_list = [[] for aids in aids_list] interests_list = [[] for aids in aids_list] # Execute all tasks in parallel args_list = list( zip( thumbsize_list, gpath_list, orient_list, bboxes_list, thetas_list, interests_list, ) ) genkw = { 'ordered': True, 'chunksize': 256, 'progkw': {'freq': 50}, # 'adjust': True, 'futures_threaded': True, 'force_serial': ibs.force_serial or config['force_serial'], } gen = ut.generate2(draw_thumb_helper, args_list, nTasks=len(args_list), **genkw) for val in gen: yield val
[docs]def draw_thumb_helper(thumbsize, gpath, orient, bbox_list, theta_list, interest_list): # time consuming img = vt.imread(gpath, orient=orient) (gh, gw) = img.shape[0:2] img_size = (gw, gh) if isinstance(thumbsize, int): max_dsize = (thumbsize, thumbsize) dsize, sx, sy = vt.resized_clamped_thumb_dims(img_size, max_dsize) elif isinstance(thumbsize, tuple) and len(thumbsize) == 2: th, tw = thumbsize dsize, sx, sy = thumbsize, tw / gw, th / gh else: raise ValueError('Incompatible thumbsize') new_verts_list = list(vt.scaled_verts_from_bbox_gen(bbox_list, theta_list, sx, sy)) # ----------------- # Actual computation thumb = vt.resize(img, dsize) orange_bgr = (0, 128, 255) blue_bgr = (255, 128, 0) color_bgr_list = [blue_bgr if interest else orange_bgr for interest in interest_list] for new_verts, color_bgr in zip(new_verts_list, color_bgr_list): thumb = vt.draw_verts(thumb, new_verts, color=color_bgr, thickness=2) width, height = dsize return thumb, width, height
[docs]def load_text(fpath): with open(fpath, 'r') as file: text = file.read() return text
[docs]def save_text(fpath, text): with open(fpath, 'w') as file: file.write(text)
[docs]class WebSrcConfig(dtool.Config): _param_info_list = [ ut.ParamInfo('ext', '.txt', hideif='.txt'), ut.ParamInfo('force_serial', False, hideif=False), ]
[docs]@register_preproc( tablename='web_src', parents=['images'], colnames=['src'], coltypes=[('extern', load_text, save_text)], configclass=WebSrcConfig, fname='webcache', chunksize=1024, ) def compute_web_src(depc, gid_list, config=None): r"""Compute the web src Args: depc (wbia.depends_cache.DependencyCache): gid_list (list): list of image rowids config (dict): (default = None) Yields: (str): tup CommandLine: wbia --tf compute_web_src --show --db PZ_MTEST Example: >>> # ENABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'testdb1' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> gid_list = ibs.get_valid_gids()[0:10] >>> thumbs = depc.get_property('web_src', gid_list, 'src', recompute=True) >>> thumb = thumbs[0] >>> hash_str = ut.hash_data(thumb) >>> assert hash_str in ['yerctlgfqosrhmjpqvkbmnoocagfqsna', 'wcuppmpowkvhfmfcnrxdeedommihexfu'], 'Found %r' % (hash_str, ) """ ibs = depc.controller gpath_list = ibs.get_image_paths(gid_list) orient_list = ibs.get_image_orientation(gid_list) args_list = list(zip(gpath_list, orient_list)) genkw = { 'ordered': True, 'futures_threaded': True, 'force_serial': ibs.force_serial or config['force_serial'], } gen = ut.generate2(draw_web_src, args_list, nTasks=len(args_list), **genkw) for val in gen: yield (val,)
[docs]def draw_web_src(gpath, orient): from wbia.web.routes_ajax import image_src_path image_src = image_src_path(gpath, orient) return image_src
[docs]class ClassifierConfig(dtool.Config): _param_info_list = [ ut.ParamInfo('classifier_algo', 'cnn', valid_values=['cnn', 'svm', 'densenet']), ut.ParamInfo('classifier_weight_filepath', None), ] _sub_config_list = [ThumbnailConfig]
[docs]@register_preproc( tablename='classifier', parents=['images'], colnames=['score', 'class'], coltypes=[float, str], configclass=ClassifierConfig, fname='detectcache', chunksize=32 if const.CONTAINERIZED else 1024, ) def compute_classifications(depc, gid_list, config=None): r"""Extract the detections for a given input image. Args: depc (wbia.depends_cache.DependencyCache): gid_list (list): list of image rowids config (dict): (default = None) Yields: (float, str): tup CommandLine: wbia compute_classifications Example: >>> # DISABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> gid_list = ibs.get_valid_gids()[0:8] >>> # depc.delete_property('classifier', gid_list) >>> results = depc.get_property('classifier', gid_list, None) >>> print(results) >>> depc = ibs.depc_image >>> config = {'classifier_algo': 'svm'} >>> depc.delete_property('classifier', gid_list, config=config) >>> results = depc.get_property('classifier', gid_list, None, config=config) >>> print(results) >>> config = {'classifier_algo': 'svm', 'classifier_weight_filepath': 'localizer-zebra-10'} >>> depc.delete_property('classifier', gid_list, config=config) >>> results = depc.get_property('classifier', gid_list, None, config=config) >>> print(results) """ logger.info('[ibs] Process Image Classifications') logger.info('config = %r' % (config,)) # Get controller ibs = depc.controller depc = ibs.depc_image if config['classifier_algo'] in ['cnn']: config_ = { 'draw_annots': False, 'thumbsize': (192, 192), } thumbnail_list = depc.get_property('thumbnails', gid_list, 'img', config=config_) result_list = ibs.generate_thumbnail_class_list(thumbnail_list, **config) elif config['classifier_algo'] in ['svm']: from wbia.algo.detect.svm import classify config_ = {'algo': 'resnet'} vector_list = depc.get_property('features', gid_list, 'vector', config=config_) classifier_weight_filepath = config['classifier_weight_filepath'] result_list = classify(vector_list, weight_filepath=classifier_weight_filepath) elif config['classifier_algo'] in ['densenet']: from wbia.algo.detect import densenet config_ = { 'draw_annots': False, 'thumbsize': (densenet.INPUT_SIZE, densenet.INPUT_SIZE), } thumbpath_list = ibs.depc_image.get( 'thumbnails', gid_list, 'img', config=config_, read_extern=False, ensure=True ) result_list = densenet.test(thumbpath_list, ibs=ibs, gid_list=gid_list, **config) else: raise ValueError( 'specified classifier algo is not supported in config = %r' % (config,) ) # yield detections for result in result_list: yield result
[docs]class Classifier2Config(dtool.Config): _param_info_list = [ ut.ParamInfo( 'classifier_two_algo', 'cnn', valid_values=['cnn', 'rf', 'densenet'] ), ut.ParamInfo('classifier_two_weight_filepath', None), ] _sub_config_list = [ThumbnailConfig]
[docs]@register_preproc( tablename='classifier_two', parents=['images'], colnames=['scores', 'classes'], coltypes=[dict, list], configclass=Classifier2Config, fname='detectcache', chunksize=32 if const.CONTAINERIZED else 128, ) def compute_classifications2(depc, gid_list, config=None): r"""Extract the multi-class classifications for a given input image. Args: depc (wbia.depends_cache.DependencyCache): gid_list (list): list of image rowids config (dict): (default = None) Yields: (np.ndarray, np.ndarray): tup CommandLine: wbia compute_classifications2 Example: >>> # DISABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> gid_list = ibs.get_valid_gids()[0:8] >>> # depc.delete_property('classifier_two', gid_list) >>> results = depc.get_property('classifier_two', gid_list, None) >>> print(results) """ logger.info('[ibs] Process Image Classifications2') logger.info('config = %r' % (config,)) # Get controller ibs = depc.controller depc = ibs.depc_image if config['classifier_two_algo'] in ['cnn']: config_ = { 'draw_annots': False, 'thumbsize': (192, 192), } # depc.delete_property('thumbnails', gid_list, config=config_) thumbnail_list = depc.get_property('thumbnails', gid_list, 'img', config=config_) result_list = ibs.generate_thumbnail_class2_list(thumbnail_list, **config) elif config['classifier_two_algo'] in ['rf']: from wbia.algo.detect.rf import classify config_ = {'algo': 'resnet'} vector_list = depc.get_property('features', gid_list, 'vector', config=config_) classifier_weight_filepath = config['classifier_weight_filepath'] result_list = classify(vector_list, weight_filepath=classifier_weight_filepath) elif config['classifier_two_algo'] in ['densenet']: from wbia.algo.detect import densenet config_ = { 'draw_annots': False, 'thumbsize': (densenet.INPUT_SIZE, densenet.INPUT_SIZE), } thumbpath_list = ibs.depc_image.get( 'thumbnails', gid_list, 'img', config=config_, read_extern=False, ensure=True ) config_ = { 'classifier_weight_filepath': config['classifier_two_weight_filepath'], } result_list = densenet.test( thumbpath_list, ibs=ibs, gid_list=gid_list, return_dict=True, multiclass=True, **config_, ) result_list = list(result_list) for index in range(len(result_list)): best_score, best_key, scores = result_list[index] classes = [best_key] result_list[index] = ( scores, classes, ) else: raise ValueError( 'specified classifier_two algo is not supported in config = %r' % (config,) ) # yield detections for result in result_list: yield result
[docs]class FeatureConfig(dtool.Config): _param_info_list = [ ut.ParamInfo('framework', 'torch', valid_values=['keras', 'torch']), ut.ParamInfo( 'model', 'vgg16', valid_values=['vgg', 'vgg16', 'vgg19', 'resnet', 'inception', 'densenet'], ), ut.ParamInfo('flatten', True), ] _sub_config_list = [ThumbnailConfig]
[docs]@register_preproc( tablename='features', parents=['images'], colnames=['vector'], coltypes=[np.ndarray], configclass=FeatureConfig, fname='featcache', chunksize=256, ) def compute_features(depc, gid_list, config=None): r"""Compute features on images using pre-trained state-of-the-art models in Keras. Args: depc (wbia.depends_cache.DependencyCache): gid_list (list): list of image rowids config (dict): (default = None) Yields: (np.ndarray, ): tup CommandLine: wbia compute_features CommandLine: python -m wbia.core_images compute_features --show Example: >>> # DISABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> print(depc.get_tablenames()) >>> gid_list = ibs.get_valid_gids()[:16] >>> config = {'model': 'vgg16'} >>> depc.delete_property('features', gid_list, config=config) >>> features = depc.get_property('features', gid_list, 'vector', config=config) >>> print(features) >>> config = {'model': 'vgg19'} >>> depc.delete_property('features', gid_list, config=config) >>> features = depc.get_property('features', gid_list, 'vector', config=config) >>> print(features) >>> config = {'model': 'resnet'} >>> depc.delete_property('features', gid_list, config=config) >>> features = depc.get_property('features', gid_list, 'vector', config=config) >>> print(features) >>> config = {'model': 'inception'} >>> depc.delete_property('features', gid_list, config=config) >>> features = depc.get_property('features', gid_list, 'vector', config=config) >>> print(features) """ logger.info('[ibs] Preprocess Features') logger.info('config = %r' % (config,)) # Get controller ibs = depc.controller ibs.assert_valid_gids(gid_list) ###################################################################################### if config['framework'] in ['keras']: from keras.preprocessing import image as preprocess_image thumbnail_config = { 'draw_annots': False, 'thumbsize': (500, 500), } thumbpath_list = depc.get( 'thumbnails', gid_list, 'img', config=thumbnail_config, read_extern=False, ensure=True, ) target_size = (224, 224) if config['model'] in ['vgg', 'vgg16']: from keras.applications.vgg16 import VGG16 as MODEL_CLASS from keras.applications.vgg16 import preprocess_input ###################################################################################### elif config['model'] in ['vgg19']: from keras.applications.vgg19 import VGG19 as MODEL_CLASS from keras.applications.vgg19 import preprocess_input ###################################################################################### elif config['model'] in ['resnet']: from keras.applications.resnet50 import ResNet50 as MODEL_CLASS # NOQA from keras.applications.resnet50 import preprocess_input ###################################################################################### elif config['model'] in ['inception']: from keras.applications.inception_v3 import InceptionV3 as MODEL_CLASS # NOQA from keras.applications.inception_v3 import preprocess_input target_size = (299, 299) ###################################################################################### else: raise ValueError( 'specified feature model is not supported in config = %r' % (config,) ) # Build model model = MODEL_CLASS(include_top=False) thumbpath_iter = ut.ProgIter(thumbpath_list, lbl='forward inference', bs=True) for thumbpath in thumbpath_iter: image = preprocess_image.load_img(thumbpath, target_size=target_size) image_array = preprocess_image.img_to_array(image) image_array = np.expand_dims(image_array, axis=0) image_array = preprocess_input(image_array) features = model.predict(image_array) if config['flatten']: features = features.flatten() yield (features,) elif config['framework'] in ['torch']: from wbia.algo.detect import densenet if config['model'] in ['densenet']: config_ = { 'draw_annots': False, 'thumbsize': (densenet.INPUT_SIZE, densenet.INPUT_SIZE), } thumbpath_list = ibs.depc_image.get( 'thumbnails', gid_list, 'img', config=config_, read_extern=False, ensure=True, ) feature_list = densenet.features(thumbpath_list) else: raise ValueError( 'specified feature model is not supported in config = %r' % (config,) ) for feature in feature_list: if config['flatten']: feature = feature.flatten() yield (feature,) else: raise ValueError( 'specified feature framework is not supported in config = %r' % (config,) )
[docs]class LocalizerOriginalConfig(dtool.Config): _param_info_list = [ ut.ParamInfo( 'algo', 'yolo', valid_values=[ 'azure', 'yolo', 'lightnet', 'ssd', 'darknet', 'rf', 'fast-rcnn', 'faster-rcnn', 'selective-search', 'selective-search-rcnn', '_COMBINED', ], ), ut.ParamInfo('species', None), ut.ParamInfo('config_filepath', None), ut.ParamInfo('weight_filepath', None), ut.ParamInfo('class_filepath', None), ut.ParamInfo( 'flip', False, hideif=False ), # True will horizontally flip all images before being sent to the localizer and will flip the results back ut.ParamInfo('grid', False), ] _sub_config_list = [ThumbnailConfig]
[docs]@register_preproc( tablename='localizations_original', parents=['images'], colnames=['score', 'bboxes', 'thetas', 'confs', 'classes'], coltypes=[float, np.ndarray, np.ndarray, np.ndarray, np.ndarray], configclass=LocalizerOriginalConfig, fname='localizationscache', chunksize=8 if const.CONTAINERIZED else 128, ) def compute_localizations_original(depc, gid_list, config=None): r"""Extract the localizations for a given input image. Args: depc (wbia.depends_cache.DependencyCache): gid_list (list): list of image rowids config (dict): (default = None) Yields: (float, np.ndarray, np.ndarray, np.ndarray, np.ndarray): tup CommandLine: wbia compute_localizations_original CommandLine: python -m wbia.core_images compute_localizations_original --show Example: >>> # DISABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> print(depc.get_tablenames()) >>> gid_list = ibs.get_valid_gids()[:16] >>> config = {'algo': 'azure', 'config_filepath': None} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'darknet', 'config_filepath': 'pretrained-v2-pascal'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'darknet', 'config_filepath': 'pretrained-v2-large-pascal'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'darknet', 'config_filepath': 'pretrained-tiny-pascal'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'darknet', 'config_filepath': 'pretrained-v2-large-coco'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'darknet', 'config_filepath': 'pretrained-tiny-coco'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'yolo'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'lightnet'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'rf'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'selective-search'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'selective-search-rcnn'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'faster-rcnn', 'config_filepath': 'pretrained-vgg-pascal'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'faster-rcnn', 'config_filepath': 'pretrained-zf-pascal'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'faster-rcnn', 'config_filepath': 'pretrained-vgg-ilsvrc'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'faster-rcnn', 'config_filepath': 'pretrained-zf-ilsvrc'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'ssd', 'config_filepath': 'pretrained-300-pascal'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'ssd', 'config_filepath': 'pretrained-512-pascal'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'ssd', 'config_filepath': 'pretrained-300-pascal-plus'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'ssd', 'config_filepath': 'pretrained-512-pascal-plus'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'ssd', 'config_filepath': 'pretrained-300-coco'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'ssd', 'config_filepath': 'pretrained-512-coco'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'ssd', 'config_filepath': 'pretrained-300-ilsvrc'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': 'ssd', 'config_filepath': 'pretrained-500-ilsvrc'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'algo': '_COMBINED'} >>> depc.delete_property('localizations_original', gid_list, config=config) >>> detects = depc.get_property('localizations_original', gid_list, 'bboxes', config=config) >>> print(detects) """ def _package_to_numpy(key_list, result_list, score): temp = [ [key[0] if isinstance(key, tuple) else result[key] for key in key_list] for result in result_list ] return ( score, np.array([_[0:4] for _ in temp]), np.array([_[4] for _ in temp]), np.array([_[5] for _ in temp]), np.array([_[6] for _ in temp]), ) def _combined(gid_list, config_dict_list): # Combined list of algorithm configs colname_list = ['score', 'bboxes', 'thetas', 'confs', 'classes'] for gid in gid_list: accum_list = [] for colname in colname_list: if colname == 'score': accum_value = 0.0 else: len_list = [] temp_list = [] for config_dict_ in config_dict_list: temp = depc.get_property( 'localizations_original', gid, colname, config=config_dict_ ) len_list.append(len(temp)) temp_list.append(temp) if colname == 'bboxes': accum_value = np.vstack(temp_list) else: accum_value = np.hstack(temp_list) assert len(accum_value) == sum(len_list) accum_list.append(accum_value) yield tuple(accum_list) logger.info('[ibs] Preprocess Localizations') logger.info('config = %r' % (config,)) # Get controller ibs = depc.controller ibs.assert_valid_gids(gid_list) config = dict(config) config['sensitivity'] = 0.0 flip = config.get('flip', False) if flip: assert ( config['algo'] == 'lightnet' ), 'config "flip" is only supported by the "lightnet" algo' # Normal computations base_key_list = ['xtl', 'ytl', 'width', 'height', 'theta', 'confidence', 'class'] # Temporary for all detectors base_key_list[4] = (0.0,) # Theta ###################################################################################### if config['algo'] in ['pydarknet', 'yolo', 'cnn']: from wbia.algo.detect import yolo logger.info('[ibs] detecting using PyDarknet CNN YOLO v1') detect_gen = yolo.detect_gid_list(ibs, gid_list, **config) ###################################################################################### elif config['algo'] in ['lightnet']: from wbia.algo.detect import lightnet logger.info('[ibs] detecting using Lightnet CNN YOLO v2') detect_gen = lightnet.detect_gid_list(ibs, gid_list, **config) elif config['algo'] in ['azure']: from wbia.algo.detect import azure logger.info('[ibs] detecting using Azure CustomVision') detect_gen = azure.detect_gid_list(ibs, gid_list, **config) ###################################################################################### elif config['algo'] in ['rf']: from wbia.algo.detect import randomforest logger.info('[ibs] detecting using Random Forests') assert config['species'] is not None base_key_list[6] = (config['species'],) # class == species detect_gen = randomforest.detect_gid_list_with_species(ibs, gid_list, **config) ###################################################################################### elif config['algo'] in ['selective-search']: from wbia.algo.detect import selectivesearch logger.info('[ibs] detecting using Selective Search') matlab_command = 'selective_search' detect_gen = selectivesearch.detect_gid_list( ibs, gid_list, matlab_command=matlab_command, **config ) ###################################################################################### elif config['algo'] in ['selective-search-rcnn']: from wbia.algo.detect import selectivesearch logger.info('[ibs] detecting using Selective Search (R-CNN)') matlab_command = 'selective_search_rcnn' detect_gen = selectivesearch.detect_gid_list( ibs, gid_list, matlab_command=matlab_command, **config ) ###################################################################################### # elif config['algo'] in ['fast-rcnn']: # from wbia.algo.detect import fasterrcnn # logger.info('[ibs] detecting using CNN Fast R-CNN') # detect_gen = fasterrcnn.detect_gid_list(ibs, gid_list, **config) ###################################################################################### elif config['algo'] in ['faster-rcnn']: from wbia.algo.detect import fasterrcnn logger.info('[ibs] detecting using CNN Faster R-CNN') detect_gen = fasterrcnn.detect_gid_list(ibs, gid_list, **config) ###################################################################################### elif config['algo'] in ['darknet']: from wbia.algo.detect import darknet logger.info('[ibs] detecting using Darknet CNN YOLO') detect_gen = darknet.detect_gid_list(ibs, gid_list, **config) ###################################################################################### elif config['algo'] in ['ssd']: from wbia.algo.detect import ssd logger.info('[ibs] detecting using CNN SSD') detect_gen = ssd.detect_gid_list(ibs, gid_list, **config) # ###################################################################################### elif config['algo'] in ['_COMBINED']: # Combined computations config_dict_list = [ # {'algo': 'selective-search', 'config_filepath': None}, # SS1 {'algo': 'darknet', 'config_filepath': 'pretrained-tiny-pascal'}, # YOLO1 {'algo': 'darknet', 'config_filepath': 'pretrained-v2-pascal'}, # YOLO2 {'algo': 'faster-rcnn', 'config_filepath': 'pretrained-zf-pascal'}, # FRCNN1 { 'algo': 'faster-rcnn', 'config_filepath': 'pretrained-vgg-pascal', }, # FRCNN2 {'algo': 'ssd', 'config_filepath': 'pretrained-300-pascal'}, # SSD1 {'algo': 'ssd', 'config_filepath': 'pretrained-512-pascal'}, # SSD1 {'algo': 'ssd', 'config_filepath': 'pretrained-300-pascal-plus'}, # SSD {'algo': 'ssd', 'config_filepath': 'pretrained-512-pascal-plus'}, # SSD4 ] detect_gen = _combined(gid_list, config_dict_list) else: raise ValueError( 'specified detection algo is not supported in config = %r' % (config,) ) # yield detections for detect in detect_gen: if len(detect) == 3: gid, gpath, result_list = detect score = 0.0 result = _package_to_numpy(base_key_list, result_list, score) else: result = detect yield result
[docs]class LocalizerConfig(dtool.Config): _param_info_list = [ ut.ParamInfo('sensitivity', 0.0), ut.ParamInfo('nms', True), ut.ParamInfo('nms_thresh', 0.2), ut.ParamInfo('nms_aware', None, hideif=None), ut.ParamInfo('invalid', True), ut.ParamInfo('invalid_margin', 0.25), ut.ParamInfo('boundary', True), ]
[docs]@register_preproc( tablename='localizations', parents=['localizations_original'], colnames=['score', 'bboxes', 'thetas', 'confs', 'classes'], coltypes=[float, np.ndarray, np.ndarray, np.ndarray, np.ndarray], configclass=LocalizerConfig, fname='detectcache', chunksize=64 if const.CONTAINERIZED else 256, ) def compute_localizations(depc, loc_orig_id_list, config=None): r"""Extract the localizations for a given input image. Args: depc (wbia.depends_cache.DependencyCache): gid_list (list): list of image rowids config (dict): (default = None) Yields: (float, np.ndarray, np.ndarray, np.ndarray, np.ndarray): tup CommandLine: wbia compute_localizations CommandLine: python -m wbia.core_images compute_localizations --show Example: >>> # DISABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> print(depc.get_tablenames()) >>> gid_list = ibs.get_valid_gids()[:16] >>> config = {'algo': 'lightnet', 'nms': True} >>> # depc.delete_property('localizations', gid_list, config=config) >>> detects = depc.get_property('localizations', gid_list, 'bboxes', config=config) >>> print(detects) >>> config = {'combined': True} >>> # depc.delete_property('localizations', gid_list, config=config) >>> detects = depc.get_property('localizations', gid_list, 'bboxes', config=config) >>> print(detects) """ logger.info('[ibs] Preprocess Localizations') logger.info('config = %r' % (config,)) VERBOSE = False ibs = depc.controller zipped = zip(depc.get_native('localizations_original', loc_orig_id_list, None)) for loc_orig_id, detect in zip(loc_orig_id_list, zipped): score, bboxes, thetas, confs, classes = detect[0] # Apply Threshold if config['sensitivity'] > 0.0: count_old = len(bboxes) if count_old > 0: keep_list = np.array( [ index for index, conf in enumerate(confs) if conf >= config['sensitivity'] ] ) if len(keep_list) == 0: bboxes = np.array([]) thetas = np.array([]) confs = np.array([]) classes = np.array([]) else: bboxes = bboxes[keep_list] thetas = thetas[keep_list] confs = confs[keep_list] classes = classes[keep_list] count_new = len(bboxes) if VERBOSE: logger.info( 'Filtered with sensitivity = %0.02f (%d -> %d)' % (config['sensitivity'], count_old, count_new) ) # Apply NMS if config['nms']: indices = np.array(list(range(len(bboxes)))) indices, bboxes, thetas, confs, classes = ibs.nms_boxes( indices, bboxes, thetas, confs, classes, verbose=VERBOSE, **config ) # Kill invalid images if config['invalid']: margin = config['invalid_margin'] count_old = len(bboxes) if count_old > 0: gid = depc.get_ancestor_rowids( 'localizations_original', [loc_orig_id], 'images' )[0] w, h = ibs.get_image_sizes(gid) keep_list = [] for (xtl, ytl, width, height) in bboxes: xbr = xtl + width ybr = ytl + height x_margin = w * margin y_margin = h * margin x_min = 0 - x_margin x_max = w + x_margin y_min = 0 - y_margin y_max = h + y_margin keep = True if xtl < x_min or x_max < xtl or xbr < x_min or x_max < xbr: keep = False if ytl < y_min or y_max < ytl or ybr < y_min or y_max < ybr: keep = False keep_list.append(keep) if len(keep_list) == 0: bboxes = np.array([]) thetas = np.array([]) confs = np.array([]) classes = np.array([]) else: bboxes = bboxes[keep_list] thetas = thetas[keep_list] confs = confs[keep_list] classes = classes[keep_list] count_new = len(bboxes) if VERBOSE: logger.info( 'Filtered invalid images (%d -> %d)' % (count_old, count_new) ) if config['boundary']: gid = depc.get_ancestor_rowids( 'localizations_original', [loc_orig_id], 'images' )[0] w, h = ibs.get_image_sizes(gid) for index, (xtl, ytl, width, height) in enumerate(bboxes): xbr = xtl + width ybr = ytl + height xtl = min(max(0, xtl), w) xbr = min(max(0, xbr), w) ytl = min(max(0, ytl), h) ybr = min(max(0, ybr), h) width = xbr - xtl height = ybr - ytl bboxes[index][0] = xtl bboxes[index][1] = ytl bboxes[index][2] = width bboxes[index][3] = height yield ( score, bboxes, thetas, confs, classes, )
[docs]def get_localization_chips_worker( gid, img, bbox_list, theta_list, target_size, axis_aligned=False ): target_size_list = [target_size] * len(bbox_list) if axis_aligned: # Over-write bbox and theta with a friendlier, axis-aligned version bbox_list_ = [] theta_list_ = [] for bbox, theta in zip(bbox_list, theta_list): # Transformation matrix R = vt.rotation_around_bbox_mat3x3(theta, bbox) # Get verticies of the annotation polygon verts = vt.verts_from_bbox(bbox, close=True) # Rotate and transform vertices xyz_pts = vt.add_homogenous_coordinate(np.array(verts).T) trans_pts = vt.remove_homogenous_coordinate(R.dot(xyz_pts)) new_verts = np.round(trans_pts).astype(np.int).T.tolist() x_points = [pt[0] for pt in new_verts] y_points = [pt[1] for pt in new_verts] xtl = int(min(x_points)) xbr = int(max(x_points)) ytl = int(min(y_points)) ybr = int(max(y_points)) bbox_ = (xtl, ytl, xbr - xtl, ybr - ytl) theta_ = 0.0 bbox_list_.append(bbox_) theta_list_.append(theta_) bbox_list = bbox_list_ theta_list = theta_list_ # Build transformation from image to chip M_list = [ vt.get_image_to_chip_transform(bbox, new_size, theta) for bbox, theta, new_size in zip(bbox_list, theta_list, target_size_list) ] # Extract "chips" flags = cv2.INTER_LANCZOS4 borderMode = cv2.BORDER_CONSTANT warpkw = dict(flags=flags, borderMode=borderMode) def _compute_localiation_chip(tup): new_size, M = tup chip = cv2.warpAffine(img, M[0:2], tuple(new_size), **warpkw) # cv2.imshow('', chip) # cv2.waitKey() msg = 'Chip shape %r does not agree with target size %r' % ( chip.shape, target_size, ) assert chip.shape[0] == new_size[0] and chip.shape[1] == new_size[1], msg return chip arg_list = list(zip(target_size_list, M_list)) chip_list = [_compute_localiation_chip(tup_) for tup_ in arg_list] gid_list = [gid] * len(chip_list) return gid_list, chip_list
[docs]def get_localization_masks_worker(gid, img, bbox_list, theta_list, target_size): target_size_list = [target_size] * len(bbox_list) verts_list = vt.geometry.scaled_verts_from_bbox_gen(bbox_list, theta_list) # Extract "masks" interpolation = cv2.INTER_LANCZOS4 warpkw = dict(interpolation=interpolation) fill_pixel_value = (128, 128, 128) # Grey-scale medium def _compute_localiation_mask(tup): new_size, vert_list = tup # Copy the image, mask out the patch img_ = np.copy(img) vert_list_ = np.array(vert_list, dtype=np.int32) cv2.fillConvexPoly(img_, vert_list_, fill_pixel_value) # Resize the image mask = cv2.resize(img_, tuple(new_size), **warpkw) # cv2.imshow('', mask) # cv2.waitKey() msg = 'Chip shape %r does not agree with target size %r' % (mask.shape, new_size) assert mask.shape[0] == new_size[0] and mask.shape[1] == new_size[1], msg return mask arg_list = list(zip(target_size_list, verts_list)) mask_list = [_compute_localiation_mask(tup_) for tup_ in arg_list] gid_list = [gid] * len(mask_list) return gid_list, mask_list
[docs]def get_localization_chips(ibs, loc_id_list, target_size=(128, 128), axis_aligned=False): depc = ibs.depc_image gid_list_ = depc.get_ancestor_rowids('localizations', loc_id_list, 'images') assert len(gid_list_) == len(loc_id_list) # Grab the localizations bboxes_list = depc.get_native('localizations', loc_id_list, 'bboxes') len_list = [len(bbox_list) for bbox_list in bboxes_list] avg = sum(len_list) / len(len_list) args = ( len(loc_id_list), min(len_list), avg, max(len_list), sum(len_list), ) logger.info( 'Extracting %d localization chips (min: %d, avg: %0.02f, max: %d, total: %d)' % args ) thetas_list = depc.get_native('localizations', loc_id_list, 'thetas') OLD = True if OLD: gids_list = [ np.array([gid] * len(bbox_list)) for gid, bbox_list in zip(gid_list_, bboxes_list) ] # Flatten all of these lists for efficiency bbox_list = ut.flatten(bboxes_list) theta_list = ut.flatten(thetas_list) if axis_aligned: # Over-write bbox and theta with a friendlier, axis-aligned version bbox_list_ = [] theta_list_ = [] for bbox, theta in zip(bbox_list, theta_list): # Transformation matrix R = vt.rotation_around_bbox_mat3x3(theta, bbox) # Get verticies of the annotation polygon verts = vt.verts_from_bbox(bbox, close=True) # Rotate and transform vertices xyz_pts = vt.add_homogenous_coordinate(np.array(verts).T) trans_pts = vt.remove_homogenous_coordinate(R.dot(xyz_pts)) new_verts = np.round(trans_pts).astype(np.int).T.tolist() x_points = [pt[0] for pt in new_verts] y_points = [pt[1] for pt in new_verts] xtl = int(min(x_points)) xbr = int(max(x_points)) ytl = int(min(y_points)) ybr = int(max(y_points)) bbox_ = (xtl, ytl, xbr - xtl, ybr - ytl) theta_ = 0.0 bbox_list_.append(bbox_) theta_list_.append(theta_) bbox_list = bbox_list_ theta_list = theta_list_ gid_list = ut.flatten(gids_list) bbox_size_list = ut.take_column(bbox_list, [2, 3]) newsize_list = [target_size] * len(bbox_list) # Checks invalid_flags = [w == 0 or h == 0 for (w, h) in bbox_size_list] invalid_bboxes = ut.compress(bbox_list, invalid_flags) assert len(invalid_bboxes) == 0, 'invalid bboxes=%r' % (invalid_bboxes,) # Build transformation from image to chip M_list = [ vt.get_image_to_chip_transform(bbox, new_size, theta) for bbox, theta, new_size in zip(bbox_list, theta_list, newsize_list) ] # Extract "chips" flags = cv2.INTER_LANCZOS4 borderMode = cv2.BORDER_CONSTANT warpkw = dict(flags=flags, borderMode=borderMode) last_gid = None chip_list = [] arg_list = list(zip(gid_list, newsize_list, M_list)) for tup in ut.ProgIter(arg_list, lbl='computing localization chips', bs=True): gid, new_size, M = tup if gid != last_gid: img = ibs.get_images(gid) last_gid = gid chip = cv2.warpAffine(img, M[0:2], tuple(new_size), **warpkw) # cv2.imshow('', chip) # cv2.waitKey() msg = 'Chip shape %r does not agree with target size %r' % ( chip.shape, target_size, ) assert ( chip.shape[0] == target_size[0] and chip.shape[1] == target_size[1] ), msg chip_list.append(chip) else: target_size_list = [target_size] * len(bboxes_list) axis_aligned_list = [axis_aligned] * len(bboxes_list) img_list = [ibs.get_images(gid) for gid in gid_list_] arg_iter = list( zip( gid_list_, img_list, bboxes_list, thetas_list, target_size_list, axis_aligned_list, ) ) result_list = ut.util_parallel.generate2( get_localization_chips_worker, arg_iter, ordered=True ) # Compute results result_list = list(result_list) # Extract results gids_list = ut.take_column(result_list, 0) chips_list = ut.take_column(result_list, 1) # Explicitly garbage collect large list of chips result_list = None # Flatten results gid_list = ut.flatten(gids_list) chip_list = ut.flatten(chips_list) assert len(gid_list) == len(chip_list) return gid_list_, gid_list, chip_list
[docs]def get_localization_masks(ibs, loc_id_list, target_size=(128, 128)): depc = ibs.depc_image gid_list_ = depc.get_ancestor_rowids('localizations', loc_id_list, 'images') assert len(gid_list_) == len(loc_id_list) # Grab the localizations bboxes_list = depc.get_native('localizations', loc_id_list, 'bboxes') len_list = [len(bbox_list) for bbox_list in bboxes_list] avg = sum(len_list) / len(len_list) args = ( len(loc_id_list), min(len_list), avg, max(len_list), sum(len_list), ) logger.info( 'Extracting %d localization masks (min: %d, avg: %0.02f, max: %d, total: %d)' % args ) thetas_list = depc.get_native('localizations', loc_id_list, 'thetas') OLD = True if OLD: gids_list = [ np.array([gid] * len(bbox_list)) for gid, bbox_list in zip(gid_list_, bboxes_list) ] # Flatten all of these lists for efficiency bbox_list = ut.flatten(bboxes_list) theta_list = ut.flatten(thetas_list) verts_list = vt.geometry.scaled_verts_from_bbox_gen(bbox_list, theta_list) gid_list = ut.flatten(gids_list) bbox_size_list = ut.take_column(bbox_list, [2, 3]) newsize_list = [target_size] * len(bbox_list) # Checks invalid_flags = [w == 0 or h == 0 for (w, h) in bbox_size_list] invalid_bboxes = ut.compress(bbox_list, invalid_flags) assert len(invalid_bboxes) == 0, 'invalid bboxes=%r' % (invalid_bboxes,) # Extract "masks" interpolation = cv2.INTER_LANCZOS4 warpkw = dict(interpolation=interpolation) last_gid = None mask_list = [] arg_list = list(zip(gid_list, newsize_list, verts_list)) for tup in ut.ProgIter(arg_list, lbl='computing localization masks', bs=True): gid, new_size, vert_list = tup if gid != last_gid: img = ibs.get_images(gid) last_gid = gid # Copy the image, mask out the patch img_ = np.copy(img) vert_list_ = np.array(vert_list, dtype=np.int32) cv2.fillConvexPoly(img_, vert_list_, (128, 128, 128)) # Resize the image mask = cv2.resize(img_, tuple(new_size), **warpkw) # cv2.imshow('', mask) # cv2.waitKey() msg = 'Chip shape %r does not agree with target size %r' % ( mask.shape, target_size, ) assert ( mask.shape[0] == target_size[0] and mask.shape[1] == target_size[1] ), msg mask_list.append(mask) else: target_size_list = [target_size] * len(bboxes_list) img_list = [ibs.get_images(gid) for gid in gid_list_] arg_iter = list( zip(gid_list_, img_list, bboxes_list, thetas_list, target_size_list) ) result_list = ut.util_parallel.generate2( get_localization_masks_worker, arg_iter, ordered=True ) # Compute results result_list = list(result_list) # Extract results gids_list = ut.take_column(result_list, 0) chips_list = ut.take_column(result_list, 1) # Explicitly garbage collect large list of chips result_list = None # Flatten results gid_list = ut.flatten(gids_list) chip_list = ut.flatten(chips_list) assert len(gid_list) == len(chip_list) return gid_list_, gid_list, mask_list
[docs]def get_localization_aoi2(ibs, loc_id_list, target_size=(192, 192)): depc = ibs.depc_image gid_list_ = depc.get_ancestor_rowids('localizations', loc_id_list, 'images') assert len(gid_list_) == len(loc_id_list) # Grab the localizations bboxes_list = depc.get_native('localizations', loc_id_list, 'bboxes') gids_list = [ np.array([gid] * len(bbox_list)) for gid, bbox_list in zip(gid_list_, bboxes_list) ] # Flatten all of these lists for efficiency gid_list = ut.flatten(gids_list) bbox_list = ut.flatten(bboxes_list) size_list = ibs.get_image_sizes(gid_list) config_ = { 'draw_annots': False, 'thumbsize': target_size, } thumbnail_list = depc.get_property('thumbnails', gid_list, 'img', config=config_) return gid_list_, gid_list, thumbnail_list, bbox_list, size_list
ChipListImgType = dtool.ExternType( ut.partial(ut.load_cPkl, verbose=False), ut.partial(ut.save_cPkl, verbose=False), extkey='ext', )
[docs]class Chip2Config(dtool.Config): _param_info_list = [ ut.ParamInfo('localization_chip_target_size', (128, 128)), ut.ParamInfo('localization_chip_masking', False), ] _sub_config_list = [ThumbnailConfig]
[docs]@register_preproc( tablename='localizations_chips', parents=['localizations'], colnames=['chips'], coltypes=[ChipListImgType], configclass=Chip2Config, fname='chipcache4', chunksize=32 if const.CONTAINERIZED else 128, ) def compute_localizations_chips(depc, loc_id_list, config=None): r"""Extract the detections for a given input image. Args: depc (wbia.depends_cache.DependencyCache): loc_id_list (list): list of localization rowids config (dict): (default = None) Yields: (float, str): tup CommandLine: wbia compute_localizations_chips Example: >>> # DISABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> gid_list = ibs.get_valid_gids()[0:8] >>> config = {'combined': True, 'localization_chip_masking': True} >>> # depc.delete_property('localizations_chips', gid_list, config=config) >>> results = depc.get_property('localizations_chips', gid_list, None, config=config) >>> print(results) >>> config = {'combined': True, 'localization_chip_masking': False} >>> # depc.delete_property('localizations_chips', gid_list, config=config) >>> results = depc.get_property('localizations_chips', gid_list, None, config=config) >>> print(results) """ logger.info('[ibs] Process Localization Chips') logger.info('config = %r' % (config,)) # Get controller ibs = depc.controller masking = config['localization_chip_masking'] target_size = config['localization_chip_target_size'] target_size_list = [target_size] * len(loc_id_list) gid_list_ = depc.get_ancestor_rowids('localizations', loc_id_list, 'images') assert len(gid_list_) == len(loc_id_list) # Grab the localizations bboxes_list = depc.get_native('localizations', loc_id_list, 'bboxes') thetas_list = depc.get_native('localizations', loc_id_list, 'thetas') len_list = [len(bbox_list) for bbox_list in bboxes_list] avg = sum(len_list) / len(len_list) args = ( len(loc_id_list), min(len_list), avg, max(len_list), sum(len_list), ) # Create image iterator img_list = (ibs.get_images(gid) for gid in gid_list_) if masking: logger.info( 'Extracting %d localization masks (min: %d, avg: %0.02f, max: %d, total: %d)' % args ) worker_func = get_localization_masks_worker else: logger.info( 'Extracting %d localization chips (min: %d, avg: %0.02f, max: %d, total: %d)' % args ) worker_func = get_localization_chips_worker arg_iter = zip(gid_list_, img_list, bboxes_list, thetas_list, target_size_list) result_list = ut.util_parallel.generate2( worker_func, arg_iter, ordered=True, nTasks=len(gid_list_), force_serial=True ) # Return the results for gid, chip_list in result_list: ret_tuple = (chip_list,) yield ret_tuple
[docs]class ClassifierLocalizationsConfig(dtool.Config): _param_info_list = [ ut.ParamInfo('classifier_algo', 'cnn', valid_values=['cnn', 'svm']), ut.ParamInfo('classifier_weight_filepath', None), ut.ParamInfo( 'classifier_masking', False, hideif=False ), # True will classify localization chip as whole-image, False will classify whole image with localization masked out. ] _sub_config_list = [ ThumbnailConfig, ]
[docs]@register_preproc( tablename='localizations_classifier', parents=['localizations'], colnames=['score', 'class'], coltypes=[np.ndarray, np.ndarray], configclass=ClassifierLocalizationsConfig, fname='detectcache', chunksize=2 if const.CONTAINERIZED else 8, ) def compute_localizations_classifications(depc, loc_id_list, config=None): r"""Extract the detections for a given input image. Args: depc (wbia.depends_cache.DependencyCache): loc_id_list (list): list of localization rowids config (dict): (default = None) Yields: (float, str): tup CommandLine: wbia compute_localizations_classifications Example: >>> # DISABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> gid_list = ibs.get_valid_gids()[0:8] >>> config = {'algo': 'yolo'} >>> # depc.delete_property('localizations_classifier', gid_list, config=config) >>> results = depc.get_property('localizations_classifier', gid_list, None, config=config) >>> print(results) >>> config = {'algo': 'yolo', 'classifier_masking': True} >>> # depc.delete_property('localizations_classifier', gid_list, config=config) >>> results = depc.get_property('localizations_classifier', gid_list, None, config=config) >>> print(results) >>> >>> depc = ibs.depc_image >>> gid_list = list(set(ibs.get_imageset_gids(ibs.get_imageset_imgsetids_from_text('TEST_SET')))) >>> config = {'combined': True, 'classifier_algo': 'svm', 'classifier_weight_filepath': None} >>> # depc.delete_property('localizations_classifier', gid_list, config=config) >>> results = depc.get_property('localizations_classifier', gid_list, None, config=config) >>> print(results) >>> >>> config = {'combined': True, 'classifier_algo': 'svm', 'classifier_weight_filepath': 'localizer-zebra-10'} >>> # depc.delete_property('localizations_classifier', gid_list, config=config) >>> results = depc.get_property('localizations_classifier', gid_list, None, config=config) >>> print(results) >>> >>> config = {'combined': True, 'classifier_algo': 'svm', 'classifier_weight_filepath': 'localizer-zebra-50'} >>> results = depc.get_property('localizations_classifier', gid_list, None, config=config) >>> print(results) >>> >>> config = {'combined': True, 'classifier_algo': 'svm', 'classifier_weight_filepath': 'localizer-zebra-100'} >>> results = depc.get_property('localizations_classifier', gid_list, None, config=config) >>> print(results) """ logger.info('[ibs] Process Localization Classifications') logger.info('config = %r' % (config,)) # Get controller ibs = depc.controller masking = config.get('classifier_masking', False) # Get the results from the algorithm if config['classifier_algo'] in ['cnn']: if masking: gid_list_, gid_list, thumbnail_list = get_localization_masks( ibs, loc_id_list, target_size=(192, 192) ) else: gid_list_, gid_list, thumbnail_list = get_localization_chips( ibs, loc_id_list, target_size=(192, 192) ) # Generate thumbnail classifications result_list = ibs.generate_thumbnail_class_list(thumbnail_list, **config) # Assert the length is the same assert len(gid_list) == len(result_list) # Release thumbnails thumbnail_list = None # Group the results group_dict = {} for gid, result in zip(gid_list, result_list): if gid not in group_dict: group_dict[gid] = [] group_dict[gid].append(result) assert len(gid_list_) == len(group_dict.keys()) if masking: # We need to perform a difference calculation to see how much the masking # caused a deviation from the un-masked image config_ = dict(config) key_list = ['thumbnail_cfg', 'classifier_masking'] for key in key_list: config_.pop(key) class_list_ = depc.get_property( 'classifier', gid_list_, 'class', config=config_ ) score_list_ = depc.get_property( 'classifier', gid_list_, 'score', config=config_ ) else: class_list_ = [None] * len(gid_list_) score_list_ = [None] * len(gid_list_) # Return the results for gid, class_, score_ in zip(gid_list_, class_list_, score_list_): result_list = group_dict[gid] zipped_list = list(zip(*result_list)) score_list = np.array(zipped_list[0]) class_list = np.array(zipped_list[1]) if masking: score_ = score_ if class_ == 'positive' else 1.0 - score_ score_list = score_ - score_list class_list = np.array(['positive'] * len(score_list)) # Return tuple values ret_tuple = ( score_list, class_list, ) yield ret_tuple elif config['classifier_algo'] in ['svm']: from wbia.algo.detect.svm import classify # from localizations get gids config_ = { 'combined': True, 'feature2_algo': 'resnet', 'feature2_chip_masking': masking, } gid_list_ = depc.get_ancestor_rowids('localizations', loc_id_list, 'images') assert len(gid_list_) == len(loc_id_list) # Get features vectors_list = depc.get_property( 'localizations_features', gid_list_, 'vector', config=config_ ) vectors_list_ = np.vstack(vectors_list) # Get gid_list shape_list = [vector_list.shape[0] for vector_list in vectors_list] gids_list = [[gid_] * shape for gid_, shape in zip(gid_list_, shape_list)] gid_list = ut.flatten(gids_list) # Stack vectors and classify classifier_weight_filepath = config['classifier_weight_filepath'] result_list = classify( vectors_list_, weight_filepath=classifier_weight_filepath, verbose=True ) # Group the results score_dict = {} class_dict = {} for index, (gid, result) in enumerate(zip(gid_list, result_list)): if gid not in score_dict: score_dict[gid] = [] if gid not in class_dict: class_dict[gid] = [] score_, class_ = result score_dict[gid].append(score_) class_dict[gid].append(class_) assert len(gid_list_) == len(score_dict.keys()) assert len(gid_list_) == len(class_dict.keys()) if masking: # We need to perform a difference calculation to see how much the masking # caused a deviation from the un-masked image config_ = dict(config) key_list = ['thumbnail_cfg', 'classifier_masking'] for key in key_list: config_.pop(key) class_list_ = depc.get_property( 'classifier', gid_list_, 'class', config=config_ ) score_list_ = depc.get_property( 'classifier', gid_list_, 'score', config=config_ ) else: class_list_ = [None] * len(gid_list_) score_list_ = [None] * len(gid_list_) # Return the results for gid_, class_, score_ in zip(gid_list_, class_list_, score_list_): score_list = score_dict[gid_] class_list = class_dict[gid_] if masking: score_ = score_ if class_ == 'positive' else 1.0 - score_ score_list = score_ - np.array(score_list) class_list = np.array(['positive'] * len(score_list)) ret_tuple = ( np.array(score_list), np.array(class_list), ) yield ret_tuple
[docs]class Feature2Config(dtool.Config): _param_info_list = [ ut.ParamInfo( 'feature2_algo', 'vgg16', valid_values=['vgg', 'vgg16', 'vgg19', 'resnet', 'inception'], ), ut.ParamInfo('feature2_chip_masking', False, hideif=False), ut.ParamInfo('flatten', True), ] _sub_config_list = [ThumbnailConfig]
[docs]@register_preproc( tablename='localizations_features', parents=['localizations'], colnames=['vector'], coltypes=[np.ndarray], configclass=Feature2Config, fname='featcache', chunksize=2 if const.CONTAINERIZED else 4, ) def compute_localizations_features(depc, loc_id_list, config=None): r"""Compute features on images using pre-trained state-of-the-art models in Keras. Args: depc (wbia.depends_cache.DependencyCache): gid_list (list): list of image rowids config (dict): (default = None) Yields: (np.ndarray, ): tup CommandLine: wbia compute_localizations_features CommandLine: python -m wbia.core_images compute_localizations_features --show Example: >>> # DISABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> print(depc.get_tablenames()) >>> gid_list = ibs.get_valid_gids()[:16] >>> config = {'feature2_algo': 'vgg16', 'combined': True} >>> depc.delete_property('localizations_features', gid_list, config=config) >>> features = depc.get_property('localizations_features', gid_list, 'vector', config=config) >>> print(features) >>> config = {'feature2_algo': 'vgg19', 'combined': True} >>> depc.delete_property('localizations_features', gid_list, config=config) >>> features = depc.get_property('localizations_features', gid_list, 'vector', config=config) >>> print(features) >>> config = {'feature2_algo': 'resnet', 'combined': True} >>> depc.delete_property('localizations_features', gid_list, config=config) >>> features = depc.get_property('localizations_features', gid_list, 'vector', config=config) >>> print(features) >>> config = {'feature2_algo': 'inception', 'combined': True} >>> depc.delete_property('localizations_features', gid_list, config=config) >>> features = depc.get_property('localizations_features', gid_list, 'vector', config=config) >>> print(features) """ from PIL import Image from keras.preprocessing import image as preprocess_image logger.info('[ibs] Preprocess Features') logger.info('config = %r' % (config,)) # Get controller ibs = depc.controller target_size = (224, 224) ###################################################################################### if config['feature2_algo'] in ['vgg', 'vgg16']: from keras.applications.vgg16 import VGG16 as MODEL_CLASS from keras.applications.vgg16 import preprocess_input ###################################################################################### elif config['feature2_algo'] in ['vgg19']: from keras.applications.vgg19 import VGG19 as MODEL_CLASS from keras.applications.vgg19 import preprocess_input ###################################################################################### elif config['feature2_algo'] in ['resnet']: from keras.applications.resnet50 import ResNet50 as MODEL_CLASS # NOQA from keras.applications.resnet50 import preprocess_input ###################################################################################### elif config['feature2_algo'] in ['inception']: from keras.applications.inception_v3 import InceptionV3 as MODEL_CLASS # NOQA from keras.applications.inception_v3 import preprocess_input target_size = (299, 299) ###################################################################################### else: raise ValueError( 'specified feature algo is not supported in config = %r' % (config,) ) # Load chips if config['feature2_chip_masking']: gid_list_, gid_list, thumbnail_list = get_localization_masks( ibs, loc_id_list, target_size=target_size ) else: gid_list_, gid_list, thumbnail_list = get_localization_chips( ibs, loc_id_list, target_size=target_size ) # Build model model = MODEL_CLASS(include_top=False) # Define Preprocess def _preprocess(thumbnail): thumbnail = cv2.cvtColor(thumbnail, cv2.COLOR_BGR2RGB) image = Image.fromarray(thumbnail) # Process PIL image image_array = preprocess_image.img_to_array(image) image_array = np.expand_dims(image_array, axis=0) image_array = preprocess_input(image_array) return image_array thumbnail_iter = ut.ProgIter(thumbnail_list, lbl='preprocessing chips', bs=True) image_array = [_preprocess(thumbnail) for thumbnail in thumbnail_iter] # Release thumbnails thumbnail_list = None inference_iter = ut.ProgIter(image_array, lbl='forward inference', bs=True) result_list = [model.predict(image_array_) for image_array_ in inference_iter] # Release image_array image_array = None # Group the results group_dict = {} for gid, result in zip(gid_list, result_list): if gid not in group_dict: group_dict[gid] = [] group_dict[gid].append(result) assert len(gid_list_) == len(group_dict.keys()) # Return the results group_iter = ut.ProgIter(gid_list_, lbl='grouping results', bs=True) for gid in group_iter: result_list = group_dict[gid] if config['flatten']: result_list = [_.flatten() for _ in result_list] result_list = np.vstack(result_list) # Return tuple values ret_tuple = (result_list,) yield ret_tuple
[docs]class LabelerConfig(dtool.Config): _param_info_list = [ ut.ParamInfo( 'labeler_algo', 'pipeline', valid_values=['azure', 'cnn', 'pipeline', 'densenet'], ), ut.ParamInfo('labeler_weight_filepath', None), ut.ParamInfo('labeler_axis_aligned', False, hideif=False), ]
[docs]@register_preproc( tablename='localizations_labeler', parents=['localizations'], colnames=['score', 'species', 'viewpoint', 'quality', 'orientation', 'probs'], coltypes=[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, list], configclass=LabelerConfig, fname='detectcache', chunksize=32 if const.CONTAINERIZED else 128, ) def compute_localizations_labels(depc, loc_id_list, config=None): r"""Extract the detections for a given input image. Args: depc (wbia.depends_cache.DependencyCache): loc_id_list (list): list of localization rowids config (dict): (default = None) Yields: (float, str): tup CommandLine: python -m wbia.core_images --exec-compute_localizations_labels Example: >>> # DISABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> gid_list = ibs.get_valid_gids()[0:10] >>> config = {'labeler_algo': 'densenet', 'labeler_weight_filepath': 'giraffe_v1'} >>> # depc.delete_property('localizations_labeler', aid_list) >>> results = depc.get_property('localizations_labeler', gid_list, None, config=config) >>> print(results) >>> config = {'labeler_weight_filepath': 'candidacy'} >>> # depc.delete_property('localizations_labeler', aid_list) >>> results = depc.get_property('localizations_labeler', gid_list, None, config=config) >>> print(results) """ from os.path import join, exists logger.info('[ibs] Process Localization Labels') logger.info('config = %r' % (config,)) # Get controller ibs = depc.controller if config['labeler_algo'] in ['pipeline', 'cnn']: gid_list_, gid_list, chip_list = get_localization_chips( ibs, loc_id_list, target_size=(128, 128), axis_aligned=config['labeler_axis_aligned'], ) result_list = ibs.generate_chip_label_list(chip_list, **config) elif config['labeler_algo'] in ['azure']: raise NotImplementedError('Azure is not implemented for images') elif config['labeler_algo'] in ['densenet']: from wbia.algo.detect import densenet target_size = ( densenet.INPUT_SIZE, densenet.INPUT_SIZE, ) gid_list_, gid_list, chip_list = get_localization_chips( ibs, loc_id_list, target_size=target_size, axis_aligned=config['labeler_axis_aligned'], ) config = dict(config) config['classifier_weight_filepath'] = config['labeler_weight_filepath'] nonce = ut.random_nonce()[:16] cache_path = join(ibs.cachedir, 'localization_labels_%s' % (nonce,)) assert not exists(cache_path) ut.ensuredir(cache_path) chip_filepath_list = [] for index, chip in enumerate(chip_list): chip_filepath = join(cache_path, 'chip_%08d.png' % (index,)) cv2.imwrite(chip_filepath, chip) assert exists(chip_filepath) chip_filepath_list.append(chip_filepath) result_gen = densenet.test_dict(chip_filepath_list, return_dict=True, **config) result_list = list(result_gen) ut.delete(cache_path) assert len(gid_list) == len(result_list) # Release chips chip_list = None # Group the results group_dict = {} for gid, result in zip(gid_list, result_list): if gid not in group_dict: group_dict[gid] = [] group_dict[gid].append(result) # Return the results for gid in gid_list_: result_list = group_dict.get(gid, None) if result_list is None: ret_tuple = ( np.array([]), np.array([]), np.array([]), np.array([]), np.array([]), [], ) else: zipped_list = list(zip(*result_list)) ret_tuple = ( np.array(zipped_list[0]), np.array(zipped_list[1]), np.array(zipped_list[2]), np.array(zipped_list[3]), np.array(zipped_list[4]), list(zipped_list[5]), ) yield ret_tuple
[docs]class AoIConfig(dtool.Config): _param_info_list = [ ut.ParamInfo('aoi_two_weight_filepath', None), ]
[docs]@register_preproc( tablename='localizations_aoi_two', parents=['localizations'], colnames=['score', 'class'], coltypes=[np.ndarray, np.ndarray], configclass=AoIConfig, fname='detectcache', chunksize=32 if const.CONTAINERIZED else 128, ) def compute_localizations_interest(depc, loc_id_list, config=None): r"""Extract the detections for a given input image. Args: depc (wbia.depends_cache.DependencyCache): loc_id_list (list): list of localization rowids config (dict): (default = None) Yields: (float, str): tup CommandLine: wbia compute_localizations_labels Example: >>> # DISABLE_DOCTEST >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> depc = ibs.depc_image >>> gid_list = ibs.get_valid_gids()[0:100] >>> depc.delete_property('labeler', gid_list) >>> results = depc.get_property('labeler', gid_list, None) >>> results = depc.get_property('labeler', gid_list, 'species') >>> print(results) """ logger.info('[ibs] Process Localization AoI2s') logger.info('config = %r' % (config,)) # Get controller ibs = depc.controller values = get_localization_aoi2(ibs, loc_id_list, target_size=(192, 192)) gid_list_, gid_list, thumbnail_list, bbox_list, size_list = values # Get the results from the algorithm size_list = ibs.get_image_sizes(gid_list) result_list = ibs.generate_thumbnail_aoi2_list( thumbnail_list, bbox_list, size_list, **config ) assert len(gid_list) == len(result_list) # Release chips thumbnail_list = None # Group the results group_dict = {} for gid, result in zip(gid_list, result_list): if gid not in group_dict: group_dict[gid] = [] group_dict[gid].append(result) assert len(gid_list_) == len(group_dict.keys()) # Return the results for gid in gid_list_: result_list = group_dict[gid] zipped_list = list(zip(*result_list)) ret_tuple = ( np.array(zipped_list[0]), np.array(zipped_list[1]), ) yield ret_tuple
[docs]class DetectorConfig(dtool.Config): _param_info_list = [ ut.ParamInfo('classifier_weight_filepath', 'candidacy'), ut.ParamInfo('classifier_sensitivity', 0.10), # ut.ParamInfo('localizer_algo', 'yolo'), ut.ParamInfo('localizer_config_filepath', 'candidacy'), ut.ParamInfo('localizer_weight_filepath', 'candidacy'), ut.ParamInfo('localizer_grid', False), ut.ParamInfo('localizer_sensitivity', 0.10), # ut.ParamInfo('labeler_weight_filepath', 'candidacy'), ut.ParamInfo('labeler_sensitivity', 0.10), ] _sub_config_list = [ ThumbnailConfig, LocalizerConfig, ]
[docs]@register_preproc( tablename='detections', parents=['images'], colnames=['score', 'bboxes', 'thetas', 'species', 'viewpoints', 'confs'], coltypes=[float, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray], configclass=DetectorConfig, fname='detectcache', chunksize=32 if const.CONTAINERIZED else 256, ) def compute_detections(depc, gid_list, config=None): r"""Extract the detections for a given input image. Args: depc (wbia.depends_cache.DependencyCache): gid_list (list): list of image rowids config (dict): (default = None) Yields: (float, np.ndarray, np.ndarray, np.ndarray, np.ndarray): tup CommandLine: wbia compute_detections Example: >>> # SLOW_DOCTEST >>> # xdoctest: +SKIP >>> from wbia.core_images import * # NOQA >>> import wbia >>> defaultdb = 'PZ_MTEST' >>> ibs = wbia.opendb(defaultdb=defaultdb) >>> # dbdir = '/Users/bluemellophone/Desktop/GGR-IBEIS-TEST/' >>> # dbdir = '/media/danger/GGR/GGR-IBEIS-TEST/' >>> # ibs = wbia.opendb(dbdir=dbdir) >>> depc = ibs.depc_image >>> gid_list = ibs.get_valid_gids()[0:2] >>> depc.delete_property('detections', gid_list) >>> detects = depc.get_property('detections', gid_list, None) >>> print(detects) """ logger.info('[ibs] Preprocess Detections') logger.info('config = %r' % (config,)) # Get controller ibs = depc.controller ibs.assert_valid_gids(gid_list) USE_CLASSIFIER = False if USE_CLASSIFIER: classifier_config = { 'classifier_weight_filepath': config['classifier_weight_filepath'], } # Filter the gids by annotations prediction_list = depc.get_property( 'classifier', gid_list, 'class', config=classifier_config ) confidence_list = depc.get_property( 'classifier', gid_list, 'score', config=classifier_config ) confidence_list = [ confidence if prediction == 'positive' else 1.0 - confidence for prediction, confidence in zip(prediction_list, confidence_list) ] gid_list_ = [ gid for gid, confidence in zip(gid_list, confidence_list) if confidence >= config['classifier_sensitivity'] ] else: classifier_config = { 'classifier_two_weight_filepath': config['classifier_weight_filepath'], } # Filter the gids by annotations predictions_list = depc.get_property( 'classifier_two', gid_list, 'classes', config=classifier_config ) gid_list_ = [ gid for gid, prediction_list in zip(gid_list, predictions_list) if len(prediction_list) > 0 ] gid_set_ = set(gid_list_) # Get the localizations for the good gids and add formal annotations localizer_config = { 'algo': config['localizer_algo'], 'config_filepath': config['localizer_config_filepath'], 'weight_filepath': config['localizer_weight_filepath'], 'grid': config['localizer_grid'], } bboxes_list = depc.get_property( 'localizations', gid_list_, 'bboxes', config=localizer_config ) thetas_list = depc.get_property( 'localizations', gid_list_, 'thetas', config=localizer_config ) confses_list = depc.get_property( 'localizations', gid_list_, 'confs', config=localizer_config ) # Get the corrected species and viewpoints labeler_config = { 'labeler_weight_filepath': config['labeler_weight_filepath'], } # depc.delete_property('localizations_labeler', gid_list_, config=labeler_config) specieses_list = depc.get_property( 'localizations_labeler', gid_list_, 'species', config=labeler_config ) viewpoints_list = depc.get_property( 'localizations_labeler', gid_list_, 'viewpoint', config=labeler_config ) scores_list = depc.get_property( 'localizations_labeler', gid_list_, 'score', config=labeler_config ) # Collect the detections, filtering by the localization confidence empty_list = [ 0.0, np.array([]), np.array([]), np.array([]), np.array([]), np.array([]), ] detect_dict = {} for index, gid in enumerate(gid_list_): bbox_list = bboxes_list[index] theta_list = thetas_list[index] species_list = specieses_list[index] # species_dict = {} # for species in species_list: # if species not in species_dict: # species_dict[species] = 0 # species_dict[species] += 1 # for tup in species_dict.iteritems(): # logger.info('\t%r' % (tup, )) # logger.info('----') viewpoint_list = viewpoints_list[index] conf_list = confses_list[index] score_list = scores_list[index] zipped = list( zip( bbox_list, theta_list, species_list, viewpoint_list, conf_list, score_list, ) ) zipped_ = [] for bbox, theta, species, viewpoint, conf, score in zipped: if ( conf >= config['localizer_sensitivity'] and score >= config['labeler_sensitivity'] ): zipped_.append([bbox, theta, species, viewpoint, conf * score]) else: logger.info( 'Localizer %0.02f %0.02f' % (conf, config['localizer_sensitivity']) ) logger.info( 'Labeler %0.02f %0.02f' % (score, config['labeler_sensitivity']) ) if len(zipped_) == 0: detect_list = list(empty_list) else: detect_list = [0.0] + [np.array(_) for _ in zip(*zipped_)] detect_dict[gid] = detect_list # Filter the annotations by the localizer operating point for gid in gid_list: if gid not in gid_set_: assert gid not in detect_dict result = list(empty_list) else: assert gid in detect_dict result = detect_dict[gid] # logger.info(result) # raw_input() # logger.info('') # image = ibs.get_images(gid) # image = vt.resize(image, (500, 500)) # cv2.imshow('', image) # cv2.waitKey(0) yield tuple(result)
[docs]class CameraTrapEXIFConfig(dtool.Config): _param_info_list = [ ut.ParamInfo('bottom', 80), ut.ParamInfo('psm', 7), ut.ParamInfo('oem', 1), ut.ParamInfo('whitelist', '0123456789°CF/:'), ]
[docs]@register_preproc( tablename='cameratrap_exif', parents=['images'], colnames=['raw'], coltypes=[str], configclass=CameraTrapEXIFConfig, fname='exifcache', chunksize=1024, ) def compute_cameratrap_exif(depc, gid_list, config=None): ibs = depc.controller gpath_list = ibs.get_image_paths(gid_list) orient_list = ibs.get_image_orientation(gid_list) arg_iter = list(zip(gpath_list, orient_list)) kwargs_iter = [config] * len(gid_list) raw_list = ut.util_parallel.generate2( compute_cameratrap_exif_worker, arg_iter, kwargs_iter ) for raw in raw_list: yield (raw,)
[docs]def compute_cameratrap_exif_worker( gpath, orient, bottom=80, psm=7, oem=1, whitelist='0123456789°CF/:' ): import pytesseract img = vt.imread(gpath, orient=orient) # Crop img = img[-1 * bottom :, :, :] config = [] if sys.platform.startswith('darwin'): config += ['--tessdata-dir', '"/opt/local/share/"'] else: config += ['--tessdata-dir', '"/usr/share/tesseract-ocr/"'] config += [ '--psm', str(psm), '--oem', str(oem), '-c', 'tessedit_char_whitelist=%s' % (whitelist,), ] config = ' '.join(config) try: raw = pytesseract.image_to_string(img, config=config) except Exception: raw = None return raw