Skip to content

Commit

Permalink
Update for floppsy rev b & add DOS floppy archiver
Browse files Browse the repository at this point in the history
  • Loading branch information
jepler committed Nov 26, 2024
1 parent 7cc730a commit bafe35e
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 28 deletions.
199 changes: 171 additions & 28 deletions adafruit_floppy.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* Author(s): Jeff Epler
"""

import struct
import floppyio
from digitalio import DigitalInOut, Pull
from micropython import const
Expand Down Expand Up @@ -55,7 +56,7 @@ class Floppy: # pylint: disable=too-many-instance-attributes

_track: typing.Optional[int]

def __init__(
def __init__( # pylint: disable=too-many-locals
self,
*,
densitypin: microcontroller.Pin,
Expand All @@ -72,6 +73,7 @@ def __init__(
wrdatapin: typing.Optional[microcontroller.Pin] = None,
wrgatepin: typing.Optional[microcontroller.Pin] = None,
floppydirectionpin: typing.Optional[microcontroller.Pin] = None,
floppyenablepin: typing.Optional[microcontroller.Pin] = None,
) -> None:
self._density = DigitalInOut(densitypin)
self._density.pull = Pull.UP
Expand Down Expand Up @@ -102,6 +104,10 @@ def __init__(
if self._floppydirection:
self._floppydirection.switch_to_output(True)

self._floppyenable = _optionaldigitalinout(floppyenablepin)
if self._floppyenable:
self._floppyenable.switch_to_output(False)

self._track = None

def _do_step(self, direction, count):
Expand Down Expand Up @@ -156,10 +162,12 @@ def track(self, track: int) -> None:
raise ValueError("Invalid seek to negative track number")

delta = track - self.track
if delta < 0:
self._do_step(_STEP_OUT, -delta)
elif delta > 0:
self._do_step(_STEP_IN, delta)
if delta:
if delta < 0:
self._do_step(_STEP_OUT, -delta)
elif delta > 0:
self._do_step(_STEP_IN, delta)
_sleep_ms(_STEP_DELAY_MS)

self._track = track
self._check_inpos()
Expand Down Expand Up @@ -222,7 +230,8 @@ def flux_readinto(self, buf: "circuitpython_typing.WritableBuffer") -> int:
class FloppyBlockDevice: # pylint: disable=too-many-instance-attributes
"""Wrap an MFMFloppy object into a block device suitable for `storage.VfsFat`
The default heads/sectors/tracks setting are for 3.5", 1.44MB floppies.
The default is to autodetect the data rate and the geometry of an inserted
floppy using the floppy's "BIOS paramter block"
In the current implementation, the floppy is read-only.
Expand All @@ -243,30 +252,75 @@ class FloppyBlockDevice: # pylint: disable=too-many-instance-attributes
def __init__( # pylint: disable=too-many-arguments
self,
floppy,
heads=2,
sectors=18,
tracks=80,
flux_buffer=None,
t1_nom_ns: float = 1000,
*,
max_sectors=18,
autodetect: bool = True,
heads: int | None = None,
sectors: int | None = None,
tracks: int | None = None,
flux_buffer: circuitpython_typing.WritableBuffer | None = None,
t1_nom_ns: float | None = None,
keep_selected: bool = False,
):
self.floppy = floppy
self.heads = heads
self.sectors = sectors
self.tracks = tracks
self.flux_buffer = flux_buffer or bytearray(sectors * 12 * 512)
self.track0side0_cache = memoryview(bytearray(sectors * 512))
self.track0side0_validity = bytearray(sectors)
self.track_cache = memoryview(bytearray(sectors * 512))
self.track_validity = bytearray(sectors)
self.flux_buffer = flux_buffer or bytearray(max_sectors * 12 * 512)
self.track0side0_cache = memoryview(bytearray(max_sectors * 512))
self.track_cache = memoryview(bytearray(max_sectors * 512))
self._keep_selected = keep_selected
self.cached_track = -1
self.cached_side = -1

self._t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
self._t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
if autodetect:
self.autodetect()
else:
self.setformat(heads, sectors, tracks, t1_nom_ns)

if keep_selected:
self.floppy.selected = True
self.floppy.spin = True

@property
def keep_selected(self) -> bool:
"""Whether to keep the drive selected & spinning between operations
This can make operations faster by avoiding spin up time"""
return self._keep_selected

@keep_selected.setter
def keep_selected(self, value: bool):
self.floppy.selected = value
self.floppy.spin = value

