Source code for pylas.lasappender

import io
import math
from typing import Union, Iterable, BinaryIO, Optional

import numpy as np

from .compression import LazBackend
from .errors import PylasError
from .header import LasHeader
from .laswriter import UncompressedPointWriter
from .point.record import PackedPointRecord
from .vlrs.vlrlist import VLRList

try:
    import lazrs
except ModuleNotFoundError:
    pass


[docs]class LasAppender: """Allows to append points to and existing LAS/LAZ file. Appending to LAZ is only supported by the lazrs backend """ def __init__( self, dest: BinaryIO, laz_backend: Optional[Union[LazBackend, Iterable[LazBackend]]] = None, closefd: bool = True, ) -> None: if not dest.seekable(): raise TypeError("Expected the 'dest' to be a seekable file object") header = LasHeader.read_from(dest) if laz_backend is None: laz_backend = LazBackend.detect_available() self.dest = dest self.header = header if not header.are_points_compressed: self.points_writer = UncompressedPointWriter(self.dest) self.dest.seek( (self.header.point_count * self.header.point_format.size) + self.header.offset_to_point_data, io.SEEK_SET, ) else: self.points_writer = self._create_laz_backend(laz_backend) if header.version.minor >= 4 and header.number_of_evlrs > 0: assert ( self.dest.tell() <= self.header.start_of_first_evlr ), "The position is past the start of evlrs" pos = self.dest.tell() self.dest.seek(self.header.start_of_first_evlr, io.SEEK_SET) self.evlrs: Optional[VLRList] = VLRList.read_from( self.dest, self.header.number_of_evlrs, extended=True ) dest.seek(self.header.start_of_first_evlr, io.SEEK_SET) self.dest.seek(pos, io.SEEK_SET) else: self.evlrs: Optional[VLRList] = None self.closefd = closefd
[docs] def append_points(self, points: PackedPointRecord) -> None: """Append the points to the file, the points must have the same point format as the points already contained within the file. :param points: The points to append """ if points.point_format != self.header.point_format: raise PylasError("Point formats do not match") self.points_writer.write_points(points) self.header.update(points)
def close(self) -> None: self.points_writer.done() self._write_evlrs() self._write_updated_header() if self.closefd: self.dest.close() def _write_evlrs(self) -> None: if ( self.header.version.minor >= 4 and self.evlrs is not None and len(self.evlrs) > 0 ): self.header.number_of_evlr = len(self.evlrs) self.header.start_of_first_evlr = self.dest.tell() self.evlrs.write_to(self.dest, as_extended=True) def _write_updated_header(self) -> None: pos = self.dest.tell() self.dest.seek(0, io.SEEK_SET) # we don't want to rewrite the vlrs # as written vlrs may not have to exact # same number of bytes (e.g. if we remove # extra spurious null bytes) self.header.write_to(self.dest, write_vlrs=False) self.dest.seek(pos, io.SEEK_SET) def _create_laz_backend( self, laz_backend: Union[LazBackend, Iterable[LazBackend]] = ( LazBackend.LazrsParallel, LazBackend.Lazrs, ), ) -> "LazrsAppender": try: laz_backend = iter(laz_backend) except TypeError: laz_backend = (laz_backend,) last_error = None for backend in laz_backend: if backend == LazBackend.Laszip: raise PylasError("Laszip backend does not support appending") elif backend == LazBackend.LazrsParallel: try: return LazrsAppender(self.dest, self.header, parallel=True) except Exception as e: last_error = e elif backend == LazBackend.Lazrs: try: return LazrsAppender(self.dest, self.header, parallel=False) except Exception as e: last_error = e if last_error is not None: raise PylasError(f"Could not initialize a laz backend: {last_error}") else: raise PylasError(f"No valid laz backend selected") def __enter__(self) -> "LasAppender": return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close()
class LazrsAppender: """Appending in LAZ file works by seeking to start of the last chunk of compressed points, decompress it while keeping the points in memory. Then seek back to the start of the last chunk, and recompress the points we just read, so that we have a compressor in the proper state ready to compress new points. """ def __init__(self, dest: BinaryIO, header: LasHeader, parallel: bool) -> None: self.dest = dest self.offset_to_point_data = header.offset_to_point_data laszip_vlr = header.vlrs.get("LasZipVlr")[0] self.dest.seek(header.offset_to_point_data, io.SEEK_SET) decompressor = lazrs.LasZipDecompressor(self.dest, laszip_vlr.record_data) vlr = decompressor.vlr() number_of_complete_chunk = int( math.floor(header.point_count / vlr.chunk_size()) ) self.dest.seek(header.offset_to_point_data, io.SEEK_SET) chunk_table = lazrs.read_chunk_table(self.dest) if chunk_table is None: # The file does not have a chunk table # we cannot seek to the last chunk, so instead, we will # decompress points (which is slower) and build the chunk table # to write it later self.chunk_table = [] start_of_chunk = self.dest.tell() point_buf = bytearray(vlr.chunk_size() * vlr.item_size()) for _ in range(number_of_complete_chunk): decompressor.decompress_many(point_buf) pos = self.dest.tell() self.chunk_table.append(pos - start_of_chunk) start_of_chunk = pos else: self.chunk_table = chunk_table[:-1] idx_first_point_of_last_chunk = number_of_complete_chunk * vlr.chunk_size() decompressor.seek(idx_first_point_of_last_chunk) points_of_last_chunk = bytearray( (header.point_count % vlr.chunk_size()) * vlr.item_size() ) decompressor.decompress_many(points_of_last_chunk) self.dest.seek(header.offset_to_point_data, io.SEEK_SET) if parallel: self.compressor = lazrs.ParLasZipCompressor( self.dest, vlr ) # This overwrites old offset else: self.compressor = lazrs.LasZipCompressor( self.dest, vlr ) # This overwrites the old offset self.dest.seek(sum(self.chunk_table), io.SEEK_CUR) self.compressor.compress_many(points_of_last_chunk) def write_points(self, points: PackedPointRecord) -> None: points_bytes = np.frombuffer(points.array, np.uint8) self.compressor.compress_many(points_bytes) def done(self) -> None: # The chunk table written is at the good position # but it is incomplete (it's missing the chunk_table of # chunks before the one we appended) self.compressor.done() # So we update it self.dest.seek(self.offset_to_point_data, io.SEEK_SET) offset_to_chunk_table = int.from_bytes(self.dest.read(8), "little", signed=True) self.dest.seek(-8, io.SEEK_CUR) chunk_table = self.chunk_table + lazrs.read_chunk_table(self.dest) self.dest.seek(offset_to_chunk_table, io.SEEK_SET) lazrs.write_chunk_table(self.dest, chunk_table)