Skip to content

Commit

Permalink
Adding Celery task to monitor stopped containers and delete the erron…
Browse files Browse the repository at this point in the history
…eous ones.
  • Loading branch information
Aitor Gómez Goiri committed Nov 23, 2015
1 parent 722cb9e commit 34d431e
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 32 deletions.
10 changes: 9 additions & 1 deletion src/ptinstancemanager/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,15 @@ def unassign(self):
self.status = Instance.UNASSIGNED
db.session.commit()

def mark_starting(self):
self.status = Instance.STARTING
db.session.commit()

def mark_error(self):
self.status = Instance.ERROR
db.session.commit()

def stop(self):
def delete(self):
self.deleted_at = datetime.now() # set deletion time
self.status = Instance.DELETED
db.session.commit()
Expand Down Expand Up @@ -99,6 +103,10 @@ def create(docker_id=None, pt_port=None, vnc_port=None):
def get(instance_id):
return db.session.query(Instance).filter_by(id = instance_id).first()

@staticmethod
def get_by_docker_id(docker_id):
return db.session.query(Instance).filter_by(docker_id = docker_id).first()

@staticmethod
def get_all():
return db.session.query(Instance).all()
Expand Down
101 changes: 70 additions & 31 deletions src/ptinstancemanager/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ def create_containers(num_containers):
if available_port is None:
raise Exception('The server cannot create new instances. Please, wait and retry it.')

res = create_container.apply_async((available_port.number,), link=wait_for_ready_container.s(10))
res = create_container.apply_async((available_port.number,), link=wait_for_ready_container.s())


@celery.task()
def create_container(pt_port):
"""Runs a new packettracer container in the specified port and
create associated instance."""
logging.info('Creating new container.')

# Create container with Docker
Expand All @@ -40,7 +42,7 @@ def create_container(pt_port):

#@celery.task()
def start_container(pt_port, vnc_port):
"""Create container with Docker."""
"""Creates and starts new packettracer container with Docker."""
docker = Client(app.config['DOCKER_URL'], version='auto')
port_bindings = { app.config['DOCKER_PT_PORT']: pt_port,
app.config['DOCKER_VNC_PORT']: vnc_port }
Expand All @@ -54,7 +56,7 @@ def start_container(pt_port, vnc_port):
ports=list(port_bindings.keys()),
volumes=[vol_bindings[k]['bind'] for k in vol_bindings],
host_config=host_config)

if container.get('Warnings'):
raise Exception('Error during container creation: %s' % container.get('Warnings'))

Expand All @@ -64,56 +66,93 @@ def start_container(pt_port, vnc_port):
return container.get('Id')


@celery.task()
def stop_container(instance_id):
instance = Instance.get(instance_id)
docker = Client(app.config['DOCKER_URL'], version='auto')
try:
docker.stop(instance.docker_id)
instance.stop()
Port.get(instance.pt_port).release() # The port can be now reused by a new PT instance
except APIError as ae:
# if it was already stopped or removed
if cli.containers(filters={'status': 'exited'}): # Not empty array
instance.stop()
Port.get(instance.pt_port).release() # The port can be now reused by a new PT instance


@celery.task()
def assign_container():
"""Unpauses available container and marks associated instance as assigned."""
logging.info('Assigning container.')
docker = Client(app.config['DOCKER_URL'], version='auto')
for instance in Instance.get_unassigned():
try:
docker.unpause(instance.docker_id)
instance.assign()
try:
docker.unpause(instance.docker_id)
instance.assign()
return instance.id
except APIError as ae:
# e.g., if it was already unpaused or it has been stopped
instance.mark_error()
logging.error('Docker API exception. %s.' % ae)
logging.error('Error assigning instance %s.' % instance.id)
logging.error('Docker API exception. %s.' % ae)
# e.g., if it was already unpaused or it has been stopped
instance.mark_error()
logging.error('Docker API exception. %s.' % ae)


@celery.task()
def unassign_container(instance_id):
"""Marks instance as unassigned and pauses the associated container."""
instance = Instance.get(instance_id)
docker = Client(app.config['DOCKER_URL'], version='auto')
try:
docker.pause(instance.docker_id)
except APIError as ae:
# e.g., if it was already paused
instance.mark_error()
logging.error('Error unassigning instance %s.' % instance_id)
logging.error('Docker API exception. %s.' % ae)
# e.g., if it was already paused
instance.mark_error()
monitor_containers.delay()
finally:
instance.unassign()


@celery.task(max_retries=5)
def wait_for_ready_container(instance_id, timeout):
def wait_for_ready_container(instance_id, timeout=30):
"""Waits for an instance to be ready (e.g., answer).
Otherwise, marks it as erroneous ."""
logging.info('Waiting for container to be ready.')
instance = Instance.get(instance_id)
is_running = ptchecker.is_running(app.config['PT_CHECKER'], 'localhost', instance.pt_port, float(timeout))
if not is_running:
# TODO mark as an error after all the retries
raise wait_for_ready_container.retry(exc=Exception('The container has not answered yet.'))
unassign_container.delay(instance_id) # else
if is_running:
unassign_container.delay(instance_id) # else
else:
instance.mark_error()
monitor_containers.delay()
# raise wait_for_ready_container.retry(exc=Exception('The container has not answered yet.'))


@celery.task()
def monitor_containers():
restarted_instances = []
docker = Client(app.config['DOCKER_URL'], version='auto')
try:
# Restart stopped containers (which exited successfully)
for container in cli.containers(filters={'exited': 0, 'label': 'ancestor=packettracer'}):
container_id = container.get('Id')
instance = Instance.get_by_docker_id(container_id)
if instance:
restarted_instances.append(instance.id)
instance.mark_starting()
docker.start(container=container_id)
wait_for_ready_container.delay(instance_id)

for erroneous_instance in Instance.get_errors():
if not erroneous_instance.docker_id in restarted_instances:
instance.delete()
Port.get(instance.pt_port).release()
# Very conservative approach:
# we remove it even if it might still be usable.
remove_container.delay(erroneous_instance.docker_id)
# TODO replace erroneous instance by a new one
except APIError as ae:
logging.error('Error on container monitoring.')
logging.error('Docker API exception. %s.' % ae)
finally:
return restarted_instances



@celery.task()
def remove_container(docker_id):
docker = Client(app.config['DOCKER_URL'], version='auto')
try:
# TODO first check its status and then act? (e.g., to unpause it before)
docker.remove_container(docker_id, force=True)
except APIError as ae:
logging.error('Error on container removal: %s.' % docker_id)
logging.error('Docker API exception. %s.' % ae)

1 comment on commit 34d431e

@gomezgoiri
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue #7.

Please sign in to comment.