You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
importosimportsysimportjsonimportdatetimefromtypingimportList, OptionalimportdatetimeimportasanaclassAsanaReader():
"""Asana reader. Reads data from an Asana workspace. Args: asana_token (str): Asana token. """def__init__(self, asana_token: str=None) ->None:
"""Initialize Asana reader."""config=asana.Configuration()
config.access_token=asana_tokenself.client=asana.ApiClient(config)
defload_data(
self,
workspace_gid: Optional[str] =None,
project_gid: Optional[str] =None,
modified_since: Optional[str] = (datetime.datetime.now() -datetime.timedelta(days=7)).isoformat(),
) ->List[str]:
"""Load data from the workspace. Args: workspace_id (Optional[str], optional): Workspace ID. project_id (Optional[str], optional): Project ID. modified_since: Optional[str], optional): Only return tasks modified since this date. """# check for valid inputifworkspace_gidisNoneandproject_gidisNone:
raiseValueError("Either workspace_id or project_id must be provided")
ifworkspace_gidisnotNoneandproject_gidisnotNone:
raiseValueError("Only one of workspace_id or project_id should be provided")
# Case: Only workspace_id is providedifworkspace_gidisnotNone:
projects=self.get_projects(workspace_gid)
elifproject_gidisnotNone:
projects= [self.get_project(project_gid)]
else:
raiseValueError("Invalid input")
defget_projects(self, workspace_gid: str) ->List[str]:
opts= {
'workspace': workspace_gid,
'archived': False,
'opt_fields': "name,workspace"
}
projects_api_instance=asana.ProjectsApi(self.client)
api_response=projects_api_instance.get_projects(opts)
return [datafordatainapi_response]
defget_project(self, project_gid: str) ->List[str]:
opts= {
'archived': False,
'opt_fields': "name,workspace"
}
projects_api_instance=asana.ProjectsApi(self.client)
api_response=projects_api_instance.get_project(project_gid, opts)
returnapi_response# mainif__name__=="__main__":
fromdotenvimportload_dotenvload_dotenv()
reader=AsanaReader(asana_token=os.getenv("ASANA_PROD_API_KEY"))
data=reader.load_data(
project_gid=os.getenv("ASANA_PROJECT_GID")
)
print(data)
The warning:
Exception ignored in: <function ApiClient.__del__ at 0x105330900>Traceback (most recent call last): File "/Users/nickyoungblut/mambaforge/envs/weaviate-db-tools/lib/python3.11/site-packages/asana/api_client.py", line 95, in __del__ File "/Users/nickyoungblut/mambaforge/envs/weaviate-db-tools/lib/python3.11/multiprocessing/pool.py", line 648, in closeAttributeError: 'NoneType' object has no attribute 'debug'Exception ignored in: <function Pool.__del__ at 0x104d1d1c0>Traceback (most recent call last): File "/Users/nickyoungblut/mambaforge/envs/weaviate-db-tools/lib/python3.11/multiprocessing/pool.py", line 271, in __del__ File "/Users/nickyoungblut/mambaforge/envs/weaviate-db-tools/lib/python3.11/multiprocessing/queues.py", line 371, in putAttributeError: 'NoneType' object has no attribute 'dumps'
def __del__(self):
# Safeguard the closure of the pool
if hasattr(self, "pool") and self.pool:
try:
self.pool.close()
self.pool.join()
except Exception as e:
# Capture and log any exception that arises during cleanup
logging.error(f"Error in __del__: {e}")
My code:
The warning:
My conda env:
The text was updated successfully, but these errors were encountered: