import abc
import io
import logging
from typing import Optional, BinaryIO, Iterable, Union
from . import errors
from .compression import LazBackend
from .header import LasHeader
from .lasdata import LasData
from .point import record
from .vlrs.known import LasZipVlr
from .vlrs.vlrlist import VLRList
try:
import lazrs
except ModuleNotFoundError:
pass
try:
import laszip
except ModuleNotFoundError:
pass
logger = logging.getLogger(__name__)
[docs]class LasReader:
"""The reader class handles LAS and LAZ via one of the supported backend"""
def __init__(
self,
source: BinaryIO,
closefd: bool = True,
laz_backend: Optional[Union[LazBackend, Iterable[LazBackend]]] = None,
):
self.closefd = closefd
if laz_backend is None:
laz_backend = LazBackend.detect_available()
self.laz_backend = laz_backend
self.header = LasHeader.read_from(source)
if self.header.are_points_compressed:
if not laz_backend:
raise errors.PylasError(
"No LazBackend selected, cannot decompress data"
)
self.point_source = self._create_laz_backend(source)
if self.point_source is None:
raise errors.PylasError(
"Data is compressed, but no LazBacked could be initialized"
)
else:
self.point_source = UncompressedPointReader(
source, self.header.point_format.size
)
self.points_read = 0
[docs] def read_points(self, n: int) -> Optional[record.ScaleAwarePointRecord]:
"""Read n points from the file
If there are no points left to read, returns None.
Parameters
----------
n: The number of points to read
if n is less than 0, this function will read the remaining points
"""
points_left = self.header.point_count - self.points_read
if points_left <= 0:
return None
if n < 0:
n = points_left
else:
n = min(n, points_left)
r = record.PackedPointRecord.from_buffer(
self.point_source.read_n_points(n), self.header.point_format, n
)
points = record.ScaleAwarePointRecord(
r.array, r.point_format, self.header.scales, self.header.offsets
)
self.points_read += n
return points
[docs] def read(self) -> LasData:
"""Reads all the points not read and returns a LasData object"""
points = self.read_points(-1)
if points is None:
points = record.PackedPointRecord.empty(self.header.point_format)
else:
points = record.PackedPointRecord(points.array, points.point_format)
las_data = LasData(header=self.header, points=points)
if self.header.version.minor >= 4:
if (
self.header.are_points_compressed
and not self.point_source.source.seekable()
):
# We explicitly require seekable stream because we have to seek
# past the chunk table of LAZ file
raise errors.PylasError(
"source must be seekable, to read evlrs form LAZ file"
)
self.point_source.source.seek(self.header.start_of_first_evlr, io.SEEK_SET)
las_data.evlrs = self._read_evlrs(self.point_source.source, seekable=True)
return las_data
[docs] def chunk_iterator(self, points_per_iteration: int) -> "PointChunkIterator":
"""Returns an iterator, that will read points by chunks
of the requested size
:param points_per_iteration: number of points to be read with each iteration
:return:
"""
return PointChunkIterator(self, points_per_iteration)
[docs] def close(self) -> None:
"""closes the file object used by the reader"""
if self.closefd:
self.point_source.close()
def _create_laz_backend(self, source) -> Optional["IPointReader"]:
try:
backends = iter(self.laz_backend)
except TypeError:
backends = (self.laz_backend,)
laszip_vlr = self.header.vlrs.pop(self.header.vlrs.index("LasZipVlr"))
for backend in backends:
try:
if not backend.is_available():
raise errors.PylasError(f"The '{backend}' is not available")
if backend == LazBackend.LazrsParallel:
return LazrsPointReader(source, laszip_vlr, parallel=True)
elif backend == LazBackend.Lazrs:
return LazrsPointReader(source, laszip_vlr, parallel=False)
elif backend == LazBackend.Laszip:
return LaszipPointReader(source, self.header)
else:
raise errors.PylasError("Unknown LazBackend: {}".format(backend))
except errors.LazError as e:
logger.error(e)
def _read_evlrs(self, source, seekable=False) -> Optional[VLRList]:
"""Reads the EVLRs of the file, will fail if the file version
does not support evlrs
"""
if (
self.header.version.minor >= 4
and self.points_read == self.header.point_count
):
if seekable:
source.seek(self.header.start_of_first_evlr)
return VLRList.read_from(source, self.header.number_of_evlrs, extended=True)
else:
return None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class PointChunkIterator:
def __init__(self, reader: LasReader, points_per_iteration: int) -> None:
self.reader = reader
self.points_per_iteration = points_per_iteration
def __next__(self) -> record.ScaleAwarePointRecord:
points = self.reader.read_points(self.points_per_iteration)
if points is None:
raise StopIteration
return points
def __iter__(self) -> "PointChunkIterator":
return self
class IPointReader(abc.ABC):
"""The interface to be implemented by the class that actually reads
points from as LAS/LAZ file so that the LasReader can use it.
It is used to manipulate LAS/LAZ (with different LAZ backends) in the
reader
"""
@abc.abstractmethod
def read_n_points(self, n: int) -> bytearray:
...
@abc.abstractmethod
def close(self) -> None:
...
class UncompressedPointReader(IPointReader):
"""Implementation of IPointReader for the simple uncompressed case"""
def __init__(self, source, point_size) -> None:
self.source = source
self.point_size = point_size
def read_n_points(self, n: int) -> bytearray:
try:
readinto = self.source.readinto
except AttributeError:
data = bytearray(self.source.read(n * self.point_size))
else:
data = bytearray(n * self.point_size)
readinto(data)
return data
def close(self):
self.source.close()
class LaszipPointReader(IPointReader):
"""Implementation for the laszip backend"""
def __init__(self, source: BinaryIO, header: LasHeader) -> None:
self.source = source
self.source.seek(0)
self.unzipper = laszip.LasUnZipper(source)
unzipper_header = self.unzipper.header
assert unzipper_header.point_data_format == header.point_format.id
assert unzipper_header.point_data_record_length == header.point_format.size
self.point_size = header.point_format.size
def read_n_points(self, n: int) -> bytearray:
points_data = bytearray(n * self.point_size)
self.unzipper.decompress_into(points_data)
return points_data
def close(self) -> None:
self.source.close()
class LazrsPointReader(IPointReader):
"""Implementation for the laz-rs backend, supports single-threaded decompression
as well as multi-threaded decompression
"""
def __init__(self, source, laszip_vlr: LasZipVlr, parallel: bool) -> None:
self.source = source
self.vlr = lazrs.LazVlr(laszip_vlr.record_data)
if parallel:
self.decompressor = lazrs.ParLasZipDecompressor(
source, laszip_vlr.record_data
)
else:
self.decompressor = lazrs.LasZipDecompressor(source, laszip_vlr.record_data)
def read_n_points(self, n: int) -> bytearray:
point_bytes = bytearray(n * self.vlr.item_size())
self.decompressor.decompress_many(point_bytes)
return point_bytes
def close(self) -> None:
self.source.close()