Source code for smqtk_descriptors.impls.descriptor_generator.caffe1

from io import BytesIO
import itertools
import logging
from typing import Any, Dict, Hashable, Iterable, Mapping, Optional, Set, Tuple, Type, TypeVar

import numpy

from smqtk_core.configuration import (
    from_config_dict,
    to_config_dict,
    make_default_config
)
from smqtk_core.dict import merge_dict
from smqtk_dataprovider import DataElement
from smqtk_descriptors import DescriptorGenerator
from smqtk_descriptors.utils import parallel_map


LOG = logging.getLogger(__name__)


try:
    import PIL.Image  # type: ignore
    import PIL.ImageFile  # type: ignore
except ImportError as ex:
    LOG.warning(f"Failed to import PIL module: {ex}")
    PIL = None  # type: ignore

try:
    import caffe  # type: ignore
except ImportError as ex:
    LOG.warning(f"Failed to import caffe module: {ex}")
    caffe = None  # type: ignore


__all__ = [
    "CaffeDescriptorGenerator",
]
T = TypeVar("T", bound="CaffeDescriptorGenerator")


[docs]class CaffeDescriptorGenerator (DescriptorGenerator): """ Compute images against a Caffe model, extracting a layer as the content descriptor. :param network_prototxt: Data element containing the text file defining the network layout. :param network_model: Data element containing the trained ``.caffemodel`` file to use. :param image_mean: Optional data element containing the image mean ``.binaryproto`` or ``.npy`` file. :param return_layer: The label of the layer we take data from to compose output descriptor vector. :param batch_size: The maximum number of images to process in one feed forward of the network. This is especially important for GPUs since they can only process a batch that will fit in the GPU memory space. :param use_gpu: If Caffe should try to use the GPU :param gpu_device_id: Integer ID of the GPU device to use. Only used if ``use_gpu`` is True. :param network_is_bgr: If the network is expecting BGR format pixels. For example, the BVLC default caffenet does (thus the default is True). :param data_layer: String label of the network's data layer. We assume its 'data' by default. :param load_truncated_images: If we should be lenient and force loading of truncated image bytes. This is False by default. :param pixel_rescale: Re-scale image pixel values before being transformed by caffe (before mean subtraction, etc) into the given tuple ``(min, max)`` range. By default, images are loaded in the ``[0, 255]`` range. Refer to the image mean being used for desired input pixel scale. :param input_scale: Optional floating-point scalar value to scale values of caffe network input data AFTER mean subtraction. This value is directly multiplied against the pixel values. :param threads: Optional specific number of threads to use for data loading and pre-processing. If this is None or 0, we introspect the current system thread capacity and use that. :raises AssertionError: Optionally provided image mean protobuf consisted of more than one image, or its shape was neither 1 nor 3 channels. """
[docs] @classmethod def is_usable(cls) -> bool: valid = caffe is not None and PIL is not None if not valid: LOG.debug("Caffe python module cannot be imported") return valid
[docs] @classmethod def get_default_config(cls) -> Dict[str, Any]: default = super(CaffeDescriptorGenerator, cls).get_default_config() data_elem_impl_set = DataElement.get_impls() # Need to make copies of dict so changes to one does not effect others. default['network_prototxt'] = \ make_default_config(data_elem_impl_set) default['network_model'] = make_default_config(data_elem_impl_set) default['image_mean'] = make_default_config(data_elem_impl_set) return default
[docs] @classmethod def from_config( cls: Type[T], config_dict: Dict, merge_default: bool = True ) -> T: if merge_default: config_dict = merge_dict(cls.get_default_config(), config_dict) data_elem_impl_set = DataElement.get_impls() # Translate prototext and model sub-configs into DataElement instances. config_dict['network_prototxt'] = \ from_config_dict(config_dict['network_prototxt'], data_elem_impl_set) config_dict['network_model'] = \ from_config_dict(config_dict['network_model'], data_elem_impl_set) # Translate optionally provided image mean sub-config into a # DataElement instance. May have been provided as ``None`` or a # configuration dictionary with type ``None`. # None, dict[type=None], dict[type=str] if config_dict['image_mean'] is None \ or config_dict['image_mean'].get('type', None) is None: config_dict['image_mean'] = None else: config_dict['image_mean'] = \ from_config_dict(config_dict['image_mean'], data_elem_impl_set) return super(CaffeDescriptorGenerator, cls).from_config(config_dict, merge_default=False)
def __init__( self, network_prototxt: DataElement, network_model: DataElement, image_mean: Optional[DataElement] = None, return_layer: str = 'fc7', batch_size: int = 1, use_gpu: bool = False, gpu_device_id: int = 0, network_is_bgr: bool = True, data_layer: str = 'data', load_truncated_images: bool = False, pixel_rescale: Optional[Tuple[float, float]] = None, input_scale: Optional[float] = None, threads: Optional[int] = None ): """ Create a Caffe CNN descriptor generator """ super(CaffeDescriptorGenerator, self).__init__() self.network_prototxt = network_prototxt self.network_model = network_model self.image_mean = image_mean self.return_layer = str(return_layer) self.batch_size = int(batch_size) self.use_gpu = bool(use_gpu) self.gpu_device_id = int(gpu_device_id) self.network_is_bgr = bool(network_is_bgr) self.data_layer = str(data_layer) self.load_truncated_images = bool(load_truncated_images) self.pixel_rescale = pixel_rescale self.input_scale = input_scale self.threads = threads assert self.batch_size > 0, \ "Batch size must be greater than 0 (got %d)" \ % self.batch_size assert self.gpu_device_id >= 0, \ "GPU Device ID must be greater than 0 (got %d)" \ % self. gpu_device_id # Network setup variables self.network: Optional[caffe.Net] = None self.net_data_shape = () self.transformer: Optional[caffe.io.Transformer] = None self._setup_network() def __getstate__(self) -> Dict[str, Any]: return self.get_config() def __setstate__(self, state: Mapping[str, Any]) -> None: # This ``__dict__.update`` works because configuration parameters # exactly match up with instance attributes currently. self.__dict__.update(state) # Translate nested Configurable instance configurations into actual # object instances. self.network_prototxt = from_config_dict( state["network_prototxt"], DataElement.get_impls() ) # noinspection PyTypeChecker self.network_model = from_config_dict( state["network_model"], DataElement.get_impls() ) state_image_mean = state["image_mean"] if state_image_mean is not None: # noinspection PyTypeChecker self.image_mean = from_config_dict( state_image_mean, DataElement.get_impls() ) self._setup_network() def _set_caffe_mode(self) -> None: """ Set the appropriate Caffe mode on the current thread/process. """ if self.use_gpu: LOG.debug("Using GPU") caffe.set_device(self.gpu_device_id) caffe.set_mode_gpu() else: LOG.debug("using CPU") caffe.set_mode_cpu() def _setup_network(self) -> None: """ Initialize Caffe and the network :raises AssertionError: Optionally provided image mean protobuf consisted of more than one image, or its shape was neither 1 or 3 channels. """ self._set_caffe_mode() # Questions: # - ``caffe.TEST`` indicates phase of either TRAIN or TEST LOG.debug("Initializing network") LOG.debug("Loading Caffe network from network/model configs") self.network = caffe.Net( self.network_prototxt.write_temp().encode(), caffe.TEST, weights=self.network_model.write_temp().encode() ) self.network_prototxt.clean_temp() self.network_model.clean_temp() # Assuming the network has a 'data' layer and notion of data shape self.net_data_shape = self.network.blobs[self.data_layer].data.shape LOG.debug("Network data shape: %s", self.net_data_shape) # Crating input data transformer LOG.debug("Initializing data transformer") self.transformer = caffe.io.Transformer( {self.data_layer: self.network.blobs[self.data_layer].data.shape} ) LOG.debug("Initializing data transformer -> %s", self.transformer.inputs) if self.image_mean is not None: LOG.debug("Loading image mean (reducing to single pixel mean)") image_mean_bytes = self.image_mean.get_bytes() try: # noinspection PyTypeChecker a = numpy.load(BytesIO(image_mean_bytes), allow_pickle=True) LOG.info("Loaded image mean from numpy bytes") except IOError: LOG.debug("Image mean file not a numpy array, assuming URI to " "protobuf binary.") # noinspection PyUnresolvedReferences blob = caffe.proto.caffe_pb2.BlobProto() blob.ParseFromString(image_mean_bytes) a = numpy.array(caffe.io.blobproto_to_array(blob)) assert a.shape[0] == 1, \ "Input image mean blob protobuf consisted of more than " \ "one image. Not sure how to handle this yet." a = a.reshape(a.shape[1:]) LOG.info("Loaded image mean from protobuf bytes") assert a.shape[0] in [1, 3], \ "Currently asserting that we either get 1 or 3 channel " \ "images. Got a %d channel image." % a[0] # TODO: Instead of always using pixel mean, try to use image-mean # if given. Might have to rescale if image/data layer shape # is different. a_mean = a.mean(1).mean(1) LOG.debug("Initializing data transformer -- mean") self.transformer.set_mean(self.data_layer, a_mean) LOG.debug("Initializing data transformer -- transpose") self.transformer.set_transpose(self.data_layer, (2, 0, 1)) if self.network_is_bgr: LOG.debug("Initializing data transformer -- channel swap") self.transformer.set_channel_swap(self.data_layer, (2, 1, 0)) if self.input_scale: LOG.debug("Initializing data transformer -- input scale") self.transformer.set_input_scale(self.data_layer, self.input_scale)
[docs] def get_config(self) -> Dict[str, Any]: """ Return a JSON-compliant dictionary that could be passed to this class's ``from_config`` method to produce an instance with identical configuration. In the common case, this involves naming the keys of the dictionary based on the initialization argument names as if it were to be passed to the constructor via dictionary expansion. :return: JSON type compliant configuration dictionary. """ image_mean_config: Optional[Dict] if self.image_mean is not None: image_mean_config = to_config_dict(self.image_mean) else: image_mean_config = None return { "network_prototxt": to_config_dict(self.network_prototxt), "network_model": to_config_dict(self.network_model), "image_mean": image_mean_config, "return_layer": self.return_layer, "batch_size": self.batch_size, "use_gpu": self.use_gpu, "gpu_device_id": self.gpu_device_id, "network_is_bgr": self.network_is_bgr, "data_layer": self.data_layer, "load_truncated_images": self.load_truncated_images, "pixel_rescale": self.pixel_rescale, "input_scale": self.input_scale, "threads": self.threads, }
[docs] def valid_content_types(self) -> Set[str]: """ :return: A set valid MIME type content types that this descriptor can handle. """ return { 'image/tiff', 'image/png', 'image/jpeg', 'image/bmp', }
def _generate_arrays(self, data_iter: Iterable[DataElement]) -> Iterable[numpy.ndarray]: """ Inner template method that defines the generation of descriptor vectors for a given iterable of data elements. Pre-conditions: - Data elements input to this method have been validated to be of at least one of this class's reported ``valid_content_types``. :param collections.abc.Iterable[DataElement] data_iter: Iterable of data element instances to be described. :raises RuntimeError: Descriptor extraction failure of some kind. :return: Iterable of numpy arrays in parallel association with the input data elements. :rtype: collections.abc.Iterable[numpy.ndarray] """ assert self.network is not None, ( "A network should be initialized by now." ) self._set_caffe_mode() log_debug = LOG.debug # Start parallel operation to pre-process imagery before aggregating # for network execution. # TODO: update ``buffer_factor`` param to account for batch size? img_array_iter = \ parallel_map(_process_load_img_array, zip( data_iter, itertools.repeat(self.transformer), itertools.repeat(self.data_layer), itertools.repeat(self.load_truncated_images), itertools.repeat(self.pixel_rescale), ), ordered=True, cores=self.threads) # Aggregate and process batches of input data elements #: :type: list[numpy.ndarray] batch_img_arrays = \ list(itertools.islice(img_array_iter, self.batch_size)) batch_i = 0 while len(batch_img_arrays) > 0: cur_batch_size = len(batch_img_arrays) log_debug("Batch {} - size {}".format(batch_i, cur_batch_size)) log_debug("Updating network data layer shape ({} images)" .format(cur_batch_size)) self.network.blobs[self.data_layer].reshape( cur_batch_size, *self.net_data_shape[1:4] ) log_debug("Loading image matrices into network layer '{:s}'" .format(self.data_layer)) self.network.blobs[self.data_layer].data[...] = batch_img_arrays log_debug("Moving network forward") self.network.forward() descriptor_list = self.network.blobs[self.return_layer].data log_debug("extracting return layer '{:s}' into vectors" .format(self.return_layer)) for v in descriptor_list: if v.ndim > 1: # In case caffe generates multidimensional array # like (rows, 1, 1) log_debug("- Raveling output array of shape {}" .format(v.shape)) yield numpy.ravel(v) else: yield v # Slice out the next batch #: :type: list[(collections.abc.Hashable, numpy.ndarray)] batch_img_arrays = \ list(itertools.islice(img_array_iter, self.batch_size)) batch_i += 1
def _process_load_img_array( input_tuple: Tuple[DataElement, "caffe.io.Transformer", str, bool, Optional[Tuple[float, float]]] ) -> Tuple[Hashable, numpy.ndarray]: """ Helper function for multiprocessing image data loading Expected input argument tuple contents (in tuple order): * data_element: DataElement providing bytes * transformer: Caffe Transformer instance for pre-processing. * data_layer: String label of the network's data layer * load_truncated_images: Boolean of whether loading truncated images is allowed (See PIL.ImageFile.LOAD_TRUNCATED_IMAGES attribute). * pixel_rescale: Pair of floating point values to recale image values into, i.e. [0, 255] (the default). :param input_tuple: Tuple of input arguments as we expect to be called by a multiprocessing map function. See above for content details. :return: Input DataElement UUID and Pre-processed numpy array. """ # data_element: DataElement providing bytes # transformer: Caffe Transformer instance for pre-processing. # data_layer: String label of the data layer (data_element, transformer, data_layer, load_truncated_images, pixel_rescale) = input_tuple PIL.ImageFile.LOAD_TRUNCATED_IMAGES = load_truncated_images img: PIL.Image.Image try: img = PIL.Image.open(BytesIO(data_element.get_bytes())) except Exception as ex_: logging.getLogger(__name__).error( "Failed opening image from data element {}. Exception ({}): {}" .format(data_element, type(ex_), str(ex_)) ) raise if img.mode != "RGB": img = img.convert("RGB") logging.getLogger(__name__).debug("Image: {}".format(img)) # Caffe natively uses float types (32-bit) try: # This can fail if the image is truncated and we're not allowing the # loading of those images img_a = numpy.asarray(img, numpy.float32) except Exception as ex_: logging.getLogger(__name__).error( "Failed array-ifying data element {}. Image may be truncated. " "Exception ({}): {}" .format(data_element, type(ex_), str(ex_)) ) raise assert img_a.ndim == 3, \ "Loaded invalid RGB image with shape {:s}".format(img_a.shape) if pixel_rescale: pmin, pmax = min(pixel_rescale), max(pixel_rescale) r = pmax - pmin img_a = (img_a / (255. / r)) + pmin img_at = transformer.preprocess(data_layer, img_a) return img_at