Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2.4 #70

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open

2.4 #70

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 33 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,34 @@ matrix:
- python3.7-venv
- python3-setuptools

- python: "3.8"
env: PYTHON_BINARY="python3.8"
addons:
hosts:
# hostnames will be enlisted in /etc/hosts and resolved to 127.0.0.1
- syn-rabbitmq
- syn-mongodb
apt:
packages:
- python3.8
- python3.8-dev
- python3.8-venv
- python3-setuptools

- python: "3.9"
env: PYTHON_BINARY="python3.9"
addons:
hosts:
# hostnames will be enlisted in /etc/hosts and resolved to 127.0.0.1
- syn-rabbitmq
- syn-mongodb
apt:
packages:
- python3.9
- python3.9-dev
- python3.9-venv
- python3-setuptools

services:
- docker

Expand All @@ -28,8 +56,8 @@ before_install:

install:
# mongodb section
- docker pull mongo:3
- docker run -d --name syn-mongodb -p 27017:27017 mongo:3
- docker pull mongo:4
- docker run -d --name syn-mongodb -p 27017:27017 mongo:4

# rabbit mq section
- docker pull rabbitmq:3
Expand All @@ -46,6 +74,6 @@ script:

branches:
only:
- master
- 2.3
- 2.4
- "master"
- "2.4"
- "2.5"
1 change: 1 addition & 0 deletions context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from synergy.db.model.queue_context_entry import queue_context_entry
from synergy.db.model.daemon_process_entry import daemon_context_entry
from synergy.db.model.managed_process_entry import managed_context_entry
from synergy.db.model.freerun_process_entry import freerun_context_entry
from synergy.db.model.timetable_tree_entry import timetable_tree_entry
from flow.flow_constants import *

Expand Down
2 changes: 1 addition & 1 deletion db/dao/single_session_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def __init__(self, logger):
collection_name=COLLECTION_SINGLE_SESSION,
model_class=SingleSession)

def find_by_session_id(self, domain_name, session_id):
def find_by_session_id(self, domain_name:str, session_id:str) -> SingleSession:
query = {DOMAIN_NAME: domain_name, SESSION_ID: session_id}
sessions = self.run_query(query)
return sessions[0]
6 changes: 3 additions & 3 deletions db/dao/site_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from bson import ObjectId

from db.model.site_statistics import SiteStatistics, DOMAIN_NAME, TIMEPERIOD
from synergy.db.manager import ds_manager
from synergy.db.manager import get_data_source
from synergy.system.decorator import thread_safe


Expand All @@ -15,10 +15,10 @@ def __init__(self, logger):
super(SiteDao, self).__init__()
self.logger = logger
self.lock = RLock()
self.ds = ds_manager.ds_factory(logger)
self.ds = get_data_source(logger)

@thread_safe
def get_one(self, collection_name, domain_name, timeperiod):
def get_one(self, collection_name:str, domain_name:str, timeperiod:str):
collection = self.ds.connection(collection_name)
document = collection.find_one(filter={DOMAIN_NAME: domain_name, TIMEPERIOD: timeperiod})
if document is None:
Expand Down
1 change: 1 addition & 0 deletions db/model/alert.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ class presents model describing an Alert event, such as 20% increase or decrease
def key_fields(cls):
return cls.domain_name.name, cls.timeperiod.name


TIMEPERIOD = Alert.timeperiod.name
DOMAIN_NAME = Alert.domain_name.name
1 change: 0 additions & 1 deletion launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,6 @@ def run_shell(parser_args):
""" Run IPython in the virtualenv
http://ipython.org/ipython-doc/stable/interactive/reference.html """
from IPython import embed

embed()


Expand Down
114 changes: 56 additions & 58 deletions scripts/install_virtualenv.sh
Original file line number Diff line number Diff line change
@@ -1,59 +1,54 @@
#!/bin/bash

python3list=(
"six-1.12.0.tar.gz"
)

