diff --git a/adafruit_floppy.py b/adafruit_floppy.py index e3640e8..9bd85b7 100644 --- a/adafruit_floppy.py +++ b/adafruit_floppy.py @@ -12,6 +12,7 @@ * Author(s): Jeff Epler """ +import struct import floppyio from digitalio import DigitalInOut, Pull from micropython import const @@ -55,7 +56,7 @@ class Floppy: # pylint: disable=too-many-instance-attributes _track: typing.Optional[int] - def __init__( + def __init__( # pylint: disable=too-many-locals self, *, densitypin: microcontroller.Pin, @@ -72,6 +73,7 @@ def __init__( wrdatapin: typing.Optional[microcontroller.Pin] = None, wrgatepin: typing.Optional[microcontroller.Pin] = None, floppydirectionpin: typing.Optional[microcontroller.Pin] = None, + floppyenablepin: typing.Optional[microcontroller.Pin] = None, ) -> None: self._density = DigitalInOut(densitypin) self._density.pull = Pull.UP @@ -102,6 +104,10 @@ def __init__( if self._floppydirection: self._floppydirection.switch_to_output(True) + self._floppyenable = _optionaldigitalinout(floppyenablepin) + if self._floppyenable: + self._floppyenable.switch_to_output(False) + self._track = None def _do_step(self, direction, count): @@ -156,10 +162,12 @@ def track(self, track: int) -> None: raise ValueError("Invalid seek to negative track number") delta = track - self.track - if delta < 0: - self._do_step(_STEP_OUT, -delta) - elif delta > 0: - self._do_step(_STEP_IN, delta) + if delta: + if delta < 0: + self._do_step(_STEP_OUT, -delta) + elif delta > 0: + self._do_step(_STEP_IN, delta) + _sleep_ms(_STEP_DELAY_MS) self._track = track self._check_inpos() @@ -205,7 +213,7 @@ def side(self) -> int: def side(self, head: int) -> None: self._side.value = head == 0 - def flux_readinto(self, buf: "circuitpython_typing.WritableBuffer") -> int: + def flux_readinto(self, buf: "circuitpython_typing.WriteableBuffer") -> int: """Read flux transition information into the buffer. The function returns when the buffer has filled, or when the index input @@ -222,7 +230,8 @@ def flux_readinto(self, buf: "circuitpython_typing.WritableBuffer") -> int: class FloppyBlockDevice: # pylint: disable=too-many-instance-attributes """Wrap an MFMFloppy object into a block device suitable for `storage.VfsFat` - The default heads/sectors/tracks setting are for 3.5", 1.44MB floppies. + The default is to autodetect the data rate and the geometry of an inserted + floppy using the floppy's "BIOS paramter block" In the current implementation, the floppy is read-only. @@ -243,30 +252,75 @@ class FloppyBlockDevice: # pylint: disable=too-many-instance-attributes def __init__( # pylint: disable=too-many-arguments self, floppy, - heads=2, - sectors=18, - tracks=80, - flux_buffer=None, - t1_nom_ns: float = 1000, + *, + max_sectors=18, + autodetect: bool = True, + heads: int | None = None, + sectors: int | None = None, + tracks: int | None = None, + flux_buffer: circuitpython_typing.WriteableBuffer | None = None, + t1_nom_ns: float | None = None, + keep_selected: bool = False, ): self.floppy = floppy - self.heads = heads - self.sectors = sectors - self.tracks = tracks - self.flux_buffer = flux_buffer or bytearray(sectors * 12 * 512) - self.track0side0_cache = memoryview(bytearray(sectors * 512)) - self.track0side0_validity = bytearray(sectors) - self.track_cache = memoryview(bytearray(sectors * 512)) - self.track_validity = bytearray(sectors) + self.flux_buffer = flux_buffer or bytearray(max_sectors * 12 * 512) + self.track0side0_cache = memoryview(bytearray(max_sectors * 512)) + self.track_cache = memoryview(bytearray(max_sectors * 512)) + self._keep_selected = keep_selected + self.cached_track = -1 + self.cached_side = -1 - self._t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9) - self._t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9) + if autodetect: + self.autodetect() + else: + self.setformat(heads, sectors, tracks, t1_nom_ns) + + if keep_selected: + self.floppy.selected = True + self.floppy.spin = True + + @property + def keep_selected(self) -> bool: + """Whether to keep the drive selected & spinning between operations + + This can make operations faster by avoiding spin up time""" + return self._keep_selected + + @keep_selected.setter + def keep_selected(self, value: bool): + self.floppy.selected = value + self.floppy.spin = value + + def _select_and_spin(self, value: bool): + if self.keep_selected: + return + self.floppy.selected = value + self.floppy.spin = value + + def on_disk_change(self): + """This function (or autodetect or setformat) must be called after a disk is changed + + Flushes the cached floppy data""" self._track_read(self.track0side0_cache, self.track0side0_validity, 0, 0) self.cached_track = -1 self.cached_side = -1 + def setformat(self, heads, sectors, tracks, t1_nom_ns): + """Set the floppy format details + + This also calls on_disk_change to flush cached floppy data.""" + self.heads = heads + self.sectors = sectors + self.tracks = tracks + self._t1_nom_ns = t1_nom_ns + self._t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9) + self._t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9) + self.track0side0_validity = bytearray(sectors) + self.track_validity = bytearray(sectors) + self.on_disk_change() + def deinit(self): """Deinitialize this object""" self.floppy.deinit() @@ -311,22 +365,25 @@ def _get_track_data(self, track, side): return self.track_cache, self.track_validity def _track_read(self, track_data, validity, track, side): - self.floppy.selected = True - self.floppy.spin = True + self._select_and_spin(True) self.floppy.track = track self.floppy.side = side self._mfm_readinto(track_data, validity) - self.floppy.spin = False - self.floppy.selected = False + self._select_and_spin(False) self.cached_track = track self.cached_side = side def _mfm_readinto(self, track_data, validity): + n = 0 + exc = None for i in range(5): - self.floppy.flux_readinto(self.flux_buffer) - print("timing bins", self._t2_5_max, self._t3_5_max) + try: + self.floppy.flux_readinto(self.flux_buffer) + except RuntimeError as error: + exc = error + continue n = floppyio.mfm_readinto( - track_data, + track_data[: 512 * self.sectors], self.flux_buffer, self._t2_5_max, self._t3_5_max, @@ -335,3 +392,89 @@ def _mfm_readinto(self, track_data, validity): ) if n == self.sectors: break + if n == 0 and exc is not None: + raise exc + + def _detect_diskformat_from_flux(self): + sector = self.track_cache[:512] + # The first two numbers are HD and DD rates. The next two are the bit + # rates for 300RPM media read in a 360RPM drive. + for t1_nom_ns in [1_000, 2_000, 8_33, 1_667]: + t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9) + t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9) + + n = floppyio.mfm_readinto( + sector, + self.flux_buffer, + t2_5_max, + t3_5_max, + ) + + if n == 0: + continue + + if sector[510] != 0x55 or sector[511] != 0xAA: + print("did not find boot signature 55 AA") + print( + "First 16 bytes in sector:", + " ".join("%02x" % c for c in sector[:16]), + ) + print( + "Final 16 bytes in sector:", + " ".join("%02x" % c for c in sector[-16:]), + ) + continue + + n_sectors_track = sector[0x18] + n_heads = sector[0x1A] + if n_heads != 2: + print(f"unsupported head count {n_heads=}") + continue + n_sectors_total = struct.unpack(" a key was pressed + break + keys = new_keys + + print() + + +print("\033[H\033[2JFloppy Archiver") +print("Archive standard DOS floppies to SD card in IMG format") +print() + +try: + sdcard = sdcardio.SDCard(board.SPI(), board.SD_CS) + vfs = storage.VfsFat(sdcard) + storage.mount(vfs, "/sd") + print("Mounted SD card.") +except Exception as e: + print("Failed to mount SD card:") + print(e) + raise SystemExit # pylint: disable=raise-missing-from + +dev = None +blockdata = bytearray(512) +baddata = b"BADDATA0" * 64 +assert len(baddata) == len(blockdata) + +while True: + if dev is not None: + dev.floppy.keep_selected = False + vfsstat = vfs.statvfs(vfs) + avail = vfsstat[0] * vfsstat[4] / 1024 / 1024 + print(f"/sd: {avail:.1f}MiB available") + smart_input("Insert disk and press any key") + + try: + if dev is None: + dev = adafruit_floppy.FloppyBlockDevice(floppy, keep_selected=True) + else: + dev.floppy.keep_selected = True + dev.autodetect() + except OSError as e: + print(e) + continue + + dev.readblocks(0, blockdata) + label = blockdata[43:54].decode("ascii", "replace").strip() + fstype = blockdata[54:61].decode("ascii", "replace").strip() + print(f"\033[H\033[2JArchiving {label!r} ({fstype!r})") + + bad_blocks = good_blocks = 0 + total_blocks = dev.count() + pertrack = dev.sectors * dev.heads + with open_next_image() as f: + for i in range(total_blocks): + if i % pertrack == 0: + print(end=f"{i//pertrack:02d}") + try: + dev.readblocks(i, blockdata) + print(end=".") + f.write(blockdata) + good_blocks += 1 + except Exception as e: # pylint: disable=broad-exception-caught + bad_blocks += 1 + print(end="!") + f.write(baddata) + if i % pertrack == (pertrack // 2 - 1): + print(end="|") + if i % pertrack == (pertrack - 1): + print() + + print() + print(f"Archived {label!r} to {last_filename.split('/')[-1]}") + print(f"{good_blocks} good + {bad_blocks} bad blocks") + print(f"out of {total_blocks} ({total_blocks//2}KiB)") + print()