diff --git a/.gitignore b/.gitignore index 5473e20..00b0e31 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,109 @@ -.idea -__pycache__ -*.pyc -*.egg-info +### Python template +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover + +# Translations +*.mo +*.pot + +# Django stuff: +*.log + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio + +*.iml + +## Directory-based project format: +.idea/ +# if you remove the above rule, at least ignore the following: + +# User-specific stuff: +# .idea/workspace.xml +# .idea/tasks.xml +# .idea/dictionaries + +# Sensitive or high-churn files: +# .idea/dataSources.ids +# .idea/dataSources.xml +# .idea/sqlDataSources.xml +# .idea/dynamic.xml +# .idea/uiDesigner.xml + +# Gradle: +# .idea/gradle.xml +# .idea/libraries + +# Mongo Explorer plugin: +# .idea/mongoSettings.xml + +## File-based project format: +*.ipr +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties + +.gitignore diff --git a/mimic/__init__.py b/mimic/__init__.py index ad57571..c9e6a8e 100644 --- a/mimic/__init__.py +++ b/mimic/__init__.py @@ -1,7 +1,15 @@ # coding=utf-8 +from __future__ import print_function +from collections import namedtuple +from itertools import chain +from random import random, randrange from sys import version_info +from mimic.steganography import Steganography + +Hgs = namedtuple('Hgs', ('ascii', 'fwd', 'rev')) + if version_info >= (3,): unichr = chr unicode = lambda s, e: s @@ -11,6 +19,9 @@ # Surrounding field for printing clarity field = u'\u2591' +# source file +FILE = None + # List of all homoglyphs - named tuples with 'ascii' char, 'fwd' alternatives string for forward mimic mode, and 'rev' # string of potentially non-universally-printable chars that should still be able to check or reverse back to ASCII all_hgs = [] @@ -35,9 +46,6 @@ def fill_homoglyphs(): If a character is deemed unprintable on some systems, don't delete it - move it from the fwd string to rev. """ - from collections import namedtuple - Hgs = namedtuple('Hgs', ('ascii', 'fwd', 'rev')) - all_hgs.extend(Hgs(*t) for t in ( (' ', u'\u00A0\u2000\u2001\u2002\u2003\u2004\u2005\u2006\u2007\u2008\u2009\u200A\u202F\u205F', u'\u3000'), ('!', u'\uFF01\u01C3\u2D51\uFE15\uFE57', u'\u119D'), @@ -158,14 +166,6 @@ def get_writer(): return getwriter(stdout.encoding or 'utf-8')(stdout) -def read_line(): - from sys import stdin - - if version_info >= (3,): - return input() - return raw_input().decode(stdin.encoding or 'utf-8') - - def listing(): """ Show a list of all known homoglyphs @@ -265,11 +265,13 @@ def search(): break -def pipe(replace): +def pipe(read_line, replace, stego): """ Pipe from input to output End with ctrl+C or EOF + :param read_line: A function which returns the next line of input :param replace: A function to replace each char + :param stego: A StegoHelper instance to manage the steganography """ out = get_writer() @@ -281,24 +283,37 @@ def pipe(replace): line = read_line() except EOFError: return + if line == '': + return for c in line: - out.write(replace(c)) - out.write('\n') + if isinstance(c, int): + c = chr(c) + replacement = replace(c, stego) + out.write(replacement) + if not line.endswith("\n"): + out.write('\n') -def pipe_mimic(hardness): + +def pipe_mimic(read_line, hardness, stego): """ Pipe from input to output, replacing chars with homoglyphs + :param read_line: function to procide the next line of text to mimick :param hardness: Percent probability to replace a char + :param stego: Steganography module to encode data in the mimicking """ - from itertools import chain - from random import random, randrange - def replace(c): + def replace(c, s): + if isinstance(c, int): + c = chr(c) if random() > hardness / 100. or c not in hg_index: return c hms = hg_index[c] + # If there is a stego object, use that to choose the next character + if s: + return s.stego_encode(hms) + # hms contains the current character. We've already decided, above, that this character should be replaced, so # we need to try and avoid that. Loop through starting at a random index. fwd = hms.ascii + hms.fwd @@ -308,20 +323,22 @@ def replace(c): return fwd[index] return c - pipe(replace) + pipe(read_line, replace, stego) -def replace_reverse(c): +def replace_reverse(c, stego): """ Undo the damage to c """ hgs = hg_index.get(c) if hgs: + if stego: + stego.stego_decode(c, hgs) return hgs.ascii return c -def replace_check(c): +def replace_check(c, stego): """ Replace non-ASCII chars with their code point """ @@ -353,6 +370,8 @@ def parse(): help="show a char's homoglyphs") parser.add_option('-l', '--list', action='store_true', help='show all homoglyphs') + parser.add_option('-s', '--source', dest='source_file', + help='mimic or demimic a source file instead of stdin') (options, args) = parser.parse_args() @@ -377,12 +396,12 @@ def check_opts(opt, compat=None, req=None): 'req': tuple(req) }) - check_opts('forward', {'chance', 'source_steg_file'}) - check_opts('reverse', {'dest_steg_file'}) - check_opts('check', {'dest_steg_file'}) - check_opts('source_steg_file', {'forward', 'chance'}, {'forward'}) - check_opts('dest_steg_file', req={'reverse', 'check'}) - check_opts('chance', {'forward', 'source_steg_file'}, {'forward'}) + check_opts('forward', {'chance', 'source_steg_file', 'source_file'}) + check_opts('reverse', {'dest_steg_file', 'source_file'}) + check_opts('check', {'dest_steg_file', 'source_file'}) + check_opts('source_steg_file', {'forward', 'chance', 'source_file'}, {'forward'}) + check_opts('dest_steg_file', {'reverse', 'check', 'source_file'}, req={'reverse', 'check'}) + check_opts('chance', {'forward', 'source_steg_file', 'source_file'}, {'forward'}) check_opts('explain_char') check_opts('list') @@ -394,15 +413,51 @@ def check_opts(opt, compat=None, req=None): return options, args +def read_line_stdin(): + """ + read_line implementation drawing from stdin (default usage) + :return: Next line of input as a string + """ + from sys import stdin + + if version_info >= (3,): + return input() + "\n" + return raw_input().decode(stdin.encoding or 'utf-8') + "\n" + + +def create_read_line_file(file_name): + """ + read_line implementation drawing from a file + + :param file_name: The name of the file to read + :return: The next line of the file + """ + global FILE + FILE = open(file_name, "rb") + def read_line_file(): + return FILE.readline().decode("utf-8") + return read_line_file + + def main(): try: (options, args) = parse() + + reader = read_line_stdin + if options.source_file: + reader = create_read_line_file(options.source_file) + if options.forward: - pipe_mimic(options.chance) + stego = Steganography(source_file=options.source_steg_file) + pipe_mimic(reader, options.chance, stego) + stego.close() elif options.reverse: - pipe(replace_reverse) + stego = Steganography(dest_file=options.dest_steg_file) + pipe(reader, replace_reverse, stego) + stego.close() elif options.check: - pipe(replace_check) + stego = Steganography() + pipe(reader, replace_check, stego) elif options.explain_char: explain(unicode(options.explain_char, 'utf-8')) elif options.list: diff --git a/mimic/steganography.py b/mimic/steganography.py new file mode 100644 index 0000000..0cb6263 --- /dev/null +++ b/mimic/steganography.py @@ -0,0 +1,191 @@ +from math import log +from random import choice +from sys import stderr + + +class Steganography: + # Size of buffers + BUFFER_SIZE = 512 + + def __init__(self, source_file=None, dest_file=None): + """ + Class to hold the state of the steganography processing + + :param source_file: Filename of the source file to encode in the mimicing + :param dest_file: Filename of the file to write the unstego'd data + """ + self.data = [] + self.source_file = source_file + self.dest_file = dest_file + self.data = [] + self.done_encoding = False + + self.source = None + self.dest = None + if source_file: + self.source = open(source_file, 'rb') + if dest_file: + self.dest = open(dest_file, 'wb') + + self.enabled = (source_file or dest_file) + + def get_bits(self, n): + """ + Gets a specified number of bits from the source file stream. + + :param n: The number of bits to retrieve + :return: An array of those bits. [] once no bits remain + """ + if len(self.data) < n: + self.data.extend(to_bits(self.source.read(512))) + + bits = self.data[0:min(n, len(self.data))] + self.data = self.data[min(n, len(self.data)):] + return bits + + def add_bits(self, bits): + """ + Add bits recovered from the reverse steganogtraphy process to the data buffer + :param bits: + :return: + """ + self.data.extend(bits) + + # Periodically write out the data buffer to the file. + # Only do this when the data lines up with byte boundaries + if len(self.data) > Steganography.BUFFER_SIZE and len(self.data) % 8 == 0: + self.dest.write(from_bits(self.data)) + self.data = [] + + def stego_encode(self, homoglyph): + """ + Choose the next mimic character based on the data to encode + + :param homoglyph: The current homoglyph that can be replaced + :return: The mimicked character + """ + + # Short circuit when there is no more data to encode + if not self.enabled or self.done_encoding: + if homoglyph.fwd: + return choice(homoglyph.fwd) + return homoglyph.ascii + + # Get the number of options for this character. + # + # If there are fewer than 2, no data can be encoded so skip mimicking + # this character and return the ascii + options = len(homoglyph.fwd) + if options < 2: + return homoglyph.ascii + + # Determine how many bits of data this mimicking can encode + bits_available = int(log(options, 2)) + to_encode = self.get_bits(bits_available) + + if to_encode: + # There is data to encode, convert it to a character index + bin_choice = "".join([str(x) for x in to_encode]) + + # When the end of encoding data is reached and there are fewer + # bits than can be encoded by this character, the end of the string + # needs to be padded with 0s. This prevents those 0s from being + # inserted at the beginning of this character's decoding. + # + # The extra 0s are truncated because they will not line up with the + # 8-bit byte boundary. + if bits_available != len(to_encode): + bin_choice += "0" * (bits_available - len(to_encode)) + + int_choice = int(bin_choice, 2) + return homoglyph.fwd[int_choice] + else: + if 2 ** bits_available < options: + self.done_encoding = True + return choice(homoglyph.fwd[2 ** bits_available:]) + else: + return homoglyph.ascii + + def stego_decode(self, char, homoglyph): + """ + Reverses the steganography encoding of stego_choice + + :param char: the char to recover + :param homoglyph: the matching homoglyph + :return: None + """ + + if self.enabled and not self.done_encoding: + # Not all characters were replaced with homoglyphs + if char in homoglyph.fwd: + + # Determine if there was a character choice outside the allowed range. + # This indicates that there is no more data to recover and the rest of + # the substitutions are cosmetic only. + bits_available = self.encodable_bits(homoglyph.fwd) + index = homoglyph.fwd.index(char) + if index >= 2 ** bits_available: + self.done_encoding = True + else: + bits_raw = bin(index)[2:] + # Pad the leading zeros to ensure the right bits are recorded. + bits = [0] * (bits_available - len(bits_raw)) + [int(d) for d in bits_raw] + self.add_bits(bits) + + @staticmethod + def encodable_bits(choices): + """ + Calculates the number of bits of data that can be encoded using a given set if choices. + + The number of choices to represent n bits is 2^n. + :param choices: The choices for encoding a bit sequence + :return: The number of bits that can be encoded using these choices + """ + if len(choices) < 2: + return 0 + return int(log(len(choices), 2)) + + def close(self): + """ + Finish up processing the stego files + """ + if self.source: + if len(self.get_bits(1)) == 1: + stderr.write("\nWARNING: Not all data encoded. Try a larger file or higher -m value\n") + self.source.close() + if self.dest: + self.dest.write(from_bits(self.data)) + self.dest.flush() + self.dest.close() + + +def to_bits(s): + """ + Convert a string to a list of bits + + :param s: The string to convert + :return: A list of bits (as integers) representing the string + """ + result = [] + for c in s: + bits = bin(ord(c))[2:] if isinstance(c, str) else bin(c)[2:] + bits = '00000000'[len(bits):] + bits + result.extend([int(b) for b in bits]) + return result + + +def from_bits(bits): + """ + Converts a series of bits back into a string + + :param bits: The bits to convert + :return: The string represented by the bits + """ + chars = [] + # extra bits are ignored. + for b in range(int(len(bits) / 8)): + byte = bits[b*8:(b+1)*8] + chars.append(chr(int(''.join([str(bit) for bit in byte]), 2))) + + assembled = ''.join(chars) + return assembled.encode('latin-1') diff --git a/mimic/tests/__init__.py b/mimic/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mimic/tests/test_steganography.py b/mimic/tests/test_steganography.py new file mode 100644 index 0000000..ad95f8e --- /dev/null +++ b/mimic/tests/test_steganography.py @@ -0,0 +1,139 @@ +import os +import tempfile +import unittest + +from mimic import Steganography, Hgs + + +class TestStego(unittest.TestCase): + def binstr_to_list(self, string): + return [int(d) for d in string.replace(" ", "")] + + def test_get_bits(self): + date_file = tempfile.mkstemp() + with open(date_file[1], 'w') as temp: + temp.write("hello") + stego = Steganography(source_file=date_file[1]) + + try: + self.assertEquals(self.binstr_to_list("01101000"), stego.get_bits(8)) + self.assertEquals(self.binstr_to_list("01100101 01101100 01101100 01101111"), stego.data) + self.assertEquals(self.binstr_to_list("01100101 01101100 01101100 0110"), stego.get_bits(28)) + self.assertEquals(self.binstr_to_list("1111"), stego.data) + self.assertEquals(self.binstr_to_list("1111"), stego.get_bits(4)) + self.assertEquals([], stego.data) + self.assertEquals([], stego.get_bits(4)) + stego.data = self.binstr_to_list("1111") + self.assertEquals(self.binstr_to_list("1111"), stego.get_bits(8)) + self.assertEquals([], stego.data) + stego.close() + finally: + os.close(date_file[0]) + os.remove(date_file[1]) + + def test_encodable_bits(self): + self.assertEquals(0, Steganography.encodable_bits([])) + self.assertEquals(0, Steganography.encodable_bits([0])) + self.assertEquals(1, Steganography.encodable_bits([0, 1])) + self.assertEquals(1, Steganography.encodable_bits([0, 1, 2])) + self.assertEquals(2, Steganography.encodable_bits([0, 1, 2, 3])) + self.assertEquals(2, Steganography.encodable_bits([0, 1, 2, 3, 4])) + + def test_encode(self): + date_file = tempfile.mkstemp() + with open(date_file[1], 'w') as temp: + temp.write("j") # 01101010 + + stego = Steganography(source_file=date_file[1]) + self.assertTrue(stego.enabled) + try: + + # Not enough choices, should pass through + self.assertEquals('a', stego.stego_encode(Hgs('a', '1', ''))) + + # Can encode 1 bit, should choose 1st (0) + self.assertEquals('1', stego.stego_encode(Hgs('a', '12', ''))) + self.assertEquals(self.binstr_to_list("1101010"), stego.data) + + # Can encode 1 bit, should choose 2nd (1) + self.assertEquals('2', stego.stego_encode(Hgs('a', '12', ''))) + self.assertEquals(self.binstr_to_list("101010"), stego.data) + + # Can encode 2 bits, should choose 3rd (10) + self.assertEquals('3', stego.stego_encode(Hgs('a', '1234', ''))) + self.assertEquals(self.binstr_to_list("1010"), stego.data) + + # Can encode 2 bits, should choose 3rd (10) + self.assertEquals('3', stego.stego_encode(Hgs('a', '1234', ''))) + self.assertEquals(self.binstr_to_list("10"), stego.data) + + # Can encode 3 bits, should choose 5th (100) + self.assertEquals('5', stego.stego_encode(Hgs('a', '12345678', ''))) + self.assertEquals(self.binstr_to_list(""), stego.data) + + # Trying to encode end marker but not enough choices + self.assertEquals('a', stego.stego_encode(Hgs('a', '1234', ''))) + self.assertFalse(stego.done_encoding) + + # Enough choices now + self.assertEquals('5', stego.stego_encode(Hgs('a', '12345', ''))) + self.assertTrue(stego.done_encoding) + + # Choosing randomly for all future replacements + self.assertEquals('1', stego.stego_encode(Hgs('a', '1', ''))) + + # Should still work if there are no fwd choices + self.assertEquals('a', stego.stego_encode(Hgs('a', '', ''))) + + finally: + stego.close() + os.close(date_file[0]) + os.remove(date_file[1]) + + def test_decode(self): + data_file = tempfile.mkstemp() + + old_size = Steganography.BUFFER_SIZE + stego = Steganography(dest_file=data_file[1]) + try: + self.assertTrue(stego.enabled) + stego.stego_decode('a', Hgs('a', '1234', '')) + self.assertEquals([], stego.data) + self.assertFalse(stego.done_encoding) + + stego.stego_decode('1', Hgs('a', '12', '')) + self.assertEquals(self.binstr_to_list("0"), stego.data) + stego.stego_decode('2', Hgs('a', '12', '')) + self.assertEquals(self.binstr_to_list("01"), stego.data) + stego.stego_decode('1', Hgs('a', '1234', '')) + self.assertEquals(self.binstr_to_list("0100"), stego.data) + stego.stego_decode('3', Hgs('a', '1234', '')) + self.assertEquals(self.binstr_to_list("010010"), stego.data) + stego.stego_decode('4', Hgs('a', '1234', '')) + self.assertEquals(self.binstr_to_list("01001011"), stego.data) + + # Outside range of encodable bits so must be end + stego.stego_decode('5', Hgs('a', '12345', '')) + self.assertEquals(self.binstr_to_list("01001011"), stego.data) + self.assertTrue(stego.done_encoding) + + Steganography.BUFFER_SIZE = 7 + # trigger a write + stego.add_bits([]) + stego.dest.flush() + with open(data_file[1], 'r') as written: + self.assertEquals('K', written.read()) + + finally: + Steganography.BUFFER_SIZE = old_size + stego.close() + os.close(data_file[0]) + os.remove(data_file[1]) + + def test_disabled(self): + stego = Steganography() + self.assertFalse(stego.enabled) + + +if __name__ == '__main__': + unittest.main() diff --git a/mimic/test.py b/mimic/tests/test_unicode.py similarity index 76% rename from mimic/test.py rename to mimic/tests/test_unicode.py index 2153f99..92701b4 100644 --- a/mimic/test.py +++ b/mimic/tests/test_unicode.py @@ -1,8 +1,15 @@ -from unittest import TestCase -from . import all_hgs, hg_index +import unittest +from sys import version_info +from mimic import all_hgs, hg_index -class TestDataset(TestCase): +if version_info >= (3,): + unichr = chr + unicode = lambda s, e: s + xrange = range + + +class TestDataset(unittest.TestCase): def test_ascii_range(self): self.assertEqual(len(all_hgs), ord('~') - ord(' ') + 1) @@ -26,4 +33,4 @@ def test_unique(self): for c in all_chars: self.assertFalse(c in charset) charset.add(c) - self.assertEqual(charset, set(hg_index.iterkeys())) \ No newline at end of file + self.assertEqual(charset, set(hg_index.iterkeys())) diff --git a/setup.py b/setup.py index d4f8457..eafd965 100644 --- a/setup.py +++ b/setup.py @@ -9,4 +9,6 @@ 'mimic=mimic:main', ], }, + test_suite='nose.collector', + tests_require=['nose'] )