Source code for core.pipeline

#! /usr/bin/env python
'''
a library of core functions which define the image processing/learning pipeline
'''
from __future__ import division, with_statement, print_function
from itertools import *
import os
import re
from copy import copy
import multiprocessing
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from pandas.io.pytables import HDFStore
from sklearn.preprocessing import StandardScaler
from sklearn.cross_validation import train_test_split
import PIL
import cv2
from core.image_scanner import ImageScanner
from core.utils import *
# ------------------------------------------------------------------------------

# info utils
[docs]def get_info(source, spec=['name', 'extension'], sep=None, ignore=['\.DS_Store']): ''' creates a descriptive DataFrame based upon files contained with a source directory Args: source (str): fullpath to directory of files spec opt(list): naming specification of files default: ['name', 'extension'] sep opt(str): regular expression with which to seperate filename components recommended value: '\.' default: None (split name from extension) ignore opt(list): list of regex patterns used for ignoring files default: ['\.DS_Store'] Returns: DataFrame: an info DataFrame ''' spec = copy(spec) images = [source] if os.path.isdir(source): images = os.listdir(source) for regex in ignore: images = filter(lambda x: not re.search(regex, x), images) data = [] for image in sorted(images): datum = [] if sep: datum.extend(re.split(sep, image)) else: temp = os.path.splitext(image) datum.append(temp[0]) datum.append(temp[1].strip('.')) datum.append(os.path.join(source, image)) data.append(datum) spec.append('source') return DataFrame(data, columns=spec)
[docs]def info_split(info, test_size=0.2): ''' split info object by rows into train and test objects Args: info: (DataFrame): info object to split test_size opt(float): percentage of info indices that will be allocated to the test object Returns: 2 DataFrames: train, test ''' def _info_split(info, test_size=0.2): train_x, test_x, train_y, test_y = train_test_split(info, info.label, test_size=test_size) return DataFrame(train_x, columns=info.columns), DataFrame(test_x, columns=info.columns) train = [] test = [] for name in info.label.unique(): x, y = _info_split(info[info.label == name], test_size=test_size) train.append(x) test.append(y) return pd.concat(train, axis=0), pd.concat(test, axis=0)
# ------------------------------------------------------------------------------
[docs]def process_data(info, features=['r', 'g', 'b', 'h', 's', 'v', 'fft_std', 'fft_max']): ''' processes images listed in a given info object into usable data Args: info (DataFrame): info object containing 'source', 'label' and 'params' columns features opt(list): list of features to include in the ouput data default: ['r', 'g', 'b', 'h', 's', 'v', 'fft_std', 'fft_max'] Returns: DataFrame: processed image data ''' # create data from info data = info.copy() data.reset_index(drop=True, inplace=True) data = data[['source', 'label', 'params']] err = data.source.tolist() data.source = data.source.apply(lambda x: PIL.Image.open(x)) data = data.apply(lambda x: generate_samples(x['source'], x['label'], x['params']), axis=1 ) # create new expanded dataframe data = list(chain(*data.tolist())) data = DataFrame(data, columns=['x', 'y', 'params']) data['bgr'] = data.x.apply(pil_to_opencv) del data['x'] # create feature lists rgb = filter(lambda x: x in list('rgb'), features) hsv = filter(lambda x: x in list('hsv'), features) fft = filter(lambda x: x in ['fft_std', 'fft_max'], features) # rgb distributions if rgb: temp = data[['bgr', 'params']].apply(lambda x: (x['bgr'], x['params']), axis=1) for chan in rgb: chan_data = temp.apply(lambda x: get_channel_histogram(x[0], chan, **x[1])) # data[chan] = chan_data.apply(lambda x: x.tolist()) create_histogram_stats(data, chan_data, chan) # hsv distributions if hsv: try: data['hsv'] = data.bgr.apply(lambda x: cv2.cvtColor(x, cv2.COLOR_BGR2HSV)) except: print(err) raise SyntaxError temp = data[['hsv', 'params']].apply(lambda x: (x['hsv'], x['params']), axis=1) for chan in hsv: chan_data = temp.apply(lambda x: get_channel_histogram(x[0], chan, **x[1])) # data[chan] = chan_data.apply(lambda x: x.tolist()) create_histogram_stats(data, chan_data, chan) del data['hsv'] # grain frequency if fft: data['gray'] = data.bgr.apply(lambda x: cv2.cvtColor(x, cv2.COLOR_BGR2GRAY)) data.gray = data.gray.apply(lambda x: np.fft.hfft(x).astype(float)) data.gray = data.gray.apply(lambda x: np.histogram(x.ravel(), bins=256)[0]) if 'fft_std' in fft: data['fft_std'] = data.gray.apply(lambda x: x.std()) if 'fft_max' in fft: data['fft_max'] = data.gray.apply(lambda x: x.max()) del data['gray'] del data['bgr'] del data['params'] # shuffle data to destroy serial correlations index = data.index.tolist() np.random.shuffle(index) data = data.ix[index] data.reset_index(drop=True, inplace=True) # Normalize features cols = data.drop('y', axis=1).columns.tolist() ss = StandardScaler() data[cols] = ss.fit_transform(data[cols]) return data
# multiproceesing def _multi_get_data(args): ''' private function used for multiprocessing ''' return process_data(args[0], features=args[1]) def _batch_get_data(info, multiprocess=True, processes=24, features=['r', 'g', 'b', 'h', 's', 'v', 'fft_std', 'fft_max']): ''' private function used for batch processing ''' if not multiprocess: return process_data(info, features=features) pool = multiprocessing.Pool(processes=processes) iterable = [(row.to_frame().T, features) for i, row in info.iterrows()] data = pool.map(_multi_get_data, iterable) pool.terminate() # data = map(_multi_get_data, iterable) # for debugging data = pd.concat(data, axis=0) return data
[docs]def get_data(info, hdf_path=None, multiprocess=True, processes=24, features=['r', 'g', 'b', 'h', 's', 'v', 'fft_std', 'fft_max']): ''' generates machine-learning-ready data from an info object Args: info (DataFrame): info object containing 'source', 'label' and 'params' columns hdf_path opt(str): fullpath of the file with which to store generated data default: None multiprocess opt(bool): use multiprocessing default: True processes opt(int): number of processes to employ for multiprocessing default: 24 features opt(list): list of features to include in the ouput data default: ['r', 'g', 'b', 'h', 's', 'v', 'fft_std', 'fft_max'] Returns: DataFrame: machine-learning-ready data ''' if not multiprocess: return process_data(info, features=features) # irregular index screws up index iterations info = info.copy() info.reset_index(drop=True, inplace=True) kwargs = { 'features': features, 'multiprocess': multiprocess, 'processes': processes } batch_path = os.path.join('/var/tmp', 'hdf_batch') if os.path.exists(batch_path): os.remove(batch_path) os.mkdir(batch_path) n = info.shape[0] indices = range(0, n, processes) indices.append(n) indices = zip(indices, indices[1:]) for i, (start, stop) in enumerate(indices): batch = info.ix[start:stop] data = _batch_get_data(batch, **kwargs) filename = 'data.' + str(i).zfill(4) + '.hdf.batch' fullpath = os.path.join(batch_path, filename) hdf = HDFStore(fullpath) hdf['data'] = data hdf.close() print('indices {:>5} - {:<5} written to {:<5}'.format(start, stop, fullpath)) batch = filter(lambda x: '.hdf.batch' in x, os.listdir(batch_path)) batch = [os.path.join(batch_path, x) for x in batch] data = [pd.read_hdf(x, 'data') for x in batch] data = pd.concat(data, axis=0, ignore_index=True) # shuffle data to destroy serial correlations index = data.index.tolist() np.random.shuffle(index) data = data.ix[index] data.reset_index(drop=True, inplace=True) if hdf_path: hdf = HDFStore(hdf_path) hdf['data'] = data hdf.close() return data
# ------------------------------------------------------------------------------
[docs]def compile_predictions(pred): ''' groups predictions made on patches of an image into a set of labels and confidences Args: pred (array-like): output from call to [some sklearn model].predict Returns: DataFrame: compiled predictions ''' data = DataFrame() data['yhat'] = pred data['confidence'] = 1.0 data = data.groupby('yhat').agg(lambda x: x.sum() / data.shape[0]) data.sort('confidence', ascending=False, inplace=True) data['label'] = data.index data.reset_index(drop=True, inplace=True) return data
# ------------------------------------------------------------------------------
[docs]def archive_data(train_info, test_info, hdf_path, cross_val=True, multiprocess=True, processes=24, features=['r', 'g', 'b', 'h', 's', 'v', 'fft_max', 'fft_std']): ''' convenience function for archive train, validate and test data Args: train_info (DataFrame): info object to use for training test_info (DataFrame): info object to use for testing hdf_path (str): fullpath of file with which to store data cross_val opt(bool): use cross validation default: True multiprocess opt(bool): use multiprocessing default: True processes opt(int): number of processes to employ for multiprocessing default: 24 features opt(list): list of features to include in the ouput data default: ['r', 'g', 'b', 'h', 's', 'v', 'fft_std', 'fft_max'] Returns: train_x, valid_x, test_x, train_y, valid_y, test_y: DataFrames train_x, test_x, train_y, test_y if cross_val=False ''' kwargs = { 'features': features, 'write': False, 'multiprocess': multiprocess, 'processes': processes } hdf = HDFStore(hdf_path) os.mkdir(hdf_path) batch = os.path.join(hdf_path, '.train') os.mkdir(batch) train = get_data(train_info, hdf_path=batch, **kwargs) train_x = train.drop('y', axis=1) train_y = train.y if cross_val: train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, test_size=0.2) hdf['train_x'] = DataFrame(train_x) hdf['valid_x'] = DataFrame(valid_x) hdf['train_y'] = Series(train_y) hdf['valid_y'] = Series(valid_y) else: hdf['train_x'] = train_x hdf['train_y'] = train_y batch = os.path.join(hdf_path, '.test') os.mkdir(batch) test = get_data(test_info, hdf_path=batch, **kwargs) test_x = test.drop('y', axis=1) test_y = test.y hdf['test_x'] = test_x hdf['test_y'] = test_y hdf.close() if cross_val: return train_x, valid_x, test_x, train_y, valid_y, test_y return train_x, test_x, train_y, test_y
[docs]def read_archive(hdf_path, items=['train_x', 'valid_x', 'test_x', 'train_y', 'valid_y', 'test_y']): ''' convenience function used for retrieving data within a hdf archive Args: hdf_path (str): fullpath of file which data is stored in items opt(list): items to be retrieved default: ['train_x', 'valid_x', 'test_x', 'train_y', 'valid_y', 'test_y'] ''' hdf = HDFStore(hdf_path) output = map(lambda x: hdf[x], items) hdf.close() return output
# ------------------------------------------------------------------------------ __all__ = [ 'get_info', 'info_split', 'process_data', 'get_data', 'compile_predictions', 'archive_data', 'read_archive' ] def main(): pass if __name__ == '__main__': help(main)