From de1594cf5083524b4a6229cea870cc9352e1f3ea Mon Sep 17 00:00:00 2001 From: The Wobbler Date: Sun, 30 Mar 2025 15:05:14 +0200 Subject: [PATCH] Got Vorbis writing partially working. (The written metadata gets recognized by TinyTag without problems, but the audio players can't handle it.) --- smalltag/_formats/__init__.py | 5 + smalltag/_formats/_ogg.py | 207 ++++++++++++++++++++++++++++++++++ 2 files changed, 212 insertions(+) create mode 100644 smalltag/_formats/_ogg.py diff --git a/smalltag/_formats/__init__.py b/smalltag/_formats/__init__.py index a68faf1..dbb51c0 100644 --- a/smalltag/_formats/__init__.py +++ b/smalltag/_formats/__init__.py @@ -2,6 +2,7 @@ if False: from ._id3 import _ID3 # noqa + from ._ogg import _Ogg def __getattr__(name): @@ -9,3 +10,7 @@ def __getattr__(name): case "_ID3": from ._id3 import _ID3 return _ID3 + + case "_Ogg": + from ._ogg import _Ogg + return _Ogg diff --git a/smalltag/_formats/_ogg.py b/smalltag/_formats/_ogg.py new file mode 100644 index 0000000..20aa72e --- /dev/null +++ b/smalltag/_formats/_ogg.py @@ -0,0 +1,207 @@ +#!/usr/bin/python3 + +from __future__ import annotations + +import os +from struct import pack, unpack +from tinytag import tinytag + +from .. import SmallTag + +if False: + from collections.abc import Callable, Iterator # noqa + from typing import BinaryIO + + +class _Ogg(SmallTag, tinytag._Ogg): + def write(self, advanced: dict[str, str | float | list[str]] | None=None, file_obj: BinaryIO = None): + should_close_file = self._set_filehandler_for_writing(file_obj) + + old_structure = self._parse_needed_for_vorbis_compose(self._filehandler) + + id_header_length, old_header_length, vorbis_setup_header_length, vorbis_stream_serial_number = old_structure + + self._filehandler.seek(id_header_length + old_header_length) + vorbis_setup_header = self._filehandler.read(vorbis_setup_header_length) + + self._filehandler.seek(id_header_length + old_header_length + vorbis_setup_header_length) + content_after_tags = self._filehandler.read() + + new_tag = self._compose_vorbis(advanced) + + ogg = ( + self._compose_ogg_header(new_tag, vorbis_setup_header, vorbis_stream_serial_number) + + new_tag + + vorbis_setup_header + ) + + self._filehandler.seek(id_header_length) + self._filehandler.write(ogg) + + self._filehandler.write(content_after_tags) + self._filehandler.truncate() + + self._filehandler.seek(id_header_length + len(ogg)) + print(self._filehandler.read(100)) + + if should_close_file: + self._filehandler.close() + + def compose_tag(self, advanced: dict[str, str | float | list[str]] | None=None) -> bytes: + return self._compose_vorbis() + + def _compose_vorbis(self, advanced: dict | None=None) -> bytes : + return self._compose_vorbis_comment_header(advanced) + + def _compose_vorbis_comment_header(self, advanced: dict | None=None) -> bytes: + header = b"\x03vorbis" + + vendor = b"SmallTag" + vendor_length = pack("I", len(vendor)) + header += vendor_length + vendor + + num_fields, fields = self._compose_vorbis_comments(advanced) + header += pack("I", num_fields) + fields + + header += b"\x01" # framing bit + + return header + + def _compose_ogg_header( + self, + comment_header: bytes, + setup_header: bytes, + vorbis_stream_serial_number: int + ) -> bytes: + # see: https://www.xiph.org/vorbis/doc/framing.html + + # capture pattern, structure version, header type, granule position + header = b"OggS\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" + + header += pack(">I", vorbis_stream_serial_number) # stream serial number + + header += b"\x01\x00\x00\x00\x00\x00\x00\x00" # page sequence, checksum set to 0 + + segment_table = b"\xff" * (len(comment_header) // 255) # create segment table + segment_table += pack("B", len(comment_header) % 255) + segment_table += b"\xff" * (len(setup_header) // 255) + segment_table += pack("B", len(setup_header) % 255) + + header += pack("B", len(segment_table)) # page segments + header += segment_table # segment table + + page = header + comment_header + setup_header + page_checksum = self._crc_checksum(page) + + header = bytearray(header) + header[22:26] = pack("I", page_checksum) # set correct checksum + header = bytes(header) + + return header + + def _compose_vorbis_comments(self, advanced: dict | None=None) -> (int, bytes): + if advanced is None: + tag_dict = self.as_dict() + + else: + tag_dict = advanced + + self._VORBIS_WRITE_MAPPING = {} # noqa + for field_name, name in self._VORBIS_MAPPING.items(): + if field_name not in ['track', 'disc', 'track_total', 'disc_total']: + self._VORBIS_WRITE_MAPPING[name] = field_name + + num_fields = 0 + fields = b"" + + for field_name, field_value in tag_dict.items(): + if field_name not in self._VORBIS_WRITE_MAPPING: + other_field = "other." + field_name + + if other_field in self._VORBIS_WRITE_MAPPING: + field = self._compose_vorbis_field(other_field, field_value) + + if not field == b"": + fields += field + num_fields += 1 + + continue + + field = self._compose_vorbis_field(field_name, field_value) + + if not field == b"": + fields += field + num_fields += 1 + + return num_fields, fields + + def _compose_vorbis_field(self, field_name: str, field_value: str) -> bytes: + if isinstance(field_value, list): + field_value = field_value[0] + + if isinstance(field_value, int): + field_value = str(field_value) # convert int to numerical string + + elif not isinstance(field_value, str): + return b"" + + comment = bytes(self._VORBIS_WRITE_MAPPING[field_name].upper(), "UTF-8") + b"=" + bytes(field_value, "UTF-8") + + comment_length = pack("I", len(comment)) + + field = comment_length + comment + + return field + + def _parse_needed_for_vorbis_compose(self, filehandler: BinaryIO) -> tuple[int]: + filehandler.seek(0) + + sizes = () + + for i_header in range(2): + header_start = filehandler.tell() + + capture_pattern = filehandler.read(5) + + if not capture_pattern == b"OggS\x00": + raise tinytag.ParseError('Invalid Ogg header') + + filehandler.seek(9, os.SEEK_CUR) + vorbis_stream_serial_number = int.from_bytes(filehandler.read(4)) + + filehandler.seek(8, os.SEEK_CUR) + segments = int.from_bytes(filehandler.read(1)) + + segments_size = 0 + + for i in range(segments): # add up the segment sizes + segments_size += int.from_bytes(filehandler.read(1)) + + if i_header == 0: + filehandler.seek(segments_size, os.SEEK_CUR) + page_size = filehandler.tell() - header_start # add the size of this ogg header + + sizes = sizes + (page_size,) + + else: # second ogg header also contains the setup header + page_size_comments = filehandler.tell() - header_start + page = filehandler.read(segments_size) + + vorbis_headers = page.split(b"\x05vorbis", 1) + page_size_comments += len(vorbis_headers[0]) + header_size_setup = len(vorbis_headers[1]) + 7 # add characters removed by split + + sizes = sizes + (page_size_comments, header_size_setup) + + return sizes + (vorbis_stream_serial_number,) + + def _crc_checksum(self, data: bytes) -> int: + generator_polynomial = 0x04c11db7 + crc = 0 + + for byte in data: + byte ^= crc + crc = byte % generator_polynomial + + return crc +