commonlist=(
"pip-19.1.1.tar.gz"
"ordereddict-1.1.tar.gz"
packagelist=(
"six"
"pip"
"ordereddict"

# ipython
"more-itertools-7.2.0.tar.gz"
"path.py-12.0.1.tar.gz"
"ipython_genutils-0.1.0.tar.gz"
"ptyprocess-0.6.0.tar.gz"
"decorator-4.4.0.tar.gz"
"pathlib2-2.3.4.tar.gz"
"pickleshare-0.7.5.tar.gz"
"simplegeneric-0.8.1.zip"
"traitlets-4.3.2.tar.gz"
"pexpect-4.7.0.tar.gz"
"backports.shutil_get_terminal_size-1.0.0.tar.gz"
"wcwidth-0.1.7.tar.gz"
"prompt_toolkit-1.0.7.tar.gz"
"Pygments-2.4.2.tar.gz"
"ipython-7.6.1.tar.gz"
"more-itertools"
"path.py"
"ipython_genutils"
"ptyprocess"
"decorator"
"pathlib2"
"pickleshare"
"simplegeneric"
"traitlets"
"pexpect"
"wcwidth"
"prompt_toolkit"
"Pygments"
"ipython"

# pylint section start
"backports.functools_lru_cache-1.4.tar.gz"
"configparser-3.7.4.tar.gz"
"mccabe-0.6.1.tar.gz"
"isort-4.3.21.tar.gz"
"lazy-object-proxy-1.4.1.tar.gz"
"wrapt-1.11.2.tar.gz"
"astroid-2.2.5.tar.gz"
"pylint-2.3.1.tar.gz"

"coverage-4.5.3.tar.gz"
"unittest-xml-reporting-2.5.1.tar.gz"
"setproctitle-1.1.10.tar.gz"
"psutil-5.6.3.tar.gz"
"configparser"
"mccabe"
"isort"
"lazy-object-proxy"
"wrapt"
"astroid"
"pylint"

"coverage"
"unittest-xml-reporting"
"setproctitle"
"psutil"

# Amqp
"vine-1.3.0.tar.gz"
"amqp-2.5.0.tar.gz"
"vine"
"amqp"

"pymongo-3.8.0.tar.gz"
"MarkupSafe-1.1.1.tar.gz"
"Jinja2-2.10.1.tar.gz"
"Werkzeug-1.0.0.tar.gz"
"synergy_odm-0.11.tar.gz"
"synergy_flow-0.16.tar.gz --no-deps"
"pymongo"
"MarkupSafe"
"Jinja2"
"Werkzeug"
"synergy_odm"
"synergy_flow --no-deps"

# fabric
"fabric-2.5.0.tar.gz"
"invoke-1.3.0.tar.gz"
"invoke"
"fabric"
)

if [[ -z "$1" ]]; then
Expand All @@ -71,35 +66,38 @@ if [[ -z "$3" ]]; then
exit 1
fi

if [[ $3 == 2* ]]; then
echo "Python version $3 is no longer supported"
exit 1
elif [[ $3 == 3* ]]; then
pip_bin="pip3 install --prefix=$2"
packagelist=("${python3list[@]}" "${commonlist[@]}")
else
echo "Python version $3 is not yet supported"
if [[ $3 != 3* ]]; then
echo "Python version $3 is not supported"
exit 1
fi

pip_bin="pip3 install --prefix=$2"
echo "DEBUG: pip_bin=${pip_bin}"

# ccache speeds up recompilation by caching previous compilations
which ccache > /dev/null 2>&1
if [[ $? == 0 ]]; then
if command -v ccache > /dev/null 2>&1; then
export CC="ccache gcc"
export CXX="ccache g++"
fi

# Ignore some CLANG errors on OSX else install will fail
if [[ `uname` == "Darwin" ]]; then
if [[ $(uname) == "Darwin" ]]; then
export ARCHFLAGS="-arch i386 -arch x86_64"
export CFLAGS=-Qunused-arguments
export CPPFLAGS=-Qunused-arguments
fi

. $2/bin/activate
. "${2}/bin/activate"

for package in "${packagelist[@]}"; do # The quotes are necessary here
${pip_bin} $1/vendors/${package}

NO_DEP_FLAG=''
if [[ ${package} =~ .*"--no-deps".* ]]; then
package=$(echo ${package} | awk '{print $1}')
NO_DEP_FLAG='--no-deps'
fi

package=$(ls ${1}/vendors/${package}*)
echo "DEBUG: ${package}"
${pip_bin} ${package} ${NO_DEP_FLAG}
done
2 changes: 1 addition & 1 deletion settings_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# mongodb_host_list=['mongodb://syn-mongodb:27017'],

perf_ticker_interval=30, # seconds between performance tracker ticks
synergy_start_timeperiod='2020050100', # precision is process dependent
synergy_start_timeperiod='2021060100', # precision is process dependent
# synergy_start_timeperiod=datetime.utcnow().strftime('%Y%m%d%H'), # precision is process dependent
debug=True
)
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from distutils.core import setup

