Got Vorbis writing partially working.

(The written metadata gets recognized by TinyTag without problems, but the audio players can't handle it.)
This commit is contained in:
The Wobbler 2025-03-30 15:05:14 +02:00
parent e7e1d121f9
commit de1594cf50
2 changed files with 212 additions and 0 deletions

View file

@ -2,6 +2,7 @@
if False:
from ._id3 import _ID3 # noqa
from ._ogg import _Ogg
def __getattr__(name):
@ -9,3 +10,7 @@ def __getattr__(name):
case "_ID3":
from ._id3 import _ID3
return _ID3
case "_Ogg":
from ._ogg import _Ogg
return _Ogg

207
smalltag/_formats/_ogg.py Normal file
View file

@ -0,0 +1,207 @@
#!/usr/bin/python3
from __future__ import annotations
import os
from struct import pack, unpack
from tinytag import tinytag
from .. import SmallTag
if False:
from collections.abc import Callable, Iterator # noqa
from typing import BinaryIO
class _Ogg(SmallTag, tinytag._Ogg):
def write(self, advanced: dict[str, str | float | list[str]] | None=None, file_obj: BinaryIO = None):
should_close_file = self._set_filehandler_for_writing(file_obj)
old_structure = self._parse_needed_for_vorbis_compose(self._filehandler)
id_header_length, old_header_length, vorbis_setup_header_length, vorbis_stream_serial_number = old_structure
self._filehandler.seek(id_header_length + old_header_length)
vorbis_setup_header = self._filehandler.read(vorbis_setup_header_length)
self._filehandler.seek(id_header_length + old_header_length + vorbis_setup_header_length)
content_after_tags = self._filehandler.read()
new_tag = self._compose_vorbis(advanced)
ogg = (
self._compose_ogg_header(new_tag, vorbis_setup_header, vorbis_stream_serial_number)
+ new_tag
+ vorbis_setup_header
)
self._filehandler.seek(id_header_length)
self._filehandler.write(ogg)
self._filehandler.write(content_after_tags)
self._filehandler.truncate()
self._filehandler.seek(id_header_length + len(ogg))
print(self._filehandler.read(100))
if should_close_file:
self._filehandler.close()
def compose_tag(self, advanced: dict[str, str | float | list[str]] | None=None) -> bytes:
return self._compose_vorbis()
def _compose_vorbis(self, advanced: dict | None=None) -> bytes :
return self._compose_vorbis_comment_header(advanced)
def _compose_vorbis_comment_header(self, advanced: dict | None=None) -> bytes:
header = b"\x03vorbis"
vendor = b"SmallTag"
vendor_length = pack("I", len(vendor))
header += vendor_length + vendor
num_fields, fields = self._compose_vorbis_comments(advanced)
header += pack("I", num_fields) + fields
header += b"\x01" # framing bit
return header
def _compose_ogg_header(
self,
comment_header: bytes,
setup_header: bytes,
vorbis_stream_serial_number: int
) -> bytes:
# see: https://www.xiph.org/vorbis/doc/framing.html
# capture pattern, structure version, header type, granule position
header = b"OggS\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
header += pack(">I", vorbis_stream_serial_number) # stream serial number
header += b"\x01\x00\x00\x00\x00\x00\x00\x00" # page sequence, checksum set to 0
segment_table = b"\xff" * (len(comment_header) // 255) # create segment table
segment_table += pack("B", len(comment_header) % 255)
segment_table += b"\xff" * (len(setup_header) // 255)
segment_table += pack("B", len(setup_header) % 255)
header += pack("B", len(segment_table)) # page segments
header += segment_table # segment table
page = header + comment_header + setup_header
page_checksum = self._crc_checksum(page)
header = bytearray(header)
header[22:26] = pack("I", page_checksum) # set correct checksum
header = bytes(header)
return header
def _compose_vorbis_comments(self, advanced: dict | None=None) -> (int, bytes):
if advanced is None:
tag_dict = self.as_dict()
else:
tag_dict = advanced
self._VORBIS_WRITE_MAPPING = {} # noqa
for field_name, name in self._VORBIS_MAPPING.items():
if field_name not in ['track', 'disc', 'track_total', 'disc_total']:
self._VORBIS_WRITE_MAPPING[name] = field_name
num_fields = 0
fields = b""
for field_name, field_value in tag_dict.items():
if field_name not in self._VORBIS_WRITE_MAPPING:
other_field = "other." + field_name
if other_field in self._VORBIS_WRITE_MAPPING:
field = self._compose_vorbis_field(other_field, field_value)
if not field == b"":
fields += field
num_fields += 1
continue
field = self._compose_vorbis_field(field_name, field_value)
if not field == b"":
fields += field
num_fields += 1
return num_fields, fields
def _compose_vorbis_field(self, field_name: str, field_value: str) -> bytes:
if isinstance(field_value, list):
field_value = field_value[0]
if isinstance(field_value, int):
field_value = str(field_value) # convert int to numerical string
elif not isinstance(field_value, str):
return b""
comment = bytes(self._VORBIS_WRITE_MAPPING[field_name].upper(), "UTF-8") + b"=" + bytes(field_value, "UTF-8")
comment_length = pack("I", len(comment))
field = comment_length + comment
return field
def _parse_needed_for_vorbis_compose(self, filehandler: BinaryIO) -> tuple[int]:
filehandler.seek(0)
sizes = ()
for i_header in range(2):
header_start = filehandler.tell()
capture_pattern = filehandler.read(5)
if not capture_pattern == b"OggS\x00":
raise tinytag.ParseError('Invalid Ogg header')
filehandler.seek(9, os.SEEK_CUR)
vorbis_stream_serial_number = int.from_bytes(filehandler.read(4))
filehandler.seek(8, os.SEEK_CUR)
segments = int.from_bytes(filehandler.read(1))
segments_size = 0
for i in range(segments): # add up the segment sizes
segments_size += int.from_bytes(filehandler.read(1))
if i_header == 0:
filehandler.seek(segments_size, os.SEEK_CUR)
page_size = filehandler.tell() - header_start # add the size of this ogg header
sizes = sizes + (page_size,)
else: # second ogg header also contains the setup header
page_size_comments = filehandler.tell() - header_start
page = filehandler.read(segments_size)
vorbis_headers = page.split(b"\x05vorbis", 1)
page_size_comments += len(vorbis_headers[0])
header_size_setup = len(vorbis_headers[1]) + 7 # add characters removed by split
sizes = sizes + (page_size_comments, header_size_setup)
return sizes + (vorbis_stream_serial_number,)
def _crc_checksum(self, data: bytes) -> int:
generator_polynomial = 0x04c11db7
crc = 0
for byte in data:
byte ^= crc
crc = byte % generator_polynomial
return crc