"""
Basic tools for setting up a function to be applied over a raster processing chain.
The :class:`~hubdc.applier.Applier` class is the main point of entry in this module.
See :doc:`ApplierExamples` for more information.
"""
from __future__ import print_function
import sys
from multiprocessing import Pool
from timeit import default_timer as now
import numpy
from osgeo import gdal
from osgeo.gdal_array import NumericTypeCodeToGDALTypeCode
from hubdc.model import Open, OpenLayer, CreateFromArray
from hubdc import const
from hubdc.writer import Writer, WriterProcess, QueueMock
from hubdc.model import PixelGrid
from hubdc.progressbar import CUIProgressBar, SilentProgressBar, ProgressBar
DEFAULT_INPUT_RESAMPLEALG = gdal.GRA_NearestNeighbour
DEFAULT_INPUT_ERRORTHRESHOLD = 0.
DEFAULT_INPUT_WARPMEMORYLIMIT = 100*2**20
DEFAULT_INPUT_MULTITHREAD = False
[docs]class Applier(object):
def __init__(self, controls=None):
self.inputs = dict()
self.vectors = dict()
self.outputs = dict()
self.controls = controls if controls is not None else ApplierControls()
self.grid = None
[docs] def setVector(self, name, filename, layer=0):
"""
Define a new input vector layer named ``name``, that is located at ``filename``.
:param name: name of the vector layer
:param filename: filename of the vector layer
:type filename:
:param layer: specify the layer to be used from the vector datasource
"""
self.vectors[name] = ApplierVector(filename=filename, layer=layer)
[docs] def setOutput(self, name, filename, format=None, creationOptions=None):
"""
Define a new output raster named ``name``, that will be created at ``filename``.
:param name: name of the raster
:param filename: filename of the raster
:param format: see `GDAL Raster Formats <http://www.gdal.org/formats_list.html>`_
:param creationOptions: see the `Creation Options` section for a specific `GDAL Raster Format`.
Predefined default creation options are used if set to ``None``, see
:meth:`hubdc.applier.ApplierOutput.getDefaultCreationOptions` for details.
:type creationOptions: list of strings
"""
self.outputs[name] = ApplierOutput(filename=filename, format=format, creationOptions=creationOptions)
[docs] def setOutputList(self, name, filenames, format=None, creationOptions=None):
"""
Define a new list of output rasters named ``name``, that are located at the ``filenames``.
For each filename a new output raster named ``(name, i)`` is added using :meth:`hubdc.applier.Applier.setOutput`.
"""
for i, filename in enumerate(filenames):
self.setOutput(name=(name, i), filename=filename, format=format, creationOptions=creationOptions)
[docs] def apply(self, operator, description=None, *ufuncArgs, **ufuncKwargs):
"""
Applies the ``operator`` blockwise over a raster processing chain and returns a list of results, one for each block.
The ``operator`` must be a subclass of :class:`~hubdc.applier.ApplierOperator` and needs to implement the
:meth:`~hubdc.applier.ApplierOperator.ufunc` method to specify the image processing.
For example::
class MyOperator(ApplierOperator):
def ufunc(self):
# process the data
applier.apply(operator=MyOperator)
or::
def my_ufunc(operator):
# process the data
applier.apply(operator=my_ufunc)
:param operator: applier operator
:type operator: :class:`~hubdc.applier.ApplierOperator` or function
:param description: short description that is displayed on the progress bar
:param ufuncArgs: additional arguments that will be passed to the operators ufunc() method.
:param ufuncKwargs: additional keyword arguments that will be passed to the operators ufunc() method.
:return: list of results, one for each processed block
"""
import inspect
if inspect.isclass(operator):
self.ufuncClass = operator
self.ufuncFunction = None
elif callable(operator):
self.ufuncClass = ApplierOperator
self.ufuncFunction = operator
else:
raise ValueError('operator must be a class or callable')
self.ufuncArgs = ufuncArgs
self.ufuncKwargs = ufuncKwargs
runT0 = now()
if description is None:
description = operator.__name__
self._runCreateGrid()
self.controls.progressBar.setLabelText('start {} [{}x{}]'.format(description, self.grid.xSize, self.grid.ySize))
self._runInitWriters()
self._runInitPool()
results = self._runProcessSubgrids()
self._runClose()
self.controls.progressBar.setLabelText('100%')
s = (now()-runT0); m = s/60; h = m/60
self.controls.progressBar.setLabelText('done {description} in {s} sec | {m} min | {h} hours'.format(description=description, s=int(s), m=round(m, 2), h=round(h, 2))); sys.stdout.flush()
return results
def _runCreateGrid(self):
if self.controls.referenceGrid is not None:
self.grid = self.controls.referenceGrid
else:
self.grid = self.controls._makeAutoGrid(inputs=self.inputs)
def _runInitWriters(self):
self.writers = list()
self.queues = list()
self.queueMock = QueueMock()
if self.controls._multiwriting:
for w in range(max(1, self.controls.nwriter)):
w = WriterProcess()
w.start()
self.writers.append(w)
self.queues.append(w.queue)
self.queueByFilename = self._getQueueByFilenameDict()
def _runInitPool(self):
if self.controls._multiprocessing:
writers, self.writers = self.writers, None # writers arn't pickable, need to detache them before passing self to Pool initializer
self.pool = Pool(processes=self.controls.nworker, initializer=_pickableWorkerInitialize, initargs=(self,))
self.writers = writers # put writers back
else:
_Worker.initialize(applier=self)
def _runProcessSubgrids(self):
subgrids = self.grid.subgrids(windowxsize=self.controls.windowxsize,
windowysize=self.controls.windowysize)
if self.controls._multiprocessing:
applyResults = list()
else:
results = list()
self.controls.progressBar.setTotalSteps(len(subgrids))
for i, subgrid in enumerate(subgrids):
kwargs = {'i': i,
'n': len(subgrids),
'subgrid': subgrid}
if self.controls._multiprocessing:
applyResults.append(self.pool.apply_async(func=_pickableWorkerProcessSubgrid, kwds=kwargs))
else:
results.append(_Worker.processSubgrid(**kwargs))
if self.controls._multiprocessing:
results = [applyResult.get() for applyResult in applyResults]
return results
def _getQueueByFilenameDict(self):
def lessFilledQueue():
lfq = self.queues[0]
for q in self.queues:
if lfq.qsize() > q.qsize():
lfq = q
return lfq
queueByFilename = dict()
for output in self.outputs.values():
if self.controls._multiwriting:
queueByFilename[output.filename] = lessFilledQueue()
else:
queueByFilename[output.filename] = self.queueMock
return queueByFilename
def _runClose(self):
if self.controls._multiprocessing:
self.pool.close()
self.pool.join()
if self.controls._multiwriting:
for writer in self.writers:
writer.queue.put([Writer.CLOSE_DATASETS, self.controls.createEnviHeader])
writer.queue.put([Writer.CLOSE_WRITER, None])
writer.join()
else:
self.queueMock.put([Writer.CLOSE_DATASETS, self.controls.createEnviHeader])
class _Worker(object):
"""
For internal use only.
"""
queues = list()
inputDatasets = dict()
inputOptions = dict()
outputFilenames = dict()
outputOptions = dict()
applier = None
operator = None
def __init__(self):
raise Exception('singleton class')
@classmethod
def initialize(cls, applier):
gdal.SetCacheMax(applier.controls.cacheMax)
gdal.SetConfigOption('GDAL_SWATH_SIZE', str(applier.controls.swathSize))
gdal.SetConfigOption('GDAL_DISABLE_READDIR_ON_OPEN', str(applier.controls.disableReadDirOnOpen))
gdal.SetConfigOption('GDAL_MAX_DATASET_POOL_SIZE', str(applier.controls.maxDatasetPoolSize))
assert isinstance(applier, Applier)
cls.applier = applier
cls.inputDatasets = dict()
cls.inputOptions = dict()
cls.inputFilenames = dict()
cls.outputFilenames = dict()
cls.outputOptions = dict()
cls.vectorDatasets = dict()
cls.vectorFilenames = dict()
cls.vectorLayers = dict()
# prepare datasets of current main grid without opening
for key, applierInput in applier.inputs.items():
assert isinstance(applierInput, ApplierInput)
cls.inputDatasets[key] = None
cls.inputFilenames[key] = applierInput.filename
cls.inputOptions[key] = applierInput.options
for key, applierOutput in applier.outputs.items():
assert isinstance(applierOutput, ApplierOutput)
cls.outputFilenames[key] = applierOutput.filename
cls.outputOptions[key] = applierOutput.options
for key, applierVector in applier.vectors.items():
assert isinstance(applierVector, ApplierVector)
cls.vectorDatasets[key] = None
cls.vectorFilenames[key] = applierVector.filename
cls.vectorLayers[key] = applierVector.layer
# create operator
cls.operator = applier.ufuncClass(maingrid=applier.grid,
inputDatasets=cls.inputDatasets, inputFilenames=cls.inputFilenames, inputOptions=cls.inputOptions,
outputFilenames=cls.outputFilenames, outputOptions=cls.outputOptions,
vectorDatasets=cls.vectorDatasets, vectorFilenames=cls.vectorFilenames, vectorLayers=cls.vectorLayers,
queueByFilename=applier.queueByFilename,
progressBar=applier.controls.progressBar,
ufuncFunction=applier.ufuncFunction, ufuncArgs=applier.ufuncArgs, ufuncKwargs=applier.ufuncKwargs)
@classmethod
def processSubgrid(cls, i, n, subgrid):
cls.operator.progressBar.setProgress(i)
return cls.operator._apply(subgrid=subgrid, iblock=i, nblock=n)
def _pickableWorkerProcessSubgrid(**kwargs):
return _Worker.processSubgrid(**kwargs)
def _pickableWorkerInitialize(*args):
return _Worker.initialize(*args)
[docs]class ApplierOutput(object):
"""
Data structure for storing output specifications defined by :meth:`hubdc.applier.Applier.setOutput`.
For internal use only.
"""
DEFAULT_FORMAT = 'ENVI'
@staticmethod
[docs] def getDefaultCreationOptions(format):
if format.lower() == 'ENVI'.lower():
return ['INTERLEAVE=BSQ']
elif format.lower() == 'GTiff'.lower():
return ['COMPRESS=LZW', 'INTERLEAVE=BAND', 'TILED=YES',
'BLOCKXSIZE=256', 'BLOCKYSIZE=256',
'SPARSE_OK=TRUE', 'BIGTIFF=YES']
return []
def __init__(self, filename, format=None, creationOptions=None):
if format is None:
format = self.DEFAULT_FORMAT
if creationOptions is None:
creationOptions = self.getDefaultCreationOptions(format)
self.filename = filename
self.options = {'format': format, 'creationOptions': creationOptions}
[docs]class ApplierVector(object):
"""
Data structure for storing input specifications defined by :meth:`hubdc.applier.Applier.setVector`.
For internal use only.
"""
def __init__(self, filename, layer=0):
self.filename = filename
self.layer = layer
[docs]class ApplierOperator(object):
"""
This is the baseclass for an user defined applier operator.
For details on user defined operators see :meth:`hubdc.applier.Applier.apply`
"""
def __init__(self, maingrid, inputDatasets, inputFilenames, inputOptions,
outputFilenames, outputOptions,
vectorDatasets, vectorFilenames, vectorLayers,
progressBar, queueByFilename, ufuncArgs, ufuncKwargs, ufuncFunction=None):
assert isinstance(maingrid, PixelGrid)
self._grid = None
self._maingrid = maingrid
self._inputDatasets = inputDatasets
self._inputFilenames = inputFilenames
self._inputOptions = inputOptions
self._outputFilenames = outputFilenames
self._outputOptions = outputOptions
self._vectorDatasets = vectorDatasets
self._vectorFilenames = vectorFilenames
self._vectorLayers = vectorLayers
self._queueByFilename = queueByFilename
self._progressBar = progressBar
self._ufuncArgs = ufuncArgs
self._ufuncKwargs = ufuncKwargs
self._ufuncFunction = ufuncFunction
self._iblock = 0
self._nblock = 0
@property
def grid(self):
"""
Returns the :class:`~hubdc.applier.PixelGrid` object of the currently processed block.
"""
assert isinstance(self._grid, PixelGrid)
return self._grid
@property
def progressBar(self):
"""
Returns the progress bar.
"""
assert isinstance(self._progressBar, ProgressBar)
return self._progressBar
[docs] def isFirstBlock(self):
"""
Returns wether or not the current block is the first one.
"""
return self._iblock == 0
[docs] def isLastBlock(self):
"""
Returns wether or not the current block is the last one.
"""
return self._iblock == self._nblock - 1
[docs] def getSample(self, array, mask):
"""
Returns a data sample taken from ``array`` at locations given by ``mask``.
:param array: data array [n, y, x] to be sampled from
:param mask: boolean mask array [1, y, x] indicating the locations to be sampled
:return: data sample [n, masked]
"""
assert isinstance(array, numpy.ndarray) and array.ndim == 3
assert isinstance(mask, numpy.ndarray) and mask.ndim == 3 and mask.shape[0] == 1
sample = array[:, mask[0]]
assert sample.ndim == 2
return sample
[docs] def setSample(self, sample, array, mask):
"""
Sets a data sample given by ``sample`` to ``array`` at locations given by ``mask``.
:param sample: data sample [n, masked]
:param array: data array [n, y, x] to be updated
:param mask: boolean mask array [1, y, x] indicating the locations to be updated
"""
assert isinstance(array, numpy.ndarray) and array.ndim == 3
assert isinstance(mask, numpy.ndarray) and mask.ndim == 3 and mask.shape[0] == 1
array[:, mask[0]] = sample
[docs] def applySampleFunction(self, inarray, outarray, mask, ufunc):
"""
Shortcut for :meth:`hubdc.applier.Applier.getSample` -> ufunc() -> :meth:`hubdc.applier.Applier.setSample` pipeline.
Ufunc must accept and return a valid data sample [n, masked].
"""
if numpy.any(mask):
sample = self.getSample(array=inarray, mask=mask)
result = ufunc(sample)
self.setSample(sample=result, array=outarray, mask=mask)
[docs] def getFull(self, value, bands=1, dtype=None, overlap=0):
return numpy.full(shape=(bands, self.grid.ySize+2*overlap, self.grid.xSize+2*overlap),
fill_value=value, dtype=dtype)
[docs] def getArray(self, name, indicies=None, overlap=0, dtype=None, scale=None):
"""
Returns the input raster image data of the current block in form of a 3-d numpy array.
The ``name`` identifier must match the identifier used with :meth:`hubdc.applier.Applier.setInput`.
:param name: input raster name
:param indicies: subset image bands by a list of indicies
:param overlap: the amount of margin (number of pixels) added to the image data block in each direction, so that the blocks will overlap; this is important for spatial operators like filters.
:param dtype: convert image data to given numpy data type (this is done before the data is scaled)
:param scale: scale image data by given scale factor (this is done after type conversion)
"""
if indicies is None:
array = self._getImage(name=name, overlap=overlap, dtype=dtype, scale=scale)
elif isinstance(indicies, (list, tuple)):
array = self._getBandSubset(name=name, overlap=overlap, indicies=indicies, dtype=dtype)
else:
raise ValueError('wrong value for indicies or wavelength')
assert isinstance(array, numpy.ndarray)
assert array.ndim == 3
return array
[docs] def getMaskArray(self, name, noData=None, ufunc=None, array=None, overlap=0):
"""
Returns a boolean data/noData mask for an input raster image in form of a 3-d numpy array.
Pixels that are equal to the image no data value are set to False, all other pixels are set to True.
In case of a multiband image, the final pixel mask value is False if it was False in all of the bands.
The ``name`` identifier must match the identifier used with :meth:`hubdc.applier.Applier.setInput`.
:param name: input raster name
:param noData: default no data value to use if the image has no no data value specified
:param ufunc: user function to mask out further pixels,
e.g. ``ufunc = lambda array: array > 0.5`` will additionally mask out all pixels that have values larger than 0.5
:param array: pass in the data array directly if it was already loaded
:param overlap: see :meth:`~hubdc.applier.ApplierOperator.getArray`
:param ufunc: see :meth:`~hubdc.applier.ApplierOperator.getArray`
:return:
:rtype:
"""
# read array data if needed
noData = self.getNoDataValue(name, default=noData)
if (noData is not None) or (ufunc is not None):
if array is None:
array = self.getArray(name, overlap=overlap)
# create noData mask
if noData is None:
valid = self.getFull(value=True, bands=1)
else:
valid = numpy.any(array != noData, axis=0, keepdims=True)
# update noData mask with ufunc mask
if ufunc is not None:
numpy.logical_and(valid, numpy.any(ufunc(array), axis=0, keepdims=True), out=valid)
return valid
[docs] def getWavebandArray(self, name, wavelengths, linear=False, overlap=0, dtype=None, scale=None):
"""
Returns an image band subset like :meth:`~hubdc.applier.ApplierOperator.getArray` with specified ``indicies``,
but instead of specifying the bands directly, specify a list of target wavelength.
:param name: input raster name
:param wavelengths: list of target wavelengths specified in nanometers
:param linear: if set to True, linearly interpolated wavebands are returned instead of nearest neighbour wavebands.
:param overlap: see :meth:`~hubdc.applier.ApplierOperator.getArray`
:param dtype: see :meth:`~hubdc.applier.ApplierOperator.getArray`
:param scale: see :meth:`~hubdc.applier.ApplierOperator.getArray`
"""
if not linear:
indicies = [index for index, distance in [self.findWavelength(name=name, wavelength=wavelength) for wavelength in wavelengths]]
array = self.getArray(name=name, overlap=overlap, indicies=indicies, dtype=dtype, scale=scale)
else:
tmp = [self.findWavelengthNeighbours(name=name, wavelength=wavelength) for wavelength in wavelengths]
leftIndicies, leftWeights, rightIndicies, rightWeights = zip(*tmp)
leftArray = lambda : self.getArray(name=name, overlap=overlap, indicies=leftIndicies, dtype=dtype, scale=scale)
rightArray = lambda : self.getArray(name=name, overlap=overlap, indicies=rightIndicies, dtype=dtype, scale=scale)
array = leftArray() * numpy.array(leftWeights).reshape((-1,1,1))
array += rightArray() * numpy.array(rightWeights).reshape((-1,1,1))
assert array.ndim == 3
return array
[docs] def getDerivedArray(self, name, ufunc, resampleAlg=gdal.GRA_Average, overlap=0):
"""
Returns input raster image data like :meth:`~hubdc.applier.ApplierOperator.getArray`,
but instead of returning the data directly, a user defined function is applied to it.
Note that the user function is applied before resampling takes place.
"""
return self._getDerivedImage(name=name, ufunc=ufunc, resampleAlg=resampleAlg, overlap=overlap)
[docs] def getCategoricalFractionArray(self, name, ids, index=0, overlap=0):
"""
Returns input raster band data like :meth:`~hubdc.applier.ApplierOperator.getArray`,
but instead of returning the data directly, aggregated pixel fractions for the given categories (i.e. ``ids``)
are returned.
:param name: categorical input raster image
:param ids: categories to be considered
:param index: band index for specifying which image band to be used
:param overlap: see :meth:`~hubdc.applier.ApplierOperator.getArray`
"""
ufunc = lambda array: numpy.stack(numpy.float32(array[index] == id) for id in ids)
return self.getDerivedArray(name=name, ufunc=ufunc, overlap=overlap, resampleAlg=gdal.GRA_Average)
[docs] def getCategoricalArray(self, name, ids, noData=0, minOverallCoverage=0., minWinnerCoverage=0., index=0, overlap=0):
"""
Returns input raster band data like :meth:`~hubdc.applier.ApplierOperator.getArray`,
but instead of returning the data directly, the category array of maximum aggregated pixel fraction for the given categories (i.e. ``ids``)
is returned.
:param name: categorical input raster image
:param ids: categories to be considered
:param noData: no data value for masked pixels
:param minOverallCoverage: mask out pixels where the overall coverage (regarding to the given categories ``ids``) is less than this threshold
:param minWinnerCoverage: mask out pixels where the winner category coverage is less than this threshold
:param index: band index for specifying which image band to be used
:param overlap: see :meth:`~hubdc.applier.ApplierOperator.getArray`
"""
fractions = self.getCategoricalFractionArray(name=name, ids=ids, index=index, overlap=overlap)
categories = numpy.array(ids)[numpy.argmax(fractions, axis=0)[None]]
if noData is not None:
invalid = False
if minOverallCoverage > 0:
invalid += numpy.sum(fractions, axis=0, keepdims=True) < minOverallCoverage
if minWinnerCoverage > 0:
invalid += numpy.max(fractions, axis=0, keepdims=True) < minWinnerCoverage
categories[invalid] = noData
assert categories.ndim == 3
return categories
[docs] def getProbabilityArray(self, name, overlap=0):
"""
Like :meth:`~hubdc.applier.ApplierOperator.getCategoricalFractionArray`, but all category related information is
implicitly taken from the class definition metadata.
:param name: classification input raster image
:param overlap: see :meth:`~hubdc.applier.ApplierOperator.getArray`
"""
classes, classNames, classLookup = self.getMetadataClassDefinition(name=name)
ids = range(1, classes+1)
return self.getCategoricalFractionArray(name=name, ids=ids, index=0, overlap=overlap)
[docs] def getClassificationArray(self, name, minOverallCoverage=0., minWinnerCoverage=0., overlap=0):
"""
Like :meth:`~hubdc.applier.ApplierOperator.getCategoricalArray`, but all category related information is
implicitly taken from the class definition metadata.
:param name: classification input raster image
:param minOverallCoverage: see :meth:`~hubdc.applier.ApplierOperator.getCategoricalArray`
:param minWinnerCoverage: see :meth:`~hubdc.applier.ApplierOperator.getCategoricalArray`
:param overlap: see :meth:`~hubdc.applier.ApplierOperator.getArray`
"""
classes, classNames, classLookup = self.getMetadataClassDefinition(name=name)
ids = range(1, classes)
return self.getCategoricalArray(name=name, ids=ids, noData=0, minOverallCoverage=minOverallCoverage, minWinnerCoverage=minWinnerCoverage, index=0, overlap=overlap)
[docs] def getOutputListSubnames(self, name):
"""
Returns an iterator over the subnames of a output raster list.
Such subnames can be used with single output raster methods like :meth:`~hubdc.applier.ApplierOperator.setArray`
or :meth:`~hubdc.applier.ApplierOperator.setMetadataItem`.
"""
for subname in self._getSubnames(name, self._outputFilenames):
yield subname
[docs] def getOutputListLen(self, name):
"""
Returns the lenth of an output raster list.
"""
return len(list(self.getOutputListSubnames(name)))
def _getSubnames(self, name, subnames):
i = 0
if (name, 0) not in subnames:
raise KeyError('{name} is not an image list'.format(name=name))
while True:
if (name, i) in subnames:
yield (name, i)
i += 1
else:
break
[docs] def findWavelengthNeighbours(self, name, wavelength):
"""
Returns the image band indices and inverse distance weigths of the bands that are located around the target wavelength specified in nanometers.
The sum of ``leftWeight`` and ``rightWeight`` is 1.
The wavelength information is taken from the ENVI metadata domain.
An exception is thrown if the ``wavelength`` and ``wavelength units`` metadata items are not correctly specified.
:return: leftIndex, leftWeight, rightIndex, rightWeight
"""
wavelengths = self.getMetadataWavelengths(name)
leftWavelengths = wavelengths.copy()
leftWavelengths[wavelengths > wavelength] = numpy.nan
rightWavelengths = wavelengths.copy()
rightWavelengths[wavelengths < wavelength] = numpy.nan
if numpy.all(numpy.isnan(leftWavelengths)):
leftIndex = None
leftDistance = numpy.inf
else:
leftIndex = numpy.nanargmin((leftWavelengths - wavelength)**2)
leftDistance = abs(wavelengths[leftIndex] - wavelength)
if numpy.all(numpy.isnan(rightWavelengths)):
rightIndex = None
rightDistance = numpy.inf
else:
rightIndex = numpy.nanargmin((rightWavelengths - wavelength)**2)
rightDistance = abs(wavelengths[rightIndex] - wavelength)
if leftIndex is None:
leftIndex = rightIndex
leftWeight = 1.
rightWeight = 0.
else:
if (leftDistance+rightDistance) != 0.:
leftWeight = 1. - (leftDistance) / (leftDistance+rightDistance)
else:
leftWeight = 0.5
rightWeight = 1. - leftWeight
if rightIndex is None:
rightIndex = leftIndex
return leftIndex, leftWeight, rightIndex, rightWeight
[docs] def findWavelength(self, name, wavelength):
"""
Returns the image band index for the waveband that is nearest to the target wavelength specified in nanometers,
and also returns the absolute distance (in nanometers).
The wavelength information is taken from the ENVI metadata domain.
An exception is thrown if the ``wavelength`` and ``wavelength units`` metadata items are not correctly specified.
:return: index, distance
"""
wavelengths = self.getMetadataWavelengths(name)
distances = numpy.abs(wavelengths - wavelength)
index = numpy.argmin(distances)
return index, distances[index]
def _getInputDataset(self, name):
if name not in self._inputDatasets:
raise Exception('{name} is not a single image input'.format(name=name))
if self._inputDatasets[name] is None:
self._inputDatasets[name] = Open(filename=self._inputFilenames[name])
return self._inputDatasets[name]
def _getInputOptions(self, name):
return self._inputOptions[name]
def _getImage(self, name, overlap, dtype, scale):
dataset = self._getInputDataset(name)
options = self._getInputOptions(name)
grid = self.grid.pixelBuffer(buffer=overlap)
if self.grid.equalProjection(dataset.pixelGrid):
datasetResampled = dataset.translate(dstPixelGrid=grid, dstName='', format='MEM',
resampleAlg=options['resampleAlg'],
noData=options['noData'])
else:
datasetResampled = dataset.warp(dstPixelGrid=grid, dstName='', format='MEM',
resampleAlg=options['resampleAlg'],
errorThreshold=options['errorThreshold'],
warpMemoryLimit=options['warpMemoryLimit'],
multithread=options['multithread'],
srcNodata=options['noData'])
array = datasetResampled.readAsArray(dtype=dtype, scale=scale)
datasetResampled.close()
return array
def _getDerivedImage(self, name, ufunc, resampleAlg, overlap):
dataset = self._getInputDataset(name)
options = self._getInputOptions(name).copy()
options['resampleAlg'] = resampleAlg
grid = self.grid.pixelBuffer(buffer=overlap)
# create tmp dataset
gridInSourceProjection = grid.reproject(dataset.pixelGrid)
tmpDataset = dataset.translate(dstPixelGrid=gridInSourceProjection, dstName='', format='MEM',
noData=options['noData'])
tmpArray = tmpDataset.readAsArray()
derivedArray = ufunc(tmpArray)
derivedDataset = CreateFromArray(pixelGrid=gridInSourceProjection, array=derivedArray,
dstName='', format='MEM', creationOptions=[])
tmpname = '_tmp_'
self._inputDatasets[tmpname] = derivedDataset
self._inputOptions[tmpname] = options
array = self.getArray(name=tmpname, overlap=overlap)
return array
def _getBandSubset(self, name, indicies, overlap, dtype):
dataset = self._getInputDataset(name)
options = self._getInputOptions(name)
bandList = [i + 1 for i in indicies]
grid = self.grid.pixelBuffer(buffer=overlap)
if self.grid.equalProjection(dataset.pixelGrid):
datasetResampled = dataset.translate(dstPixelGrid=grid, dstName='', format='MEM',
bandList=bandList,
resampleAlg=options['resampleAlg'],
noData=options['noData'])
else:
selfGridReprojected = self.grid.reproject(dataset.pixelGrid)
selfGridReprojectedWithBuffer = selfGridReprojected.pixelBuffer(buffer=1+overlap)
datasetClipped = dataset.translate(dstPixelGrid=selfGridReprojectedWithBuffer, dstName='', format='MEM',
bandList=bandList,
noData=options['noData'])
datasetResampled = datasetClipped.warp(dstPixelGrid=grid, dstName='', format='MEM',
resampleAlg=options['resampleAlg'],
errorThreshold=options['errorThreshold'],
warpMemoryLimit=options['warpMemoryLimit'],
multithread=options['multithread'],
srcNodata=options['noData'])
datasetClipped.close()
array = datasetResampled.readAsArray(dtype=dtype)
datasetResampled.close()
return array
def _getVectorDataset(self, name):
if name not in self._vectorDatasets:
raise Exception('{name} is not a vector input'.format(name=name))
if self._vectorDatasets[name] is None:
self._vectorDatasets[name] = OpenLayer(filename=self._vectorFilenames[name], layerNameOrIndex=self._vectorLayers[name])
return self._vectorDatasets[name]
[docs] def getVectorArray(self, name, initValue=0, burnValue=1, burnAttribute=None, allTouched=False, filterSQL=None, overlap=0, dtype=numpy.float32, scale=None):
"""
Returns the vector rasterization of the current block in form of a 3-d numpy array.
The ``name`` identifier must match the identifier used with :meth:`hubdc.applier.Applier.setVector`.
:param name: vector name
:param initValue: value to pre-initialize the output array
:param burnValue: value to burn into the output array for all objects; exclusive with ``burnAttribute``
:param burnAttribute: identifies an attribute field on the features to be used for a burn-in value; exclusive with ``burnValue``
:param allTouched: whether to enable that all pixels touched by lines or polygons will be updated, not just those on the line render path, or whose center point is within the polygon
:param filterSQL: set an SQL WHERE clause which will be used to filter vector features
:param overlap: the amount of margin (number of pixels) added to the image data block in each direction, so that the blocks will overlap; this is important for spatial operators like filters.
:param dtype: convert output array to given numpy data type (this is done before the data is scaled)
:param scale: scale output array by given scale factor (this is done after type conversion)
"""
layer = self._getVectorDataset(name)
grid = self.grid.pixelBuffer(buffer=overlap)
dataset = layer.rasterize(dstPixelGrid=grid, eType=NumericTypeCodeToGDALTypeCode(dtype),
initValue=initValue, burnValue=burnValue, burnAttribute=burnAttribute, allTouched=allTouched,
filter=filterSQL, dstName='', format='MEM', creationOptions=[])
array = dataset.readAsArray(scale=scale)
return array
[docs] def getVectorCategoricalFractionArray(self, name, ids, oversampling=10, xRes=None, yRes=None, initValue=0, burnValue=1, burnAttribute=None, allTouched=False, filterSQL=None,
overlap=0):
"""
Returns input vector rasterization data like :meth:`~hubdc.applier.ApplierOperator.getVectorArray`,
but instead of returning the data directly, rasterization is performed at a specified resolution ``xRes`` and ``yRes``,
aggregated pixel fractions for the given categories (i.e. ``ids``) are returned.
:param name: vector name
:param ids: list of categry ids to use
:param oversampling: set the rasterization resolution to a multiple (i.e. the oversampling factor) of the reference grid resolution
:param xRes: set xRes rasterization resolution explicitely
:param yRes: set yRes rasterization resolution explicitely
For all other arguments see :meth:`~hubdc.applier.ApplierOperator.getVector`
"""
layer = self._getVectorDataset(name)
grid = self.grid.pixelBuffer(buffer=overlap)
# rasterize vector layer at fine resolution
if xRes is None or yRes is None:
xRes = grid.xRes / float(oversampling)
yRes = grid.yRes / float(oversampling)
gridOversampled = grid.newResolution(xRes=xRes, yRes=yRes)
tmpDataset = layer.rasterize(dstPixelGrid=gridOversampled, eType=NumericTypeCodeToGDALTypeCode(numpy.float32),
initValue=initValue, burnValue=burnValue, burnAttribute=burnAttribute, allTouched=allTouched,
filter=filterSQL, dstName='', format='MEM', creationOptions=[])
tmpArrayCatecories = tmpDataset.readAsArray()
tmpArrayFractions = numpy.stack(numpy.float32(tmpArrayCatecories[0] == id) for id in ids)
# store as tmp input raster and aggregate fractions to target grid
derivedDataset = CreateFromArray(pixelGrid=gridOversampled, array=tmpArrayFractions,
dstName='', format='MEM', creationOptions=[])
tmpname = '_tmp_'
self._inputDatasets[tmpname] = derivedDataset
self._inputOptions[tmpname] = {'resampleAlg' : gdal.GRA_Average, 'noData' : None}
array = self.getArray(name=tmpname, overlap=overlap)
return array
[docs] def getVectorCategoricalArray(self, name, ids, noData, minOverallCoverage=0., minWinnerCoverage=0.,
oversampling=10, xRes=None, yRes=None, burnValue=1, burnAttribute=None, allTouched=False, filterSQL=None,
overlap=0):
"""
Returns input raster band data like :meth:`~hubdc.applier.ApplierOperator.getVectorArray`,
but instead of returning the data directly, the category array of maximum aggregated pixel fraction for the given categories (i.e. ``ids``)
is returned.
:param name: vector name
:param ids:
:param noData: see :meth:`~hubdc.applier.ApplierOperator.getCategoricalArray`
:param minOverallCoverage: see :meth:`~hubdc.applier.ApplierOperator.getCategoricalArray`
:param minWinnerCoverage: see :meth:`~hubdc.applier.ApplierOperator.getCategoricalArray`
For all other arguments see :meth:`~hubdc.applier.ApplierOperator.getVector`
and :meth:`~hubdc.applier.ApplierOperator.getVectorCategoricalFractionArray`.
"""
fractions = self.getVectorCategoricalFractionArray(name=name, ids=ids, oversampling=oversampling, xRes=xRes, yRes=yRes,
initValue=noData, burnValue=burnValue, burnAttribute=burnAttribute, allTouched=allTouched, filterSQL=filterSQL,
overlap=overlap)
categories = numpy.array(ids)[fractions.argmax(axis=0)[None]]
if minOverallCoverage > 0:
invalid = numpy.sum(fractions, axis=0, keepdims=True) < minOverallCoverage
categories[invalid] = noData
if minWinnerCoverage > 0:
invalid = numpy.max(fractions, axis=0, keepdims=True) < minWinnerCoverage
categories[invalid] = noData
assert categories.ndim == 3
return categories
[docs] def getOutputFilename(self, name):
"""
Returns the output image filename
"""
return self._outputFilenames[name]
[docs] def getVectorFilename(self, name):
"""
Returns the vector layer filename.
"""
return self._vectorFilenames[name]
[docs] def setArray(self, name, array, overlap=0, replace=None, scale=None, dtype=None):
"""
Write data to an output raster image.
The ``name`` identifier must match the identifier used with :meth:`hubdc.applier.Applier.setOutput`.
:param name: output raster name
:param array: 3-d or 2-d numpy array to be written
:param overlap: the amount of margin (number of pixels) to be removed from the image data block in each direction;
this is useful when the overlap keyword was also used with :meth:`~hubdc.applier.ApplierOperator.getArray`
:param replace: tuple of ``(sourceValue, targetValue)`` values; replace all occurances of ``sourceValue`` inside the array with ``targetValue``;
this is done after type conversion and scaling)
:param scale: scale array data by given scale factor (this is done after type conversion)
:param dtype: convert array data to given numpy data type (this is done before the data is scaled)
"""
if not isinstance(array, numpy.ndarray):
array = numpy.array(array)
if array.ndim == 2:
array = array[None]
if overlap > 0:
array = array[:, overlap:-overlap, overlap:-overlap]
if replace is not None:
if replace[0] is numpy.nan:
mask = numpy.isnan(array)
else:
mask = numpy.equal(array, replace[0])
if scale is not None:
array *= scale
if dtype is not None:
array = array.astype(dtype)
if replace is not None:
array[mask] = replace[1]
filename = self.getOutputFilename(name)
self._queueByFilename[filename].put((Writer.WRITE_ARRAY, filename, array, self.grid, self._maingrid, self._outputOptions[name]['format'], self._outputOptions[name]['creationOptions']))
[docs] def setNoDataValue(self, name, value):
"""
Set the no data value for an output raster image.
"""
self.setMetadataItem(name=name, key='data ignore value', value=value, domain='ENVI')
filename = self.getOutputFilename(name)
self._queueByFilename[filename].put((Writer.SET_NODATA, filename, value))
[docs] def getNoDataValue(self, name, default=None):
"""
Returns the no data value for an input raster image. If a no data value is not specified, the ``default`` value is returned.
"""
dataset = self._getInputDataset(name)
noDataValue = dataset.getNoDataValue(default=default)
return default
def _apply(self, subgrid, iblock, nblock):
self._iblock = iblock
self._nblock = nblock
self._grid = subgrid
return self.ufunc(*self._ufuncArgs, **self._ufuncKwargs)
[docs] def ufunc(self, *args, **kwargs):
"""
Overwrite this method to specify the image processing.
See :doc:`ApplierExamples` for more information.
"""
if self._ufuncFunction is None:
raise NotImplementedError()
else:
return self._ufuncFunction(self, *args, **kwargs)
[docs]class ApplierControls(object):
def __init__(self):
self.setWindowXSize()
self.setWindowYSize()
self.setNumThreads()
self.setNumWriter()
self.setCreateEnviHeader()
self.setAutoFootprint()
self.setAutoResolution()
self.setResolution()
self.setFootprint()
self.setProjection()
self.setReferenceGrid()
self.setGDALCacheMax()
self.setGDALSwathSize()
self.setGDALDisableReadDirOnOpen()
self.setGDALMaxDatasetPoolSize()
self.setProgressBar()
[docs] def setProgressBar(self, progressBar=None):
"""
Set the progress display object. Default is an :class:`~hubdc.progressbar.CUIProgress` object.
For suppressing outputs use an :class:`~hubdc.progressbar.SilentProgress` object
"""
if progressBar is None:
progressBar = CUIProgressBar()
self.progressBar = progressBar
return self
DEFAULT_WINDOWXSIZE = 256
[docs] def setWindowXSize(self, windowxsize=DEFAULT_WINDOWXSIZE):
"""
Set the X size of the blocks used. Images are processed in blocks (windows)
of 'windowxsize' columns, and 'windowysize' rows.
"""
self.windowxsize = windowxsize
return self
DEFAULT_WINDOWYSIZE = 256
[docs] def setWindowYSize(self, windowysize=DEFAULT_WINDOWYSIZE):
"""
Set the Y size of the blocks used. Images are processed in blocks (windows)
of 'windowxsize' columns, and 'windowysize' rows.
"""
self.windowysize = windowysize
return self
[docs] def setWindowFullSize(self):
"""
Set the block size to full extent.
"""
veryLargeNumber = 10**20
self.setWindowXSize(veryLargeNumber)
self.setWindowYSize(veryLargeNumber)
return self
DEFAULT_NWORKER = None
[docs] def setNumThreads(self, nworker=DEFAULT_NWORKER):
"""
Set the number of pool worker for multiprocessing. Set to None to disable multiprocessing (recommended for debugging).
"""
self.nworker = nworker
return self
DEFAULT_NWRITER = None
[docs] def setNumWriter(self, nwriter=DEFAULT_NWRITER):
"""
Set the number of writer processes. Set to None to disable multiwriting (recommended for debugging).
"""
self.nwriter = nwriter
return self
DEFAULT_CREATEENVIHEADER = True
DEFAULT_FOOTPRINTTYPE = const.FOOTPRINT_UNION
DEFAULT_RESOLUTIONTYPE = const.RESOLUTION_MINIMUM
[docs] def setAutoResolution(self, resolutionType=DEFAULT_RESOLUTIONTYPE):
"""
Derive resolution of the reference pixel grid from input files. Possible options are 'minimum', 'maximum' or 'average'.
"""
self.resolutionType = resolutionType
return self
[docs] def setResolution(self, xRes=None, yRes=None):
"""
Set resolution of the reference pixel grid.
"""
self.xRes = xRes
self.yRes = yRes
return self
[docs] def setProjection(self, projection=None):
"""
Set projection of the reference pixel grid.
"""
self.projection = projection
return self
[docs] def setReferenceGrid(self, grid=None):
"""
Set the reference pixel grid. Pass an instance of the :py:class:`~hubdc.model.PixelGrid.PixelGrid` class.
"""
self.referenceGrid = grid
return self
[docs] def setReferenceGridByImage(self, filename=None):
"""
Set an image defining the reference pixel grid.
"""
if filename is None:
self.setReferenceGrid(grid=None)
else:
self.setReferenceGrid(grid=PixelGrid.fromFile(filename))
return self
[docs] def setReferenceGridByVector(self, filename, xRes, yRes, layerNameOrIndex=0):
"""
Set a vector layer defining the reference pixel grid footprint and projection.
"""
layer = OpenLayer(filename=filename, layerNameOrIndex=layerNameOrIndex)
grid = layer.makePixelGrid(xRes=xRes, yRes=yRes)
self.setReferenceGrid(grid=grid)
return self
DEFAULT_GDALCACHEMAX = 100*2**20
[docs] def setGDALCacheMax(self, bytes=DEFAULT_GDALCACHEMAX):
"""
For details see the `GDAL_CACHEMAX Configuration Option <https://trac.osgeo.org/gdal/wiki/ConfigOptions#GDAL_CACHEMAX>`_.
"""
self.cacheMax = bytes
return self
DEFAULT_GDALSWATHSIZE = 100*2**20
[docs] def setGDALSwathSize(self, bytes=DEFAULT_GDALSWATHSIZE):
"""
For details see the `GDAL_SWATH_SIZE Configuration Option <https://trac.osgeo.org/gdal/wiki/ConfigOptions#GDAL_SWATH_SIZE>`_.
"""
self.swathSize = bytes
return self
DEFAULT_GDALDISABLEREADDIRONOPEN = True
[docs] def setGDALDisableReadDirOnOpen(self, disable=DEFAULT_GDALDISABLEREADDIRONOPEN):
"""
For details see the `GDAL_DISABLE_READDIR_ON_OPEN Configuration Option <https://trac.osgeo.org/gdal/wiki/ConfigOptions#GDAL_DISABLE_READDIR_ON_OPEN>`_.
"""
self.disableReadDirOnOpen = disable
return self
DEFAULT_GDALMAXDATASETPOOLSIZE = 100
[docs] def setGDALMaxDatasetPoolSize(self, nfiles=DEFAULT_GDALMAXDATASETPOOLSIZE):
"""
For details see the `GDAL_MAX_DATASET_POOL_SIZE Configuration Option <https://trac.osgeo.org/gdal/wiki/ConfigOptions#GDAL_MAX_DATASET_POOL_SIZE>`_.
"""
self.maxDatasetPoolSize = nfiles
return self
@property
def _multiprocessing(self):
return self.nworker is not None
@property
def _multiwriting(self):
return self._multiprocessing or (self.nwriter is not None)
def _makeAutoGrid(self, inputs):
grids = [PixelGrid.fromFile(input.filename) for input in inputs.values()]
projection = self._deriveProjection(grids)
xMin, xMax, yMin, yMax = self._deriveFootprint(grids, projection)
xRes, yRes = self._deriveResolution(grids)
return PixelGrid(projection=projection, xRes=xRes, yRes=yRes, xMin=xMin, xMax=xMax, yMin=yMin, yMax=yMax)
def _deriveProjection(self, grids):
if self.projection is None:
if len(grids) == 0:
raise Exception('projection not defined')
for grid in grids:
if not grid.equalProjection(grids[0]):
raise Exception('input projections do not match')
projection = grids[0].projection
else:
projection = self.projection
return projection
def _deriveFootprint(self, grids, projection):
if None in [self.xMin, self.xMax, self.yMin, self.yMax]:
if len(grids) == 0:
raise Exception('footprint not defined')
xMins, xMaxs, yMins, yMaxs = numpy.array([grid.reprojectExtent(targetProjection=projection) for grid in grids]).T
if self.footprintType == const.FOOTPRINT_UNION:
xMin = xMins.min()
xMax = xMaxs.max()
yMin = yMins.min()
yMax = yMaxs.max()
elif self.footprintType == const.FOOTPRINT_INTERSECTION:
xMin = xMins.max()
xMax = xMaxs.min()
yMin = yMins.max()
yMax = yMaxs.min()
if xMax <= xMin or yMax <= yMin:
raise Exception('input extents do not intersect')
else:
raise ValueError('unknown footprint type')
else:
xMin, xMax, yMin, yMax = self.xMin, self.xMax, self.yMin, self.yMax
return xMin, xMax, yMin,yMax
def _deriveResolution(self, grids):
if self.xRes is None or self.yRes is None:
xResList = numpy.array([grid.xRes for grid in grids])
yResList = numpy.array([grid.yRes for grid in grids])
if self.resolutionType == const.RESOLUTION_MINIMUM:
xRes = xResList.min()
yRes = yResList.min()
elif self.resolutionType == const.RESOLUTION_MAXIMUM:
xRes = xResList.max()
yRes = yResList.max()
elif self.resolutionType == const.RESOLUTION_AVERAGE:
xRes = xResList.mean()
yRes = yResList.mean()
else:
raise ValueError('unknown resolution type')
else:
xRes = self.xRes
yRes = self.yRes
return xRes, yRes