Source code for wbia.other.detectcore

# -*- coding: utf-8 -*-
"""Developer convenience functions for ibs (detections).

TODO: need to split up into sub modules:
    consistency_checks
    feasibility_fixes
    move the export stuff to dbio

    then there are also convineience functions that need to be ordered at least
    within this file
"""
import logging
from os.path import exists, expanduser, join, abspath
import numpy as np
import vtool as vt
import utool as ut
import cv2
import wbia.constants as const
from wbia.control import controller_inject
from wbia.other.detectfuncs import (
    general_parse_gt,
    general_get_imageset_gids,
    localizer_parse_pred,
    _resize,
    general_overlap,
)
import random

# Inject utool functions
(print, rrr, profile) = ut.inject2(__name__, '[other.detectcore]')
logger = logging.getLogger('wbia')


CLASS_INJECT_KEY, register_ibs_method = controller_inject.make_ibs_register_decorator(
    __name__
)


[docs]def nms(dets, scores, thresh, use_cpu=True): # Interface into Faster R-CNN's Python native NMS algorithm by Girshick et al. from wbia.algo.detect.nms.py_cpu_nms import py_cpu_nms return py_cpu_nms(dets, scores, thresh)
[docs]@register_ibs_method def export_to_pascal(ibs, *args, **kwargs): """Alias for export_to_xml""" return ibs.export_to_xml(*args, **kwargs)
[docs]@register_ibs_method def export_to_xml( ibs, species_list, species_mapping=None, offset='auto', enforce_viewpoint=False, target_size=900, purge=False, use_maximum_linear_dimension=True, use_existing_train_test=True, include_parts=False, gid_list=None, output_path=None, allow_empty_images=False, min_annot_size=5, **kwargs, ): """Create training XML for training models.""" import random from datetime import date from wbia.detecttools.pypascalmarkup import PascalVOC_Markup_Annotation logger.info('Received species_mapping = %r' % (species_mapping,)) if species_list is None: species_list = sorted(set(species_mapping.values())) logger.info('Using species_list = %r' % (species_list,)) def _add_annotation( annotation, bbox, theta, species_name, viewpoint, interest, decrease, width, height, part_type=None, ): # Transformation matrix R = vt.rotation_around_bbox_mat3x3(theta, bbox) # Get verticies of the annotation polygon verts = vt.verts_from_bbox(bbox, close=True) # Rotate and transform vertices xyz_pts = vt.add_homogenous_coordinate(np.array(verts).T) trans_pts = vt.remove_homogenous_coordinate(R.dot(xyz_pts)) new_verts = np.round(trans_pts).astype(np.int).T.tolist() x_points = [pt[0] for pt in new_verts] y_points = [pt[1] for pt in new_verts] xmin = int(min(x_points) * decrease) xmax = int(max(x_points) * decrease) ymin = int(min(y_points) * decrease) ymax = int(max(y_points) * decrease) # Bounds check xmin = max(xmin, 0) ymin = max(ymin, 0) xmax = min(xmax, width - 1) ymax = min(ymax, height - 1) # Get info info = {} w_ = xmax - xmin h_ = ymax - ymin if w_ < min_annot_size: return if h_ < min_annot_size: return if viewpoint != -1 and viewpoint is not None: info['pose'] = viewpoint if interest is not None: info['interest'] = '1' if interest else '0' if part_type is not None: species_name = '%s+%s' % (species_name, part_type) area = w_ * h_ logger.info('\t\tAdding %r with area %0.04f pixels^2' % (species_name, area)) annotation.add_object(species_name, (xmax, xmin, ymax, ymin), **info) current_year = int(date.today().year) information = {'database_name': ibs.get_dbname()} import datetime now = datetime.datetime.now() folder = 'VOC%d' % (now.year,) if output_path is None: output_path = ibs.get_cachedir() datadir = join(output_path, 'VOCdevkit', folder) imagedir = join(datadir, 'JPEGImages') annotdir = join(datadir, 'Annotations') setsdir = join(datadir, 'ImageSets') mainsetsdir = join(setsdir, 'Main') if purge: ut.delete(datadir) ut.ensuredir(datadir) ut.ensuredir(imagedir) ut.ensuredir(annotdir) ut.ensuredir(setsdir) ut.ensuredir(mainsetsdir) # Get all gids and process them if gid_list is None: gid_list = sorted(ibs.get_valid_gids()) sets_dict = { 'test': [], 'train': [], 'trainval': [], 'val': [], } index = 1 if offset == 'auto' else offset # Make a preliminary train / test split as imagesets or use the existing ones if not use_existing_train_test: ibs.imageset_train_test_split(**kwargs) train_gid_set = set(general_get_imageset_gids(ibs, 'TRAIN_SET', **kwargs)) test_gid_set = set(general_get_imageset_gids(ibs, 'TEST_SET', **kwargs)) logger.info('Exporting %d images' % (len(gid_list),)) for gid in gid_list: aid_list = ibs.get_image_aids(gid) image_uri = ibs.get_image_uris(gid) image_path = ibs.get_image_paths(gid) if len(aid_list) > 0 or allow_empty_images: fulldir = image_path.split('/') filename = fulldir.pop() extension = filename.split('.')[-1] # NOQA out_name = '%d_%06d' % (current_year, index) out_img = '%s.jpg' % (out_name,) _image = ibs.get_images(gid) height, width, channels = _image.shape condition = width > height if use_maximum_linear_dimension else width < height if condition: ratio = height / width decrease = target_size / width width = target_size height = int(target_size * ratio) else: ratio = width / height decrease = target_size / height height = target_size width = int(target_size * ratio) dst_img = join(imagedir, out_img) _image = vt.resize(_image, (width, height)) vt.imwrite(dst_img, _image) annotation = PascalVOC_Markup_Annotation( dst_img, folder, out_img, source=image_uri, **information ) bbox_list = ibs.get_annot_bboxes(aid_list) theta_list = ibs.get_annot_thetas(aid_list) species_name_list = ibs.get_annot_species_texts(aid_list) viewpoint_list = ibs.get_annot_viewpoints(aid_list) interest_list = ibs.get_annot_interest(aid_list) part_rowids_list = ibs.get_annot_part_rowids(aid_list) zipped = zip( bbox_list, theta_list, species_name_list, viewpoint_list, interest_list, part_rowids_list, ) for ( bbox, theta, species_name, viewpoint, interest, part_rowid_list, ) in zipped: if species_mapping is not None: species_name = species_mapping.get(species_name, species_name) if species_name is not None and species_name not in species_list: continue _add_annotation( annotation, bbox, theta, species_name, viewpoint, interest, decrease, width, height, ) if include_parts and len(part_rowid_list) > 0: part_bbox_list = ibs.get_part_bboxes(part_rowid_list) part_theta_list = ibs.get_part_thetas(part_rowid_list) part_type_list = ibs.get_part_types(part_rowid_list) part_zipped = zip(part_bbox_list, part_theta_list, part_type_list) for part_bbox, part_theta, part_type in part_zipped: part_viewpoint = viewpoint part_interest = None _add_annotation( annotation, part_bbox, part_theta, species_name, part_viewpoint, part_interest, decrease, width, height, part_type=part_type, ) out_filename = '%s.xml' % (out_name,) dst_annot = join(annotdir, out_filename) if gid in test_gid_set: sets_dict['test'].append(out_name) elif gid in train_gid_set: state = random.uniform(0.0, 1.0) if state <= 0.75: sets_dict['train'].append(out_name) sets_dict['trainval'].append(out_name) else: sets_dict['val'].append(out_name) sets_dict['trainval'].append(out_name) else: raise AssertionError( 'All gids must be either in the TRAIN_SET or TEST_SET imagesets' ) # Write XML logger.info( 'Copying:\n%r\n%r\n%r\n\n' % (image_path, dst_img, (width, height)) ) xml_data = open(dst_annot, 'w') xml_data.write(annotation.xml()) xml_data.close() while exists(dst_annot): index += 1 if offset != 'auto': break out_filename = '%d_%06d.xml' % (current_year, index) dst_annot = join(annotdir, out_filename) else: logger.info('Skipping:\n%r\n\n' % (image_path,)) for key in sets_dict.keys(): manifest_filename = '%s.txt' % (key,) manifest_filepath = join(mainsetsdir, manifest_filename) with open(manifest_filepath, 'w') as file_: sets_dict[key].append('') content = sets_dict[key] content = '\n'.join(content) file_.write(content) logger.info('...completed') return datadir
[docs]@register_ibs_method def export_to_coco( ibs, species_list, species_mapping={}, viewpoint_mapping={}, target_size=2400, use_maximum_linear_dimension=True, use_existing_train_test=True, include_parts=False, gid_list=None, include_reviews=False, require_image_reviewed=False, require_named=False, output_images=True, use_global_train_set=False, **kwargs, ): """Create training COCO dataset for training models.""" from datetime import date import datetime import random import json logger.info('Received species_mapping = %r' % (species_mapping,)) logger.info('Received viewpoint_mapping = %r' % (viewpoint_mapping,)) if species_list is None: species_list = sorted(set(species_mapping.values())) logger.info('Using species_list = %r' % (species_list,)) current_year = int(date.today().year) datadir = abspath(join(ibs.get_cachedir(), 'coco')) annotdir = join(datadir, 'annotations') imagedir = join(datadir, 'images') image_dir_dict = { 'train': join(imagedir, 'train%s' % (current_year,)), 'val': join(imagedir, 'val%s' % (current_year,)), 'test': join(imagedir, 'test%s' % (current_year,)), } ut.delete(datadir) ut.ensuredir(datadir) ut.ensuredir(annotdir) ut.ensuredir(imagedir) for dataset in image_dir_dict: ut.ensuredir(image_dir_dict[dataset]) info = { 'description': 'Wild Me %s Dataset' % (ibs.dbname,), # 'url' : 'http://www.greatgrevysrally.com', 'url': 'http://www.wildme.org', 'version': '1.0', 'year': current_year, 'contributor': 'Wild Me <dev@wildme.org>', 'date_created': datetime.datetime.utcnow().isoformat(' '), 'name': ibs.get_db_name(), 'uuid': str(ibs.get_db_init_uuid()), } licenses = [ { 'url': 'http://creativecommons.org/licenses/by-nc-nd/2.0/', 'id': 3, 'name': 'Attribution-NonCommercial-NoDerivs License', }, ] assert len(species_list) == len( set(species_list) ), 'Cannot have duplicate species in species_list' category_dict = {} categories = [] for index, species in enumerate(sorted(species_list)): species = species_mapping.get(species, species) categories.append({'id': index, 'name': species, 'supercategory': 'animal'}) category_dict[species] = index def _add_annotation_or_part( image_index, annot_index, annot_uuid, bbox, theta, species_name, viewpoint, interest, annot_name, decrease, width, height, individuals, part_index=None, part_uuid=None, ): is_part = part_index is not None R = vt.rotation_around_bbox_mat3x3(theta, bbox) verts = vt.verts_from_bbox(bbox, close=True) xyz_pts = vt.add_homogenous_coordinate(np.array(verts).T) trans_pts = vt.remove_homogenous_coordinate(R.dot(xyz_pts)) new_verts = np.round(trans_pts).astype(np.int).T.tolist() x_points = [int(np.around(pt[0] * decrease)) for pt in new_verts] y_points = [int(np.around(pt[1] * decrease)) for pt in new_verts] segmentation = ut.flatten(list(zip(x_points, y_points))) xmin = max(min(x_points), 0) ymin = max(min(y_points), 0) xmax = min(max(x_points), width - 1) ymax = min(max(y_points), height - 1) w = xmax - xmin h = ymax - ymin area = w * h xtl_, ytl_, w_, h_ = bbox xtl_ *= decrease ytl_ *= decrease w_ *= decrease h_ *= decrease annot_part = { 'bbox': [xtl_, ytl_, w_, h_], 'theta': theta, 'viewpoint': viewpoint, 'segmentation': [segmentation], 'segmentation_bbox': [xmin, ymin, w, h], 'area': area, 'iscrowd': 0, 'id': part_index if is_part else annot_index, 'image_id': image_index, 'category_id': category_dict[species_name], 'uuid': str(part_uuid if is_part else annot_uuid), 'individual_ids': individuals, } if is_part: annot_part['annot_id'] = annot_index else: annot_part['isinterest'] = int(interest) annot_part['name'] = annot_name return annot_part, area output_dict = {} for dataset in ['train', 'val', 'test']: output_dict[dataset] = { 'info': info, 'licenses': licenses, 'categories': categories, 'images': [], 'annotations': [], 'parts': [], } # Get all gids and process them if gid_list is None: if require_named: aid_list = ibs.get_valid_aids() species_list_ = ibs.get_annot_species(aid_list) flag_list = [ species_mapping.get(species_, species_) in species_list for species_ in species_list_ ] aid_list = ut.compress(aid_list, flag_list) nid_list = ibs.get_annot_nids(aid_list) flag_list = [nid >= 0 for nid in nid_list] aid_list = ut.compress(aid_list, flag_list) gid_list = list(set(ibs.get_annot_gids(aid_list))) else: gid_list = ibs.get_valid_gids() if require_image_reviewed: image_reviewed_list = ibs.get_image_reviewed(gid_list) gid_list = ut.compress(gid_list, image_reviewed_list) gid_list = sorted(list(set(gid_list))) # Make a preliminary train / test split as imagesets or use the existing ones if not use_existing_train_test: ibs.imageset_train_test_split(**kwargs) train_gid_set = set(general_get_imageset_gids(ibs, 'TRAIN_SET', **kwargs)) test_gid_set = set(general_get_imageset_gids(ibs, 'TEST_SET', **kwargs)) image_index = 1 annot_index = 1 part_index = 1 aid_dict = {} logger.info('Exporting %d images' % (len(gid_list),)) for gid in gid_list: if use_global_train_set: dataset = 'train' else: if gid in test_gid_set: dataset = 'test' elif gid in train_gid_set: state = random.uniform(0.0, 1.0) if state <= 0.75: dataset = 'train' else: dataset = 'val' else: # raise AssertionError('All gids must be either in the TRAIN_SET or TEST_SET imagesets') logger.info('GID = %r was not in the TRAIN_SET or TEST_SET' % (gid,)) dataset = 'test' width, height = ibs.get_image_sizes(gid) if target_size is None: decrease = 1.0 else: condition = width > height if use_maximum_linear_dimension else width < height if condition: ratio = height / width decrease = target_size / width width = target_size height = int(target_size * ratio) else: ratio = width / height decrease = target_size / height height = target_size width = int(target_size * ratio) image_path = ibs.get_image_paths(gid) image_filename = '%012d.jpg' % (image_index,) image_filepath = join(image_dir_dict[dataset], image_filename) if output_images: _image = ibs.get_images(gid) _image = vt.resize(_image, (width, height)) vt.imwrite(image_filepath, _image) image_gps = ibs.get_image_gps(gid) if image_gps is None or len(image_gps) != 2 or None in image_gps: image_gps_lat, image_gps_lon = None else: image_gps_lat, image_gps_lon = image_gps image_gps_lat = '%03.06f' % (image_gps_lat,) image_gps_lon = '%03.06f' % (image_gps_lon,) output_dict[dataset]['images'].append( { 'license': 3, 'file_name': image_filename, # 'file_name' : basename(ibs.get_image_uris_original(gid)), 'photographer': ibs.get_image_notes(gid), 'coco_url': None, 'height': height, 'width': width, 'date_captured': ibs.get_image_datetime_str(gid).replace('/', '-'), 'gps_lat_captured': image_gps_lat, 'gps_lon_captured': image_gps_lon, 'flickr_url': None, 'id': image_index, 'uuid': str(ibs.get_image_uuids(gid)), } ) logger.info( 'Copying:\n%r\n%r\n%r\n\n' % (image_path, image_filepath, (width, height)) ) aid_list = ibs.get_image_aids(gid) bbox_list = ibs.get_annot_bboxes(aid_list) theta_list = ibs.get_annot_thetas(aid_list) species_name_list = ibs.get_annot_species_texts(aid_list) viewpoint_list = ibs.get_annot_viewpoints(aid_list) interest_list = ibs.get_annot_interest(aid_list) annot_uuid_list = ibs.get_annot_uuids(aid_list) annot_name_list = ibs.get_annot_name_texts(aid_list) part_rowids_list = ibs.get_annot_part_rowids(aid_list) nid_list = ibs.get_annot_nids(aid_list) zipped = zip( aid_list, bbox_list, theta_list, species_name_list, viewpoint_list, interest_list, annot_uuid_list, annot_name_list, part_rowids_list, nid_list, ) for ( aid, bbox, theta, species_name, viewpoint, interest, annot_uuid, annot_name, part_rowid_list, nid, ) in zipped: species_name = species_mapping.get(species_name, species_name) if species_name is None: continue if species_name not in species_list: continue if require_named and nid < 0: continue viewpoint = viewpoint_mapping.get(species_name, {}).get(viewpoint, viewpoint) # if viewpoint is None: # continue individuals = ibs.get_name_aids(ibs.get_annot_nids(aid)) # Transformation matrix annot, area = _add_annotation_or_part( image_index, annot_index, annot_uuid, bbox, theta, species_name, viewpoint, interest, annot_name, decrease, width, height, individuals, ) logger.info( '\t\tAdding annot %r with area %0.04f pixels^2' % (species_name, area) ) if include_reviews: reviews = ibs.get_review_rowids_from_single([aid])[0] user_list = ibs.get_review_identity(reviews) aid_tuple_list = ibs.get_review_aid_tuple(reviews) decision_list = ibs.get_review_decision_str(reviews) ids = [] decisions = [] zipped = zip(user_list, aid_tuple_list, decision_list) for user, aid_tuple, decision in zipped: if 'user:web' not in user: continue match = list(set(aid_tuple) - set([aid])) assert len(match) == 1 ids.append(match[0]) decisions.append(decision.lower()) annot['review_ids'] = list(zip(ids, decisions)) output_dict[dataset]['annotations'].append(annot) if include_parts and len(part_rowid_list) > 0: part_uuid_list = ibs.get_part_uuids(part_rowid_list) part_bbox_list = ibs.get_part_bboxes(part_rowid_list) part_theta_list = ibs.get_part_thetas(part_rowid_list) part_type_list = ibs.get_part_types(part_rowid_list) part_zipped = zip( part_uuid_list, part_bbox_list, part_theta_list, part_type_list ) for part_uuid, part_bbox, part_theta, part_type in part_zipped: part_species_name = '%s+%s' % (species_name, part_type) part_species_name = species_mapping.get( part_species_name, part_species_name ) if part_species_name is None: continue if part_species_name not in species_list: continue part, area = _add_annotation_or_part( image_index, annot_index, annot_uuid, part_bbox, part_theta, part_species_name, viewpoint, interest, annot_name, decrease, width, height, individuals, part_index=part_index, part_uuid=part_uuid, ) logger.info( '\t\tAdding part %r with area %0.04f pixels^2' % (part_species_name, area) ) output_dict[dataset]['parts'].append(part) part_index += 1 aid_dict[aid] = annot_index annot_index += 1 image_index += 1 for dataset in output_dict: annots = output_dict[dataset]['annotations'] for index in range(len(annots)): annot = annots[index] # Map internal aids to external annot index individual_ids = annot['individual_ids'] individual_ids_ = [] for individual_id in individual_ids: if individual_id not in aid_dict: continue individual_id_ = aid_dict[individual_id] individual_ids_.append(individual_id_) annot['individual_ids'] = individual_ids_ # Map reviews if include_reviews: review_ids = annot['review_ids'] review_ids_ = [] for review in review_ids: review_id, review_decision = review if review_id not in aid_dict: continue review_id_ = aid_dict[review_id] review_ = ( review_id_, review_decision, ) review_ids_.append(review_) annot['review_ids'] = review_ids_ # Store output_dict[dataset]['annotations'][index] = annot for dataset in output_dict: json_filename = 'instances_%s%s.json' % (dataset, current_year) json_filepath = join(annotdir, json_filename) with open(json_filepath, 'w') as json_file: json.dump(output_dict[dataset], json_file) logger.info('...completed') return datadir
[docs]@register_ibs_method def imageset_train_test_split( ibs, train_split=0.8, is_tile=False, gid_list=None, **kwargs ): from random import shuffle if gid_list is None: gid_list = ibs.get_valid_gids(is_tile=is_tile) aids_list = ibs.get_image_aids(gid_list) distro_dict = {} for gid, aid_list in zip(gid_list, aids_list): total = len(aid_list) if total not in distro_dict: distro_dict[total] = [] distro_dict[total].append(gid) logger.info('Processing train/test imagesets...') global_train_list = [] global_test_list = [] for distro, gid_list_ in distro_dict.items(): total = len(gid_list_) shuffle(gid_list_) split_index = total * (1.0 - train_split) + 1e-9 # weird if split_index < 1.0: split_index = total / 2 else: split_index = np.around(split_index) split_index = int(split_index) args = ( distro, total, split_index, ) logger.info('\tnum aids distro: %d - total: %d - split_index: %d' % args) train_list = gid_list_[split_index:] test_list = gid_list_[:split_index] args = ( len(test_list), len(train_list), len(train_list) / total, ) logger.info('\ttest: %d\n\ttrain: %d\n\tsplit: %0.04f' % args) global_train_list.extend(train_list) global_test_list.extend(test_list) args = ( len(global_train_list), len(global_test_list), len(global_train_list) + len(global_test_list), len(global_train_list) / len(gid_list), train_split, ) train_imgsetid = ibs.add_imagesets('TRAIN_SET') test_imgsetid = ibs.add_imagesets('TEST_SET') temp_list = ibs.get_imageset_gids(train_imgsetid) ibs.unrelate_images_and_imagesets(temp_list, [train_imgsetid] * len(temp_list)) temp_list = ibs.get_imageset_gids(test_imgsetid) ibs.unrelate_images_and_imagesets(temp_list, [test_imgsetid] * len(temp_list)) ibs.set_image_imgsetids(global_train_list, [train_imgsetid] * len(global_train_list)) ibs.set_image_imgsetids(global_test_list, [test_imgsetid] * len(global_test_list)) logger.info('Complete... %d train + %d test = %d (%0.04f %0.04f)' % args)
[docs]@register_ibs_method def localizer_distributions(ibs, threshold=10, dataset=None): # Process distributions of densities if dataset is None: gid_list = ibs.get_valid_gids() else: assert dataset in ['TRAIN_SET', 'TEST_SET'] imageset_id = ibs.get_imageset_imgsetids_from_text(dataset) gid_list = list(set(ibs.get_imageset_gids(imageset_id))) aids_list = ibs.get_image_aids(gid_list) distro_dict = {} species_dict = {} for gid, aid_list in zip(gid_list, aids_list): total = len(aid_list) if total >= threshold: total = threshold if total not in distro_dict: distro_dict[total] = 0 total = '%s' % (total,) if total < threshold else '%s+' % (total,) distro_dict[total] += 1 for aid in aid_list: species = ibs.get_annot_species_texts(aid) viewpoint = ibs.get_annot_viewpoints(aid) if species not in species_dict: species_dict[species] = {} if viewpoint not in species_dict[species]: species_dict[species][viewpoint] = 0 species_dict[species][viewpoint] += 1 logger.info('Annotation density distribution (annotations per image)') for distro in sorted(distro_dict.keys()): logger.info('{:>6} annot(s): {:>5}'.format(distro, distro_dict[distro])) logger.info('') for species in sorted(species_dict.keys()): logger.info('Species viewpoint distribution: %r' % (species,)) viewpoint_dict = species_dict[species] total = 0 for viewpoint in const.VIEWTEXT_TO_VIEWPOINT_RADIANS: count = viewpoint_dict.get(viewpoint, 0) logger.info('{:>15}: {:>5}'.format(viewpoint, count)) total += count logger.info('TOTAL: %d\n' % (total,))
# visualize_distributions(distro_dict, threshold=threshold)
[docs]def visualize_distributions(distro_dict, threshold=10): import matplotlib.pyplot as plt key_list = sorted(distro_dict.keys()) threshold_str = '%d+' % (threshold,) label_list = [threshold_str if key == threshold else str(key) for key in key_list] size_list = [distro_dict[key] for key in key_list] color_list = ['yellowgreen', 'gold', 'lightskyblue', 'lightcoral'] explode = [0.0] + [0.0] * (len(size_list) - 1) plt.pie( size_list, explode=explode, labels=label_list, colors=color_list, autopct='%1.1f%%', shadow=True, startangle=90, ) plt.axis('equal') plt.show()
[docs]@register_ibs_method def visualize_pascal_voc_dataset( ibs, dataset_path, num_examples=30, randomize=False, write=True, write_path=None ): r"""Visualize the PASCAL VOC dataset. Args: ibs (IBEISController): dataset_path (str): the dataset path in the PASCAL VOC format num_examples (int, optional): the number of examples to draw randomize (bool, optional): if to randomize the visualization write (bool, optional): if to display or write the files CommandLine: python -m wbia.other.detectcore --test-visualize_pascal_voc_dataset Example: >>> # DISABLE_DOCTEST >>> from wbia.other.detectcore import * # NOQA >>> import wbia # NOQA >>> ibs = wbia.opendb('testdb1') >>> dataset_path = '/Users/jason.parham/Downloads/wilddog_data/data/VOCdevkit/VOC2020/' >>> # dataset_path = '/Users/jason.parham/Downloads/LearningData/' >>> # dataset_path = '/Users/jason.parham/Downloads/VOCdevkit/VOC2018/' >>> ibs.visualize_pascal_voc_dataset(dataset_path, randomize=True) """ from wbia.detecttools.wbiadata import IBEIS_Data import random num_examples = int(num_examples) assert num_examples > 0 dataset = IBEIS_Data(dataset_path) dataset.print_distribution() image_list = sorted(dataset.images) num_examples = min(num_examples, len(image_list)) if randomize: random.shuffle(image_list) if write_path is None: write_path = abspath(expanduser(join('~', 'Desktop'))) for image in image_list[:num_examples]: if write: write_filepath = join(write_path, image.filename) image = image.show(display=False) cv2.imwrite(write_filepath, image) else: image.show()
[docs]@register_ibs_method def classifier_visualize_training_localizations( ibs, classifier_weight_filepath, species_list=['zebra'], scheme=2, output_path=None, values=None, **kwargs, ): def _draw(image_dict, list_, color): for _ in list_: vals = _['gid'], _['xbr'], _['ybr'], _['xtl'], _['ytl'] gid, xbr, ybr, xtl, ytl = vals height, width = image_dict[gid].shape[:2] xbr = int(xbr * width) ybr = int(ybr * height) xtl = int(xtl * width) ytl = int(ytl * height) image = image_dict[gid] cv2.rectangle(image, (xtl, ytl), (xbr, ybr), color, 4) def _write_chips(chip_list, output_path_fmt_str): interpolation = cv2.INTER_LANCZOS4 warpkw = dict(interpolation=interpolation) chip_list = [] for _ in list_: vals = _['gid'], _['xbr'], _['ybr'], _['xtl'], _['ytl'] gid, xbr, ybr, xtl, ytl = vals height, width = image_dict[gid].shape[:2] xbr = int(xbr * width) ybr = int(ybr * height) xtl = int(xtl * width) ytl = int(ytl * height) image = image_dict[gid] # Get chips chip = image[ytl:ybr, xtl:xbr, :] chip = cv2.resize(chip, (192, 192), **warpkw) chip_list.append(chip) return chip_list # Get output path if output_path is None: output_path = abspath(expanduser(join('~', 'Desktop', 'output-bootstrap'))) ut.delete(output_path) ut.ensuredir(output_path) if values is None: # Load data logger.info('Loading pre-trained features for filtered localizations') train_gid_list = general_get_imageset_gids(ibs, 'TRAIN_SET', **kwargs) train_gid_list = train_gid_list[:10] config = { 'algo': '_COMBINED', 'species_set': set(species_list), 'features': True, 'feature2_algo': 'resnet', 'classify': True, 'classifier_algo': 'svm', 'classifier_weight_filepath': classifier_weight_filepath, 'nms': True, 'nms_thresh': 0.50, # 'thresh' : True, # 'index_thresh' : 0.25, } logger.info('\tGather Ground-Truth') gt_dict = general_parse_gt(ibs, test_gid_list=train_gid_list, **config) logger.info('\tGather Predictions') pred_dict = localizer_parse_pred(ibs, test_gid_list=train_gid_list, **config) logger.info('Mine proposals') reviewed_gid_dict = {} values = _bootstrap_mine( ibs, gt_dict, pred_dict, scheme, reviewed_gid_dict, **kwargs ) mined_gid_list, mined_gt_list, mined_pos_list, mined_neg_list = values logger.info('Prepare images') # Get images and a dictionary based on their gids image_list = ibs.get_images(mined_gid_list) image_dict = {gid: image for gid, image in zip(mined_gid_list, image_list)} # Draw positives list_ = mined_pos_list color = (0, 255, 0) chip_list = _draw(image_dict, list_, color) pos_path = join(output_path, 'positives') ut.ensuredir(pos_path) _write_chips(chip_list, join(pos_path, 'chips_pos_%05d.png')) # Draw negatives list_ = mined_neg_list color = (0, 0, 255) chip_list = _draw(image_dict, list_, color) neg_path = join(output_path, 'negatives') ut.ensuredir(neg_path) _write_chips(chip_list, join(neg_path, 'chips_neg_%05d.png')) # Draw positives list_ = mined_gt_list color = (255, 0, 0) _draw(image_dict, list_, color) logger.info('Write images to %r' % (output_path,)) # Write images to disk for gid in image_dict: output_filename = 'localizations_gid_%d.png' % (gid,) output_filepath = join(output_path, output_filename) cv2.imwrite(output_filepath, image_dict[gid])
[docs]@register_ibs_method def redownload_detection_models(ibs): r"""Re-download detection models. Args: ibs (IBEISController): CommandLine: python -c "from wbia.algo.detect import grabmodels; grabmodels.redownload_models()" python -c "import utool, wbia.algo; utool.view_directory(wbia.algo.detect.grabmodels._expand_modeldir())" Example: >>> # DISABLE_DOCTEST >>> from wbia.other.detectcore import * # NOQA >>> import wbia # NOQA >>> ibs = wbia.opendb('testdb1') >>> result = redownload_detection_models(ibs) >>> print(result) """ logger.info('[other.detectcore] redownload_detection_models') from wbia.algo.detect import grabmodels modeldir = ibs.get_detect_modeldir() grabmodels.redownload_models(modeldir=modeldir)
[docs]@register_ibs_method def view_model_dir(ibs): logger.info('[other.detectcore] redownload_detection_models') modeldir = ibs.get_detect_modeldir() ut.view_directory(modeldir)
# grabmodels.redownload_models(modeldir=modeldir) def _bootstrap_mine( ibs, gt_dict, pred_dict, scheme, reviewed_gid_dict, min_overlap=0.75, max_overlap=0.25, ): import random ################################################################################## # Step 7.5: gather SVM training data from overlap images # note that this step randomly subsamples new negatives, so # each SVM in the ensemble is given a different set of negatives mined_gid_list = [] mined_gt_list = [] mined_pos_list = [] mined_neg_list = [] for image_uuid in gt_dict: # logger.info('--- Processing user interaction for image %r' % (image_uuid, )) # Get the gt and prediction list gt_list = gt_dict[image_uuid] pred_list = pred_dict[image_uuid] # If never seen this image before, pick a new selection of GT bboxes image_gid = ibs.get_image_gids_from_uuid(image_uuid) if image_gid not in reviewed_gid_dict: # Simulate the user selecting the gt bounding box(es) index_list = list(range(len(gt_list))) if scheme == 1: # Pick a random bbox index_list_ = [random.choice(index_list)] elif scheme == 2: # Pick all gt boxes index_list_ = index_list[:] else: raise ValueError reviewed_gid_dict[image_gid] = index_list_ # Filter based on picked bboxes for gt picked_index_list = reviewed_gid_dict[image_gid] gt_list_ = [gt_list[picked_index] for picked_index in picked_index_list] # Calculate overlap overlap = general_overlap(gt_list_, pred_list) num_gt, num_pred = overlap.shape if num_gt == 0 or num_pred == 0: continue else: pos_idx_list = np.where(overlap >= min_overlap)[1] neg_idx_list = np.where(overlap <= max_overlap)[1] num_pos = len(pos_idx_list) num_neg = len(neg_idx_list) # Randomly sample negative chips to get new candidates # Most of the time (like almost always will happen) if num_neg > num_pos: np.random.shuffle(neg_idx_list) neg_idx_list = neg_idx_list[:num_pos] mined_gid_list.append(image_gid) mined_gt_list += gt_list_ mined_pos_list += [pred_list[idx] for idx in pos_idx_list] mined_neg_list += [pred_list[idx] for idx in neg_idx_list] args = ( len(mined_pos_list), len(mined_neg_list), len(mined_gid_list), ) logger.info('Mined %d positive, %d negative from %d images' % args) return mined_gid_list, mined_gt_list, mined_pos_list, mined_neg_list
[docs]@register_ibs_method def visualize_ground_truth(ibs, config, **kwargs): visualize_bounding_boxes(ibs, config, 'ground_truth', **kwargs)
[docs]@register_ibs_method def visualize_predictions(ibs, config, **kwargs): visualize_bounding_boxes(ibs, config, 'prediction', **kwargs)
[docs]def visualize_bounding_boxes( ibs, config, version, gid_list=None, randomize=False, num_images=10, t_width=500, output_path=None, ): if gid_list is None: gid_list = general_get_imageset_gids(ibs, 'TEST_SET', **config) else: num_images = None if randomize: random.shuffle(gid_list) if num_images not in [-1, None]: num_images = min(num_images, len(gid_list)) gid_list = gid_list[:num_images] uuid_list = ibs.get_image_uuids(gid_list) assert version is not None version = version.lower() if version == 'prediction': logger.info('\tGather Predictions') val_dict = localizer_parse_pred(ibs, test_gid_list=gid_list, **config) elif version == 'ground_truth': logger.info('\tGather Ground-Truth') val_dict = general_parse_gt(ibs, test_gid_list=gid_list, **config) if output_path is None: output_path = abspath(expanduser(join('~', 'Desktop', 'bboxes'))) ut.ensuredir(output_path) filepath_dict = {} for gid, image_uuid in zip(gid_list, uuid_list): image = ibs.get_images(gid) image = _resize(image, t_width=t_width) h, w, c = image.shape val_list = val_dict[image_uuid] for val in val_list: xbr = int(np.around(val['xbr'] * w)) ybr = int(np.around(val['ybr'] * h)) xtl = int(np.around(val['xtl'] * w)) ytl = int(np.around(val['ytl'] * h)) cv2.rectangle(image, (xtl, ytl), (xbr, ybr), (0, 140, 255), 4) write_filename = 'bboxes_%d_%s.png' % (gid, version) write_filepath = join(output_path, write_filename) logger.info(write_filepath) cv2.imwrite(write_filepath, image) filepath_dict[gid] = write_filepath return filepath_dict