""" The definition of the VLR Header, VLR, the KnownVLRs
are in this module.
A KnownVLR is a VLR for which we know how to parse its record_data
"""
import abc
import ctypes
import logging
import struct
from typing import List, Optional, Any, Tuple, Dict
import numpy as np
from .vlr import BaseVLR, VLR
from ..extradims import (
get_type_for_extra_dim,
get_kind_of_extra_dim,
)
from ..point.dims import DimensionKind
from ..point.format import ExtraBytesParams
from ..utils import encode_to_null_terminated
abstractmethod = abc.abstractmethod
logger = logging.getLogger(__name__)
NULL_BYTE = b"\0"
[docs]class IKnownVLR(abc.ABC):
"""Interface that any KnownVLR must implement.
A KnownVLR is a VLR for which we know how to parse its record_data
Implementing this interfaces allows to automatically call the
right parser for the right VLR when reading them.
"""
[docs] @staticmethod
@abstractmethod
def official_user_id() -> str:
"""Shall return the official user_id as described in the documentation"""
pass
[docs] @staticmethod
@abstractmethod
def official_record_ids() -> Tuple[int, ...]:
"""Shall return the official record_id for the VLR
.. note::
Even if the VLR has one record_id, the return type must be a tuple
Returns
-------
tuple of int
The record_ids this VLR type can have
"""
pass
[docs] @abstractmethod
def record_data_bytes(self) -> bytes:
"""Shall return the bytes corresponding to the record_data part of the VLR
as they should be written in the file.
Returns
-------
bytes
The bytes of the vlr's record_data
"""
pass
[docs] @abstractmethod
def parse_record_data(self, record_data: bytes) -> None:
"""Shall parse the given record_data into a user-friendlier structure
Parameters
----------
record_data: bytes
The record_data bytes read from the file
"""
pass
[docs]class BaseKnownVLR(BaseVLR, IKnownVLR, abc.ABC):
"""Base Class to factorize common code between the different type of Known VLRs"""
def __init__(self, record_id=None, description=""):
super().__init__(
self.official_user_id(),
self.official_record_ids()[0] if record_id is None else record_id,
description,
)
[docs] @classmethod
def from_raw(cls, raw: VLR):
know_vlr = cls()
know_vlr._description = raw.description
know_vlr.parse_record_data(raw.record_data)
return know_vlr
[docs]class ClassificationLookupVlr(BaseKnownVLR):
"""This vlr maps class numbers to short descriptions / names
>>> lookup = ClassificationLookupVlr()
>>> lookup[0] = "never_classified"
>>> lookup[2] = "ground"
>>> lookup[0]
'never_classified'
"""
_lookup_struct = struct.Struct("<B15s")
def __init__(self):
super().__init__(description="Classification Lookup")
self.lookups: Dict[int, str] = {}
[docs] def parse_record_data(self, record_data: bytes) -> None:
for class_id, desc in struct.iter_unpack("<B15s", record_data):
# index using desc[i:i+1], because desc[i] gives an int, and we want a byte
description = b"".join(
desc[i : i + 1]
for i in range(len(desc))
if desc[i : i + 1].isalnum() or desc[i : i + 1] == b" "
).decode()
self.lookups[class_id] = description
[docs] def record_data_bytes(self) -> bytes:
def lookup_converter(lookup_dict):
for class_id, description in lookup_dict.items():
description_bytes = description.encode("ascii")
if len(description_bytes) > 15:
raise ValueError(
"decription ({}) is to long ({} bytes), it must not exceed 15 bytes when encoded".format(
description, len(description_bytes)
)
)
yield class_id, description_bytes
return b"".join(
self._lookup_struct.pack(class_id, desc)
for class_id, desc in lookup_converter(self.lookups)
)
def __getitem__(self, class_id: int) -> str:
return self.lookups[class_id]
def __setitem__(self, class_id: int, description: str):
if class_id not in range(256):
raise ValueError("Class id {} is not in range [0, 255]".format(class_id))
self.lookups[class_id] = description
[docs] @staticmethod
def official_user_id() -> str:
return "LASF_Spec"
[docs] @staticmethod
def official_record_ids() -> Tuple[int, ...]:
return (0,)
[docs]class LasZipVlr(BaseKnownVLR):
"""Contains the information needed by laszip (or any other laz backend)
to compress the point records.
"""
def __init__(self, data: bytes) -> None:
super().__init__(description="http://laszip.org")
self.record_data = data
[docs] def parse_record_data(self, record_data: bytes) -> None:
# Only laz backends know how to parse this
pass
[docs] def record_data_bytes(self) -> bytes:
return self.record_data
[docs] @staticmethod
def official_user_id() -> str:
return "laszip encoded"
[docs] @staticmethod
def official_record_ids() -> Tuple[int, ...]:
return (22204,)
[docs] @classmethod
def from_raw(cls, raw_vlr):
return cls(raw_vlr.record_data)
[docs]class GeoKeyEntryStruct(ctypes.LittleEndianStructure):
_pack_ = 1
_fields_ = [
("id", ctypes.c_uint16),
("tiff_tag_location", ctypes.c_uint16),
("count", ctypes.c_uint16),
("value_offset", ctypes.c_uint16),
]
[docs] @staticmethod
def size():
return ctypes.sizeof(GeoKeysHeaderStructs)
def __repr__(self):
return "<GeoKey(Id: {}, Location: {}, count: {}, offset: {})>".format(
self.id, self.tiff_tag_location, self.count, self.value_offset
)
[docs]class GeoKeyDirectoryVlr(BaseKnownVLR):
def __init__(self):
super().__init__(description="GeoTIFF GeoKeyDirectoryTag")
self.geo_keys_header = GeoKeysHeaderStructs()
self.geo_keys = [GeoKeyEntryStruct()]
[docs] def parse_record_data(self, record_data):
record_data = bytearray(record_data)
header_data = record_data[: ctypes.sizeof(GeoKeysHeaderStructs)]
self.geo_keys_header = GeoKeysHeaderStructs.from_buffer(header_data)
self.geo_keys = []
keys_data = record_data[GeoKeysHeaderStructs.size() :]
num_keys = (
len(record_data[GeoKeysHeaderStructs.size() :]) // GeoKeyEntryStruct.size()
)
if num_keys != self.geo_keys_header.number_of_keys:
# print("Mismatch num keys")
self.geo_keys_header.number_of_keys = num_keys
for i in range(self.geo_keys_header.number_of_keys):
data = keys_data[
(i * GeoKeyEntryStruct.size()) : (i + 1) * GeoKeyEntryStruct.size()
]
self.geo_keys.append(GeoKeyEntryStruct.from_buffer(data))
[docs] def record_data_bytes(self):
b = bytes(self.geo_keys_header)
b += b"".join(map(bytes, self.geo_keys))
return b
def __repr__(self):
return "<{}({} geo_keys)>".format(self.__class__.__name__, len(self.geo_keys))
[docs] @staticmethod
def official_user_id():
return "LASF_Projection"
[docs] @staticmethod
def official_record_ids():
return (34735,)
[docs]class GeoDoubleParamsVlr(BaseKnownVLR):
def __init__(self):
super().__init__(description="GeoTIFF GeoDoubleParamsTag")
self.doubles = []
[docs] def parse_record_data(self, record_data):
sizeof_double = ctypes.sizeof(ctypes.c_double)
if len(record_data) % sizeof_double != 0:
raise ValueError(
"GeoDoubleParams record data length () is not a multiple of sizeof(double) ()".format(
len(record_data), sizeof_double
)
)
record_data = bytearray(record_data)
num_doubles = len(record_data) // sizeof_double
for i in range(num_doubles):
b = record_data[i * sizeof_double : (i + 1) * sizeof_double]
self.doubles.append(ctypes.c_double.from_buffer(b))
[docs] def record_data_bytes(self):
return b"".join(map(bytes, self.doubles))
def __repr__(self):
return "<GeoDoubleParamsVlr({})>".format(self.doubles)
[docs] @staticmethod
def official_user_id():
return "LASF_Projection"
[docs] @staticmethod
def official_record_ids():
return (34736,)
[docs]class GeoAsciiParamsVlr(BaseKnownVLR):
def __init__(self):
super().__init__(description="GeoTIFF GeoAsciiParamsTag")
self.strings = []
[docs] def parse_record_data(self, record_data):
self.strings = [s.decode("ascii") for s in record_data.split(NULL_BYTE)]
[docs] def record_data_bytes(self):
return NULL_BYTE.join(s.encode("ascii") for s in self.strings)
def __repr__(self):
return "<GeoAsciiParamsVlr({})>".format(self.strings)
[docs] @staticmethod
def official_user_id():
return "LASF_Projection"
[docs] @staticmethod
def official_record_ids():
return (34737,)
[docs]class WktCoordinateSystemVlr(BaseKnownVLR):
"""Replaces Coordinates Reference System for new las files (point fmt >= 5)
"LAS is not using the “ESRI WKT”
"""
def __init__(self, wkt_string=""):
super().__init__(description="OGC Transformation Record")
self.string = wkt_string
def _encode_string(self):
return encode_to_null_terminated(self.string, codec="utf-8")
[docs] def parse_record_data(self, record_data):
self.string = record_data.decode("utf-8").rstrip("\0")
[docs] def record_data_bytes(self):
return self._encode_string()
[docs] @staticmethod
def official_user_id():
return "LASF_Projection"
[docs] @staticmethod
def official_record_ids():
return (2112,)
[docs]def vlr_factory(vlr: VLR):
"""Given a vlr tries to find its corresponding KnownVLR class
that can parse its data.
If no KnownVLR implementation is found, returns the input vlr unchanged
"""
user_id = vlr.user_id
known_vlrs = BaseKnownVLR.__subclasses__()
for known_vlr in known_vlrs:
if (
known_vlr.official_user_id() == user_id
and vlr.record_id in known_vlr.official_record_ids()
):
try:
return known_vlr.from_raw(vlr)
except Exception as err:
logger.warning(f"Failed to parse {known_vlr}: {err}")
return vlr
return vlr