Skip to content

Commit

Permalink
Merge pull request #12 from adafruit/floppsy-revb-fat-archiver
Browse files Browse the repository at this point in the history
Update for floppsy rev b & add DOS floppy archiver
  • Loading branch information
jepler authored Nov 27, 2024
2 parents 7cc730a + 4636370 commit bde51e9
Show file tree
Hide file tree
Showing 3 changed files with 334 additions and 29 deletions.
201 changes: 172 additions & 29 deletions adafruit_floppy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Author(s): Jeff Epler
"""

import struct
import floppyio
from digitalio import DigitalInOut, Pull
from micropython import const
Expand Down Expand Up @@ -55,7 +56,7 @@ class Floppy: # pylint: disable=too-many-instance-attributes

_track: typing.Optional[int]

def __init__(
def __init__( # pylint: disable=too-many-locals
self,
*,
densitypin: microcontroller.Pin,
Expand All @@ -72,6 +73,7 @@ def __init__(
wrdatapin: typing.Optional[microcontroller.Pin] = None,
wrgatepin: typing.Optional[microcontroller.Pin] = None,
floppydirectionpin: typing.Optional[microcontroller.Pin] = None,
floppyenablepin: typing.Optional[microcontroller.Pin] = None,
) -> None:
self._density = DigitalInOut(densitypin)
self._density.pull = Pull.UP
Expand Down Expand Up @@ -102,6 +104,10 @@ def __init__(
if self._floppydirection:
self._floppydirection.switch_to_output(True)

self._floppyenable = _optionaldigitalinout(floppyenablepin)
if self._floppyenable:
self._floppyenable.switch_to_output(False)

self._track = None

def _do_step(self, direction, count):
Expand Down Expand Up @@ -156,10 +162,12 @@ def track(self, track: int) -> None:
raise ValueError("Invalid seek to negative track number")

delta = track - self.track
if delta < 0:
self._do_step(_STEP_OUT, -delta)
elif delta > 0:
self._do_step(_STEP_IN, delta)
if delta:
if delta < 0:
self._do_step(_STEP_OUT, -delta)
elif delta > 0:
self._do_step(_STEP_IN, delta)
_sleep_ms(_STEP_DELAY_MS)

self._track = track
self._check_inpos()
Expand Down Expand Up @@ -205,7 +213,7 @@ def side(self) -> int:
def side(self, head: int) -> None:
self._side.value = head == 0

def flux_readinto(self, buf: "circuitpython_typing.WritableBuffer") -> int:
def flux_readinto(self, buf: "circuitpython_typing.WriteableBuffer") -> int:
"""Read flux transition information into the buffer.
The function returns when the buffer has filled, or when the index input
Expand All @@ -222,7 +230,8 @@ def flux_readinto(self, buf: "circuitpython_typing.WritableBuffer") -> int:
class FloppyBlockDevice: # pylint: disable=too-many-instance-attributes
"""Wrap an MFMFloppy object into a block device suitable for `storage.VfsFat`
The default heads/sectors/tracks setting are for 3.5", 1.44MB floppies.
The default is to autodetect the data rate and the geometry of an inserted
floppy using the floppy's "BIOS paramter block"
In the current implementation, the floppy is read-only.
Expand All @@ -243,30 +252,75 @@ class FloppyBlockDevice: # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
floppy,
heads=2,
sectors=18,
tracks=80,
flux_buffer=None,
t1_nom_ns: float = 1000,
*,
max_sectors=18,
autodetect: bool = True,
heads: int | None = None,
sectors: int | None = None,
tracks: int | None = None,
flux_buffer: circuitpython_typing.WriteableBuffer | None = None,
t1_nom_ns: float | None = None,
keep_selected: bool = False,
):
self.floppy = floppy
self.heads = heads
self.sectors = sectors
self.tracks = tracks
self.flux_buffer = flux_buffer or bytearray(sectors * 12 * 512)
self.track0side0_cache = memoryview(bytearray(sectors * 512))
self.track0side0_validity = bytearray(sectors)
self.track_cache = memoryview(bytearray(sectors * 512))
self.track_validity = bytearray(sectors)
self.flux_buffer = flux_buffer or bytearray(max_sectors * 12 * 512)
self.track0side0_cache = memoryview(bytearray(max_sectors * 512))
self.track_cache = memoryview(bytearray(max_sectors * 512))
self._keep_selected = keep_selected
self.cached_track = -1
self.cached_side = -1

self._t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
self._t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
if autodetect:
self.autodetect()
else:
self.setformat(heads, sectors, tracks, t1_nom_ns)

if keep_selected:
self.floppy.selected = True
self.floppy.spin = True

@property
def keep_selected(self) -> bool:
"""Whether to keep the drive selected & spinning between operations
This can make operations faster by avoiding spin up time"""
return self._keep_selected

@keep_selected.setter
def keep_selected(self, value: bool):
self.floppy.selected = value
self.floppy.spin = value

def _select_and_spin(self, value: bool):
if self.keep_selected:
return
self.floppy.selected = value
self.floppy.spin = value

def on_disk_change(self):
"""This function (or autodetect or setformat) must be called after a disk is changed
Flushes the cached floppy data"""

self._track_read(self.track0side0_cache, self.track0side0_validity, 0, 0)

self.cached_track = -1
self.cached_side = -1

def setformat(self, heads, sectors, tracks, t1_nom_ns):
"""Set the floppy format details
This also calls on_disk_change to flush cached floppy data."""
self.heads = heads
self.sectors = sectors
self.tracks = tracks
self._t1_nom_ns = t1_nom_ns
self._t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
self._t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
self.track0side0_validity = bytearray(sectors)
self.track_validity = bytearray(sectors)
self.on_disk_change()

def deinit(self):
"""Deinitialize this object"""
self.floppy.deinit()
Expand Down Expand Up @@ -311,22 +365,25 @@ def _get_track_data(self, track, side):
return self.track_cache, self.track_validity

def _track_read(self, track_data, validity, track, side):
self.floppy.selected = True
self.floppy.spin = True
self._select_and_spin(True)
self.floppy.track = track
self.floppy.side = side
self._mfm_readinto(track_data, validity)
self.floppy.spin = False
self.floppy.selected = False
self._select_and_spin(False)
self.cached_track = track
self.cached_side = side

def _mfm_readinto(self, track_data, validity):
n = 0
exc = None
for i in range(5):
self.floppy.flux_readinto(self.flux_buffer)
print("timing bins", self._t2_5_max, self._t3_5_max)
try:
self.floppy.flux_readinto(self.flux_buffer)
except RuntimeError as error:
exc = error
continue
n = floppyio.mfm_readinto(
track_data,
track_data[: 512 * self.sectors],
self.flux_buffer,
self._t2_5_max,
self._t3_5_max,
Expand All @@ -335,3 +392,89 @@ def _mfm_readinto(self, track_data, validity):
)
if n == self.sectors:
break
if n == 0 and exc is not None:
raise exc

def _detect_diskformat_from_flux(self):
sector = self.track_cache[:512]
# The first two numbers are HD and DD rates. The next two are the bit
# rates for 300RPM media read in a 360RPM drive.
for t1_nom_ns in [1_000, 2_000, 8_33, 1_667]:
t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9)

n = floppyio.mfm_readinto(
sector,
self.flux_buffer,
t2_5_max,
t3_5_max,
)

if n == 0:
continue

if sector[510] != 0x55 or sector[511] != 0xAA:
print("did not find boot signature 55 AA")
print(
"First 16 bytes in sector:",
" ".join("%02x" % c for c in sector[:16]),
)
print(
"Final 16 bytes in sector:",
" ".join("%02x" % c for c in sector[-16:]),
)
continue

n_sectors_track = sector[0x18]
n_heads = sector[0x1A]
if n_heads != 2:
print(f"unsupported head count {n_heads=}")
continue
n_sectors_total = struct.unpack("<H", sector[0x13:0x15])[0]
n_tracks = n_sectors_total // (n_heads * n_sectors_track)
f_tracks = n_sectors_total % (n_heads * n_sectors_track)
if f_tracks != 0:
# pylint: disable=line-too-long
print(
f"Dubious geometry! {n_sectors_total=} {n_sectors_track=} {n_heads=} is {n_tracks=}+{f_tracks=}"
)
n_tracks += 1

return {
"heads": n_heads,
"sectors": n_sectors_track,
"tracks": n_tracks,
"t1_nom_ns": t1_nom_ns,
}

def autodetect(self):
"""Detect an inserted DOS floppy
The floppy must have a standard MFM data rate & DOS 2.0 compatible Bios
Parameter Block (BPB). Almost all FAT formatted floppies for DOS & Windows
should autodetect in this way.
This also flushes the cached data.
"""
self._select_and_spin(True)
self.floppy.track = 1
self.floppy.track = 0
self.floppy.side = 0
exc = None
try:
for _ in range(5): # try repeatedly to read track 0 side 0 sector 0
try:
self.floppy.flux_readinto(self.flux_buffer)
except RuntimeError as error:
exc = error
continue
diskformat = self._detect_diskformat_from_flux()
if diskformat is not None:
break
finally:
self._select_and_spin(False)

if diskformat is not None:
self.setformat(**diskformat)
else:
raise OSError("Failed to detect floppy format") from exc
2 changes: 2 additions & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
API Documentation
-----------------

.. If you created a package, create one automodule per module in the package.
Expand Down
Loading

0 comments on commit bde51e9

Please sign in to comment.