Skip to content

Commit

Permalink
Merge pull request #1 from duedil-ltd/feature/tests
Browse files Browse the repository at this point in the history
Added initial passing unit tests for lzo indexer methods
  • Loading branch information
tarnfeld committed Dec 28, 2013
2 parents 4c7428f + c286cac commit c577614
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 14 deletions.
9 changes: 9 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
notifications:
email: false
language: python
python:
- "2.7"
install:
- "sudo apt-get install lzop"
cache: apt
script: python setup.py test
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
python-lzo-indexer
==================

Python library for indexing block offsets within LZO compressed files
![](https://travis-ci.org/duedil-ltd/python-lzo-indexer.png)

Python library for indexing block offsets within LZO compressed files.
27 changes: 14 additions & 13 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
#!/usr/bin/env python

from distutils.core import setup
from setuptools import setup


def read(filename):
return open(filename).read()

setup(name="lzo-indexer",
version="0.0.1",
description="Library for indexing LZO compressed files",
long_description=read("README.md"),
author="Tom Arnfeld",
author_email="[email protected]",
maintainer="Tom Arnfeld",
maintainer_email="[email protected]",
url="https://github.com/duedil-ltd/python-lzo-indexer",
download_url="https://github.com/duedil-ltd/python-lzo-indexer/archive/release-0.0.1.zip",
license=read("LICENSE"),
packages=['lzo_indexer'],
)
version="0.0.1",
description="Library for indexing LZO compressed files",
long_description=read("README.md"),
author="Tom Arnfeld",
author_email="[email protected]",
maintainer="Tom Arnfeld",
maintainer_email="[email protected]",
url="https://github.com/duedil-ltd/python-lzo-indexer",
download_url="https://github.com/duedil-ltd/python-lzo-indexer/archive/release-0.0.1.zip",
license=read("LICENSE"),
packages=["lzo_indexer"],
test_suite="tests.test_indexer",
)
Empty file added tests/__init__.py
Empty file.
72 changes: 72 additions & 0 deletions tests/test_indexer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import unittest
import lzo_indexer
from StringIO import StringIO
from subprocess import Popen, PIPE, STDOUT


class TestLZOIndexer(unittest.TestCase):

def test_get_blocks_single_block(self):
stream = self._lzo_stream(1)
index = lzo_indexer.get_lzo_blocks(stream)

block = index.next()
self.assertEqual(block, 38)
with self.assertRaises(StopIteration):
index.next()

def test_get_blocks_multiple_blocks(self):
stream = self._lzo_stream(10**6)
index = lzo_indexer.get_lzo_blocks(stream)

expected_offsets = [38, 1233, 2428, 3623]
self.assertEqual(list(index), expected_offsets)

def test_index_lzo_string_single_block(self):
string = self._lzo_stream(1).getvalue()
index = lzo_indexer.index_lzo_string(string)

self.assertEqual(index, "\x00\x00\x00\x00\x00\x00\x00\x26")

def test_index_lzo_string_multiple_blocks(self):
string = self._lzo_stream(10**6).getvalue()
index = lzo_indexer.index_lzo_string(string)

self.assertEqual(index, "\x00\x00\x00\x00\x00\x00\x00&\x00\x00\x00" \
"\x00\x00\x00\x04\xd1\x00\x00\x00\x00\x00" \
"\x00\t|\x00\x00\x00\x00\x00\x00\x0e'")

def test_index_lzo_file_single_block(self):
stream = self._lzo_stream(1)
index = StringIO()

lzo_indexer.index_lzo_file(stream, index)

self.assertEqual(index.getvalue(), "\x00\x00\x00\x00\x00\x00\x00\x26")

def test_index_lzo_file_multiple_blocks(self):
stream = self._lzo_stream(10**6)
index = StringIO()

lzo_indexer.index_lzo_file(stream, index)

self.assertEqual(index.getvalue(), "\x00\x00\x00\x00\x00\x00\x00&\x00" \
"\x00\x00\x00\x00\x00\x04\xd1\x00" \
"\x00\x00\x00\x00\x00\t|\x00\x00" \
"\x00\x00\x00\x00\x0e'")

def _lzo_stream(self, length=4096):
"""Compress a string of null bytes, the length being defined by the
argument to this function.
"""

compressor = Popen(["lzop", "-c"], stdin=PIPE, stdout=PIPE, stderr=PIPE)
stdout, stderr = compressor.communicate(input="\x00" * length)

if stderr:
raise Exception("Failed to compress with error %r" % (stderr))

stream = StringIO(stdout)
stream.seek(0)

return stream

0 comments on commit c577614

Please sign in to comment.