Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Solution to assignment-4 (assignment-2.1 on hold) #58

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 191 additions & 0 deletions solutions/assignment-2/build.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import os
import json
import logging
import argparse
import threading
import subprocess
from time import sleep
from graph import Graph, GraphError
from typing import Any, List, Dict, Optional


class BuildAutomationError(Exception):

def __init__(self, message: str):
self.message: str = 'Error: ' + message
super(BuildAutomationError, self).__init__(self.message)


class File(object):
"""docstring for File"""

def __init__(self, filename: str, is_BUILD_FILE: bool=False):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confusing use of capital letters here ... why not just use "is_build_file"?

self.filename: str = filename
self.last_modified: float = os.path.getmtime(filename)
self.is_BUILD_FILE: bool = is_BUILD_FILE

def is_modified(self) -> bool:
if os.path.getmtime(self.filename) != self.last_modified:
self.last_modified = os.path.getmtime(self.filename)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call os.path.getmtime only once, since it is unnecessary duplication computation.

return True
return False


class Rule(object):
"""rule stores all the possible build rules provided in build configuration files"""

def __init__(self, properties: Dict[str, Any], command_path: str):
self.name: str = properties['name']
self.command: str = 'cd {} && {}'.format(command_path, properties['command'])
kaustubh-karkare marked this conversation as resolved.
Show resolved Hide resolved
self.filenames: List[str] = list()
self.dependent_rulenames: List[str] = list()
if 'deps' in properties:
self.dependent_rulenames = properties['deps']
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.dependent_rulenames = properties.get('deps', [])

if 'files' in properties:
for filename in properties['files']:
self.filenames.append(os.path.join(command_path, filename))

def execute(self) -> str:
process = subprocess.Popen(self.command.split(), stdout=subprocess.PIPE, shell=True)
(out, err) = process.communicate()
return out.decode()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't think that the stdout of the process is a reliable indicator of success. I'd recommend using return value.



class Build(object):
"""Build scans a given directory and stores all the possible build rules """

BUILD_FILENAME = "build.json"

def __init__(self, arguments: List[str]):
self.logger = logging.getLogger()
self.build_rules: Dict[str, Rule] = dict()
self.build_files: Dict[str, File] = dict()
self.build_arguments: Dict[str, Any] = self._parse(arguments)
self.build_arguments['root'] = os.path.realpath(self.build_arguments['root'])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of doing this in a separate phase, you can try:

