You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
async def update(self):
while True:
for k,v in self.extend_data.items():
value = self.service.set_payload(k)
self.values[k] = value
await asyncio.sleep(5)
This is my code logic, but it doesn't do what I want:
class TimeEvgrp(SimpleEventgroup):
def init(self, service: Prot, _id, extend_data):
super().init(service, id=_id, interval=1)
self.service: Prot
self.extend_data = extend_data
self.update_task = asyncio.create_task(self.update())
# self.set_task = asyncio.create_task(self.set_value())
class Prot(SimpleService):
service_id = 0x1080
version_major = 1
version_minor = 1
服务发现协议的单例管理类,不然发布多个服务会导致端口冲突
class ServiceDiscoveryManager:
_instance = None
def setup_log(fmt="", **kwargs):
try:
import coloredlogs # type: ignore[import]
coloredlogs.install(fmt="%(asctime)s,%(msecs)03d " + fmt, **kwargs)
except ModuleNotFoundError:
logging.basicConfig(format="%(asctime)s " + fmt, **kwargs)
logging.info("install coloredlogs for colored logs :-)")
def eth_service_release(**kwargs):
asyncio.run(run(**kwargs))
class AsyncServiceThread(QThread):
finished = pyqtSignal()
prot_signal = pyqtSignal(object) # 定义一个信号,用于发送 prot
stop_requested = False # 控制循环的标志
The text was updated successfully, but these errors were encountered: