Skip to content

Commit

Permalink
feat: temp commit
Browse files Browse the repository at this point in the history
  • Loading branch information
cdummett committed Nov 27, 2023
1 parent 211aa21 commit 6ea4094
Show file tree
Hide file tree
Showing 16 changed files with 944 additions and 315 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VEGA_SIM_VEGA_TAG=07b6292ceaf5c69182066084f20e26d95c326456
VEGA_SIM_VEGA_TAG=10155-next-closeout
VEGA_SIM_CONSOLE_TAG=develop
VEGA_DEFAULT_KEY_NAME='Key 1'
VEGA_SIM_NETWORKS_INTERNAL_TAG=main
Expand Down
85 changes: 84 additions & 1 deletion vega_sim/api/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,53 @@ class Position:
position_status: vega_protos.vega.PositionStatus


@dataclass(frozen=True)
class PositionResolution:
market_id: str
distressed: int
closed: int
mark_price: float


@dataclass(frozen=True)
class SettleDistressed:
market_id: str
party_id: str
margin: float
price: float


def _position_resolution_from_proto(
position_resolution: vega_protos.events.v1.events.PositionResolution,
decimal_spec: DecimalSpec,
) -> PositionResolution:
return PositionResolution(
market_id=position_resolution.market_id,
distressed=position_resolution.distressed,
closed=position_resolution.closed,
mark_price=num_from_padded_int(
position_resolution.mark_price, decimals=decimal_spec.price_decimals
),
)


def _settle_distressed_from_proto(
settle_distressed: vega_protos.events.v1.events.SettleDistressed,
decimal_spec: DecimalSpec,
):
return SettleDistressed(
market_id=settle_distressed.market_id,
party_id=settle_distressed.party_id,
margin=num_from_padded_int(
int(settle_distressed.margin),
decimals=decimal_spec.asset_decimals,
),
price=num_from_padded_int(
int(settle_distressed.price), decimals=decimal_spec.price_decimals
),
)


def _network_parameter_from_proto(network_parameter: vega_protos.vega.NetworkParameter):
return NetworkParameter(key=network_parameter.key, value=network_parameter.value)

Expand Down Expand Up @@ -838,7 +885,7 @@ def _position_from_proto(
average_entry_price=num_from_padded_int(
position.average_entry_price, decimal_spec.price_decimals
),
updated_at=datetime.datetime.fromtimestamp(int(position.updated_at / 1e9)),
updated_at=datetime.datetime.fromtimestamp(position.updated_at / 1e9),
loss_socialisation_amount=num_from_padded_int(
position.loss_socialisation_amount,
decimal_spec.asset_decimals,
Expand Down Expand Up @@ -2310,6 +2357,42 @@ def market_data_subscription_handler(
)


def position_resolution_subscription_handler(
stream: Iterable[vega_protos.api.v1.core.ObserveEventBusResponse],
mkt_pos_dp: Optional[Dict[str, int]] = None,
mkt_price_dp: Optional[Dict[str, int]] = None,
mkt_to_asset: Optional[Dict[str, str]] = None,
asset_dp: Optional[Dict[str, int]] = None,
) -> Transfer:
return _stream_handler(
stream_item=stream,
extraction_fn=lambda evt: evt.position_resolution,
conversion_fn=_position_resolution_from_proto,
mkt_pos_dp=mkt_pos_dp,
mkt_price_dp=mkt_price_dp,
mkt_to_asset=mkt_to_asset,
asset_dp=asset_dp,
)


def settle_distressed_subscription_handler(
stream: Iterable[vega_protos.api.v1.core.ObserveEventBusResponse],
mkt_pos_dp: Optional[Dict[str, int]] = None,
mkt_price_dp: Optional[Dict[str, int]] = None,
mkt_to_asset: Optional[Dict[str, str]] = None,
asset_dp: Optional[Dict[str, int]] = None,
) -> Transfer:
return _stream_handler(
stream_item=stream,
extraction_fn=lambda evt: evt.settle_distressed,
conversion_fn=_settle_distressed_from_proto,
mkt_pos_dp=mkt_pos_dp,
mkt_price_dp=mkt_price_dp,
mkt_to_asset=mkt_to_asset,
asset_dp=asset_dp,
)


def network_parameter_handler(
stream: Iterable[vega_protos.api.v1.core.ObserveEventBusResponse],
) -> Transfer:
Expand Down
11 changes: 6 additions & 5 deletions vega_sim/api/market.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,8 @@ class PriceMonitoringParameters(Config):
"auction_extension": 3600,
},
]
}
},
"none": {"triggers": []},
}

def load(self, opt: Optional[str] = None):
Expand Down Expand Up @@ -393,10 +394,10 @@ def build(self):
class LiquidationStrategy(Config):
OPTS = {
"default": {
"disposal_time_step": 1,
"disposal_fraction": 1,
"full_disposal_size": 10000000,
"max_fraction_consumed": 0.5,
"disposal_time_step": 30,
"disposal_fraction": 0.1,
"full_disposal_size": 1,
"max_fraction_consumed": 0.1,
}
}

