# -*- coding: utf-8 -*-
import logging
import os
from os.path import splitext, basename, isabs
import warnings
import vtool.exif as vtexif
import utool as ut
from vtool.exif import ORIENTATION_DICT_INVERSE, ORIENTATION_UNDEFINED, ORIENTATION_000
from wbia.utils import call_houston
EXIF_UNDEFINED = ORIENTATION_DICT_INVERSE[ORIENTATION_UNDEFINED]
EXIF_NORMAL = ORIENTATION_DICT_INVERSE[ORIENTATION_000]
(print, rrr, profile) = ut.inject2(__name__)
logger = logging.getLogger('wbia')
[docs]def parse_exif(pil_img):
"""Image EXIF helper"""
exif_dict = vtexif.get_exif_dict(pil_img)
# TODO: More tags
# (mainly the orientation tag)
lat, lon = vtexif.get_lat_lon(exif_dict)
orient = vtexif.get_orientation(exif_dict, on_error='warn')
time = vtexif.get_unixtime(exif_dict)
return time, lat, lon, orient
[docs]def get_standard_ext(gpath):
"""Returns standardized image extension"""
ext = splitext(gpath)[1].lower()
return '.jpg' if ext == '.jpeg' else ext
[docs]@profile
def parse_imageinfo(gpath):
"""Worker function: gpath must be in UNIX-PATH format!
Args:
gpath (str): image path
Returns:
tuple: param_tup -
if successful returns a tuple of image parameters which are values
for SQL columns on else returns None
CommandLine:
python -m wbia.algo.preproc.preproc_image --exec-parse_imageinfo
Doctest:
>>> from wbia.algo.preproc.preproc_image import * # NOQA
>>> gpath = ut.grab_test_imgpath('patsy.jpg')
>>> param_tup = parse_imageinfo(gpath)
>>> result = ('param_tup = %s' % (str(param_tup),))
>>> print(result)
>>> uuid = param_tup[0]
>>> assert str(uuid) == '16008058-788c-2d48-cd50-f6029f726cbf'
"""
# Try to open the image
from PIL import Image
import tempfile
import requests
import cv2
import urllib
if gpath is None:
return None
urlsplit = urllib.parse.urlsplit
urlquote = urllib.parse.quote
urlunquote = urllib.parse.unquote
gpath = gpath.strip()
url_protos = ['https://', 'http://']
s3_proto = ['s3://']
houston_proto = ['houston+']
valid_protos = s3_proto + url_protos + houston_proto
def isproto(gpath, valid_protos):
return any(gpath.startswith(proto) for proto in valid_protos)
def islocal(gpath):
return not (isabs(gpath) and isproto(gpath, valid_protos))
with warnings.catch_warnings(record=True) as w:
try:
if isproto(gpath, valid_protos):
# suffix = '.%s' % (basename(gpath), )
filename = basename(gpath)
_, ext = splitext(filename)
# base = filename
base = ut.random_nonce(16)
suffix = '.%s%s' % (base, ext)
temp_file, temp_filepath = tempfile.mkstemp(suffix=suffix)
args = (
gpath,
temp_filepath,
)
logger.info(
'[preproc] Caching remote %s file to temporary file %r' % args
)
if isproto(gpath, s3_proto):
s3_dict = ut.s3_str_decode_to_dict(gpath)
ut.grab_s3_contents(temp_filepath, **s3_dict)
if isproto(gpath, url_protos):
# Ensure that the Unicode string is properly encoded for web requests
uri_ = urlunquote(gpath)
uri_ = urlsplit(uri_, allow_fragments=False)
uri_path = urlquote(uri_.path.encode('utf8'))
uri_ = uri_._replace(path=uri_path)
uri_ = uri_.geturl()
try:
response = requests.get(uri_, stream=True, allow_redirects=True)
assert (
response.status_code == 200
), '200 code not received on download'
except Exception:
parts = urlsplit(uri_, allow_fragments=False)
uri_ = uri_[len('%s://' % (parts.scheme,)) :]
hostname = urlquote(parts.hostname.encode('utf8'))
if parts.port:
hostname = f'{hostname}:{parts.port}'
uri_ = '%s://%s%s' % (parts.scheme, hostname, parts.path)
response = requests.get(uri_, stream=True, allow_redirects=True)
assert (
response.status_code == 200
), '200 code not received on download'
# Save
with open(temp_filepath, 'wb') as temp_file_:
for chunk in response.iter_content(1024):
temp_file_.write(chunk)
elif isproto(gpath, houston_proto):
response = call_houston(gpath)
assert (
response.status_code == 200
), f'200 code not received on download: {gpath}'
with open(temp_filepath, 'wb') as temp_file_:
for chunk in response.iter_content(1024):
temp_file_.write(chunk)
gpath_ = temp_filepath
else:
temp_file, temp_filepath = None, None
gpath_ = gpath
except (
AssertionError,
IOError,
requests.HTTPError,
urllib.error.HTTPError,
Image.DecompressionBombError,
) as ex:
# ut.embed()
logger.info('[preproc] IOError: %s' % (str(ex),))
return None
if len(w) > 0:
# for warn in w:
# warnings.showwarning(warn.message, warn.category,
# warn.filename, warn.lineno, warn.file,
# warn.line)
# warnstr = warnings.formatwarning
# logger.info(warnstr)
logger.info('%d warnings issued by %r' % (len(w), gpath))
try:
# Open image with EXIF support to get time, GPS, and the original orientation
pil_img = Image.open(gpath_, 'r')
# Convert 16-bit RGBA images on disk to 8-bit RGB
if pil_img.mode == 'RGBA':
pil_img.load()
canvas = Image.new('RGB', pil_img.size, (255, 255, 255))
canvas.paste(pil_img, mask=pil_img.split()[3]) # 3 is the alpha channel
canvas.save(gpath_)
pil_img.close()
# Reload image
pil_img = Image.open(gpath_, 'r')
time, lat, lon, orient = parse_exif(pil_img) # Read exif tags
pil_img.close()
# OpenCV >= 3.1 supports EXIF tags, which will load correctly
img = cv2.imread(gpath_)
assert img is not None
if orient not in [EXIF_UNDEFINED, EXIF_NORMAL]:
try:
# Sanitize weird behavior and standardize EXIF orientation to 1
cv2.imwrite(gpath_, img)
orient = EXIF_NORMAL
except AssertionError:
return None
except (FileNotFoundError):
return None
# Parse out the data
height, width = img.shape[:2] # Read width, height
# We cannot use pixel data as libjpeg is not deterministic (even for reads!)
image_uuid = ut.get_file_uuid(gpath_) # Read file ]-hash-> guid = gid
# orig_gpath = gpath
orig_gname = basename(gpath)
ext = get_standard_ext(gpath)
notes = ''
# Build parameters tuple
param_tup = (
image_uuid,
gpath,
gpath,
orig_gname,
# orig_gpath,
ext,
width,
height,
time,
lat,
lon,
orient,
notes,
)
if temp_filepath is not None:
os.close(temp_file)
os.unlink(temp_filepath)
# logger.info('[ginfo] %r %r' % (image_uuid, orig_gname))
return param_tup
# def add_images_params_gen(gpath_list):
# """
# generates values for add_images sqlcommands asychronously
# Args:
# gpath_list (list):
# Kwargs:
# ordered, force_serial, chunksize, prog, verbose, quiet, nTasks, freq,
# adjust
# Returns:
# generator: params_gen
# CommandLine:
# python -m wbia.algo.preproc.preproc_image --exec-add_images_params_gen
# Example:
# >>> # ENABLE_DOCTEST
# >>> from wbia.algo.preproc.preproc_image import * # NOQA
# >>> from vtool.tests import grabdata
# >>> gpath_list = grabdata.get_test_gpaths(ndata=3) + ['doesnotexist.jpg']
# >>> params_list = list(add_images_params_gen(gpath_list))
# >>> assert str(params_list[0][0]) == '66ec193a-1619-b3b6-216d-1784b4833b61', 'UUID gen method changed'
# >>> assert str(params_list[0][3]) == 'easy1.JPG', 'orig name is different'
# >>> assert params_list[3] is None
# """
# params_gen = ut.generate2(parse_imageinfo, zip(gpath_list), adjust=True,
# force_serial=True)
# return params_gen
[docs]def on_delete(ibs, featweight_rowid_list, qreq_=None):
logger.info('Warning: Not Implemented')