parser.add_argument('--root', type=lambda path: os.path.realpath(path), ...

self._scan(path=self.build_arguments['root'], relative_path='')
self.set_ignored_files()
self.watch_thread: Optional[Any] = None
self.stop_watch = True

def _parse(self, arguments: List[str]) -> Dict[str, Any]:
parser = argparse.ArgumentParser(description='Process build rules')
parser.add_argument('rules', nargs='*', type=str, help='A list of build rules')
parser.add_argument('--root', type=str, default=r'.\\', help='Root directory where build is to be triggered')
parser.add_argument('--watch', action='store_true', help='Status to build again on file modification')
parser.add_argument('--poll', type=float, default=1, help='Time interval for checking the file modification status')
parser.add_argument('--ignore', nargs='*', type=str, help='List of files not to be watched')
return vars(parser.parse_args(arguments))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit of using vars here?


def _scan(self, path: str, relative_path: str) -> None:
for filename in os.listdir(path):
file_path: str = os.path.join(path, filename)
if os.path.isdir(file_path):
self._scan(path=file_path, relative_path=os.path.join(relative_path, filename))
continue
if filename == self.BUILD_FILENAME:
with open(file_path, 'r') as json_file:
try:
json_dict: Any = json.loads(json_file.read())
kaustubh-karkare marked this conversation as resolved.
Show resolved Hide resolved
except ValueError:
raise BuildAutomationError(f'Decoding json build configuration file "{file_path}" failed')
for properties in json_dict:
properties['name'] = os.path.join(relative_path, properties['name'])
name = properties['name']
if name in self.build_rules:
raise BuildAutomationError(f'{name} has multifile entities in different build configuration files which conflicts')
kaustubh-karkare marked this conversation as resolved.
Show resolved Hide resolved
self.build_rules[name] = Rule(properties=properties, command_path=path)
if self.build_rules[name].filenames:
for filename in self.build_rules[name].filenames:
if filename not in self.build_files:
self.build_files[filename] = File(filename=filename)
self.build_files[file_path] = File(filename=file_path, is_BUILD_FILE=True)

def get_dependent_rulenames_for_executing_rule(self, rulename: str) -> List[str]:
"""Generating one dependency list for all the rules"""
dependency_list: Dict[str, List[str]] = dict()
for rule in self.build_rules:
dependency_list[rule] = self.build_rules[rule].dependent_rulenames
try:
graph = Graph(dependency_list)
all_dependencies: List[str] = graph.draft_first_search(rulename)
all_dependencies.reverse()
except GraphError as e:
raise BuildAutomationError(e.message)
return all_dependencies

def get_dependent_rulenames_for_executing_changes_in_file(self, filename: str) -> List[str]:
"""Generating one supporter list for all the rules and files"""
supporter_list: Dict[str, List[str]] = {key: list() for key in self.build_rules}
for file_name in self.build_files:
if not self.build_files[file_name].is_BUILD_FILE:
supporter_list[file_name] = list()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, so the keys of supporter_list include both rule names and file names? Mixing things like that is probably not a good idea, and it looks like this only matters in the first step of your traversal (filename -> rule name), so you can do that manually.


for rulename in self.build_rules:
for dependent_rulename in self.build_rules[rulename].dependent_rulenames:
supporter_list[dependent_rulename].append(rulename)
for file_name in self.build_rules[rulename].filenames:
supporter_list[file_name].append(rulename)
try:
graph = Graph(supporter_list)
all_supporters: List[str] = graph.draft_first_search(filename)
except GraphError as e:
raise BuildAutomationError(e.message)
all_supporters.pop(0) # The first element being the filename is not executable and needs to be removed
return all_supporters

def execute(self) -> str:
output = ''
for rulename in self.build_arguments['rules']:
if rulename not in self.build_rules:
raise BuildAutomationError(f'Invalid rule "{rulename}" provided')
dependent_rulenames = self.get_dependent_rulenames_for_executing_rule(rulename)
for dependent_rulename in dependent_rulenames:
output += self.build_rules[dependent_rulename].execute()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See Part2 of updated problem statement.

return output

def set_ignored_files(self) -> None:
self.ignored_files: List[str] = list()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, use sets when ordering does not matter.

if self.build_arguments['ignore']:
for file in self.build_arguments['ignore']:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/file/filepath/ is a better variable name

self.ignored_files.append(os.path.join(self.build_arguments['root'], os.path.normpath(file)))

def check_and_maybe_execute(self) -> str:
output = ''
for filename in self.build_files:
if filename not in self.ignored_files:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return/continue/break early to reduce indenting.

if filename in self.ignored_files:
  continue

if self.build_files[filename].is_modified():
self.logger.info(f"Modification in file {filename} detected")
if self.build_files[filename].is_BUILD_FILE:
self.build_rules.clear()
self.build_files.clear()
self._scan(path=self.build_arguments['root'], relative_path='')
output += self.execute()
else:
dependent_rulenames = self.get_dependent_rulenames_for_executing_changes_in_file(filename)
for rulename in dependent_rulenames:
output += self.build_rules[rulename].execute()
return output

def start_watching(self) -> None:
if not self.build_arguments['watch']:
raise BuildAutomationError('Failed to start watch. Watch is not set to True for this build process.')
if self.stop_watch:
self.watch_thread = threading.Thread(target=self._thread_for_watching)
self.stop_watch = False
self.watch_thread.start()
else:
raise BuildAutomationError('A watch function is already running for this build process')

def _thread_for_watching(self) -> None:
while not self.stop_watch:
sleep(self.build_arguments['poll'])
output = self.check_and_maybe_execute()
if output != '':
self.logger.info(f"Automatic build result: {output}")

def stop_watching(self) -> None:
if not self.stop_watch:
self.stop_watch = True
self.watch_thread.join()
else:
raise BuildAutomationError('No Watch funtion running to stop')
Loading