def _select_and_spin(self, value: bool):
if self.keep_selected:
return
self.floppy.selected = value
self.floppy.spin = value

def on_disk_change(self):
"""This function (or autodetect or setformat) must be called after a disk is changed
Flushes the cached floppy data"""

self._track_read(self.track0side0_cache, self.track0side0_validity, 0, 0)

self.cached_track = -1
self.cached_side = -1

def setformat(self, heads, sectors, tracks, t1_nom_ns):
"""Set the floppy format details
This also calls on_disk_change to flush cached floppy data."""
self.heads = heads
self.sectors = sectors
self.tracks = tracks
self._t1_nom_ns = t1_nom_ns
self._t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
self._t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
self.track0side0_validity = bytearray(sectors)
self.track_validity = bytearray(sectors)
self.on_disk_change()

def deinit(self):
"""Deinitialize this object"""
self.floppy.deinit()
Expand Down Expand Up @@ -311,22 +365,25 @@ def _get_track_data(self, track, side):
return self.track_cache, self.track_validity

def _track_read(self, track_data, validity, track, side):
self.floppy.selected = True
self.floppy.spin = True
self._select_and_spin(True)
self.floppy.track = track
self.floppy.side = side
self._mfm_readinto(track_data, validity)
self.floppy.spin = False
self.floppy.selected = False
self._select_and_spin(False)
self.cached_track = track
self.cached_side = side

def _mfm_readinto(self, track_data, validity):
n = 0
exc = None
for i in range(5):
self.floppy.flux_readinto(self.flux_buffer)
print("timing bins", self._t2_5_max, self._t3_5_max)
try:
self.floppy.flux_readinto(self.flux_buffer)
except RuntimeError as error:
exc = error
continue
n = floppyio.mfm_readinto(
track_data,
track_data[: 512 * self.sectors],
self.flux_buffer,
self._t2_5_max,
self._t3_5_max,
Expand All @@ -335,3 +392,89 @@ def _mfm_readinto(self, track_data, validity):
)
if n == self.sectors:
break
if n == 0 and exc is not None:
raise exc

def _detect_diskformat_from_flux(self):
sector = self.track_cache[:512]
# The first two numbers are HD and DD rates. The next two are the bit
# rates for 300RPM media read in a 360RPM drive.
for t1_nom_ns in [1_000, 2_000, 8_33, 1_667]:
t2_5_max = round(2.5 * t1_nom_ns * floppyio.samplerate * 1e-9)
t3_5_max = round(3.5 * t1_nom_ns * floppyio.samplerate * 1e-9)

n = floppyio.mfm_readinto(
sector,
self.flux_buffer,
t2_5_max,
t3_5_max,
)

if n == 0:
continue

if sector[510] != 0x55 or sector[511] != 0xAA:
print("did not find boot signature 55 AA")
print(
"First 16 bytes in sector:",
" ".join("%02x" % c for c in sector[:16]),
)
print(
"Final 16 bytes in sector:",
" ".join("%02x" % c for c in sector[-16:]),
)
continue

n_sectors_track = sector[0x18]
n_heads = sector[0x1A]
if n_heads != 2:
print(f"unsupported head count {n_heads=}")
continue
n_sectors_total = struct.unpack("<H", sector[0x13:0x15])[0]
n_tracks = n_sectors_total // (n_heads * n_sectors_track)
f_tracks = n_sectors_total % (n_heads * n_sectors_track)
if f_tracks != 0:
# pylint: disable=line-too-long
print(
f"Dubious geometry! {n_sectors_total=} {n_sectors_track=} {n_heads=} is {n_tracks=}+{f_tracks=}"
)
n_tracks += 1

return {
"heads": n_heads,
"sectors": n_sectors_track,
"tracks": n_tracks,
"t1_nom_ns": t1_nom_ns,
}

def autodetect(self):
"""Detect an inserted DOS floppy
The floppy must have a standard MFM data rate & DOS 2.0 compatible Bios
Parameter Block (BPB). Almost all FAT formatted floppies for DOS & Windows
should autodetect in this way.
This also flushes the cached data.
"""
self._select_and_spin(True)
self.floppy.track = 1
self.floppy.track = 0
self.floppy.side = 0
exc = None
try:
for _ in range(5): # try repeatedly to read track 0 side 0 sector 0
try:
self.floppy.flux_readinto(self.flux_buffer)
except RuntimeError as error:
exc = error
continue
diskformat = self._detect_diskformat_from_flux()
if diskformat is not None:
break
finally:
self._select_and_spin(False)

