Source code for wbia.other.dbinfo

# -*- coding: utf-8 -*-
"""
get_dbinfo is probably the only usefull funciton in here
# This is not the cleanest module
"""
# TODO: ADD COPYRIGHT TAG
import logging
from wbia import constants as const
import collections
import functools
import numpy as np
import utool as ut
import matplotlib.pyplot as plt
from os.path import join, abspath, split


print, rrr, profile = ut.inject2(__name__)
logger = logging.getLogger('wbia')





[docs]def get_dbinfo( ibs, verbose=True, with_imgsize=True, with_bytes=True, with_contrib=True, with_agesex=True, with_header=True, with_reviews=True, with_ggr=False, with_ca=False, with_map=False, short=False, tag='dbinfo', aid_list=None, aids=None, gmt_offset=3.0, ): """ Returns dictionary of digestable database information Infostr is a string summary of all the stats. Prints infostr in addition to returning locals Args: ibs (IBEISController): verbose (bool): with_imgsize (bool): with_bytes (bool): Returns: dict: SeeAlso: python -m wbia.other.ibsfuncs --exec-get_annot_stats_dict --db PZ_PB_RF_TRAIN --use-hist=True --old=False --per_name_vpedge=False python -m wbia.other.ibsfuncs --exec-get_annot_stats_dict --db PZ_PB_RF_TRAIN --all CommandLine: python -m wbia.other.dbinfo --exec-get_dbinfo:0 python -m wbia.other.dbinfo --test-get_dbinfo:1 python -m wbia.other.dbinfo --test-get_dbinfo:0 --db NNP_Master3 python -m wbia.other.dbinfo --test-get_dbinfo:0 --db PZ_Master1 python -m wbia.other.dbinfo --test-get_dbinfo:0 --db GZ_ALL python -m wbia.other.dbinfo --exec-get_dbinfo:0 --db PZ_ViewPoints python -m wbia.other.dbinfo --exec-get_dbinfo:0 --db GZ_Master1 python -m wbia.other.dbinfo --exec-get_dbinfo:0 --db LF_Bajo_bonito -a default python -m wbia.other.dbinfo --exec-get_dbinfo:0 --db DETECT_SEATURTLES -a default --readonly python -m wbia.other.dbinfo --exec-get_dbinfo:0 -a ctrl python -m wbia.other.dbinfo --exec-get_dbinfo:0 -a default:minqual=ok,require_timestamp=True --dbdir ~/lev/media/danger/LEWA python -m wbia.other.dbinfo --exec-get_dbinfo:0 -a default:minqual=ok,require_timestamp=True --dbdir ~/lev/media/danger/LEWA --loadbackup=0 python -m wbia.other.dbinfo --exec-get_dbinfo:0 -a default: --dbdir ~/lev/media/danger/LEWA python -m wbia.other.dbinfo --exec-get_dbinfo:0 -a default: --dbdir ~/lev/media/danger/LEWA --loadbackup=0 Example: >>> # DISABLE_DOCTEST >>> from wbia.other.dbinfo import * # NOQA >>> import wbia >>> defaultdb = 'testdb1' >>> ibs, aid_list = wbia.testdata_aids(defaultdb, a='default:minqual=ok,view=primary,view_ext1=1') >>> kwargs = ut.get_kwdefaults(get_dbinfo) >>> kwargs['verbose'] = False >>> kwargs['aid_list'] = aid_list >>> kwargs = ut.parse_dict_from_argv(kwargs) >>> output = get_dbinfo(ibs, **kwargs) >>> result = (output['info_str']) >>> print(result) >>> #ibs = wbia.opendb(defaultdb='testdb1') >>> # <HACK FOR FILTERING> >>> #from wbia.expt import cfghelpers >>> #from wbia.expt import annotation_configs >>> #from wbia.init import filter_annots >>> #named_defaults_dict = ut.dict_take(annotation_configs.__dict__, >>> # annotation_configs.TEST_NAMES) >>> #named_qcfg_defaults = dict(zip(annotation_configs.TEST_NAMES, >>> # ut.get_list_column(named_defaults_dict, 'qcfg'))) >>> #acfg = cfghelpers.parse_argv_cfg(('--annot-filter', '-a'), named_defaults_dict=named_qcfg_defaults, default=None)[0] >>> #aid_list = ibs.get_valid_aids() >>> # </HACK FOR FILTERING> Example: >>> # DISABLE_DOCTEST >>> from wbia.other.dbinfo import * # NOQA >>> import wbia >>> verbose = True >>> short = True >>> #ibs = wbia.opendb(db='GZ_ALL') >>> #ibs = wbia.opendb(db='PZ_Master0') >>> ibs = wbia.opendb('testdb1') >>> assert ibs.get_dbname() == 'testdb1', 'DO NOT DELETE CONTRIBUTORS OF OTHER DBS' >>> ibs.delete_contributors(ibs.get_valid_contributor_rowids()) >>> ibs.delete_empty_nids() >>> #ibs = wbia.opendb(db='PZ_MTEST') >>> output = get_dbinfo(ibs, with_contrib=False, verbose=False, short=True) >>> result = (output['info_str']) >>> print(result) +============================ DB Info: testdb1 DB Notes: None DB NumContrib: 0 ---------- # Names = 7 # Names (unassociated) = 0 # Names (singleton) = 5 # Names (multiton) = 2 ---------- # Annots = 13 # Annots (unknown) = 4 # Annots (singleton) = 5 # Annots (multiton) = 4 ---------- # Img = 13 L============================ """ # TODO Database size in bytes # TODO: occurrence, contributors, etc... if aids is not None: aid_list = aids # Basic variables request_annot_subset = False _input_aid_list = aid_list # NOQA if aid_list is None: valid_aids = ibs.get_valid_aids() else: if isinstance(aid_list, str): # Hack to get experiment stats on aids acfg_name_list = [aid_list] logger.info('Specified custom aids via acfgname %s' % (acfg_name_list,)) from wbia.expt import experiment_helpers acfg_list, expanded_aids_list = experiment_helpers.get_annotcfg_list( ibs, acfg_name_list ) aid_list = sorted(list(set(ut.flatten(ut.flatten(expanded_aids_list))))) if verbose: logger.info('Specified %d custom aids' % (len(aid_list))) request_annot_subset = True valid_aids = aid_list def get_dates(ibs, gid_list): unixtime_list = ibs.get_image_unixtime2(gid_list) unixtime_list = [unixtime + (gmt_offset * 60 * 60) for unixtime in unixtime_list] datetime_list = [ ut.unixtime_to_datetimestr(unixtime) if unixtime is not None else 'UNKNOWN' for unixtime in unixtime_list ] date_str_list = [value[:10] for value in datetime_list] return date_str_list if with_ggr: request_annot_subset = True valid_gids = list(set(ibs.get_annot_gids(valid_aids))) date_str_list = get_dates(ibs, valid_gids) flag_list = [ value in ['2016/01/30', '2016/01/31', '2018/01/27', '2018/01/28'] for value in date_str_list ] valid_gids = ut.compress(valid_gids, flag_list) ggr_aids = set(ut.flatten(ibs.get_image_aids(valid_gids))) valid_aids = sorted(list(set(valid_aids) & ggr_aids)) valid_nids = list( set(ibs.get_annot_nids(valid_aids, distinguish_unknowns=False)) - {const.UNKNOWN_NAME_ROWID} ) valid_gids = list(set(ibs.get_annot_gids(valid_aids))) # valid_rids = ibs._get_all_review_rowids() valid_rids = [] valid_rids += ibs.get_review_rowids_from_aid1(valid_aids) valid_rids += ibs.get_review_rowids_from_aid2(valid_aids) valid_rids = ut.flatten(valid_rids) valid_rids = list(set(valid_rids)) num_all_total_reviews = len(valid_rids) aids_tuple = ibs.get_review_aid_tuple(valid_rids) flag_list = [] for aid_tuple in aids_tuple: aid1, aid2 = aid_tuple flag = aid1 in valid_aids and aid2 in valid_aids flag_list.append(flag) valid_rids = ut.compress(valid_rids, flag_list) # associated_nids = ibs.get_valid_nids(filter_empty=True) # nids with at least one annotation valid_images = ibs.images(valid_gids) valid_annots = ibs.annots(valid_aids) # Image info if verbose: logger.info('Checking Image Info') gx2_aids = valid_images.aids if request_annot_subset: # remove annots not in this subset valid_aids_set = set(valid_aids) gx2_aids = [list(set(aids_).intersection(valid_aids_set)) for aids_ in gx2_aids] gx2_nAnnots = np.array(list(map(len, gx2_aids))) image_without_annots = len(np.where(gx2_nAnnots == 0)[0]) gx2_nAnnots_stats = ut.repr4( ut.get_stats(gx2_nAnnots, use_median=True), nl=0, precision=2, si=True ) image_reviewed_list = ibs.get_image_reviewed(valid_gids) # Name stats if verbose: logger.info('Checking Name Info') nx2_aids = ibs.get_name_aids(valid_nids) if request_annot_subset: # remove annots not in this subset valid_aids_set = set(valid_aids) nx2_aids = [list(set(aids_).intersection(valid_aids_set)) for aids_ in nx2_aids] associated_nids = ut.compress(valid_nids, list(map(len, nx2_aids))) ibs.check_name_mapping_consistency(nx2_aids) # Occurrence Info def compute_annot_occurrence_ids(ibs, aid_list, config): from wbia.algo.preproc import preproc_occurrence import utool as ut gid_list = ibs.get_annot_gids(aid_list) gid2_aids = ut.group_items(aid_list, gid_list) flat_imgsetids, flat_gids = preproc_occurrence.wbia_compute_occurrences( ibs, gid_list, config=config, verbose=False ) occurid2_gids = ut.group_items(flat_gids, flat_imgsetids) occurid2_aids = { oid: ut.flatten(ut.take(gid2_aids, gids)) for oid, gids in occurid2_gids.items() } return occurid2_aids nids = ibs.get_annot_nids(valid_aids) nid2_annotxs = ut.ddict(set) for aid, nid in zip(valid_aids, nids): if nid >= 0: nid2_annotxs[nid].add(aid) occurence_config = {'use_gps': True, 'seconds_thresh': 10 * 60} occurid2_aids = compute_annot_occurrence_ids(ibs, valid_aids, config=occurence_config) aid2_occurxs = ut.ddict(set) occurid2_aids_named = ut.ddict(set) occurid2_nids = ut.ddict(set) for occurx, aids in occurid2_aids.items(): nids = ibs.get_annot_nids(aids) for aid, nid in zip(aids, nids): if nid >= 0: aid2_occurxs[aid].add(occurx) occurid2_aids_named[occurx].add(aid) occurid2_nids[occurx].add(nid) # assert sorted(set(list(map(len, aid2_occurxs.values())))) == [1] occur_nids = ibs.unflat_map(ibs.get_annot_nids, occurid2_aids.values()) occur_unique_nids = [ut.unique(nids) for nids in occur_nids] nid2_occurxs = ut.ddict(set) for occurx, nids in enumerate(occur_unique_nids): for nid in nids: if nid >= 0: nid2_occurxs[nid].add(occurx) name_annot_stats = ut.get_stats( list(map(len, nid2_annotxs.values())), use_median=True, use_sum=True ) occurence_annot_stats = ut.get_stats( list(map(len, occurid2_aids_named.values())), use_median=True, use_sum=True ) occurence_encounter_stats = ut.get_stats( list(map(len, occurid2_nids.values())), use_median=True, use_sum=True ) annot_encounter_stats = ut.get_stats( list(map(len, nid2_occurxs.values())), use_median=True, use_sum=True ) if verbose: logger.info('Checking Annot Species') unknown_annots = valid_annots.compress(ibs.is_aid_unknown(valid_annots)) species_list = valid_annots.species_texts species2_annots = valid_annots.group_items(valid_annots.species_texts) species2_nAids = {key: len(val) for key, val in species2_annots.items()} if verbose: logger.info('Checking Multiton/Singleton Species') nx2_nAnnots = np.array(list(map(len, nx2_aids))) # Seperate singleton / multitons multiton_nxs = np.where(nx2_nAnnots > 1)[0] singleton_nxs = np.where(nx2_nAnnots == 1)[0] unassociated_nxs = np.where(nx2_nAnnots == 0)[0] assert len(np.intersect1d(singleton_nxs, multiton_nxs)) == 0, 'intersecting names' valid_nxs = np.hstack([multiton_nxs, singleton_nxs]) num_names_with_gt = len(multiton_nxs) # Annot Info if verbose: logger.info('Checking Annot Info') multiton_aids_list = ut.take(nx2_aids, multiton_nxs) assert len(set(multiton_nxs)) == len(multiton_nxs) if len(multiton_aids_list) == 0: multiton_aids = np.array([], dtype=np.int) else: multiton_aids = np.hstack(multiton_aids_list) assert len(set(multiton_aids)) == len(multiton_aids), 'duplicate annot' singleton_aids = ut.take(nx2_aids, singleton_nxs) multiton_nid2_nannots = list(map(len, multiton_aids_list)) # Image size stats if with_imgsize: if verbose: logger.info('Checking ImageSize Info') gpath_list = ibs.get_image_paths(valid_gids) def wh_print_stats(wh_list): if len(wh_list) == 0: return '{empty}' wh_list = np.asarray(wh_list) stat_dict = collections.OrderedDict( [ ('max', wh_list.max(0)), ('min', wh_list.min(0)), ('mean', wh_list.mean(0)), ('std', wh_list.std(0)), ] ) def arr2str(var): return '[' + (', '.join(list(map(lambda x: '%.1f' % x, var)))) + ']' ret = ',\n '.join( ['%s:%s' % (key, arr2str(val)) for key, val in stat_dict.items()] ) return '{\n ' + ret + '\n}' logger.info('reading image sizes') # Image size stats img_size_list = ibs.get_image_sizes(valid_gids) img_size_stats = wh_print_stats(img_size_list) # Chip size stats annotation_bbox_list = ibs.get_annot_bboxes(valid_aids) annotation_bbox_arr = np.array(annotation_bbox_list) if len(annotation_bbox_arr) == 0: annotation_size_list = [] else: annotation_size_list = annotation_bbox_arr[:, 2:4] chip_size_stats = wh_print_stats(annotation_size_list) imgsize_stat_lines = [ (' # Img in dir = %d' % len(gpath_list)), (' Image Size Stats = %s' % (img_size_stats,)), (' * Chip Size Stats = %s' % (chip_size_stats,)), ] else: imgsize_stat_lines = [] if verbose: logger.info('Building Stats String') multiton_stats = ut.repr3( ut.get_stats(multiton_nid2_nannots, use_median=True), nl=0, precision=2, si=True ) # Time stats unixtime_list = valid_images.unixtime2 unixtime_list = [unixtime + (gmt_offset * 60 * 60) for unixtime in unixtime_list] # valid_unixtime_list = [time for time in unixtime_list if time != -1] # unixtime_statstr = ibs.get_image_time_statstr(valid_gids) if ut.get_argflag('--hackshow-unixtime'): show_time_distributions(ibs, unixtime_list) ut.show_if_requested() unixtime_statstr = ut.repr3(ut.get_timestats_dict(unixtime_list, full=True), si=True) date_str_list = get_dates(ibs, valid_gids) ggr_dates_stats = ut.dict_hist(date_str_list) # GPS stats gps_list_ = ibs.get_image_gps(valid_gids) gpsvalid_list = [gps != (-1, -1) for gps in gps_list_] gps_list = ut.compress(gps_list_, gpsvalid_list) if with_map: def plot_kenya(ibs, ax, gps_list=[], focus=False, focus2=False, margin=0.1): import utool as ut import pandas as pd import geopandas import shapely if focus2: focus = True world = geopandas.read_file( geopandas.datasets.get_path('naturalearth_lowres') ) africa = world[world.continent == 'Africa'] kenya = africa[africa.name == 'Kenya'] cities = geopandas.read_file( geopandas.datasets.get_path('naturalearth_cities') ) nairobi = cities[cities.name == 'Nairobi'] kenya.plot(ax=ax, color='white', edgecolor='black') path_dict = ibs.compute_ggr_path_dict() meru = path_dict['County Meru'] for key in path_dict: path = path_dict[key] polygon = shapely.geometry.Polygon(path.vertices[:, ::-1]) gdf = geopandas.GeoDataFrame([1], geometry=[polygon], crs=world.crs) if key.startswith('County'): if 'Meru' in key: gdf.plot(ax=ax, color=(1, 0, 0, 0.2), edgecolor='red') else: gdf.plot(ax=ax, color='grey', edgecolor='black') if focus: if key.startswith('Land Tenure'): gdf.plot(ax=ax, color=(1, 0, 0, 0.0), edgecolor='blue') if focus2: flag_list = [] for gps in gps_list: flag = meru.contains_point(gps) flag_list.append(flag) gps_list = ut.compress(gps_list, flag_list) df = pd.DataFrame( { 'Latitude': ut.take_column(gps_list, 0), 'Longitude': ut.take_column(gps_list, 1), } ) gdf = geopandas.GeoDataFrame( df, geometry=geopandas.points_from_xy(df.Longitude, df.Latitude) ) gdf.plot(ax=ax, color='red') min_lat, min_lon = gdf.min() max_lat, max_lon = gdf.max() dom_lat = max_lat - min_lat dom_lon = max_lon - min_lon margin_lat = dom_lat * margin margin_lon = dom_lon * margin min_lat -= margin_lat min_lon -= margin_lon max_lat += margin_lat max_lon += margin_lon polygon = shapely.geometry.Polygon( [ [min_lon, min_lat], [min_lon, max_lat], [max_lon, max_lat], [max_lon, min_lat], ] ) gdf = geopandas.GeoDataFrame([1], geometry=[polygon], crs=world.crs) gdf.plot(ax=ax, color=(1, 0, 0, 0.0), edgecolor='blue') nairobi.plot(ax=ax, marker='*', color='black', markersize=500) ax.grid(False, which='major') ax.grid(False, which='minor') ax.get_xaxis().set_ticks([]) ax.get_yaxis().set_ticks([]) if focus: ax.set_autoscalex_on(False) ax.set_autoscaley_on(False) ax.set_xlim([min_lon, max_lon]) ax.set_ylim([min_lat, max_lat]) fig = plt.figure(figsize=(30, 30), dpi=400) ax = plt.subplot(131) plot_kenya(ibs, ax, gps_list) ax = plt.subplot(132) plot_kenya(ibs, ax, gps_list, focus=True) ax = plt.subplot(133) plot_kenya(ibs, ax, gps_list, focus2=True) plt.savefig('map.png', bbox_inches='tight') def get_annot_age_stats(aid_list): annot_age_months_est_min = ibs.get_annot_age_months_est_min(aid_list) annot_age_months_est_max = ibs.get_annot_age_months_est_max(aid_list) age_dict = ut.ddict((lambda: 0)) for min_age, max_age in zip(annot_age_months_est_min, annot_age_months_est_max): if max_age is None: max_age = min_age if min_age is None: min_age = max_age if max_age is None and min_age is None: logger.info('Found UNKNOWN Age: %r, %r' % (min_age, max_age)) age_dict['UNKNOWN'] += 1 elif (min_age is None or min_age < 12) and max_age < 12: age_dict['Infant'] += 1 elif 12 <= min_age and min_age < 36 and 12 <= max_age and max_age < 36: age_dict['Juvenile'] += 1 elif 36 <= min_age and (max_age is None or 36 <= max_age): age_dict['Adult'] += 1 return age_dict def get_annot_sex_stats(aid_list): annot_sextext_list = ibs.get_annot_sex_texts(aid_list) sextext2_aids = ut.group_items(aid_list, annot_sextext_list) sex_keys = list(ibs.const.SEX_TEXT_TO_INT.keys()) assert set(sex_keys) >= set(annot_sextext_list), 'bad keys: ' + str( set(annot_sextext_list) - set(sex_keys) ) sextext2_nAnnots = ut.odict( [(key, len(sextext2_aids.get(key, []))) for key in sex_keys] ) # Filter 0's sextext2_nAnnots = {key: val for key, val in sextext2_nAnnots.items() if val != 0} return sextext2_nAnnots def get_annot_qual_stats(ibs, aid_list): annots = ibs.annots(aid_list) qualtext2_nAnnots = ut.order_dict_by( ut.map_vals(len, annots.group_items(annots.quality_texts)), list(ibs.const.QUALITY_TEXT_TO_INT.keys()), ) return qualtext2_nAnnots def get_annot_viewpoint_stats(ibs, aid_list): annots = ibs.annots(aid_list) viewcode2_nAnnots = ut.order_dict_by( ut.map_vals(len, annots.group_items(annots.viewpoint_code)), list(ibs.const.VIEW.CODE_TO_INT.keys()) + [None], ) return viewcode2_nAnnots if verbose: logger.info('Checking Other Annot Stats') qualtext2_nAnnots = get_annot_qual_stats(ibs, valid_aids) viewcode2_nAnnots = get_annot_viewpoint_stats(ibs, valid_aids) agetext2_nAnnots = get_annot_age_stats(valid_aids) sextext2_nAnnots = get_annot_sex_stats(valid_aids) if verbose: logger.info('Checking Contrib Stats') # Contributor Statistics # hack remove colon for image alignment def fix_tag_list(tag_list): return [None if tag is None else tag.replace(':', ';') for tag in tag_list] image_contributor_tags = fix_tag_list(ibs.get_image_contributor_tag(valid_gids)) annot_contributor_tags = fix_tag_list(ibs.get_annot_image_contributor_tag(valid_aids)) contributor_tag_to_gids = ut.group_items(valid_gids, image_contributor_tags) contributor_tag_to_aids = ut.group_items(valid_aids, annot_contributor_tags) contributor_tag_to_qualstats = { key: get_annot_qual_stats(ibs, aids) for key, aids in contributor_tag_to_aids.items() } contributor_tag_to_viewstats = { key: get_annot_viewpoint_stats(ibs, aids) for key, aids in contributor_tag_to_aids.items() } contributor_tag_to_nImages = { key: len(val) for key, val in contributor_tag_to_gids.items() } contributor_tag_to_nAnnots = { key: len(val) for key, val in contributor_tag_to_aids.items() } if verbose: logger.info('Summarizing') # Summarize stats num_names = len(valid_nids) num_names_unassociated = len(valid_nids) - len(associated_nids) num_names_singleton = len(singleton_nxs) num_names_multiton = len(multiton_nxs) num_singleton_annots = len(singleton_aids) num_multiton_annots = len(multiton_aids) num_unknown_annots = len(unknown_annots) num_annots = len(valid_aids) if with_bytes: if verbose: logger.info('Checking Disk Space') ibsdir_space = ut.byte_str2(ut.get_disk_space(ibs.get_ibsdir())) dbdir_space = ut.byte_str2(ut.get_disk_space(ibs.get_dbdir())) imgdir_space = ut.byte_str2(ut.get_disk_space(ibs.get_imgdir())) cachedir_space = ut.byte_str2(ut.get_disk_space(ibs.get_cachedir())) if True: if verbose: logger.info('Check asserts') try: bad_aids = np.intersect1d(multiton_aids, unknown_annots) _num_names_total_check = ( num_names_singleton + num_names_unassociated + num_names_multiton ) _num_annots_total_check = ( num_unknown_annots + num_singleton_annots + num_multiton_annots ) assert len(bad_aids) == 0, 'intersecting multiton aids and unknown aids' assert _num_names_total_check == num_names, 'inconsistent num names' # if not request_annot_subset: # dont check this if you have an annot subset # assert _num_annots_total_check == num_annots, 'inconsistent num annots' except Exception as ex: ut.printex( ex, keys=[ '_num_names_total_check', 'num_names', '_num_annots_total_check', 'num_annots', 'num_names_singleton', 'num_names_multiton', 'num_unknown_annots', 'num_multiton_annots', 'num_singleton_annots', ], ) raise # Get contributor statistics contributor_rowids = ibs.get_valid_contributor_rowids() num_contributors = len(contributor_rowids) if verbose: logger.info('Checking Review Info') # Get reviewer statistics def get_review_decision_stats(ibs, rid_list): review_decision_list = ibs.get_review_decision_str(rid_list) review_decision_to_rids = ut.group_items(rid_list, review_decision_list) review_decision_stats = { key: len(val) for key, val in review_decision_to_rids.items() } return review_decision_stats def get_review_identity(rid_list): review_identity_list = ibs.get_review_identity(rid_list) review_identity_list = [ value.replace('user:web', 'human:web') .replace('web:None', 'web') .replace('auto_clf', 'vamp') .replace(':', '[') + ']' for value in review_identity_list ] return review_identity_list def get_review_identity_stats(ibs, rid_list): review_identity_list = get_review_identity(rid_list) review_identity_to_rids = ut.group_items(rid_list, review_identity_list) review_identity_stats = { key: len(val) for key, val in review_identity_to_rids.items() } return review_identity_to_rids, review_identity_stats def get_review_participation( review_aids_list, value_list, aid2_occurxs, nid2_occurxs ): annot_review_participation_dict = {} encounter_review_participation_dict = {} review_aid_list = ut.flatten(review_aids_list) review_nid_list = ibs.get_annot_nids(review_aid_list) review_aid_nid_dict = dict(zip(review_aid_list, review_nid_list)) known_aids = set(aid2_occurxs.keys()) known_encounters = set([]) for nid, occurxs in nid2_occurxs.items(): for occurx in occurxs: encounter = '%s,%s' % ( occurx, nid, ) known_encounters.add(encounter) for review_aids, value in list(zip(review_aids_list, value_list)): for value_ in [value, 'Any']: enc_values_ = [ (None, value_), (True, '%s (INTRA)' % (value_)), (False, '%s (INTER)' % (value_)), ] review_nids = ut.take(review_aid_nid_dict, review_aids) review_occurxs = ut.flatten(ut.take(aid2_occurxs, review_aids)) is_intra = len(set(review_occurxs)) == 1 if value_ not in annot_review_participation_dict: annot_review_participation_dict[value_] = { '__KNOWN__': known_aids, '__HIT__': set([]), } for env_flag_, enc_value_ in enc_values_: if enc_value_ not in encounter_review_participation_dict: encounter_review_participation_dict[enc_value_] = { '__KNOWN__': known_encounters, '__HIT__': set([]), } for aid, nid, occurx in zip(review_aids, review_nids, review_occurxs): encounter = '%s,%s' % ( occurx, nid, ) annot_review_participation_dict[value_]['__HIT__'].add(aid) if aid not in annot_review_participation_dict[value_]: annot_review_participation_dict[value_][aid] = 0 annot_review_participation_dict[value_][aid] += 1 for env_flag_, enc_value_ in enc_values_: if env_flag_ in [None, is_intra]: encounter_review_participation_dict[enc_value_][ '__HIT__' ].add(encounter) if ( encounter not in encounter_review_participation_dict[enc_value_] ): encounter_review_participation_dict[enc_value_][ encounter ] = 0 encounter_review_participation_dict[enc_value_][ encounter ] += 1 for review_participation_dict in [ annot_review_participation_dict, encounter_review_participation_dict, ]: for value in review_participation_dict: known_values = review_participation_dict[value].pop('__KNOWN__') hit_values = review_participation_dict[value].pop('__HIT__') missed_values = known_values - hit_values values = list(review_participation_dict[value].values()) stats = ut.get_stats(values, use_median=True, use_sum=True) stats['known'] = len(known_values) stats['hit'] = len(hit_values) stats['miss'] = len(missed_values) review_participation_dict[value] = stats return annot_review_participation_dict, encounter_review_participation_dict review_decision_stats = get_review_decision_stats(ibs, valid_rids) review_identity_to_rids, review_identity_stats = get_review_identity_stats( ibs, valid_rids ) review_identity_to_decision_stats = { key: get_review_decision_stats(ibs, aids) for key, aids in review_identity_to_rids.items() } review_aids_list = ibs.get_review_aid_tuple(valid_rids) review_decision_list = ibs.get_review_decision_str(valid_rids) review_identity_list = get_review_identity(valid_rids) ( review_decision_annot_participation_dict, review_decision_encounter_participation_dict, ) = get_review_participation( review_aids_list, review_decision_list, aid2_occurxs, nid2_occurxs ) ( review_identity_annot_participation_dict, review_identity_encounter_participation_dict, ) = get_review_participation( review_aids_list, review_identity_list, aid2_occurxs, nid2_occurxs ) review_tags_list = ibs.get_review_tags(valid_rids) review_tag_list = [ review_tag if review_tag is None else '+'.join(sorted(review_tag)) for review_tag in review_tags_list ] review_tag_to_rids = ut.group_items(valid_rids, review_tag_list) review_tag_stats = {key: len(val) for key, val in review_tag_to_rids.items()} if with_ca: species_list = ibs.get_annot_species_texts(valid_aids) viewpoint_list = ibs.get_annot_viewpoints(valid_aids) quality_list = ibs.get_annot_qualities(valid_aids) interest_list = ibs.get_annot_interest(valid_aids) canonical_list = ibs.get_annot_canonical(valid_aids) # ggr_num_relevant = 0 ggr_num_species = 0 ggr_num_viewpoints = 0 ggr_num_qualities = 0 ggr_num_filter = 0 ggr_num_aois = 0 ggr_num_cas = 0 ggr_num_filter_overlap = 0 ggr_num_filter_remove = 0 ggr_num_filter_add = 0 ggr_num_aoi_overlap = 0 ggr_num_aoi_remove = 0 ggr_num_aoi_add = 0 zipped = list( zip( valid_aids, species_list, viewpoint_list, quality_list, interest_list, canonical_list, ) ) ca_removed_aids = [] ca_added_aids = [] for aid, species_, viewpoint_, quality_, interest_, canonical_ in zipped: if species_ == 'zebra_grevys+_canonical_': continue assert None not in [species_, viewpoint_, quality_] species_ = species_.lower() viewpoint_ = viewpoint_.lower() quality_ = int(quality_) # if species_ in ['zebra_grevys']: # ggr_num_relevant += 1 if species_ in ['zebra_grevys']: ggr_num_species += 1 filter_viewpoint_ = 'right' in viewpoint_ filter_quality_ = quality_ >= 3 filter_ = filter_viewpoint_ and filter_quality_ if canonical_: ggr_num_cas += 1 if filter_viewpoint_: ggr_num_viewpoints += 1 if filter_quality_: ggr_num_qualities += 1 if filter_: ggr_num_filter += 1 if canonical_: ggr_num_filter_overlap += 1 else: ggr_num_filter_remove += 1 ca_removed_aids.append(aid) else: if canonical_: ggr_num_filter_add += 1 ca_added_aids.append(aid) if interest_: ggr_num_aois += 1 if canonical_: ggr_num_aoi_overlap += 1 else: ggr_num_aoi_remove += 1 else: if canonical_: ggr_num_aoi_add += 1 print('CA REMOVED: %s' % (ca_removed_aids,)) print('CA ADDED: %s' % (ca_added_aids,)) removed_chip_paths = ibs.get_annot_chip_fpath(ca_removed_aids) added_chip_paths = ibs.get_annot_chip_fpath(ca_added_aids) removed_output_path = abspath(join('.', 'ca_removed')) added_output_path = abspath(join('.', 'ca_added')) ut.delete(removed_output_path) ut.delete(added_output_path) ut.ensuredir(removed_output_path) ut.ensuredir(added_output_path) for removed_chip_path in removed_chip_paths: removed_chip_filename = split(removed_chip_path)[1] removed_output_filepath = join(removed_output_path, removed_chip_filename) ut.copy(removed_chip_path, removed_output_filepath, verbose=False) for added_chip_path in added_chip_paths: added_chip_filename = split(added_chip_path)[1] added_output_filepath = join(added_output_path, added_chip_filename) ut.copy(added_chip_path, added_output_filepath, verbose=False) ######### num_tabs = 30 def align2(str_): return ut.align(str_, ':', ' :') def align_dict2(dict_): # str_ = ut.repr2(dict_, si=True) str_ = ut.repr3(dict_, si=True) return align2(str_) header_block_lines = [('+============================')] + ( [ ('+ singleton := names with a single annotation'), ('+ multiton := names with multiple annotations'), ('--' * num_tabs), ] if not short and with_header else [] ) source_block_lines = [ ('DB Info: ' + ibs.get_dbname()), # ('DB Notes: ' + ibs.get_dbnotes()), ('DB NumContrib: %d' % num_contributors), ] bytes_block_lines = ( [ ('--' * num_tabs), ('DB Bytes: '), (' +- dbdir nBytes: ' + dbdir_space), (' | +- _ibsdb nBytes: ' + ibsdir_space), (' | | +-imgdir nBytes: ' + imgdir_space), (' | | +-cachedir nBytes: ' + cachedir_space), ] if with_bytes else [] ) name_block_lines = [ ('--' * num_tabs), ('# Names = %d' % num_names), ('# Names (unassociated) = %d' % num_names_unassociated), ('# Names (singleton) = %d' % num_names_singleton), ('# Names (multiton) = %d' % num_names_multiton), ] subset_str = ' ' if not request_annot_subset else '(SUBSET)' annot_block_lines = [ ('--' * num_tabs), ('# Annots %s = %d' % (subset_str, num_annots)), ('# Annots (unknown) = %d' % num_unknown_annots), ( '# Annots (named) = %d' % (num_singleton_annots + num_multiton_annots) ), ('# Annots (singleton) = %d' % num_singleton_annots), ('# Annots (multiton) = %d' % num_multiton_annots), ] annot_per_basic_block_lines = ( [ ('--' * num_tabs), # ('# Annots per Name (multiton) = %s' % (align2(multiton_stats),)), ('# Annots per Image = %s' % (align2(gx2_nAnnots_stats),)), ('# Annots per Species = %s' % (align_dict2(species2_nAids),)), ] if not short else [] ) annot_per_qualview_block_lines = [ None if short else '# Annots per Viewpoint = %s' % align_dict2(viewcode2_nAnnots), None if short else '# Annots per Quality = %s' % align_dict2(qualtext2_nAnnots), ] annot_per_agesex_block_lines = ( [ ('# Annots per Age = %s' % align_dict2(agetext2_nAnnots)), ('# Annots per Sex = %s' % align_dict2(sextext2_nAnnots)), ] if not short and with_agesex else [] ) annot_ggr_census = ( [ ('GGR Annots: '), # (' +-Relevant: %s' % (ggr_num_relevant,)), (" +- Grevy's Species: %s" % (ggr_num_species,)), (' | +-AoIs: %s' % (ggr_num_aois,)), (' | | +-Right Side: %s' % (ggr_num_viewpoints,)), (' | | +-Good Quality: %s' % (ggr_num_qualities,)), (' | | +-Filter: %s' % (ggr_num_filter,)), (' | +-CAs: %s' % (ggr_num_cas,)), ( ' +-CA & Filter Overlap: %s (CA removed %d, added %d)' % (ggr_num_filter_overlap, ggr_num_filter_remove, ggr_num_filter_add) ), ( ' +-CA & AOI Overlap: %s (CA removed %d, added %d)' % (ggr_num_aoi_overlap, ggr_num_aoi_remove, ggr_num_aoi_add) ), ] if with_ggr else [] ) from wbia.algo.preproc import occurrence_blackbox valid_nids_ = ibs.get_annot_nids(valid_aids) valid_gids_ = ibs.get_annot_gids(valid_aids) date_str_list_ = get_dates(ibs, valid_gids_) name_dates_stats = {} for valid_aid, valid_nid, date_str in zip(valid_aids, valid_nids_, date_str_list_): if valid_nid < 0: continue if valid_nid not in name_dates_stats: name_dates_stats[valid_nid] = set([]) name_dates_stats[valid_nid].add(date_str) if with_ggr: ggr_name_dates_stats = { 'GGR-16 D1 OR D2': 0, 'GGR-16 D1 AND D2': 0, 'GGR-18 D1 OR D2': 0, 'GGR-18 D1 AND D2': 0, 'GGR-16 AND GGR-18': 0, '1+ Days': 0, '2+ Days': 0, '3+ Days': 0, '4+ Days': 0, } for date_str in sorted(set(date_str_list_)): ggr_name_dates_stats[date_str] = 0 for nid in name_dates_stats: date_strs = name_dates_stats[nid] total_days = len(date_strs) assert 0 < total_days and total_days <= 4 for val in range(1, total_days + 1): key = '%d+ Days' % (val,) ggr_name_dates_stats[key] += 1 for date_str in date_strs: ggr_name_dates_stats[date_str] += 1 if '2016/01/30' in date_strs or '2016/01/31' in date_strs: ggr_name_dates_stats['GGR-16 D1 OR D2'] += 1 if '2018/01/27' in date_strs or '2018/01/28' in date_strs: ggr_name_dates_stats['GGR-16 AND GGR-18'] += 1 if '2018/01/27' in date_strs or '2018/01/28' in date_strs: ggr_name_dates_stats['GGR-18 D1 OR D2'] += 1 if '2016/01/30' in date_strs and '2016/01/31' in date_strs: ggr_name_dates_stats['GGR-16 D1 AND D2'] += 1 if '2018/01/27' in date_strs and '2018/01/28' in date_strs: ggr_name_dates_stats['GGR-18 D1 AND D2'] += 1 ggr16_pl_index, ggr16_pl_error = sight_resight_count( ggr_name_dates_stats['2016/01/30'], ggr_name_dates_stats['2016/01/31'], ggr_name_dates_stats['GGR-16 D1 AND D2'], ) ggr_name_dates_stats['GGR-16 PL INDEX'] = '%0.01f +/- %0.01f' % ( ggr16_pl_index, ggr16_pl_error, ) total = ggr_name_dates_stats['GGR-16 D1 OR D2'] ggr_name_dates_stats['GGR-16 COVERAGE'] = '%0.01f (%0.01f - %0.01f)' % ( 100.0 * total / ggr16_pl_index, 100.0 * total / (ggr16_pl_index + ggr16_pl_error), 100.0 * min(1.0, total / (ggr16_pl_index - ggr16_pl_error)), ) ggr18_pl_index, ggr18_pl_error = sight_resight_count( ggr_name_dates_stats['2018/01/27'], ggr_name_dates_stats['2018/01/28'], ggr_name_dates_stats['GGR-18 D1 AND D2'], ) ggr_name_dates_stats['GGR-18 PL INDEX'] = '%0.01f +/- %0.01f' % ( ggr18_pl_index, ggr18_pl_error, ) total = ggr_name_dates_stats['GGR-18 D1 OR D2'] ggr_name_dates_stats['GGR-18 COVERAGE'] = '%0.01f (%0.01f - %0.01f)' % ( 100.0 * total / ggr18_pl_index, 100.0 * total / (ggr18_pl_index + ggr18_pl_error), 100.0 * min(1.0, total / (ggr18_pl_index - ggr18_pl_error)), ) else: ggr_name_dates_stats = {} occurrence_block_lines = ( [ ('--' * num_tabs), '# Occurrences = %s' % (len(occurid2_aids),), '# Occurrences with Named = %s' % (len(set(ut.flatten(aid2_occurxs.values()))),), '# +- GPS Filter = %s' % (occurence_config.get('use_gps', False),), '# +- GPS Threshold KM/Sec. = %0.04f' % (occurrence_blackbox.KM_PER_SEC,), '# +- Time Filter = %s' % (True,), '# +- Time Threshold Sec. = %0.1f' % (occurence_config.get('seconds_thresh', None),), ( '# Named Annots per Occurrence = %s' % (align_dict2(occurence_annot_stats),) ), ( '# Encounters per Occurrence = %s' % (align_dict2(occurence_encounter_stats),) ), '# Encounters = %s' % (len(ut.flatten(nid2_occurxs.values())),), ( '# Encounters per Name = %s' % (align_dict2(annot_encounter_stats),) ), '# Annotations with Names = %s' % (len(set(ut.flatten(nid2_annotxs.values()))),), ( '# Annotations per Name = %s' % (align_dict2(name_annot_stats),) ), # ('# Pair Tag Info (annots) = %s' % (align_dict2(pair_tag_info),)), ] if not short else [] ) reviews_block_lines = ( [ ('--' * num_tabs), ('# All Reviews = %d' % num_all_total_reviews), ('# Relevant Reviews = %d' % len(valid_rids)), ('# Reviews per Decision = %s' % align_dict2(review_decision_stats)), ('# Reviews per Reviewer = %s' % align_dict2(review_identity_stats)), ( '# Review Breakdown = %s' % align_dict2(review_identity_to_decision_stats) ), ('# Reviews with Tag = %s' % align_dict2(review_tag_stats)), ( '# Annot Review Participation by Decision = %s' % align_dict2(review_decision_annot_participation_dict) ), ( '# Encounter Review Participation by Decision = %s' % align_dict2(review_decision_encounter_participation_dict) ), ( '# Annot Review Participation by Reviewer = %s' % align_dict2(review_identity_annot_participation_dict) ), ( '# Encounter Review Participation by Reviewer = %s' % align_dict2(review_identity_encounter_participation_dict) ), ] if with_reviews else [] ) img_block_lines = [ ('--' * num_tabs), ('# Img = %d' % len(valid_gids)), None if short else ('# Img reviewed = %d' % sum(image_reviewed_list)), None if short else ('# Img with gps = %d' % len(gps_list)), # ('# Img with timestamp = %d' % len(valid_unixtime_list)), None if short else ('Img Time Stats = %s' % (align2(unixtime_statstr),)), ('GGR Days = %s' % (align_dict2(ggr_dates_stats),)) if with_ggr else None, ('GGR Name Stats = %s' % (align_dict2(ggr_name_dates_stats),)) if with_ggr else None, ] contributor_block_lines = ( [ ('--' * num_tabs), ( '# Images per contributor = ' + align_dict2(contributor_tag_to_nImages) ), ( '# Annots per contributor = ' + align_dict2(contributor_tag_to_nAnnots) ), ( '# Quality per contributor = ' + align_dict2(contributor_tag_to_qualstats) ), ( '# Viewpoint per contributor = ' + align_dict2(contributor_tag_to_viewstats) ), ] if with_contrib else [] ) info_str_lines = ( header_block_lines + bytes_block_lines + source_block_lines + name_block_lines + annot_block_lines + annot_per_basic_block_lines + annot_per_qualview_block_lines + annot_per_agesex_block_lines + annot_ggr_census + occurrence_block_lines + reviews_block_lines + img_block_lines + imgsize_stat_lines + contributor_block_lines + [('L============================')] ) info_str = '\n'.join(ut.filter_Nones(info_str_lines)) info_str2 = ut.indent(info_str, '[{tag}] '.format(tag=tag)) if verbose: logger.info(info_str2) locals_ = locals() return locals_
[docs]def hackshow_names(ibs, aid_list, fnum=None): r""" Args: ibs (IBEISController): wbia controller object aid_list (list): CommandLine: python -m wbia.other.dbinfo --exec-hackshow_names --show python -m wbia.other.dbinfo --exec-hackshow_names --show --db PZ_Master1 Example: >>> # DISABLE_DOCTEST >>> from wbia.other.dbinfo import * # NOQA >>> import wbia >>> ibs = wbia.opendb(defaultdb='PZ_MTEST') >>> aid_list = ibs.get_valid_aids() >>> result = hackshow_names(ibs, aid_list) >>> print(result) >>> ut.show_if_requested() """ import wbia.plottool as pt import vtool as vt grouped_aids, nid_list = ibs.group_annots_by_name(aid_list) grouped_aids = [aids for aids in grouped_aids if len(aids) > 1] unixtimes_list = ibs.unflat_map(ibs.get_annot_image_unixtimes_asfloat, grouped_aids) yaws_list = ibs.unflat_map(ibs.get_annot_yaws, grouped_aids) # markers_list = [[(1, 2, yaw * 360 / (np.pi * 2)) for yaw in yaws] for yaws in yaws_list] unixtime_list = ut.flatten(unixtimes_list) timemax = np.nanmax(unixtime_list) timemin = np.nanmin(unixtime_list) timerange = timemax - timemin unixtimes_list = [ ((unixtimes[:] - timemin) / timerange) for unixtimes in unixtimes_list ] for unixtimes in unixtimes_list: num_nan = sum(np.isnan(unixtimes)) unixtimes[np.isnan(unixtimes)] = np.linspace(-1, -0.5, num_nan) # ydata_list = [np.arange(len(aids)) for aids in grouped_aids] sortx_list = vt.argsort_groups(unixtimes_list, reverse=False) # markers_list = ut.list_ziptake(markers_list, sortx_list) yaws_list = ut.list_ziptake(yaws_list, sortx_list) ydatas_list = vt.ziptake(unixtimes_list, sortx_list) # ydatas_list = sortx_list # ydatas_list = vt.argsort_groups(unixtimes_list, reverse=False) # Sort by num members # ydatas_list = ut.take(ydatas_list, np.argsort(list(map(len, ydatas_list)))) xdatas_list = [ np.zeros(len(ydatas)) + count for count, ydatas in enumerate(ydatas_list) ] # markers = ut.flatten(markers_list) # yaws = np.array(ut.flatten(yaws_list)) y_data = np.array(ut.flatten(ydatas_list)) x_data = np.array(ut.flatten(xdatas_list)) fnum = pt.ensure_fnum(fnum) pt.figure(fnum=fnum) ax = pt.gca() # unique_yaws, groupxs = vt.group_indices(yaws) ax.scatter(x_data, y_data, color=[1, 0, 0], s=1, marker='.') # pt.draw_stems(x_data, y_data, marker=markers, setlims=True, linestyle='') pt.dark_background() ax = pt.gca() ax.set_xlim(min(x_data) - 0.1, max(x_data) + 0.1) ax.set_ylim(min(y_data) - 0.1, max(y_data) + 0.1)
[docs]def show_image_time_distributions(ibs, gid_list): r""" Args: ibs (IBEISController): wbia controller object gid_list (list): CommandLine: python -m wbia.other.dbinfo show_image_time_distributions --show python -m wbia.other.dbinfo show_image_time_distributions --show --db lynx Example: >>> # DISABLE_DOCTEST >>> from wbia.other.dbinfo import * # NOQA >>> import wbia >>> ibs = wbia.opendb(defaultdb='testdb1') >>> aids = wbia.testdata_aids(ibs=ibs) >>> gid_list = ut.unique_unordered(ibs.get_annot_gids(aids)) >>> result = show_image_time_distributions(ibs, gid_list) >>> print(result) >>> ut.show_if_requested() """ unixtime_list = ibs.get_image_unixtime(gid_list) unixtime_list = np.array(unixtime_list, dtype=np.float) unixtime_list = ut.list_replace(unixtime_list, -1, float('nan')) show_time_distributions(ibs, unixtime_list)
[docs]def show_time_distributions(ibs, unixtime_list): r"""""" # import vtool as vt import wbia.plottool as pt unixtime_list = np.array(unixtime_list) num_nan = np.isnan(unixtime_list).sum() num_total = len(unixtime_list) unixtime_list = unixtime_list[~np.isnan(unixtime_list)] from wbia.scripts.thesis import TMP_RC import matplotlib as mpl mpl.rcParams.update(TMP_RC) if False: from matplotlib import dates as mpldates # data_list = list(map(ut.unixtime_to_datetimeobj, unixtime_list)) n, bins, patches = pt.plt.hist(unixtime_list, 365) # n_ = list(map(ut.unixtime_to_datetimeobj, n)) # bins_ = list(map(ut.unixtime_to_datetimeobj, bins)) pt.plt.setp(patches, 'facecolor', 'g', 'alpha', 0.75) ax = pt.gca() # ax.xaxis.set_major_locator(mpldates.YearLocator()) # hfmt = mpldates.DateFormatter('%y/%m/%d') # ax.xaxis.set_major_formatter(hfmt) mpldates.num2date(unixtime_list) # pt.gcf().autofmt_xdate() # y = pt.plt.normpdf( bins, unixtime_list.mean(), unixtime_list.std()) # ax.set_xticks(bins_) # l = pt.plt.plot(bins_, y, 'k--', linewidth=1.5) else: pt.draw_time_distribution(unixtime_list) # pt.draw_histogram() ax = pt.gca() ax.set_xlabel('Date') ax.set_title( 'Timestamp distribution of %s. #nan=%d/%d' % (ibs.get_dbname_alias(), num_nan, num_total) ) pt.gcf().autofmt_xdate() icon = ibs.get_database_icon() if False and icon is not None: # import matplotlib as mpl # import vtool as vt ax = pt.gca() # Overlay a species icon # http://matplotlib.org/examples/pylab_examples/demo_annotation_box.html # icon = vt.convert_image_list_colorspace([icon], 'RGB', 'BGR')[0] # pt.overlay_icon(icon, coords=(0, 1), bbox_alignment=(0, 1)) pt.overlay_icon( icon, coords=(0, 1), bbox_alignment=(0, 1), as_artist=1, max_asize=(100, 200), ) # imagebox = mpl.offsetbox.OffsetImage(icon, zoom=1.0) # # xy = [ax.get_xlim()[0] + 5, ax.get_ylim()[1]] # # ax.set_xlim(1, 100) # # ax.set_ylim(0, 100) # # x = np.array(ax.get_xlim()).sum() / 2 # # y = np.array(ax.get_ylim()).sum() / 2 # # xy = [x, y] # # logger.info('xy = %r' % (xy,)) # # x = np.nanmin(unixtime_list) # # xy = [x, y] # # logger.info('xy = %r' % (xy,)) # # ax.get_ylim()[0]] # xy = [ax.get_xlim()[0], ax.get_ylim()[1]] # ab = mpl.offsetbox.AnnotationBbox( # imagebox, xy, xycoords='data', # xybox=(-0., 0.), # boxcoords="offset points", # box_alignment=(0, 1), pad=0.0) # ax.add_artist(ab) if ut.get_argflag('--contextadjust'): # pt.adjust_subplots(left=.08, bottom=.1, top=.9, wspace=.3, hspace=.1) pt.adjust_subplots(use_argv=True)
[docs]def latex_dbstats(ibs_list, **kwargs): r""" Args: ibs (IBEISController): wbia controller object CommandLine: python -m wbia.other.dbinfo --exec-latex_dbstats --dblist testdb1 python -m wbia.other.dbinfo --exec-latex_dbstats --dblist testdb1 --show python -m wbia.other.dbinfo --exec-latex_dbstats --dblist PZ_Master0 testdb1 --show python -m wbia.other.dbinfo --exec-latex_dbstats --dblist PZ_Master0 PZ_MTEST GZ_ALL --show python -m wbia.other.dbinfo --test-latex_dbstats --dblist GZ_ALL NNP_MasterGIRM_core --show Example: >>> # DISABLE_DOCTEST >>> from wbia.other.dbinfo import * # NOQA >>> import wbia >>> db_list = ut.get_argval('--dblist', type_=list, default=['testdb1']) >>> ibs_list = [wbia.opendb(db=db) for db in db_list] >>> tabular_str = latex_dbstats(ibs_list) >>> tabular_cmd = ut.latex_newcommand(ut.latex_sanitize_command_name('DatabaseInfo'), tabular_str) >>> ut.copy_text_to_clipboard(tabular_cmd) >>> write_fpath = ut.get_argval('--write', type_=str, default=None) >>> if write_fpath is not None: >>> fpath = ut.truepath(write_fpath) >>> text = ut.readfrom(fpath) >>> new_text = ut.replace_between_tags(text, tabular_cmd, '% <DBINFO>', '% </DBINFO>') >>> ut.writeto(fpath, new_text) >>> ut.print_code(tabular_cmd, 'latex') >>> ut.quit_if_noshow() >>> ut.render_latex_text('\\noindent \n' + tabular_str) """ import wbia # Parse for aids test data aids_list = [wbia.testdata_aids(ibs=ibs) for ibs in ibs_list] # dbinfo_list = [get_dbinfo(ibs, with_contrib=False, verbose=False) for ibs in ibs_list] dbinfo_list = [ get_dbinfo(ibs, with_contrib=False, verbose=False, aid_list=aids) for ibs, aids in zip(ibs_list, aids_list) ] # title = db_name + ' database statistics' title = 'Database statistics' stat_title = '# Annotations per name (multiton)' # col_lbls = [ # 'multiton', # #'singleton', # 'total', # 'multiton', # 'singleton', # 'total', # ] key_to_col_lbls = { 'num_names_multiton': 'multiton', 'num_names_singleton': 'singleton', 'num_names': 'total', 'num_multiton_annots': 'multiton', 'num_singleton_annots': 'singleton', 'num_unknown_annots': 'unknown', 'num_annots': 'total', } # Structure of columns / multicolumns multi_col_keys = [ ( '# Names', ( 'num_names_multiton', # 'num_names_singleton', 'num_names', ), ), ( '# Annots', ( 'num_multiton_annots', 'num_singleton_annots', # 'num_unknown_annots', 'num_annots', ), ), ] # multicol_lbls = [('# Names', 3), ('# Annots', 3)] multicol_lbls = [(mcolname, len(mcols)) for mcolname, mcols in multi_col_keys] # Flatten column labels col_keys = ut.flatten(ut.get_list_column(multi_col_keys, 1)) col_lbls = ut.dict_take(key_to_col_lbls, col_keys) row_lbls = [] row_values = [] # stat_col_lbls = ['max', 'min', 'mean', 'std', 'nMin', 'nMax'] stat_col_lbls = ['max', 'min', 'mean', 'std', 'med'] # stat_row_lbls = ['# Annot per Name (multiton)'] stat_row_lbls = [] stat_row_values = [] SINGLE_TABLE = False EXTRA = True for ibs, dbinfo_locals in zip(ibs_list, dbinfo_list): row_ = ut.dict_take(dbinfo_locals, col_keys) dbname = ibs.get_dbname_alias() row_lbls.append(dbname) multiton_annot_stats = ut.get_stats( dbinfo_locals['multiton_nid2_nannots'], use_median=True, nl=1 ) stat_rows = ut.dict_take(multiton_annot_stats, stat_col_lbls) if SINGLE_TABLE: row_.extend(stat_rows) else: stat_row_lbls.append(dbname) stat_row_values.append(stat_rows) row_values.append(row_) CENTERLINE = False AS_TABLE = True tablekw = dict( astable=AS_TABLE, centerline=CENTERLINE, FORCE_INT=False, precision=2, col_sep='', multicol_sep='|', **kwargs ) if EXTRA: extra_keys = [ # 'species2_nAids', 'qualtext2_nAnnots', 'viewcode2_nAnnots', ] extra_titles = { 'species2_nAids': 'Annotations per species.', 'qualtext2_nAnnots': 'Annotations per quality.', 'viewcode2_nAnnots': 'Annotations per viewpoint.', } extra_collbls = ut.ddict(list) extra_rowvalues = ut.ddict(list) extra_tables = ut.ddict(list) for ibs, dbinfo_locals in zip(ibs_list, dbinfo_list): for key in extra_keys: extra_collbls[key] = ut.unique_ordered( extra_collbls[key] + list(dbinfo_locals[key].keys()) ) extra_collbls['qualtext2_nAnnots'] = [ 'excellent', 'good', 'ok', 'poor', 'junk', 'UNKNOWN', ] # extra_collbls['viewcode2_nAnnots'] = ['backleft', 'left', 'frontleft', 'front', 'frontright', 'right', 'backright', 'back', None] extra_collbls['viewcode2_nAnnots'] = [ 'BL', 'L', 'FL', 'F', 'FR', 'R', 'BR', 'B', None, ] for ibs, dbinfo_locals in zip(ibs_list, dbinfo_list): for key in extra_keys: extra_rowvalues[key].append( ut.dict_take(dbinfo_locals[key], extra_collbls[key], 0) ) qualalias = {'UNKNOWN': None} extra_collbls['viewcode2_nAnnots'] = [ ibs.const.YAWALIAS.get(val, val) for val in extra_collbls['viewcode2_nAnnots'] ] extra_collbls['qualtext2_nAnnots'] = [ qualalias.get(val, val) for val in extra_collbls['qualtext2_nAnnots'] ] for key in extra_keys: extra_tables[key] = ut.util_latex.make_score_tabular( row_lbls, extra_collbls[key], extra_rowvalues[key], title=extra_titles[key], col_align='r', table_position='[h!]', **tablekw ) # tabular_str = util_latex.tabular_join(tabular_body_list) if SINGLE_TABLE: col_lbls += stat_col_lbls multicol_lbls += [(stat_title, len(stat_col_lbls))] count_tabular_str = ut.util_latex.make_score_tabular( row_lbls, col_lbls, row_values, title=title, multicol_lbls=multicol_lbls, table_position='[ht!]', **tablekw ) # logger.info(row_lbls) if SINGLE_TABLE: tabular_str = count_tabular_str else: stat_tabular_str = ut.util_latex.make_score_tabular( stat_row_lbls, stat_col_lbls, stat_row_values, title=stat_title, col_align='r', table_position='[h!]', **tablekw ) # Make a table of statistics if tablekw['astable']: tablesep = '\n%--\n' else: tablesep = '\\\\\n%--\n' if EXTRA: tabular_str = tablesep.join( [count_tabular_str, stat_tabular_str] + ut.dict_take(extra_tables, extra_keys) ) else: tabular_str = tablesep.join([count_tabular_str, stat_tabular_str]) return tabular_str
[docs]def get_short_infostr(ibs): """Returns printable database information Args: ibs (IBEISController): wbia controller object Returns: str: infostr CommandLine: python -m wbia.other.dbinfo --test-get_short_infostr Example: >>> # ENABLE_DOCTEST >>> from wbia.other.dbinfo import * # NOQA >>> import wbia >>> ibs = wbia.opendb('testdb1') >>> infostr = get_short_infostr(ibs) >>> result = str(infostr) >>> print(result) dbname = 'testdb1' num_images = 13 num_annotations = 13 num_names = 7 """ dbname = ibs.get_dbname() # workdir = ut.unixpath(ibs.get_workdir()) num_images = ibs.get_num_images() num_annotations = ibs.get_num_annotations() num_names = ibs.get_num_names() # workdir = %r infostr = ut.codeblock( """ dbname = %s num_images = %r num_annotations = %r num_names = %r """ % (ut.repr2(dbname), num_images, num_annotations, num_names) ) return infostr
[docs]def cache_memory_stats(ibs, cid_list, fnum=None): logger.info('[dev stats] cache_memory_stats()') # kpts_list = ibs.get_annot_kpts(cid_list) # desc_list = ibs.get_annot_vecs(cid_list) # nFeats_list = map(len, kpts_list) gx_list = np.unique(ibs.cx2_gx(cid_list)) bytes_map = { 'chip dbytes': [ut.file_bytes(fpath) for fpath in ibs.get_rchip_path(cid_list)], 'img dbytes': [ ut.file_bytes(gpath) for gpath in ibs.gx2_gname(gx_list, full=True) ], # 'flann dbytes': ut.file_bytes(flann_fpath), } byte_units = { 'GB': 2 ** 30, 'MB': 2 ** 20, 'KB': 2 ** 10, } tabular_body_list = [] convert_to = 'KB' for key, val in bytes_map.items(): key2 = key.replace('bytes', convert_to) if isinstance(val, list): val2 = [bytes_ / byte_units[convert_to] for bytes_ in val] tex_str = ut.util_latex.latex_get_stats(key2, val2) else: val2 = val / byte_units[convert_to] tex_str = ut.util_latex.latex_scalar(key2, val2) tabular_body_list.append(tex_str) tabular = ut.util_latex.tabular_join(tabular_body_list) logger.info(tabular) ut.util_latex.render(tabular) if fnum is None: fnum = 0 return fnum + 1
[docs]def sight_resight_count(nvisit1, nvisit2, resight): r""" Lincoln Petersen Index The Lincoln-Peterson index is a method used to estimate the total number of individuals in a population given two independent sets observations. The likelihood of a population size is a hypergeometric distribution given by assuming a uniform sampling distribution. Args: nvisit1 (int): the number of individuals seen on visit 1. nvisit2 (int): be the number of individuals seen on visit 2. resight (int): the number of (matched) individuals seen on both visits. Returns: tuple: (pl_index, pl_error) LaTeX: \begin{equation}\label{eqn:lpifull} L(\poptotal \given \nvisit_1, \nvisit_2, \resight) = \frac{ \binom{\nvisit_1}{\resight} \binom{\poptotal - \nvisit_1}{\nvisit_2 - \resight} }{ \binom{\poptotal}{\nvisit_2} } \end{equation} Assuming that $T$ has a uniform prior distribution, the maximum likelihood estimation of population size given two visits to a location is: \begin{equation}\label{eqn:lpi} \poptotal \approx \frac{\nvisit_1 \nvisit_2}{\resight} \pm 1.96 \sqrt{\frac{{(\nvisit_1)}^2 (\nvisit_2) (\nvisit_2 - \resight)}{\resight^3}} \end{equation} References: https://en.wikipedia.org/wiki/Mark_and_recapture https://en.wikipedia.org/wiki/Talk:Mark_and_recapture#Statistical_treatment https://mail.google.com/mail/u/0/#search/lincoln+peterse+n/14c6b50227f5209f https://probabilityandstats.wordpress.com/tag/maximum-likelihood-estimate/ http://math.arizona.edu/~jwatkins/o-mle.pdf CommandLine: python -m wbia.other.dbinfo sight_resight_count --show Example: >>> # DISABLE_DOCTEST >>> from wbia.other.dbinfo import * # NOQA >>> nvisit1 = 100 >>> nvisit2 = 20 >>> resight = 10 >>> (pl_index, pl_error) = sight_resight_count(nvisit1, nvisit2, resight) >>> result = '(pl_index, pl_error) = %s' % ut.repr2((pl_index, pl_error)) >>> pl_low = max(pl_index - pl_error, 1) >>> pl_high = pl_index + pl_error >>> print('pl_low = %r' % (pl_low,)) >>> print('pl_high = %r' % (pl_high,)) >>> print(result) >>> ut.quit_if_noshow() >>> import wbia.plottool as pt >>> import scipy, scipy.stats >>> x = pl_index # np.array([10, 11, 12]) >>> k, N, K, n = resight, x, nvisit1, nvisit2 >>> #k, M, n, N = k, N, k, n # Wiki to SciPy notation >>> #prob = scipy.stats.hypergeom.cdf(k, N, K, n) >>> fig = pt.figure(1) >>> fig.clf() >>> N_range = np.arange(1, pl_high * 2) >>> # Something seems to be off >>> probs = sight_resight_prob(N_range, nvisit1, nvisit2, resight) >>> pl_prob = sight_resight_prob([pl_index], nvisit1, nvisit2, resight)[0] >>> pt.plot(N_range, probs, 'b-', label='probability of population size') >>> pt.plt.title('nvisit1=%r, nvisit2=%r, resight=%r' % ( >>> nvisit1, nvisit2, resight)) >>> pt.plot(pl_index, pl_prob, 'rx', label='Lincoln Peterson Estimate') >>> pt.plot([pl_low, pl_high], [pl_prob, pl_prob], 'gx-', >>> label='Lincoln Peterson Error Bar') >>> pt.legend() >>> ut.show_if_requested() """ import math try: nvisit1 = float(nvisit1) nvisit2 = float(nvisit2) resight = float(resight) pl_index = int(math.ceil((nvisit1 * nvisit2) / resight)) pl_error_num = float((nvisit1 ** 2) * nvisit2 * (nvisit2 - resight)) pl_error_dom = float(resight ** 3) pl_error = int(math.ceil(1.96 * math.sqrt(pl_error_num / pl_error_dom))) except ZeroDivisionError: # pl_index = 'Undefined - Zero recaptured (k = 0)' pl_index = 0 pl_error = 0 return pl_index, pl_error