Skip to content

Commit

Permalink
Merge branch 'master' into make-workspace-panels-dockable
Browse files Browse the repository at this point in the history
  • Loading branch information
sixsage committed Sep 5, 2024
2 parents 0d6bb8b + 44a7ed3 commit 0cf011c
Show file tree
Hide file tree
Showing 32 changed files with 376 additions and 57 deletions.
3 changes: 2 additions & 1 deletion core/amber/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ python-lsp-server[websockets]
bidict==0.22.0
cached_property
psutil
transformers
transformers
tzlocal
4 changes: 2 additions & 2 deletions core/amber/src/main/python/core/runnables/data_processor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import os
import sys
import traceback
from datetime import datetime
from threading import Event

from loguru import logger
Expand All @@ -11,6 +10,7 @@
from core.models.table import all_output_to_tuple
from core.util import Stoppable
from core.util.console_message.replace_print import replace_print
from core.util.console_message.timestamp import current_time_in_local_timezone
from core.util.runnable.runnable import Runnable
from proto.edu.uci.ics.amber.engine.architecture.worker import (
ConsoleMessage,
Expand Down Expand Up @@ -112,7 +112,7 @@ def _report_exception(self, exc_info: ExceptionInfo):
self._context.console_message_manager.put_message(
ConsoleMessage(
worker_id=self._context.worker_id,
timestamp=datetime.now(),
timestamp=current_time_in_local_timezone(),
msg_type=ConsoleMessageType.ERROR,
source=f"{module_name}:{func_name}:{line_number}",
title=title,
Expand Down
4 changes: 2 additions & 2 deletions core/amber/src/main/python/core/runnables/main_loop.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import datetime
import threading
import time
import typing
Expand All @@ -23,6 +22,7 @@
from core.runnables.data_processor import DataProcessor
from core.util import StoppableQueueBlockingRunnable, get_one_of, set_one_of
from core.util.customized_queue.queue_base import QueueElement
from core.util.console_message.timestamp import current_time_in_local_timezone
from proto.edu.uci.ics.amber.engine.architecture.worker import (
ControlCommandV2,
ConsoleMessageType,
Expand Down Expand Up @@ -335,7 +335,7 @@ def _check_and_report_debug_event(self) -> None:
PythonConsoleMessageV2(
ConsoleMessage(
worker_id=self.context.worker_id,
timestamp=datetime.datetime.now(),
timestamp=current_time_in_local_timezone(),
msg_type=ConsoleMessageType.DEBUGGER,
source="(Pdb)",
title=debug_event,
Expand Down
68 changes: 68 additions & 0 deletions core/amber/src/main/python/core/runnables/test_console_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import pytest
from core.models.internal_queue import InternalQueue
from core.util.buffer.timed_buffer import TimedBuffer
from core.util import set_one_of
from proto.edu.uci.ics.amber.engine.common import (
ActorVirtualIdentity,
ControlInvocationV2,
ControlPayloadV2,
PythonControlMessage,
)
from proto.edu.uci.ics.amber.engine.architecture.worker import (
PythonConsoleMessageV2,
ConsoleMessage,
ConsoleMessageType,
ControlCommandV2,
)
from core.util.console_message.timestamp import current_time_in_local_timezone


class TestConsoleMessage:
@pytest.fixture
def internal_queue(self):
return InternalQueue()

@pytest.fixture
def timed_buffer(self):
return TimedBuffer()

@pytest.fixture
def console_message(self):
return PythonConsoleMessageV2(
ConsoleMessage(
worker_id="0",
timestamp=current_time_in_local_timezone(),
# this will produce error if betterproto is set to 2.0.0b7
# timestamp=datetime.datetime.now(),
msg_type=ConsoleMessageType.PRINT,
source="pytest",
title="Test Message",
message="Test Message",
)
)

@pytest.fixture
def mock_controller(self):
return ActorVirtualIdentity("CONTROLLER")

@pytest.mark.timeout(2)
def test_console_message_serialization(self, mock_controller, console_message):
"""
Test the serialization of the console message
:param mock_controller: the mock actor id
:param console_message: the test message
"""
# below statements wrap the console message as the python control message
command = set_one_of(ControlCommandV2, console_message)
payload = set_one_of(ControlPayloadV2, ControlInvocationV2(1, command=command))
python_control_message = PythonControlMessage(
tag=mock_controller, payload=payload
)
# serialize the python control message to bytes
python_control_message_bytes = bytes(python_control_message)
# deserialize the control message from bytes
parsed_python_control_message = PythonControlMessage().parse(
python_control_message_bytes
)
# deserialized one should equal to the original one
assert python_control_message == parsed_python_control_message
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import builtins
import datetime
import inspect
from contextlib import redirect_stdout
from io import StringIO

from typing import ContextManager

from core.util.console_message.timestamp import current_time_in_local_timezone
from core.util.buffer.buffer_base import IBuffer
from proto.edu.uci.ics.amber.engine.architecture.worker import (
ConsoleMessage,
Expand Down Expand Up @@ -53,7 +52,7 @@ def wrapped_print(*args, **kwargs):
complete_str = tmp_buf.getvalue()
console_message = ConsoleMessage(
worker_id=self.worker_id,
timestamp=datetime.datetime.now(),
timestamp=current_time_in_local_timezone(),
msg_type=ConsoleMessageType.PRINT,
source=(
f"{inspect.currentframe().f_back.f_globals['__name__']}"
Expand Down
12 changes: 12 additions & 0 deletions core/amber/src/main/python/core/util/console_message/timestamp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import datetime
import tzlocal


def current_time_in_local_timezone():
# Get the system's local timezone
local_timezone = tzlocal.get_localzone()

# Get the current time in the local timezone
local_time = datetime.datetime.now(local_timezone)

return local_time
2 changes: 1 addition & 1 deletion core/amber/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ flow-control {
}

network-buffering {
default-batch-size = 400
default-data-transfer-batch-size = 400
enable-adaptive-buffering = true
adaptive-buffering-timeout-ms = 500
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,17 @@ abstract class RegionPlanGenerator(
var physicalPlan: PhysicalPlan,
opResultStorage: OpResultStorage
) {

private val executionClusterInfo = new ExecutionClusterInfo()

def generate(): (RegionPlan, PhysicalPlan)

def allocateResource(
regionDAG: DirectedAcyclicGraph[Region, RegionLink]
): Unit = {
val resourceAllocator = new DefaultResourceAllocator(physicalPlan, executionClusterInfo)
val dataTransferBatchSize = workflowContext.workflowSettings.dataTransferBatchSize

val resourceAllocator =
new DefaultResourceAllocator(physicalPlan, executionClusterInfo, dataTransferBatchSize)
// generate the resource configs
new TopologicalOrderIterator(regionDAG).asScala
.foreach(region => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import edu.uci.ics.amber.engine.architecture.sendsemantics.partitionings.{
RangeBasedShufflePartitioning,
RoundRobinPartitioning
}
import edu.uci.ics.amber.engine.common.AmberConfig.defaultBatchSize
import edu.uci.ics.amber.engine.common.virtualidentity.{ActorVirtualIdentity, ChannelIdentity}
import edu.uci.ics.texera.workflow.common.workflow.{
BroadcastPartition,
Expand All @@ -23,12 +22,13 @@ case object LinkConfig {
def toPartitioning(
fromWorkerIds: List[ActorVirtualIdentity],
toWorkerIds: List[ActorVirtualIdentity],
partitionInfo: PartitionInfo
partitionInfo: PartitionInfo,
dataTransferBatchSize: Int
): Partitioning = {
partitionInfo match {
case HashPartition(hashAttributeNames) =>
HashBasedShufflePartitioning(
defaultBatchSize,
dataTransferBatchSize,
fromWorkerIds.flatMap(from =>
toWorkerIds.map(to => ChannelIdentity(from, to, isControl = false))
),
Expand All @@ -37,7 +37,7 @@ case object LinkConfig {

case RangePartition(rangeAttributeNames, rangeMin, rangeMax) =>
RangeBasedShufflePartitioning(
defaultBatchSize,
dataTransferBatchSize,
fromWorkerIds.flatMap(fromId =>
toWorkerIds.map(toId => ChannelIdentity(fromId, toId, isControl = false))
),
Expand All @@ -49,15 +49,15 @@ case object LinkConfig {
case SinglePartition() =>
assert(toWorkerIds.size == 1)
OneToOnePartitioning(
defaultBatchSize,
dataTransferBatchSize,
fromWorkerIds.map(fromWorkerId =>
ChannelIdentity(fromWorkerId, toWorkerIds.head, isControl = false)
)
)

case OneToOnePartition() =>
OneToOnePartitioning(
defaultBatchSize,
dataTransferBatchSize,
fromWorkerIds.zip(toWorkerIds).map {
case (fromWorkerId, toWorkerId) =>
ChannelIdentity(fromWorkerId, toWorkerId, isControl = false)
Expand All @@ -66,7 +66,7 @@ case object LinkConfig {

case BroadcastPartition() =>
BroadcastPartitioning(
defaultBatchSize,
dataTransferBatchSize,
fromWorkerIds.zip(toWorkerIds).map {
case (fromWorkerId, toWorkerId) =>
ChannelIdentity(fromWorkerId, toWorkerId, isControl = false)
Expand All @@ -75,7 +75,7 @@ case object LinkConfig {

case UnknownPartition() =>
RoundRobinPartitioning(
defaultBatchSize,
dataTransferBatchSize,
fromWorkerIds.flatMap(from =>
toWorkerIds.map(to => ChannelIdentity(from, to, isControl = false))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ trait ResourceAllocator {
}
class DefaultResourceAllocator(
physicalPlan: PhysicalPlan,
executionClusterInfo: ExecutionClusterInfo
executionClusterInfo: ExecutionClusterInfo,
dataTransferBatchSize: Int
) extends ResourceAllocator {

// a map of a physical link to the partition info of the upstream/downstream of this link
Expand Down Expand Up @@ -67,7 +68,8 @@ class DefaultResourceAllocator(
toPartitioning(
operatorConfigs(physicalLink.fromOpId).workerConfigs.map(_.workerId),
operatorConfigs(physicalLink.toOpId).workerConfigs.map(_.workerId),
linkPartitionInfos(physicalLink)
linkPartitionInfos(physicalLink),
this.dataTransferBatchSize
)
)
}.toMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ trait Partitioner extends Serializable {
class NetworkOutputBuffer(
val to: ActorVirtualIdentity,
val dataOutputPort: NetworkOutputGateway,
val batchSize: Int = AmberConfig.defaultBatchSize
val batchSize: Int = AmberConfig.defaultDataTransferBatchSize
) {

var buffer = new ArrayBuffer[Tuple]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ object AmberConfig {
getConfSource.getInt("flow-control.credit-poll-interval-in-ms")

// Network buffering configuration
def defaultBatchSize: Int = getConfSource.getInt("network-buffering.default-batch-size")
def defaultDataTransferBatchSize: Int =
getConfSource.getInt("network-buffering.default-data-transfer-batch-size")
val enableAdaptiveNetworkBuffering: Boolean =
getConfSource.getBoolean("network-buffering.enable-adaptive-buffering")
val adaptiveBufferingTimeoutMs: Int =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package edu.uci.ics.texera.web.model.websocket.request

import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import edu.uci.ics.texera.workflow.common.operators.LogicalOp
import edu.uci.ics.texera.workflow.common.workflow.LogicalLink
import edu.uci.ics.texera.workflow.common.workflow.{LogicalLink, WorkflowSettings}

case class ReplayExecutionInfo(
@JsonDeserialize(contentAs = classOf[java.lang.Long])
Expand All @@ -14,7 +14,8 @@ case class WorkflowExecuteRequest(
executionName: String,
engineVersion: String,
logicalPlan: LogicalPlanPojo,
replayFromExecution: Option[ReplayExecutionInfo] // contains execution Id, interaction Id.
replayFromExecution: Option[ReplayExecutionInfo], // contains execution Id, interaction Id.
workflowSettings: WorkflowSettings
) extends TexeraWebSocketRequest

case class LogicalPlanPojo(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,34 @@
package edu.uci.ics.texera.web.resource.aiassistant

import edu.uci.ics.amber.engine.common.AmberConfig
import java.net.{HttpURLConnection, URL}
import com.typesafe.config.Config

object AiAssistantManager {
private val aiAssistantConfig = AmberConfig.aiAssistantConfig.getOrElse(
throw new Exception("ai-assistant-server configuration is missing in application.conf")
)
val assistantType: String = aiAssistantConfig.getString("assistant")
// The accountKey is the OpenAI authentication key used to authenticate API requests and obtain responses from the OpenAI service.
// Optionally retrieve the configuration
private val aiAssistantConfigOpt: Option[Config] = AmberConfig.aiAssistantConfig
private val noAssistant: String = "NoAiAssistant"
// Public variables, accessible from outside the object
var accountKey: String = _
var sharedUrl: String = _

// Initialize accountKey and sharedUrl if the configuration is present
aiAssistantConfigOpt.foreach { aiAssistantConfig =>
accountKey = aiAssistantConfig.getString("ai-service-key")
sharedUrl = aiAssistantConfig.getString("ai-service-url")
}

val accountKey: String = aiAssistantConfig.getString("ai-service-key")
val sharedUrl: String = aiAssistantConfig.getString("ai-service-url")
val validAIAssistant: String = aiAssistantConfigOpt match {
case Some(aiAssistantConfig) =>
val assistantType: String = aiAssistantConfig.getString("assistant")
assistantType match {
case "none" => noAssistant
case "openai" => initOpenAI()
case _ => noAssistant
}
case None =>
noAssistant
}

private def initOpenAI(): String = {
var connection: HttpURLConnection = null
Expand All @@ -26,26 +44,15 @@ object AiAssistantManager {
if (responseCode == 200) {
"OpenAI"
} else {
"NoAiAssistant"
noAssistant
}
} catch {
case e: Exception =>
"NoAiAssistant"
noAssistant
} finally {
if (connection != null) {
connection.disconnect()
}
}
}

val validAIAssistant: String = assistantType match {
case "none" =>
"NoAiAssistant"

case "openai" =>
initOpenAI()

case _ =>
"NoAiAssistant"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class WorkflowExecutionService(
with LazyLogging {

logger.info("Creating a new execution.")
workflowContext.workflowSettings = request.workflowSettings

val wsInput = new WebsocketInput(errorHandler)

Expand Down
Loading

0 comments on commit 0cf011c

Please sign in to comment.