Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pre commit hook #5

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*/**/__pycache__
*/**/*.pyc
99 changes: 42 additions & 57 deletions python_scripts/sync_announcement.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,46 @@
def log_error(logger, message):
logger.error(f"Sync Announcement: {message}")


try:
# list of media_player entities
media_players = data.get('media_players')

# the message to play
message = data.get('message')

# use caching mechanisms
# optional - true/false, default: true
cache = data.get('cache') or True

# specify the language for the tts model
# optional - default: en-US
language = data.get('language') or 'en-US'

# specify a volume to set to all media_players before playing the message
# optional - min: 0, max: 1, step: 0.01, default: None
volume = data.get('volume') or None

# specify if the volume should be reset to the previous state after the
# tts message
# optional - true/false, default: false
volume_reset = data.get('volume_reset') or False

# store the volume level of all media_players before running the script if
# specified
if volume_reset:
media_player_states_before_run = {}
for entity_id in media_players:
media_player_states_before_run[entity_id] = (
hass.states.get(entity_id).attributes['volume_level']
)
class SyncAnnouncement:
def __init__(self, hass, data):
self.hass = hass
self.data = data

def run(self):
# extracting data
media_players = self.data.get('media_players')
message = self.data.get('message')
language = self.data.get('language', 'en-US')
volume = self.data.get('volume', None)
volume_reset = self.data.get('volume_reset', False)
cache = self.data.get('cache', True)

states_before_run = {}

# store the volume level of all media_players
# before running the script if specified
if volume_reset:
for entity_id in media_players:
states_before_run[entity_id] = (
self.hass.states.get(entity_id).attributes['volume_level']
)

# set volume to all media_players if specified
if volume is not None:
self.hass.services.call('media_player', 'volume_set', {
'entity_id': media_players,
'volume_level': volume,
})

# set volume to all media_players if specified
if volume is not None:
hass.services.call('media_player', 'volume_set', {
# send message through tts.cloud_say
self.hass.services.call('tts', 'cloud_say', {
'entity_id': media_players,
'volume_level': volume,
'message': message,
'cache': cache,
'language': language,
})

# send message through tts.cloud_say
hass.services.call('tts', 'cloud_say', {
'entity_id': media_players,
'message': message,
'cache': cache,
'language': language,
})

# set respective previous volume to all media_players if specified
if volume_reset:
for entity_id, prev_volume in media_player_states_before_run.items():
hass.services.call('media_player', 'volume_set', {
'entity_id': entity_id,
'volume_level': prev_volume,
})
except Exception as e:
log_error("**An unhandled error has occurred.**\n\n{}".format(e))
# set respective previous volume to all media_players if specified
if volume_reset:
for entity_id, previous_volume in states_before_run.items():
self.hass.services.call('media_player', 'volume_set', {
'entity_id': entity_id,
'volume_level': previous_volume,
})
49 changes: 49 additions & 0 deletions python_scripts/sync_announcement_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import unittest
from unittest.mock import MagicMock
from sync_announcement import SyncAnnouncement


class TestSyncAnnouncement(unittest.TestCase):
def test_sync_announcement(self):
# Mock the hass object
mock_hass = MagicMock()

# Mock the data dictionary
data = {
'media_players': ['media_player_1', 'media_player_2'],
'message': 'Test message',
'cache': True,
'language': 'en-US',
'volume': 0.5,
'volume_reset': True
}

# Mock the hass.states.get method
mock_hass.states.get.return_value.attributes = {'volume_level': 0.3}

# Create an instance of SyncAnnouncement
sync_announcement = SyncAnnouncement(mock_hass, data)

# Call the run method
sync_announcement.run()

# Assert that the hass.services.call method was called
# with the expected arguments
mock_hass.services.call.assert_any_call('media_player', 'volume_set', {
'entity_id': data['media_players'],
'volume_level': data['volume'],
})
mock_hass.services.call.assert_any_call('tts', 'cloud_say', {
'entity_id': data['media_players'],
'message': data['message'],
'cache': data['cache'],
'language': data['language'],
})
mock_hass.services.call.assert_any_call('media_player', 'volume_set', {
'entity_id': data['media_players'][0],
'volume_level': 0.3,
})


if __name__ == '__main__':
unittest.main()