""" Contains the classes that manages Las PointRecords
Las PointRecords are represented using Numpy's structured arrays,
The PointRecord classes provide a few extra things to manage these arrays
in the context of Las point data
"""
import logging
from typing import NoReturn
import numpy as np
from . import dims
from .dims import ScaledArrayView
from .. import errors
from ..point import PointFormat
logger = logging.getLogger(__name__)
[docs]def scale_dimension(array_dim, scale, offset):
return (array_dim * scale) + offset
[docs]def unscale_dimension(array_dim, scale, offset):
return np.round((np.array(array_dim) - offset) / scale)
[docs]def raise_not_enough_bytes_error(
expected_bytes_len, missing_bytes_len, point_data_buffer_len, points_dtype
) -> NoReturn:
raise errors.PylasError(
"The file does not contain enough bytes to store the expected number of points\n"
"expected {} bytes, read {} bytes ({} bytes missing == {} points) and it cannot be corrected\n"
"{} (bytes) / {} (point_size) = {} (points)".format(
expected_bytes_len,
point_data_buffer_len,
missing_bytes_len,
missing_bytes_len / points_dtype.itemsize,
point_data_buffer_len,
points_dtype.itemsize,
point_data_buffer_len / points_dtype.itemsize,
)
)
[docs]class PackedPointRecord:
"""
In the PackedPointRecord, fields that are a combinations of many sub-fields (fields stored on less than a byte)
are still packed together and are only de-packed and re-packed when accessed.
This uses of less memory than if the sub-fields were unpacked
>>> #return number is a sub-field
>>> from pylas import PointFormat
>>> packed_point_record = PackedPointRecord.zeros(PointFormat(0), 10)
>>> return_number = packed_point_record['return_number']
>>> return_number
<SubFieldView([0 0 0 0 0 0 0 0 0 0])>
>>> return_number[:] = 1
>>> np.alltrue(packed_point_record['return_number'] == 1)
True
"""
def __init__(self, data: np.ndarray, point_format: PointFormat):
self.array = data
self.point_format = point_format
self.sub_fields_dict = dims.get_sub_fields_dict(point_format.id)
@property
def point_size(self):
"""Returns the point size in bytes taken by each points of the record
Returns
-------
int
The point size in byte
"""
return self.array.dtype.itemsize
[docs] @classmethod
def zeros(cls, point_format, point_count):
"""Creates a new point record with all dimensions initialized to zero
Parameters
----------
point_format: PointFormat
The point format id the point record should have
point_count : int
The number of point the point record should have
Returns
-------
PackedPointRecord
"""
data = np.zeros(point_count, point_format.dtype())
return cls(data, point_format)
[docs] @classmethod
def empty(cls, point_format):
"""Creates an empty point record.
Parameters
----------
point_format: pylas.PointFormat
The point format id the point record should have
Returns
-------
PackedPointRecord
"""
return cls.zeros(point_format, point_count=0)
[docs] @classmethod
def from_point_record(
cls, other_point_record: "PackedPointRecord", new_point_format: PointFormat
) -> "PackedPointRecord":
"""Construct a new PackedPointRecord from an existing one with the ability to change
to point format while doing so
"""
array = np.zeros_like(other_point_record.array, dtype=new_point_format.dtype())
new_record = cls(array, new_point_format)
new_record.copy_fields_from(other_point_record)
return new_record
[docs] @classmethod
def from_buffer(cls, buffer, point_format, count, offset=0):
points_dtype = point_format.dtype()
data = np.frombuffer(buffer, dtype=points_dtype, offset=offset, count=count)
return cls(data, point_format)
[docs] def copy_fields_from(self, other_record: "PackedPointRecord") -> None:
"""Tries to copy the values of the current dimensions from other_record"""
for dim_name in self.point_format.dimension_names:
try:
self[dim_name] = np.array(other_record[dim_name])
except ValueError:
pass
[docs] def memoryview(self) -> memoryview:
return memoryview(self.array)
[docs] def resize(self, new_size: int) -> None:
size_diff = new_size - len(self.array)
if size_diff > 0:
self.array = np.append(
self.array, np.zeros(size_diff, dtype=self.array.dtype)
)
elif size_diff < 0:
self.array = self._array[:new_size].copy()
def _append_zeros_if_too_small(self, value):
"""Appends zeros to the points stored if the value we are trying to
fit is bigger
"""
size_diff = len(value) - len(self.array)
if size_diff > 0:
self.resize(size_diff)
def __eq__(self, other):
return self.point_format == other.point_format and np.all(
self.array == other.array
)
def __len__(self):
return self.array.shape[0]
def __getitem__(self, item):
"""Gives access to the underlying numpy array
Unpack the dimension if item is the name a sub-field
"""
if isinstance(item, (int, slice, np.ndarray)):
return PackedPointRecord(self.array[item], self.point_format)
# 1) Is it a sub field ?
try:
composed_dim, sub_field = self.sub_fields_dict[item]
return dims.SubFieldView(self.array[composed_dim], sub_field.mask)
except KeyError:
pass
# 2) Is it a Scaled Extra Byte Dimension ?
try:
dim_info = self.point_format.dimension_by_name(item)
if dim_info.is_standard is False:
if dim_info.scales is not None or dim_info.offsets is not None:
scale = (
np.ones(dim_info.num_elements, np.float64)
if dim_info.scales is None
else dim_info.scales[: dim_info.num_elements]
)
offset = (
np.zeros(dim_info.num_elements, np.float64)
if dim_info.offsets is None
else dim_info.offsets[: dim_info.num_elements]
)
return ScaledArrayView(self.array[item], scale, offset)
except ValueError:
pass
return self.array[item]
def __setitem__(self, key, value):
"""Sets elements in the array"""
self._append_zeros_if_too_small(value)
if isinstance(key, str):
self[key][:] = value
else:
self.array[key] = value
def __getattr__(self, item):
try:
return self[item]
except ValueError:
raise AttributeError("{} is not a valid dimension".format(item)) from None
def __repr__(self):
return "<{}(fmt: {}, len: {}, point size: {})>".format(
self.__class__.__name__,
self.point_format,
len(self),
self.point_format.size,
)
[docs]def apply_new_scaling(record, scales: np.ndarray, offsets: np.ndarray) -> None:
record["X"] = unscale_dimension(np.asarray(record.x), scales[0], offsets[0])
record["Y"] = unscale_dimension(np.asarray(record.y), scales[1], offsets[1])
record["Z"] = unscale_dimension(np.asarray(record.x), scales[2], offsets[2])
[docs]class ScaleAwarePointRecord(PackedPointRecord):
def __init__(self, array, point_format, scales, offsets):
super().__init__(array, point_format)
self.scales = scales
self.offsets = offsets
[docs] def change_scaling(self, scales=None, offsets=None) -> None:
if scales is not None:
self.scales = scales
if offsets is not None:
self.offsets = offsets
apply_new_scaling(self, scales, offsets)
self.scales = scales
self.offsets = offsets
def __getitem__(self, item):
if isinstance(item, (slice, np.ndarray)):
return ScaleAwarePointRecord(
self.array[item], self.point_format, self.scales, self.offsets
)
if item == "x":
return ScaledArrayView(self.array["X"], self.scales[0], self.offsets[0])
elif item == "y":
return ScaledArrayView(self.array["Y"], self.scales[1], self.offsets[1])
elif item == "z":
return ScaledArrayView(self.array["Z"], self.scales[2], self.offsets[2])
else:
return super().__getitem__(item)