#
# Project: SILX
# https://github.com/silx-kit/silx
#
# Copyright (C) 2012-2023 European Synchrotron Radiation Facility, Grenoble, France
#
# Principal author: Jérôme Kieffer (Jerome.Kieffer@ESRF.eu)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# .
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# .
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
"""A module for performing basic statistical analysis (min, max, mean, std) on
large data where numpy is not very efficient.
"""
__author__ = "Jerome Kieffer"
__license__ = "MIT"
__date__ = "19/05/2021"
__copyright__ = "2012-2019, ESRF, Grenoble"
__contact__ = "jerome.kieffer@esrf.fr"
import logging
import numpy
from collections import namedtuple
from math import sqrt
from .common import pyopencl
from .processing import EventDescription, OpenclProcessing, BufferDescription
from .utils import concatenate_cl_kernel
if pyopencl:
mf = pyopencl.mem_flags
from pyopencl.reduction import ReductionKernel
try:
from pyopencl import cltypes
except ImportError:
v = pyopencl.array.vec()
float8 = v.float8
else:
float8 = cltypes.float8
else:
raise ImportError("pyopencl is not installed")
logger = logging.getLogger(__name__)
StatResults = namedtuple(
"StatResults", ["min", "max", "cnt", "sum", "mean", "var", "std"]
)
zero8 = "(float8)(FLT_MAX, -FLT_MAX, 0.0f, 0.0f, 0.0f, 0.0f, 0.0f, 0.0f)"
# min max cnt cnt_e sum sum_e var var_e
[docs]
class Statistics(OpenclProcessing):
"""A class for doing statistical analysis using OpenCL
:param List[int] size: Shape of input data to treat
:param numpy.dtype dtype: Input data type
:param numpy.ndarray template: Data template to extract size & dtype
:param ctx: Actual working context, left to None for automatic
initialization from device type or platformid/deviceid
:param str devicetype: Type of device, can be "CPU", "GPU", "ACC" or "ALL"
:param int platformid: Platform identifier as given by clinfo
:param int deviceid: Device identifier as given by clinfo
:param int block_size:
Preferred workgroup size, may vary depending on the outcome of the compilation
:param bool profile:
Switch on profiling to be able to profile at the kernel level,
store profiling elements (makes code slightly slower)
"""
buffers = [
BufferDescription("raw", 1, numpy.float32, mf.READ_ONLY),
BufferDescription("converted", 1, numpy.float32, mf.READ_WRITE),
]
kernel_files = ["preprocess.cl"]
mapping = {
numpy.int8: "s8_to_float",
numpy.uint8: "u8_to_float",
numpy.int16: "s16_to_float",
numpy.uint16: "u16_to_float",
numpy.uint32: "u32_to_float",
numpy.int32: "s32_to_float",
}
def __init__(
self,
size=None,
dtype=None,
template=None,
ctx=None,
devicetype="all",
platformid=None,
deviceid=None,
block_size=None,
profile=False,
):
OpenclProcessing.__init__(
self,
ctx=ctx,
devicetype=devicetype,
platformid=platformid,
deviceid=deviceid,
block_size=block_size,
profile=profile,
)
self.size = size
self.dtype = dtype
if template is not None:
self.size = template.size
self.dtype = template.dtype
self.buffers = [
BufferDescription(i.name, i.size * self.size, i.dtype, i.flags)
for i in self.__class__.buffers
]
self.allocate_buffers(use_array=True)
self.compile_kernels()
self.set_kernel_arguments()
[docs]
def set_kernel_arguments(self):
"""Parametrize all kernel arguments"""
for val in self.mapping.values():
self.cl_kernel_args[val] = dict(
((i, self.cl_mem[i]) for i in ("raw", "converted"))
)
[docs]
def compile_kernels(self):
"""Compile the kernel"""
OpenclProcessing.compile_kernels(
self, self.kernel_files, "-D NIMAGE=%i" % self.size
)
compiler_options = self.get_compiler_options(x87_volatile=True)
src = concatenate_cl_kernel(("doubleword.cl", "statistics.cl"))
self.reduction_comp = ReductionKernel(
self.ctx,
dtype_out=float8,
neutral=zero8,
map_expr="map_statistics(data, i)",
reduce_expr="reduce_statistics(a,b)",
arguments="__global float *data",
preamble=src,
options=compiler_options,
)
self.reduction_simple = ReductionKernel(
self.ctx,
dtype_out=float8,
neutral=zero8,
map_expr="map_statistics(data, i)",
reduce_expr="reduce_statistics_simple(a,b)",
arguments="__global float *data",
preamble=src,
options=compiler_options,
)
if "cl_khr_fp64" in self.device.extensions:
self.reduction_double = ReductionKernel(
self.ctx,
dtype_out=float8,
neutral=zero8,
map_expr="map_statistics(data, i)",
reduce_expr="reduce_statistics_double(a,b)",
arguments="__global float *data",
preamble=src,
options=compiler_options,
)
else:
logger.info(
"Device %s does not support double-precision arithmetics, fall-back on compensated one",
self.device,
)
self.reduction_double = self.reduction_comp
[docs]
def send_buffer(self, data, dest):
"""
Send a numpy array to the device, including the cast on the device if
possible
:param numpy.ndarray data: numpy array with data
:param dest: name of the buffer as registered in the class
"""
logger.info("send data to %s", dest)
dest_type = numpy.dtype([i.dtype for i in self.buffers if i.name == dest][0])
events = []
if (data.dtype == dest_type) or (data.dtype.itemsize > dest_type.itemsize):
copy_image = pyopencl.enqueue_copy(
self.queue,
self.cl_mem[dest].data,
numpy.ascontiguousarray(data, dest_type),
)
events.append(EventDescription("copy H->D %s" % dest, copy_image))
else:
copy_image = pyopencl.enqueue_copy(
self.queue, self.cl_mem["raw"].data, numpy.ascontiguousarray(data)
)
kernel = getattr(self.program, self.mapping[data.dtype.type])
cast_to_float = kernel(
self.queue,
(self.size,),
None,
self.cl_mem["raw"].data,
self.cl_mem[dest].data,
)
events += [
EventDescription("copy H->D raw", copy_image),
EventDescription(f"cast to float {dest}", cast_to_float),
]
if self.profile:
self.events += events
return events
[docs]
def process(self, data, comp=True):
"""Actually calculate the statics on the data
:param numpy.ndarray data: numpy array with the image
:param comp: use Kahan compensated arithmetics for the calculation
:return: Statistics named tuple
:rtype: StatResults
"""
if data.ndim != 1:
data = data.ravel()
size = data.size
assert size <= self.size, "size is OK"
events = []
if comp is True:
comp = "comp"
elif comp is False:
comp = "single"
else:
comp = comp.lower()
with self.sem:
self.send_buffer(data, "converted")
if comp in ("single", "fp32", "float32"):
reduction = self.reduction_simple
elif comp in ("double", "fp64", "float64"):
reduction = self.reduction_double
else:
reduction = self.reduction_comp
res_d, evt = reduction(
self.cl_mem["converted"][: self.size],
queue=self.queue,
return_event=True,
)
events.append(EventDescription(f"statistical reduction {comp}", evt))
if self.profile:
self.events += events
res_h = res_d.get()
min_ = 1.0 * res_h["s0"]
max_ = 1.0 * res_h["s1"]
count = 1.0 * res_h["s2"] + res_h["s3"]
sum_ = 1.0 * res_h["s4"] + res_h["s5"]
m2 = 1.0 * res_h["s6"] + res_h["s7"]
var = m2 / (count - 1.0)
res = StatResults(min_, max_, count, sum_, sum_ / count, var, sqrt(var))
return res
__call__ = process