Expand Down
28 changes: 28 additions & 0 deletions vega_sim/builders/mybuilder.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import json
import vega_sim.builders as build

from google.protobuf.json_format import MessageToJson

import vega_sim.proto.vega as vega_protos


if __name__ == "__main__":
asset_id = "8ba0b10971f0c4747746cd01ff05a53ae75ca91eba1d4d050b527910c983e27e"
market_id = "d3c94abd0738d3eb6d319654aa86a13268ef00070ef35367c5fd50fae0c00f5d"

asset_decimals = {asset_id: 6}

command = MessageToJson(
vega_protos.wallet.v1.wallet.SubmitTransactionRequest(
liquidity_provision_amendment=build.commands.commands.liquidity_provision_amendment(
asset_decimals=asset_decimals,
asset_id=asset_id,
market_id=market_id,
commitment_amount=10000,
fee=0.001,
)
),
indent="",
).replace("\n", "")

print(command)
43 changes: 42 additions & 1 deletion vega_sim/local_data_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

def _queue_forwarder(
data_client: vac.VegaCoreClient,
counter: dict,
stream_registry: List[
Tuple[
Any,
Expand Down Expand Up @@ -57,6 +58,7 @@ def _queue_forwarder(
for event in o.events:
if (kill_thread_sig is not None) and kill_thread_sig.is_set():
return
counter[event.type] += 1
output = retry(5, 1.0, lambda: handlers[event.type](event))
if isinstance(output, (list, GeneratorType)):
for elem in output:
Expand Down Expand Up @@ -99,6 +101,11 @@ def __init__(
self._asset_decimals = asset_decimals
self._market_to_asset = market_to_asset

# Initialise a counter to count the number of events
self._event_counter: Dict[str:int] = dict.fromkeys(
vega_protos.events.v1.events.BusEventType.values(), 0
)

self.time_update_lock = threading.RLock()
self.orders_lock = threading.RLock()
self.transfers_lock = threading.RLock()
Expand All @@ -109,6 +116,8 @@ def __init__(
self.trades_lock = threading.RLock()
self.ledger_entries_lock = threading.RLock()
self.network_parameter_lock = threading.RLock()
self.position_resolution_lock = threading.RLock()
self.settle_distressed_lock = threading.RLock()
self._time_update_from_feed = 0
self._live_order_state_from_feed = {}
self._dead_order_state_from_feed = {}
Expand All @@ -122,6 +131,8 @@ def __init__(
self._trades_from_feed: List[data.Trade] = []
self._ledger_entries_from_feed: List[data.LedgerEntry] = []
self._network_parameter_from_feed: Dict[str, data.NetworkParameter] = {}
self._position_resolution_from_feed: Dict[str, data.PositionResolution] = {}
self._settle_distressed_from_feed: List[data.SettleDistressed]

self._observation_thread = None
self._aggregated_observation_feed: Queue[Any] = Queue()
Expand Down Expand Up @@ -186,6 +197,20 @@ def __init__(
evt.network_parameter,
),
),
(
(events_protos.BUS_EVENT_TYPE_POSITION_RESOLUTION,),
lambda evt: data.position_resolution_subscription_handler(
evt,
self._market_pos_decimals,
self._market_price_decimals,
self._market_to_asset,
self._asset_decimals,
),
),
(
(events_protos.BUS_EVENT_TYPE_SETTLE_DISTRESSED,),
lambda evt: evt.settle_distressed,
),
]
self._high_load_stream_registry = [
(
Expand All @@ -211,6 +236,11 @@ def stop(self) -> None:
self._observation_thread.join()
self._forwarding_thread.join()

def bus_event_count_from_feed(
self, bus_event_type: vega_protos.events.v1.events.BusEventType
) -> int:
return self._event_counter[bus_event_type]

def time_update_from_feed(
self,
) -> int:
Expand Down Expand Up @@ -294,6 +324,12 @@ def network_parameter_from_feed(
self.initialise_network_parameters()
return self._network_parameter_from_feed[key]

def position_resolution_from_feed(
self,
market_id: str,
) -> data.PositionResolution:
return self._position_resolution_from_feed.get(market_id, None)

def start_live_feeds(
self,
market_ids: Optional[Union[str, List[str]]] = None,
Expand Down Expand Up @@ -333,6 +369,7 @@ def start_live_feeds(
target=_queue_forwarder,
args=(
self._event_bus_client,
self._event_counter,
self.stream_registry
+ (self._high_load_stream_registry if start_high_load_feeds else []),
self._aggregated_observation_feed,
Expand Down Expand Up @@ -553,11 +590,15 @@ def _monitor_stream(self) -> None:
with self.network_parameter_lock:
self._network_parameter_from_feed[update.key] = update

elif isinstance(update, data.PositionResolution):
with self.position_resolution_lock:
self._position_resolution_from_feed[update.market_id,] = update

elif update is None:
logger.debug("Failed to process event into update.")

else:
logger.info(f"Unhandled update {update}")
logger.debug(f"Unhandled update {update}")

def get_ledger_entries_from_stream(
self,
Expand Down
216 changes: 108 additions & 108 deletions vega_sim/proto/vega/events/v1/events_pb2.py

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions vega_sim/proto/vega/events/v1/events_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,7 @@ class RewardPayoutEvent(_message.Message):
"reward_type",
"market",
"locked_until_epoch",
"quantum_amount",
)
PARTY_FIELD_NUMBER: _ClassVar[int]
EPOCH_SEQ_FIELD_NUMBER: _ClassVar[int]
Expand All @@ -1010,6 +1011,7 @@ class RewardPayoutEvent(_message.Message):
REWARD_TYPE_FIELD_NUMBER: _ClassVar[int]
MARKET_FIELD_NUMBER: _ClassVar[int]
LOCKED_UNTIL_EPOCH_FIELD_NUMBER: _ClassVar[int]
QUANTUM_AMOUNT_FIELD_NUMBER: _ClassVar[int]
party: str
epoch_seq: str
asset: str
Expand All @@ -1019,6 +1021,7 @@ class RewardPayoutEvent(_message.Message):
reward_type: str
market: str
locked_until_epoch: str
quantum_amount: str
def __init__(
self,
party: _Optional[str] = ...,
Expand All @@ -1030,6 +1033,7 @@ class RewardPayoutEvent(_message.Message):
reward_type: _Optional[str] = ...,
market: _Optional[str] = ...,
locked_until_epoch: _Optional[str] = ...,
quantum_amount: _Optional[str] = ...,
) -> None: ...

class ValidatorScoreEvent(_message.Message):
Expand Down
232 changes: 116 additions & 116 deletions vega_sim/proto/vega/vega_pb2.py

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions vega_sim/proto/vega/vega_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1527,6 +1527,7 @@ class MarketData(_message.Message):
"market_growth",
"product_data",
"liquidity_provider_sla",
"next_network_closeout",
)
MARK_PRICE_FIELD_NUMBER: _ClassVar[int]
BEST_BID_PRICE_FIELD_NUMBER: _ClassVar[int]
Expand Down Expand Up @@ -1560,6 +1561,7 @@ class MarketData(_message.Message):
MARKET_GROWTH_FIELD_NUMBER: _ClassVar[int]
PRODUCT_DATA_FIELD_NUMBER: _ClassVar[int]
LIQUIDITY_PROVIDER_SLA_FIELD_NUMBER: _ClassVar[int]
NEXT_NETWORK_CLOSEOUT_FIELD_NUMBER: _ClassVar[int]
mark_price: str
best_bid_price: str
best_bid_volume: int
Expand Down Expand Up @@ -1598,6 +1600,7 @@ class MarketData(_message.Message):
liquidity_provider_sla: _containers.RepeatedCompositeFieldContainer[
LiquidityProviderSLA
]
next_network_closeout: int
def __init__(
self,
mark_price: _Optional[str] = ...,
Expand Down Expand Up @@ -1640,6 +1643,7 @@ class MarketData(_message.Message):
liquidity_provider_sla: _Optional[
_Iterable[_Union[LiquidityProviderSLA, _Mapping]]
] = ...,
next_network_closeout: _Optional[int] = ...,
) -> None: ...

class LiquidityProviderFeeShare(_message.Message):
Expand Down Expand Up @@ -2250,6 +2254,7 @@ class Reward(_message.Message):
"market_id",
"reward_type",
"locked_until_epoch",
"quantum_amount",
)
ASSET_ID_FIELD_NUMBER: _ClassVar[int]
PARTY_ID_FIELD_NUMBER: _ClassVar[int]
Expand All @@ -2260,6 +2265,7 @@ class Reward(_message.Message):
MARKET_ID_FIELD_NUMBER: _ClassVar[int]
REWARD_TYPE_FIELD_NUMBER: _ClassVar[int]
LOCKED_UNTIL_EPOCH_FIELD_NUMBER: _ClassVar[int]
QUANTUM_AMOUNT_FIELD_NUMBER: _ClassVar[int]
asset_id: str
party_id: str
epoch: int
Expand All @@ -2269,6 +2275,7 @@ class Reward(_message.Message):
market_id: str
reward_type: str
locked_until_epoch: int
quantum_amount: str
def __init__(
self,
asset_id: _Optional[str] = ...,
Expand All @@ -2280,6 +2287,7 @@ class Reward(_message.Message):
market_id: _Optional[str] = ...,
reward_type: _Optional[str] = ...,
locked_until_epoch: _Optional[int] = ...,
quantum_amount: _Optional[str] = ...,
) -> None: ...

class RewardSummary(_message.Message):
Expand Down
Loading

0 comments on commit 6ea4094

Please sign in to comment.