Skip to content

Commit

Permalink
Merge pull request #94 from masa-finance/feat--integration-api-updates
Browse files Browse the repository at this point in the history
fix: types and registrations to match protocol update
  • Loading branch information
grantdfoster authored Jan 21, 2025
2 parents 1f3ff30 + 3d2ccad commit ee0661d
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 225 deletions.
Binary file modified interfaces/__pycache__/types.cpython-312.pyc
Binary file not shown.
122 changes: 86 additions & 36 deletions interfaces/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ def to_dict(self):

@dataclass
class VerifiedTweet(JSONSerializable):
tweet_id: str
url: str
timestamp: str
full_text: str
TweetID: str
URL: str
Timestamp: str
FullText: str


@dataclass
Expand Down Expand Up @@ -47,14 +47,13 @@ class Profile(JSONSerializable):

@dataclass
class RegisteredAgentRequest(JSONSerializable):
hotkey: str
uid: int
subnet_id: int
version: str
isActive: bool
emissions: float
verification_tweet: Optional[VerifiedTweet]
profile: Optional[dict[str, Profile]]
HotKey: str
UID: int
SubnetID: int
Version: str
Emissions: float
VerificationTweet: Optional[VerifiedTweet]
Profile: Optional[dict[str, Profile]]


@dataclass
Expand All @@ -65,19 +64,8 @@ class RegisteredAgentResponse(JSONSerializable):
UserID: str
SubnetID: int
Version: str
IsActive: bool
CreatedAt: str
UpdatedAt: str
Emissions: float
VerificationTweetID: str
VerificationTweetURL: str
VerificationTweetTimestamp: str
VerificationTweetText: str
Nominations: Optional[int]
IsNominated: Optional[bool]
Marketcap: Optional[int]

# Profile attributes
Avatar: Optional[str]
Banner: Optional[str]
Biography: Optional[str]
Expand All @@ -91,12 +79,18 @@ class RegisteredAgentResponse(JSONSerializable):
LikesCount: int
ListedCount: int
Location: Optional[str]
Name: Optional[str]
PinnedTweetIDs: list[str]
Name: str
PinnedTweetIDs: Optional[list[str]]
TweetsCount: int
URL: Optional[str]
Username: Optional[str]
Username: str
Website: Optional[str]
VerificationTweetID: str
VerificationTweetURL: str
VerificationTweetTimestamp: str
VerificationTweetText: str
Emissions: float
Marketcap: int


@dataclass
Expand All @@ -107,14 +101,58 @@ class ConnectedNode(JSONSerializable):
fernet: Fernet


@dataclass
class Mention:
ID: str
Username: str
Name: str


@dataclass
class Photo:
ID: str
URL: str


@dataclass
class GIF:
ID: str
Preview: str
URL: str


@dataclass
class Video:
ID: str
Preview: str
URL: str


@dataclass
class BoundingBox:
Coordinates: Optional[List[List[float]]]
Type: str


@dataclass
class Place:
bounding_box: BoundingBox
country: str
country_code: str
full_name: str
id: str
name: str
place_type: str


@dataclass
class Tweet(BaseModel, JSONSerializable):
ConversationID: Optional[str]
GIFs: Optional[List[str]]
GIFs: Optional[List[GIF]]
Hashtags: Optional[List[str]]
HTML: Optional[str]
ID: Optional[str]
InReplyToStatus: Optional[str]
InReplyToStatus: Optional["Tweet"]
InReplyToStatusID: Optional[str]
IsQuoted: Optional[bool]
IsPin: Optional[bool]
Expand All @@ -123,28 +161,40 @@ class Tweet(BaseModel, JSONSerializable):
IsSelfThread: Optional[bool]
Likes: Optional[int]
Name: Optional[str]
Mentions: Optional[List[Dict[str, Any]]]
Mentions: Optional[List[Mention]]
PermanentURL: Optional[str]
Photos: Optional[List[str]]
Place: Optional[Dict[str, Any]]
QuotedStatus: Optional[str]
Photos: Optional[List[Photo]]
Place: Optional[Place]
QuotedStatus: Optional["Tweet"]
QuotedStatusID: Optional[str]
Replies: Optional[int]
Retweets: Optional[int]
RetweetedStatus: Optional[str]
RetweetedStatus: Optional["Tweet"]
RetweetedStatusID: Optional[str]
Text: Optional[str]
Thread: Optional[str]
Thread: Optional[List["Tweet"]]
TimeParsed: Optional[str]
Timestamp: Optional[int]
URLs: Optional[List[str]]
UserID: Optional[str]
Username: Optional[str]
Videos: Optional[List[str]]
Videos: Optional[List[Video]]
Views: Optional[int]
SensitiveContent: Optional[bool]


