From e78ff7440b6f356cb8448e3c3f150594a4cceec6 Mon Sep 17 00:00:00 2001 From: Jeff Epler Date: Tue, 26 Nov 2024 11:00:45 -0600 Subject: [PATCH 1/3] Update for floppsy rev b & add DOS floppy archiver --- adafruit_floppy.py | 199 +++++++++++++++++++++++++++++++++------ examples/dos_archiver.py | 104 ++++++++++++++++++++ 2 files changed, 275 insertions(+), 28 deletions(-) create mode 100644 examples/dos_archiver.py diff --git a/adafruit_floppy.py b/adafruit_floppy.py index e3640e8..2eab064 100644 --- a/adafruit_floppy.py +++ b/adafruit_floppy.py @@ -12,6 +12,7 @@ * Author(s): Jeff Epler """ +import struct import floppyio from digitalio import DigitalInOut, Pull from micropython import const @@ -55,7 +56,7 @@ class Floppy: # pylint: disable=too-many-instance-attributes _track: typing.Optional[int] - def __init__( + def __init__( # pylint: disable=too-many-locals self, *, densitypin: microcontroller.Pin, @@ -72,6 +73,7 @@ def __init__( wrdatapin: typing.Optional[microcontroller.Pin] = None, wrgatepin: typing.Optional[microcontroller.Pin] = None, floppydirectionpin: typing.Optional[microcontroller.Pin] = None, + floppyenablepin: typing.Optional[microcontroller.Pin] = None, ) -> None: self._density = DigitalInOut(densitypin) self._density.pull = Pull.UP @@ -102,6 +104,10 @@ def __init__( if self._floppydirection: self._floppydirection.switch_to_output(True) + self._floppyenable = _optionaldigitalinout(floppyenablepin) + if self._floppyenable: + self._floppyenable.switch_to_output(False) + self._track = None def _do_step(self, direction, count): @@ -156,10 +162,12 @@ def track(self, track: int) -> None: raise ValueError("Invalid seek to negative track number") delta = track - self.track - if delta < 0: - self._do_step(_STEP_OUT, -delta) - elif delta > 0: - self._do_step(_STEP_IN, delta) + if delta: + if delta < 0: + self._do_step(_STEP_OUT, -delta) + elif delta > 0: + self._do_step(_STEP_IN, delta) + _sleep_ms(_STEP_DELAY_MS) self._track = track self._check_inpos() @@ -222,7 +230,8 @@ def flux_readinto(self, buf: "circuitpython_typing.WritableBuffer") -> int: class FloppyBlockDevice: # pylint: disable=too-many-instance-attributes """Wrap an MFMFloppy object into a block device suitable for `storage.VfsFat` - The default heads/sectors/tracks setting are for 3.5", 1.44MB floppies. + The default is to autodetect the data rate and the geometry of an inserted + floppy using the floppy's "BIOS paramter block" In the current implementation, the floppy is read-only. @@ -243,30 +252,75 @@ class FloppyBlockDevice: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, floppy, - heads=2, - sectors=18, - tracks=80, - flux_buffer=None, - t1_nom_ns: float = 1000, + *, + max_sectors=18, + autodetect: bool = True, + heads: int | None = None, + sectors: int | None = None, + tracks: int | None = None, + flux_buffer: circuitpython_typing.WritableBuffer | None = None, + t1_nom_ns: float | None = None, + keep_selected: bool = False, ): self.floppy = floppy - self.heads = heads - self.sectors = sectors - self.tracks = tracks - self.flux_buffer = flux_buffer or bytearray(sectors * 12 * 512) - self.track0side0_cache = memoryview(bytearray(sectors * 512)) - self.track0side0_validity = bytearray(sectors) - self.track_cache = memoryview(bytearray(sectors * 512)) - self.track_validity = bytearray(sectors) + self.flux_buffer = flux_buffer or bytearray(max_sectors * 12 * 512) + self.track0side0_cache = memoryview(bytearray(max_sectors * 512)) + self.track_cache = memoryview(bytearray(max_sectors * 512)) + self._keep_selected = keep_selected + self.cached_track = -1 + self.cached_side = -1 - self._t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9) - self._t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9) + if autodetect: + self.autodetect() + else: + self.setformat(heads, sectors, tracks, t1_nom_ns) + + if keep_selected: + self.floppy.selected = True + self.floppy.spin = True + + @property + def keep_selected(self) -> bool: + """Whether to keep the drive selected & spinning between operations + + This can make operations faster by avoiding spin up time""" + return self._keep_selected + + @keep_selected.setter + def keep_selected(self, value: bool): + self.floppy.selected = value + self.floppy.spin = value + + def _select_and_spin(self, value: bool): + if self.keep_selected: + return + self.floppy.selected = value + self.floppy.spin = value + + def on_disk_change(self): + """This function (or autodetect or setformat) must be called after a disk is changed + + Flushes the cached floppy data""" self._track_read(self.track0side0_cache, self.track0side0_validity, 0, 0) self.cached_track = -1 self.cached_side = -1 + def setformat(self, heads, sectors, tracks, t1_nom_ns): + """Set the floppy format details + + This also calls on_disk_change to flush cached floppy data.""" + self.heads = heads + self.sectors = sectors + self.tracks = tracks + self._t1_nom_ns = t1_nom_ns + self._t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9) + self._t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9) + self.track0side0_validity = bytearray(sectors) + self.track_validity = bytearray(sectors) + self.on_disk_change() + def deinit(self): """Deinitialize this object""" self.floppy.deinit() @@ -311,22 +365,25 @@ def _get_track_data(self, track, side): return self.track_cache, self.track_validity def _track_read(self, track_data, validity, track, side): - self.floppy.selected = True - self.floppy.spin = True + self._select_and_spin(True) self.floppy.track = track self.floppy.side = side self._mfm_readinto(track_data, validity) - self.floppy.spin = False - self.floppy.selected = False + self._select_and_spin(False) self.cached_track = track self.cached_side = side def _mfm_readinto(self, track_data, validity): + n = 0 + exc = None for i in range(5): - self.floppy.flux_readinto(self.flux_buffer) - print("timing bins", self._t2_5_max, self._t3_5_max) + try: + self.floppy.flux_readinto(self.flux_buffer) + except RuntimeError as error: + exc = error + continue n = floppyio.mfm_readinto( - track_data, + track_data[: 512 * self.sectors], self.flux_buffer, self._t2_5_max, self._t3_5_max, @@ -335,3 +392,89 @@ def _mfm_readinto(self, track_data, validity): ) if n == self.sectors: break + if n == 0 and exc is not None: + raise exc + + def _detect_diskformat_from_flux(self): + sector = self.track_cache[:512] + # The first two numbers are HD and DD rates. The next two are the bit + # rates for 300RPM media read in a 360RPM drive. + for t1_nom_ns in [1_000, 2_000, 8_33, 1_667]: + t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9) + t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9) + + n = floppyio.mfm_readinto( + sector, + self.flux_buffer, + t2_5_max, + t3_5_max, + ) + + if n == 0: + continue + + if sector[510] != 0x55 or sector[511] != 0xAA: + print("did not find boot signature 55 AA") + print( + "First 16 bytes in sector:", + " ".join("%02x" % c for c in sector[:16]), + ) + print( + "Final 16 bytes in sector:", + " ".join("%02x" % c for c in sector[-16:]), + ) + continue + + n_sectors_track = sector[0x18] + n_heads = sector[0x1A] + if n_heads != 2: + print(f"unsupported head count {n_heads=}") + continue + n_sectors_total = struct.unpack(" Date: Tue, 26 Nov 2024 14:52:59 -0600 Subject: [PATCH 2/3] fix sphinx errors --- adafruit_floppy.py | 4 ++-- docs/api.rst | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/adafruit_floppy.py b/adafruit_floppy.py index 2eab064..9bd85b7 100644 --- a/adafruit_floppy.py +++ b/adafruit_floppy.py @@ -213,7 +213,7 @@ def side(self) -> int: def side(self, head: int) -> None: self._side.value = head == 0 - def flux_readinto(self, buf: "circuitpython_typing.WritableBuffer") -> int: + def flux_readinto(self, buf: "circuitpython_typing.WriteableBuffer") -> int: """Read flux transition information into the buffer. The function returns when the buffer has filled, or when the index input @@ -258,7 +258,7 @@ def __init__( # pylint: disable=too-many-arguments heads: int | None = None, sectors: int | None = None, tracks: int | None = None, - flux_buffer: circuitpython_typing.WritableBuffer | None = None, + flux_buffer: circuitpython_typing.WriteableBuffer | None = None, t1_nom_ns: float | None = None, keep_selected: bool = False, ): diff --git a/docs/api.rst b/docs/api.rst index f5e9cae..74a92f1 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -1,3 +1,5 @@ +API Documentation +----------------- .. If you created a package, create one automodule per module in the package. From 4636370b6d0c656c80738762f56c202f85f891ca Mon Sep 17 00:00:00 2001 From: Jeff Epler Date: Wed, 27 Nov 2024 09:27:14 -0600 Subject: [PATCH 3/3] Improve example with better messages & use of onboard buttons --- examples/dos_archiver.py | 86 +++++++++++++++++++++++++++++++++------- 1 file changed, 71 insertions(+), 15 deletions(-) diff --git a/examples/dos_archiver.py b/examples/dos_archiver.py index f931e58..868180d 100644 --- a/examples/dos_archiver.py +++ b/examples/dos_archiver.py @@ -2,12 +2,6 @@ # # SPDX-License-Identifier: Unlicense -import os -import sdcardio -import board -import storage -import adafruit_floppy - """DOS floppy archiver for Adafruit Floppsy Insert an SD card & hook up your floppy drive. @@ -15,6 +9,19 @@ Insert a floppy and press Enter to archive it Do this for as many floppies as you like.""" +import os +import sdcardio +import board +import storage +import usb_cdc +import adafruit_aw9523 +import adafruit_floppy + +i2c = board.I2C() # uses board.SCL and board.SDA +aw = adafruit_aw9523.AW9523(i2c) +aw.directions = 0 +KEY_BITS = 0xF + floppy = adafruit_floppy.Floppy( densitypin=board.DENSITY, indexpin=board.INDEX, @@ -34,11 +41,12 @@ ) _image_counter = 0 +last_filename = None def open_next_image(extension="img"): """Return an opened numbered file on the sdcard, such as "img01234.jpg".""" - global _image_counter # pylint: disable=global-statement + global _image_counter, last_filename # pylint: disable=global-statement try: os.stat("/sd") except OSError as exc: # no SD card! @@ -51,21 +59,63 @@ def open_next_image(extension="img"): except OSError: break print("Writing to", filename) + last_filename = filename return open(filename, "wb") -sdcard = sdcardio.SDCard(board.SPI(), board.SD_CS) -vfs = storage.VfsFat(sdcard) -storage.mount(vfs, "/sd") +def smart_input(prompt): + print(end=prompt) + + console = usb_cdc.console + serial_connected = console.connected + console.flush() + keys = aw.inputs & KEY_BITS + + while True: + new_connected = console.connected + if new_connected and not serial_connected: + print(end="\r") + print(end=prompt) + serial_connected = new_connected + + if n := console.in_waiting: + console.read(n) + break + + new_keys = aw.inputs & KEY_BITS + if ~new_keys & keys: # A bit went to 0 -> a key was pressed + break + keys = new_keys + + print() + + +print("\033[H\033[2JFloppy Archiver") +print("Archive standard DOS floppies to SD card in IMG format") +print() + +try: + sdcard = sdcardio.SDCard(board.SPI(), board.SD_CS) + vfs = storage.VfsFat(sdcard) + storage.mount(vfs, "/sd") + print("Mounted SD card.") +except Exception as e: + print("Failed to mount SD card:") + print(e) + raise SystemExit # pylint: disable=raise-missing-from dev = None blockdata = bytearray(512) baddata = b"BADDATA0" * 64 +assert len(baddata) == len(blockdata) while True: if dev is not None: dev.floppy.keep_selected = False - input("Insert disk and press ENTER") + vfsstat = vfs.statvfs(vfs) + avail = vfsstat[0] * vfsstat[4] / 1024 / 1024 + print(f"/sd: {avail:.1f}MiB available") + smart_input("Insert disk and press any key") try: if dev is None: @@ -77,6 +127,11 @@ def open_next_image(extension="img"): print(e) continue + dev.readblocks(0, blockdata) + label = blockdata[43:54].decode("ascii", "replace").strip() + fstype = blockdata[54:61].decode("ascii", "replace").strip() + print(f"\033[H\033[2JArchiving {label!r} ({fstype!r})") + bad_blocks = good_blocks = 0 total_blocks = dev.count() pertrack = dev.sectors * dev.heads @@ -98,7 +153,8 @@ def open_next_image(extension="img"): if i % pertrack == (pertrack - 1): print() - print( - f"{good_blocks} good + {bad_blocks} bad blocks", - f"out of {total_blocks} ({total_blocks//2}KiB)", - ) + print() + print(f"Archived {label!r} to {last_filename.split('/')[-1]}") + print(f"{good_blocks} good + {bad_blocks} bad blocks") + print(f"out of {total_blocks} ({total_blocks//2}KiB)") + print()