Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Append ts segments to m3u8 file #47

Merged
merged 61 commits into from
Aug 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
d9ca9dd
Initial version for mseed and multiple architectures on single branch
Apr 27, 2021
ad81cf4
fixed ffmpeg command
Apr 27, 2021
166eaa6
made files safe names
Apr 28, 2021
52eb93b
Hardcod URL for now, made env vars to int
Apr 28, 2021
4f566d4
removed from project
Apr 28, 2021
a33a2a0
Fixed debug message
Apr 28, 2021
3539b7d
make data directory if it doesn't exist
Apr 28, 2021
d2278a9
initial combo version
Apr 28, 2021
275f064
initial version
Apr 28, 2021
512b8ee
run actual commands, add logspout
Apr 28, 2021
c899243
fix typos, use shutil.move instead of os.rename for docker
Apr 28, 2021
aadea5a
Fixed m3u8 path
Apr 28, 2021
826aadc
fixed to encode in realtime, wait for file
Apr 28, 2021
582f9da
change to ts files
Apr 28, 2021
6a7d10d
use a bunch of ts files
Apr 28, 2021
cbc0af8
force new dummy.ts on start
Apr 28, 2021
ef68ffe
Fix Dockerfile case sensitive name
Apr 28, 2021
8789a31
Added logging
Apr 29, 2021
b76cae1
make streaming privleged to use nice
Apr 29, 2021
f52b737
fix audio sampling rate at 64000
Apr 29, 2021
c8a61f6
check before deleting/moving files, fixed audio rate, speed up start
Apr 29, 2021
8f99119
change to mono
Apr 29, 2021
cbb713f
updates
Apr 29, 2021
b408800
Split docker-compose.yml into rpi and x86 version
Apr 30, 2021
d002883
Fix file permsissions
Apr 30, 2021
cc5740a
chagne segment to minutes, debug statements
Apr 30, 2021
b343ccc
make node run for 24 hours
Apr 30, 2021
0651503
Fixed rpi baseline build
Apr 30, 2021
0b3907b
added support for virtual streaming mode
Apr 30, 2021
7763540
moved to node folder
Apr 30, 2021
e559609
yml updated for local testing
joyliao07 May 4, 2021
eb8b00e
made different compose files for build and pull
mcshicks May 8, 2021
f001ba2
Added info about logical soundcard names
mcshicks May 8, 2021
87febac
checking .env adding hls loopback
mcshicks May 8, 2021
3625961
add image tag for rpi
mcshicks May 15, 2021
28212b5
Removed empty build section
mcshicks May 15, 2021
f214441
Fix typo in readme sentence.
scottveirs May 19, 2021
8502af8
Fix minor typos in README paragraph 1
scottveirs May 19, 2021
24c84bc
Split stream into architecture specific files
mcshicks May 19, 2021
4d24aff
Change stream to architecture specific files
mcshicks May 19, 2021
c5d6ddf
added table of supported configurations
mcshicks May 19, 2021
45e8f00
Merge branch 'mseed' of github.com:orcasound/orcanode into mseed
mcshicks May 19, 2021
79f90b2
Handling gaps in mseed data & Internet outages
joyliao07 May 26, 2021
cf82246
Merge branch 'mseed' of https://github.com/orcasound/orcanode into mseed
joyliao07 May 26, 2021
590ed6b
Fetch data using ooipy
karan2704 Jun 16, 2022
b27493c
reslove conflicts
karan2704 Jun 16, 2022
cf995c8
remove conflict markers
karan2704 Jun 17, 2022
96c0938
add requirements to DockerFile
karan2704 Jun 18, 2022
4a4a5b6
convert to .ts
karan2704 Jun 20, 2022
a4ef165
generate .m3u8 manifest file
karan2704 Jun 22, 2022
57804e7
Handling gaps in mseed data (#40)
karan2704 Jun 28, 2022
7733be6
resolve shutil error
karan2704 Jun 29, 2022
e45267d
up to date with master
karan2704 Jun 29, 2022
51bb67a
Bug fix (#43)
karan2704 Jun 29, 2022
ce581e0
add prefix to .ts filenames
karan2704 Jul 1, 2022
c2f5f58
fetch for lags
karan2704 Jul 2, 2022
a8b2e4a
resolve merge conflict
karan2704 Jul 6, 2022
ff48c60
test hls_flags append
karan2704 Jul 13, 2022
5288269
writeManifest() fn for m3u8
karan2704 Jul 16, 2022
d4884b3
place #EXT-X-ENDLIST in the end
karan2704 Jul 21, 2022
2637897
fetch and upload
karan2704 Jul 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 28 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Orcasound's orcastream

This software contains audio tools and scripts for capturing, reformatting, transcoding and uploading audio for Orcasound. The directory structure reflects that we have developed a **base** set of tools and a couple of specific projects, orcanode and orcamseed (in the node and mseed directories). Orcasound hydrophone nodes stream by running the **node** code on Intel (amd64) or [Raspberry Pi](https://www.raspberrypi.org/) (arm32v7) platforms using a soundcard. While any soundcard should work, the most common one in use is the [Pisound](https://blokas.io/pisound/) board on either a Raspberry Pi 3B+ or 4. The other project (in the **mseed** directory) is for converting mseed format data to be streamed via Orcanode through the Orcasound human & machine detection pipeline. This is mainly used for streaming audio data from the [OOI](https://oceanobservatories.org/ "OOI") (NSF-funded Ocean Observatory Initiative) hydrophones off the coast of Oregon. See the README in each of those directories for more info.
This software contains audio tools and scripts for capturing, reformatting, transcoding and uploading audio for Orcasound. There is a base set of tools and a couple of specific projects, orcanode and orcamseed. Orcanode is streaming using Intel (amd64) or Raspberry Pi (arm32v7) platforms using a soundcard. While any soundcard should work, the most common one in use is the pisound board on either a Raspberry Pi 3B+ or 4. The other project orcamseed is for converting mseed format data to be streamed on Orcanode. This is mainly used for the [OOI](https://oceanobservatories.org/ "OOI") network. See the README in each of those directories for more info.

## Background & motivation

Expand All @@ -22,25 +22,7 @@ An ARM or X86 device with a sound card (or other audio input devices) connected

### Installing

Create a base docker image for your architecture by running the script in /base/rpi or /base/amd64 as appropriate. You will need to create a .env file as appropriate for your projects. Here is an example of an .env file (tested/working as of June, 2021)...

```
AWS_METADATA_SERVICE_TIMEOUT=5
AWS_METADATA_SERVICE_NUM_ATTEMPTS=0
REGION=us-west-2
BUCKET_TYPE=dev
NODE_TYPE=hls-only
NODE_NAME=rpi_YOURNODENAME_test
NODE_LOOPBACK=true
SAMPLE_RATE=48000
AUDIO_HW_ID=pisound
CHANNELS=1
FLAC_DURATION=30
SEGMENT_DURATION=10
LC_ALL=C.UTF-8
```

... except that the following fields are excised and will need to be added if you are integrating with the audio and logging systems of Orcasound:
Create a base docker image for your architecture by running the script in /base/rpi or /base/amd64 as appropriate. You will need to create a .env file as appropriate for your projects. Common to to all projects are the need for AWS keys

```
AWSACCESSKEYID=YourAWSaccessKey
Expand All @@ -56,15 +38,39 @@ Here are explanations of some of the .env fields:

* NODE_NAME should indicate your device and it's location, ideally in the form `device_location` (e.g. we call our Raspberry Pi staging device in Seattle `rpi_seattle`.
* NODE_TYPE determines what audio data formats will be generated and transferred to their respective AWS buckets.
* AUDIO_HW_ID is the card, device providing the audio data. Note: you can find your sound device by using the command "arecord -l". It's preferred to use the logical name i.e. pisound, USB, etc, instead of the "0,0" or "1,0" format which can change on reboots.
* AUDIO_HW_ID is the card, device providing the audio data. Note: you can find your sound device by using the command "arecord -l". For Raspberry Pi hardware with pisound just use AUDIO_HW_ID=pisound
* CHANNELS indicates the number of audio channels to expect (1 or 2).
* FLAC_DURATION is the amount of seconds you want in each archived lossless file.
* SEGMENT_DURATION is the amount of seconds you want in each streamed lossy segment.


## Supported combinations


| NODE ARCHITECTURE | node | mseed |
|-------------------|------|-------|
| arm32v7 | Y | N |
| amd64 | Y | Y |



| NODE ARCHITECTURE | hls-only | research | dev-virt |
|-------------------|----------|----------|----------|
| arm32v7 | Y | Y | N |
| amd64 | Y | N | Y |



| NODE Hardware | hls-only | research |
|-------------------|----------|----------|
| RPI4 | Y | Y |
| RPI3 B- | Y | N |



## Running local tests

At the root of the repository directory (where you also put your .env file) first copy the compose file you want to `docker-compose.yml`. For example, if you have a Raspberry Pi and you want to use the prebuilt image, then copy `docker-compose.rpi-pull.yml` to `docker-compose.yml`. Then run `docker-compose up -d`. Watch what happens using `htop`. If you want to verify files are being written to /tmp or /mnt directories, get the name of your streaming service using `docker-compose ps` (in this case `orcanode_streaming_1`) and then do `docker exec -it orcanode_streaming_1 /bin/bash` to get a bash shell within the running container.
In the repository directory (where you also put your .env file) first copy the compose file you want to docker-compose.yml. For example if you are raspberry pi and you want to use the prebuilt image then copy docker-compose.rpi-pull.yml to docker-compose. Then run `docker-compose up -d`. Watch what happens using `htop`. If you want to verify files are being written to /tmp or /mnt directories, get the name of your streaming service using `docker-compose ps` (in this case `orcanode_streaming_1`) and then do `docker exec -it orcanode_streaming_1 /bin/bash` to get a bash shell within the running container.

### Running an end-to-end test

Expand Down
6 changes: 5 additions & 1 deletion base/upload_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import os
import sys

# AWS access key and secret
ACCESS_ID = os.environ["AWSACCESSKEYID"]
ACCESS_KEY = os.environ["AWSSECRETACCESSKEY"]

NODE = os.environ["NODE_NAME"]
BASEPATH = os.path.join("/tmp", NODE)
PATH = os.path.join(BASEPATH, "hls")
Expand Down Expand Up @@ -63,7 +67,7 @@
def s3_copy_file(path, filename):
log.debug('uploading file '+filename+' from '+path+' to bucket '+BUCKET)
try:
resource = boto3.resource('s3', REGION) # Doesn't seem like we have to specify region
resource = boto3.resource('s3', aws_access_key_id=ACCESS_ID, aws_secret_access_key= ACCESS_KEY)
# transfer = S3Transfer(client)
uploadfile = os.path.join(path, filename)
log.debug('upload file: ' + uploadfile)
Expand Down
1 change: 1 addition & 0 deletions mseed/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ MAINTAINER Orcasound <[email protected]>

RUN pip3 install numpy
RUN pip3 install obspy
RUN pip3 install ooipy

############################### Copy files #####################################

Expand Down
10 changes: 10 additions & 0 deletions mseed/createpaths.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@

date=$(date '+%Y-%m-%d')

mkdir -p /tmp/$NODE_NAME
mkdir -p /tmp/$NODE_NAME/hls
mkdir -p /tmp/$NODE_NAME/hls/$date

echo "starting upload"

python3 upload_s3.py
31 changes: 10 additions & 21 deletions mseed/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,37 +1,26 @@
version: "3"
services:
pull:
fetch:
image: orcastream/orcamseed
build: ./
# command: tail -F README.md
command: python3 mseedpull.py
command: python3 ooipypull.py
restart: always
env_file: .env
volumes:
- data:/root/data
stream:
- data:/tmp
depends_on:
- upload

upload:
image: orcastream/orcamseed
build: ./
# command: tail -F README.md
command: ./streamfiles.sh
command: tail -F README.md
restart: always
env_file: .env
volumes:
- data:/root/data
- data:/tmp
privileged: true
logspout:
image: gliderlabs/logspout
command: ${SYSLOG_URL}
restart: always
hostname: ${NODE_NAME}
env_file: .env
environment:
- SYSLOG_HOSTNAME=${NODE_NAME}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
ports:
- "8000:8000"



volumes:
data:
75 changes: 59 additions & 16 deletions mseed/mseedpull.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,25 @@ def handle_data(self, data):
if (datestr != datenowstr):
dates.append(datenowstr)
for datestr in dates:
r = None
url = BASE_URL + '{}'.format(datestr)
log.debug("fetching: "+url)
r = requests.get(url)
if r == 'Response [404]':
# Day folder does not exist yet or website down
try:
r = requests.get(url)
except (OSError):
print('OS error. Please check Internet connection.')
time.sleep(10)
except:
log.debug("Unexpected error to get url.")
time.sleep(10)
# If url is none, skip the following operations:
if r is None:
continue
elif r == 'Response [404]':
print("website not responding or file not posted")
parser = MyHTMLParser()
parser.feed(str(r.content))
else:
parser = MyHTMLParser()
parser.feed(str(r.content))
return filelist


Expand All @@ -111,6 +122,22 @@ def fetchAndConvert(files):
if (filedelay < maxdelay and filedelay > mindelay):
toconvert += 1
log.debug(f'files to convert: {toconvert}')

# If no file to convert, capture the next available file time:
if(toconvert == 0):
nextAvailable = None
for file in files:
filetime = file['datetime']
filedelay = now - filetime
if (filedelay < maxdelay):
nextAvailable = filetime
break
if(nextAvailable):
time_to_wait_for_next = filetime - (now - mindelay)
print('The next available filetime: ' + str(filetime) + '; Time delta: ' + str(time_to_wait_for_next))
else:
print('The next available file is cuurently unavailable.')

for file in files:
filetime = file['datetime']
url = file['url']
Expand Down Expand Up @@ -159,15 +186,15 @@ def queueFiles(files):
if (delay + duration < age): # in the past
log.debug('deleting old entry: ' + tsfilename)
if os.path.exists(tsfilename):
os.remove(tsfilename)
os.remove(tsfilename)
filesdone.remove(filepath)
del files[idx]
deleted += 1
if ((delay + duration >= age) and (age > delay)):
# should be playing next
# should be playing next
log.debug('playing : ' + tsfilename)
if os.path.exists(tsfilename):
shutil.move(tsfilename, '/root/data/dummy.ts')
shutil.move(tsfilename, '/root/data/dummy.ts')
played += 1
filesdone.remove(filepath)
del files[idx]
Expand All @@ -183,14 +210,30 @@ def main_loop():
# TODO this converts correctly but after queue files it
# get overwritten by fetchandconver
# you need to change it fetchandconvert appends the exisitng list
# and all timedate stamps are only converted once at most.
log.debug("checking")
files = getFileUrls()
log.debug(f'number of URLS: {len(files)}')
convertedfiles.extend(fetchAndConvert(files))
log.debug(f'number of converted files: {len(files)}')
played, deleted, convertedfiles = queueFiles(convertedfiles)
log.debug(f'played: {played}, deleted: {deleted}')
# and all timedate stamps are only converted once at most.
for i in range(0, 5):
print("Call getFileUrls(). Attempt #" + str(i+1) + " out of 5.")
files = getFileUrls()
if len(files) > 0:
log.debug(f'number of URLS: {len(files)}')
for i in range(0, 3):
print("Call fetchAndConvert(files). Attempt "+str(i+1) + " out of 3")
try:
convertedfiles.extend(fetchAndConvert(files))
except (OSError):
print('OS error. Please check Internet connection.')
time.sleep(10)
except:
logging.exception("Unexpected error to convert files.")
time.sleep(10)
log.debug(f'number of converted files: {len(files)}')
played, deleted, convertedfiles = queueFiles(convertedfiles)
log.debug(f'played: {played}, deleted: {deleted}')
break
else:
print("Unable to get file urls.")
time.sleep(10)
print("The next call will start in 2.5 minutes.")
time.sleep(150.0 - ((time.time() - starttime) % 150.0))


Expand Down
117 changes: 117 additions & 0 deletions mseed/ooipypull.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import ooipy
import os
import datetime
import shutil
import logging
import logging.handlers
import sys

LOGLEVEL = logging.DEBUG
PREFIX = os.environ["TIME_PREFIX"]
DELAY = os.environ["DELAY_SEGMENT"]
NODE = os.environ["NODE_NAME"]
BASEPATH = os.path.join('/tmp', NODE)
PATH = os.path.join(BASEPATH, 'hls')

log = logging.getLogger(__name__)

log.setLevel(LOGLEVEL)

handler = logging.StreamHandler(sys.stdout)

formatter = logging.Formatter('%(module)s.%(funcName)s: %(message)s')
handler.setFormatter(formatter)

log.addHandler(handler)

def fetchData(start_time, segment_length, end_time, node):
os.makedirs(BASEPATH, exist_ok=True)
os.makedirs(PATH, exist_ok=True)
while start_time < end_time:
segment_end = min(start_time + segment_length, end_time)

#paths and filenames
datestr = start_time.strftime("%Y-%m-%dT%H-%M-%S-%f")[:-3]
sub_directory = start_time.strftime("%Y-%m-%d")
file_path = os.path.join(PATH, sub_directory)
wav_name = "{date}.wav".format(date=datestr)
ts_name = "{prefix}{date}.ts".format(prefix=PREFIX, date=datestr)

#create directory and edit latest.txt
if not os.path.exists(file_path):
os.mkdir(file_path)
manifest_file = os.path.join('/root', 'live.m3u8')
if os.path.exists(manifest_file):
os.remove(manifest_file)
if not os.path.exists(os.path.join(BASEPATH, 'latest.txt')):
with open('latest.txt', 'x') as f:
f.write(sub_directory)
shutil.move('latest.txt', BASEPATH )
else:
with open(f'/{BASEPATH}/latest.txt', 'w') as f:
f.write(sub_directory)


#fetch if file doesn't already exist
if(os.path.exists(os.path.join(file_path, ts_name))):
print("EXISTS")
continue
hydrophone_data = ooipy.request.hydrophone_request.get_acoustic_data(
start_time, segment_end, node, verbose=True, data_gap_mode=2
)
if hydrophone_data is None:
print(f"Could not get data from {start_time} to {segment_end}")
start_time = segment_end
continue
print(f"data: {hydrophone_data}")
hydrophone_data.wav_write(wav_name)

writeManifest(wav_name, ts_name)

#move files to tmp for upload
shutil.move(os.path.join('/root', ts_name), os.path.join(file_path, ts_name))
shutil.copy('/root/live.m3u8', os.path.join(file_path, 'live.m3u8'))
os.remove(wav_name)
start_time = segment_end


def writeManifest(wav_name, ts_name):
root_path = os.path.join('/root', 'live.m3u8')
if not os.path.exists(root_path):
os.system("ffmpeg -i {wavfile} -f segment -segment_list 'live.m3u8' -strftime 1 -segment_time 300 -segment_format mpegts -ac 1 -acodec aac {tsfile}".format(wavfile=wav_name, tsfile=ts_name))
else:
os.system('ffmpeg -i {wavfile} -f mpegts -ar 64000 -acodec aac -ac 1 {tsfile}'.format(wavfile=wav_name, tsfile=ts_name))
#remove EXT-X-ENDLIST and write new segment
with open("live.m3u8", "r+") as f:
lines = f.readlines()
f.seek(0)
f.truncate()
f.writelines(lines[:-1])
f.write("#EXTINF:300.000000, \n")
f.write(f'{ts_name} \n')
f.write(f'#EXT-X-ENDLIST \n')

#os.system("ffmpeg -i {wavfile} -hls_playlist_type event -strftime 1 -hls_segment_type mpegts -ac 1 -acodec aac -hls_segment_filename {tsfile} -hls_time 1800 -hls_flags omit_endlist+append_list live.m3u8".format(wavfile=wav_name, tsfile=ts_name))



def _main():

segment_length = datetime.timedelta(minutes = 5)
fixed_delay = datetime.timedelta(hours=8)

while True:
end_time = datetime.datetime.utcnow()
start_time = end_time - datetime.timedelta(hours=8)

#near live fetch
fetchData(start_time, segment_length, end_time, 'PC01A')

#delayed fetch
fetchData(end_time-datetime.timedelta(hours=24), segment_length, end_time, 'PC01A')

start_time, end_time = end_time, datetime.datetime.utcnow()



_main()
Loading