class RegistrationCallback(BaseModel):
registered: str
agent: Optional[str] = None
message: Optional[str] = None


@dataclass
class TweetVerificationResult:
verification_tweet: Optional[VerifiedTweet]
user_id: Optional[str]
screen_name: Optional[str]
avatar: Optional[str]
name: Optional[str]
is_verified: Optional[bool]
followers_count: Optional[int]
error: Optional[str]
2 changes: 1 addition & 1 deletion neurons/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.2.0"
__version__ = "0.3.0"
version_split = __version__.split(".")
version_numerical = (
(100 * int(version_split[0]))
Expand Down
1 change: 0 additions & 1 deletion neurons/miner.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ async def registration_callback(
"""Registration Callback"""
try:
logger.info(f"Message From Validator: {payload}")
logger.info(f"Registration Success!")
return {"status": "Callback received"}
except Exception as e:
logger.error(f"Error in registration callback: {str(e)}")
Expand Down
5 changes: 5 additions & 0 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ async def fetch_x_profile(self, username: str) -> Dict[str, Any]:
response = await request.execute(data={"username": username})
return response

async def fetch_x_tweet_by_id(self, id: str) -> Dict[str, Any]:
request = Request()
response = await request.execute(data={"tweet_id": id})
return response

async def sync_loop(self) -> None:
"""Background task to sync metagraph"""
while True:
Expand Down
39 changes: 13 additions & 26 deletions protocol/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

# Import the functions from their respective modules
from protocol.profile import get_x_profile
from protocol.tweet import get_x_tweet_by_id

# Load environment variables
load_dotenv()
Expand Down Expand Up @@ -40,7 +41,6 @@ class Request:
queues (Dict[str, PriorityQueue]): Dictionary of priority queues for different request types.
lock (threading.Lock): Thread lock for managing concurrent access.
active_requests (int): Counter for currently processing requests.
counter (itertools.count): Unique sequence counter for request ordering.
requests_per_second (float): Maximum number of requests allowed per second.
last_request_time (float): Timestamp of the last processed request.
rate_limit_lock (threading.Lock): Thread lock for rate limiting.
Expand Down Expand Up @@ -72,7 +72,6 @@ def __init__(self, max_concurrent_requests: int = DEFAULT_MAX_CONCURRENT_REQUEST
self.max_concurrent_requests = max_concurrent_requests
self.lock = threading.Lock()
self.active_requests = 0
self.counter = itertools.count() # Unique sequence count

# Rate limiting attributes
self.requests_per_second = DEFAULT_API_REQUESTS_PER_SECOND
Expand All @@ -85,7 +84,7 @@ def __init__(self, max_concurrent_requests: int = DEFAULT_MAX_CONCURRENT_REQUEST
)

async def execute(self, data: Dict[str, Any]):
response = self._handle_request(data, True)
response = self._handle_request(data)
return response

def _wait_for_rate_limit(self):
Expand All @@ -109,52 +108,40 @@ def _wait_for_rate_limit(self):

self.last_request_time = time.time()

def _handle_request(self, data: Dict[str, Any], quick_return=False):
def _handle_request(self, data: Dict[str, Any]):
"""Process a single request with error handling, retry mechanism, and rate limiting.
Args:
data (Dict[str, Any]): Request payload data.
type (str): The type of request, either 'profile' or 'tweet'.
Note:
This method enforces rate limiting before making the actual API request.
"""

with self.lock:
self.active_requests += 1
logger.debug(f"Active requests increased to {self.active_requests}")

try:
self._wait_for_rate_limit() # Apply rate limiting before making request

response = get_x_profile(username=data["username"])

if quick_return:
return response

if response["data"] is not None:
logger.info(f"Processed request: {response}")
if dict(data).get("username"):
response = get_x_profile(username=data["username"])
elif dict(data).get("tweet_id"):
response = get_x_tweet_by_id(tweet_id=data["tweet_id"])
else:
raise ValueError("Invalid request data")

metadata = {
"uid": data["metadata"].UID,
"user_id": data["metadata"].UserID,
"subnet_id": data["metadata"].SubnetID,
"query": data["query"],
"count": len(response),
"created_at": int(time.time()),
}

self.saver.save_post(response, metadata)
return response, metadata
return response

except Exception as e:
logger.error(f"Error processing request: {e}")
self._retry_request(data)
finally:
with self.lock:
self.active_requests -= 1
logger.debug(
f"Active requests decreased to {
self.active_requests}"
)
logger.debug(f"Active requests decreased to {self.active_requests}")

def _retry_request(
self,
Expand Down
Loading

0 comments on commit ee0661d

Please sign in to comment.