setup(name='synergy_scheduler',
version='2.3',
version='2.4',
description='Synergy Scheduler',
author='Bohdan Mushkevych',
author_email='[email protected]',
Expand Down
9 changes: 6 additions & 3 deletions synergy/db/dao/base_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
from bson import ObjectId

from odm.document import BaseDocument
from synergy.db.manager import ds_manager
from typing import Type

from synergy.db.manager import get_data_source
from synergy.system.decorator import thread_safe


Expand All @@ -29,16 +31,17 @@ def build_db_query(fields_names, field_values):
class BaseDao(object):
""" Thread-safe base Data Access Object """

def __init__(self, logger, collection_name:str, model_class, primary_key=None):
def __init__(self, logger, collection_name:str, model_class:Type[BaseDocument], primary_key=None):
super(BaseDao, self).__init__()
self.logger = logger
self.collection_name = collection_name
self.model_klass = model_class
self.primary_key = primary_key
if not primary_key:
self.primary_key = self.model_klass.key_fields()

self.lock = RLock()
self.ds = ds_manager.ds_factory(logger)
self.ds = get_data_source(logger)

@thread_safe
def get_one(self, key):
Expand Down
6 changes: 3 additions & 3 deletions synergy/db/dao/job_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from bson import ObjectId

from synergy.conf import context
from synergy.db.manager import ds_manager
from synergy.db.manager import get_data_source
from synergy.db.model import job
from synergy.db.model.job import Job
from synergy.scheduler.scheduler_constants import COLLECTION_JOB_HOURLY, COLLECTION_JOB_DAILY, \
Expand All @@ -32,10 +32,10 @@ def __init__(self, logger):
super(JobDao, self).__init__()
self.logger = logger
self.lock = RLock()
self.ds = ds_manager.ds_factory(logger)
self.ds = get_data_source(logger)

@thread_safe
def _get_job_collection_name(self, process_name):
def _get_job_collection_name(self, process_name:str):
"""jobs are stored in 4 collections: hourly, daily, monthly and yearly;
method looks for the proper job_collection base on process TIME_QUALIFIER"""
qualifier = context.process_context[process_name].time_qualifier
Expand Down
4 changes: 2 additions & 2 deletions synergy/db/dao/unit_of_work_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

from synergy.conf import context
from synergy.db.error import DuplicateKeyError
from synergy.db.manager import ds_manager
from synergy.db.manager import get_data_source
from synergy.db.model import unit_of_work
from synergy.db.model.unit_of_work import UnitOfWork
from synergy.scheduler.scheduler_constants import COLLECTION_UNIT_OF_WORK
Expand Down Expand Up @@ -41,7 +41,7 @@ def __init__(self, logger):
super(UnitOfWorkDao, self).__init__()
self.logger = logger
self.lock = RLock()
self.ds = ds_manager.ds_factory(logger)
self.ds = get_data_source(logger)

@thread_safe
def get_one(self, key):
Expand Down
44 changes: 44 additions & 0 deletions synergy/db/manager/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
__author__ = 'Bohdan Mushkevych'

import atexit

from synergy.conf import settings
from synergy.db.manager.ds_manager import MongoDbManager, HBaseManager


class DataSource:
instances = dict()

@classmethod
def instance(cls, logger):
id_logger = id(logger) # allow one DB Connection per process (hence - per logger)
if id_logger not in DataSource.instances:
ds_type = settings.settings['ds_type']
if ds_type == "mongo_db":
ds = MongoDbManager(logger)
elif ds_type == "hbase":
ds = HBaseManager(logger)
else:
raise ValueError(f'Unsupported Data Source type: {ds_type}')

atexit.register(ds.interpreter_terminating)
DataSource.instances[id_logger] = ds

return DataSource.instances[id_logger]

@classmethod
def _cleanup(cls):
""" method is registered with the atexit hook, and ensures all MongoDb connections are closed """
for _, ds in DataSource.instances.items():
try:
ds.__del__()
except Exception as e:
print(f'{e}')


# ensure the DB Connections are closed on the exit
atexit.register(DataSource._cleanup)


def get_data_source(logger):
return DataSource.instance(logger)
Loading