Skip to content

Commit

Permalink
some preparations for the next flare-on write-up
Browse files Browse the repository at this point in the history
  • Loading branch information
huettenhain committed Nov 12, 2024
1 parent 050ab04 commit eea6a1a
Show file tree
Hide file tree
Showing 12 changed files with 7,909 additions and 8,072 deletions.
50 changes: 50 additions & 0 deletions samples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from typing import Optional, Dict

import hashlib
import os
import urllib.request

from refinery.units.crypto.cipher.aes import aes


class SampleStore:
cache: Dict[str, bytes]

def __init__(self):
self.cache = {}

def download(self, sha256hash: str, key: Optional[str] = None):
def tobytearray(r):
if isinstance(r, bytearray):
return r
return bytearray(r)
key = key or 'REFINERYTESTDATA'
key = key.encode('latin1')
sha256hash = sha256hash.lower()
req = urllib.request.Request(
F'https://github.com/binref/refinery-test-data/blob/master/{sha256hash}.enc?raw=true')
try:
with urllib.request.urlopen(req) as response:
encoded_sample = tobytearray(response.read())
except Exception:
api = os.environ['MALSHARE_API']
req = urllib.request.Request(
F'https://malshare.com/api.php?api_key={api}&action=getfile&hash={sha256hash}')
with urllib.request.urlopen(req) as response:
result = tobytearray(response.read())
else:
result = encoded_sample | aes(mode='CBC', key=key) | bytearray
if not result or hashlib.sha256(result).hexdigest().lower() != sha256hash:
raise ValueError('sample did not decode correctly')
self.cache[sha256hash] = result
return result

def get(self, sha256hash: str, key: Optional[str] = None):
for cached, value in self.cache.items():
if cached.casefold() == sha256hash.casefold():
return value
else:
return self.download(sha256hash, key)

def __getitem__(self, sha256hash: str):
return self.get(sha256hash)
15 changes: 15 additions & 0 deletions strip-tutorials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Strips the Jupyter Notebooks in the Tutorial section of run count information.
"""
import json
import pathlib

for path in pathlib.Path.cwd().glob('./tutorials/*.ipynb'):
with path.open('r') as fd:
notebook = json.load(fd)
for cell in notebook['cells']:
cell.pop('execution_count', None)
with path.open('w') as fd:
json.dump(notebook, fd, indent=1)
51 changes: 2 additions & 49 deletions test/__init__.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,13 @@
from typing import Optional, Dict

import hashlib
import logging
import os
import random
import refinery
import string
import unittest
import urllib.request

from refinery.units.crypto.cipher.aes import aes

__all__ = ['refinery', 'TestBase', 'NameUnknownException']

from samples import SampleStore

class SampleStore:
cache: Dict[str, bytes]

def __init__(self):
self.cache = {}

def download(self, sha256hash: str, key: Optional[str] = None):
def tobytearray(r):
if isinstance(r, bytearray):
return r
return bytearray(r)
key = key or 'REFINERYTESTDATA'
key = key.encode('latin1')
sha256hash = sha256hash.lower()
req = urllib.request.Request(
F'https://github.com/binref/refinery-test-data/blob/master/{sha256hash}.enc?raw=true')
try:
with urllib.request.urlopen(req) as response:
encoded_sample = tobytearray(response.read())
except Exception:
api = os.environ['MALSHARE_API']
req = urllib.request.Request(
F'https://malshare.com/api.php?api_key={api}&action=getfile&hash={sha256hash}')
with urllib.request.urlopen(req) as response:
result = tobytearray(response.read())
else:
result = encoded_sample | aes(mode='CBC', key=key) | bytearray
if not result or hashlib.sha256(result).hexdigest().lower() != sha256hash:
raise ValueError('sample did not decode correctly')
self.cache[sha256hash] = result
return result

def get(self, sha256hash: str, key: Optional[str] = None):
for cached, value in self.cache.items():
if cached.casefold() == sha256hash.casefold():
return value
else:
return self.download(sha256hash, key)

def __getitem__(self, sha256hash: str):
return self.get(sha256hash)
__all__ = ['refinery', 'TestBase', 'NameUnknownException']


class NameUnknownException(Exception):
Expand Down
33 changes: 29 additions & 4 deletions tutorials/boilerplate.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,19 @@
import hashlib
import logging
import re
import requests
import shlex
import getpass


os.environ['REFINERY_TERM_SIZE'] = '120'
os.environ['REFINERY_COLORLESS'] = '1'
if True:
os.environ['REFINERY_TERM_SIZE'] = '120'
os.environ['REFINERY_COLORLESS'] = '1'

from refinery.lib.meta import SizeInt
from refinery.lib.loader import load_pipeline
from refinery.units import Executable, Unit
from test import SampleStore

from samples import SampleStore

logging.disable(logging.CRITICAL)
Executable.Entry = '__DEMO__'
Expand Down Expand Up @@ -203,6 +205,18 @@ def show(line: str):
return Image(filename=line.strip())


@register_cell_magic
def cat(line: str, cell=None):
cat, _, out = line.partition('>')
cat, _, eof = cat.partition('<<')
out = out.strip()
eof = eof.strip()
cell = cell or ''
cell, _, _ = cell.partition(eof)
cell = cell.strip()
store.cache[out] = cell.encode('utf8')


def store_sample(hash: str, name: Optional[str] = None, key: Optional[str] = None):
store.download(hash, key=key)
if name is None:
Expand All @@ -212,3 +226,14 @@ def store_sample(hash: str, name: Optional[str] = None, key: Optional[str] = Non

def store_clear():
store.cache.clear()


@register_line_magic
def flare(line: str):
name, _, pattern = line.strip().partition(' ')
url = F'https://www.awarenetwork.org/home/outlaw/ctfs/flareon/{name}'
store_clear()
store.cache[name] = requests.get(url).content
emit(F'{name} | xt7z {pattern} [| dump {{path}} ]')
rm(name)
ls()
Loading

0 comments on commit eea6a1a

Please sign in to comment.