if diskformat is not None:
self.setformat(**diskformat)
else:
raise OSError("Failed to detect floppy format") from exc
104 changes: 104 additions & 0 deletions examples/dos_archiver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import os

Check failure on line 1 in examples/dos_archiver.py

View workflow job for this annotation

GitHub Actions / test

reformatted

Check failure on line 1 in examples/dos_archiver.py

View workflow job for this annotation

GitHub Actions / test

reformatted
import sdcardio
# SPDX-FileCopyrightText: Copyright (c) 2024 Jeff Epler for Adafruit Industries
#
# SPDX-License-Identifier: Unlicense

"""DOS floppy archiver for Adafruit Floppsy
Insert an SD card & hook up your floppy drive.
Open the REPL / serial connection
Insert a floppy and press Enter to archive it
Do this for as many floppies as you like."""

import board
import storage

Check failure on line 15 in examples/dos_archiver.py

View workflow job for this annotation

GitHub Actions / test

Import "import board" should be placed at the top of the module

Check failure on line 15 in examples/dos_archiver.py

View workflow job for this annotation

GitHub Actions / test

Import "import board" should be placed at the top of the module
import adafruit_floppy

Check failure on line 16 in examples/dos_archiver.py

View workflow job for this annotation

GitHub Actions / test

Import "import storage" should be placed at the top of the module

Check failure on line 16 in examples/dos_archiver.py

View workflow job for this annotation

GitHub Actions / test

Import "import storage" should be placed at the top of the module

Check failure on line 17 in examples/dos_archiver.py

View workflow job for this annotation

GitHub Actions / test

Import "import adafruit_floppy" should be placed at the top of the module

Check failure on line 17 in examples/dos_archiver.py

View workflow job for this annotation

GitHub Actions / test

Import "import adafruit_floppy" should be placed at the top of the module
floppy = adafruit_floppy.Floppy(
densitypin=board.DENSITY,
indexpin=board.INDEX,
selectpin=board.SELECT,
motorpin=board.MOTOR,
directionpin=board.DIRECTION,
steppin=board.STEP,
track0pin=board.TRACK0,
protectpin=board.WRPROT,
rddatapin=board.RDDATA,
sidepin=board.SIDE,
readypin=board.READY,
wrdatapin=board.WRDATA,
wrgatepin=board.WRGATE,
floppydirectionpin=board.FLOPPY_DIRECTION,
floppyenablepin=board.FLOPPY_ENABLE,
)

_image_counter = 0


def open_next_image(extension="img"):
"""Return an opened numbered file on the sdcard, such as "img01234.jpg"."""
global _image_counter # pylint: disable=global-statement
try:
os.stat("/sd")
except OSError as exc: # no SD card!
raise RuntimeError("No SD card mounted") from exc
while True:
filename = "/sd/dsk%04d.%s" % (_image_counter, extension)
_image_counter += 1
try:
os.stat(filename)
except OSError:
break
print("Writing to", filename)
return open(filename, "wb")


sdcard = sdcardio.SDCard(board.SPI(), board.SD_CS)
vfs = storage.VfsFat(sdcard)
storage.mount(vfs, "/sd")

dev = None
blockdata = bytearray(512)
baddata = b"BADDATA0" * 64

while True:
if dev is not None:
dev.floppy.keep_selected = False
input("Insert disk and press ENTER")

try:
if dev is None:
dev = adafruit_floppy.FloppyBlockDevice(floppy, keep_selected=True)
else:
dev.floppy.keep_selected = True
dev.autodetect()
except OSError as e:
print(e)
continue

bad_blocks = good_blocks = 0
total_blocks = dev.count()
pertrack = dev.sectors * dev.heads
with open_next_image() as f:
for i in range(total_blocks):
if i % pertrack == 0:
print(end=f"{i//pertrack:02d}")
try:
dev.readblocks(i, blockdata)
print(end=".")
f.write(blockdata)
good_blocks += 1
except Exception as e: # pylint: disable=broad-exception-caught
bad_blocks += 1
print(end="!")
f.write(baddata)
if i % pertrack == (pertrack // 2 - 1):
print(end="|")
if i % pertrack == (pertrack - 1):
print()

print(
f"{good_blocks} good + {bad_blocks} bad blocks",
f"out of {total_blocks} ({total_blocks//2}KiB)",
)

0 comments on commit bafe35e

Please sign in to comment.