Skip to content

Commit

Permalink
First pass at streaming APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasbair committed Oct 8, 2024
1 parent 648c05d commit f73f028
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 1 deletion.
41 changes: 41 additions & 0 deletions lib/models/definitions/transaction/daily_financing.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
defmodule ExOanda.DailyFinancing do
@moduledoc """
Schema for Oanda daily financing.
"""

use TypedEctoSchema
import Ecto.Changeset
alias ExOanda.Type.Atom

@primary_key false

typed_embedded_schema do
field(:id, :string)
field(:time, :utc_datetime_usec)
field(:user_id, :integer)
field(:account_id, :string)
field(:batch_id, :string)
field(:request_id, :string)
field(:type, Atom, default: :DAILY_FINANCING)
field(:financing, :integer)
field(:account_balance, :float)
field(:position_financings, {:array, :map})
end

@doc false
def changeset(struct, params) do
struct
|> cast(params, [
:id,
:time,
:user_id,
:account_id,
:batch_id,
:request_id,
:type,
:financing,
:account_balance,
:position_financings
])
end
end
1 change: 1 addition & 0 deletions lib/models/response/atom.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule ExOanda.Type.Atom do
def type, do: :string

def cast(value) when is_atom(value), do: {:ok, value}
def cast(value) when is_binary(value), do: {:ok, String.to_atom(value)}
def cast(_), do: :error

def load(value), do: {:ok, String.to_existing_atom(value)}
Expand Down
22 changes: 22 additions & 0 deletions lib/models/response/streaming/pricing/heartbeat.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
defmodule ExOanda.Response.PricingHeartbeat do
@moduledoc """
Schema for Oanda pricing heartbeat response.
"""

use TypedEctoSchema
import Ecto.Changeset
alias ExOanda.Type.Atom

@primary_key false

typed_embedded_schema do
field(:time, :utc_datetime_usec)
field(:type, Atom, default: :HEARTBEAT)
end

@doc false
def changeset(struct, params) do
struct
|> cast(params, [:time, :type])
end
end
47 changes: 47 additions & 0 deletions lib/models/response/streaming/pricing/pricing.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
defmodule ExOanda.Response.Pricing do
@moduledoc """
Schema for Oanda streaming pricing response.
"""

use TypedEctoSchema
import Ecto.Changeset

@primary_key false

typed_embedded_schema do
field(:instrument, :string)
field(:status, Ecto.Enum, values: [:tradeable, :non_tradeable, :invalid])
field(:time, :utc_datetime_usec)
field(:closeout_ask, :float)
field(:closeout_bid, :float)

embeds_many :asks, Ask, primary_key: false do
field(:liquidity, :integer)
field(:price, :float)
end

embeds_many :bids, Bid, primary_key: false do
field(:liquidity, :integer)
field(:price, :float)
end
end

@doc false
def changeset(struct, params) do
struct
|> cast(params, [
:instrument,
:status,
:time,
:closeout_ask,
:closeout_bid,
])
|> cast_embed(:asks, with: &price_changeset/2)
|> cast_embed(:bids, with: &price_changeset/2)
end

defp price_changeset(struct, params) do
struct
|> cast(params, [:liquidity, :price])
end
end
50 changes: 50 additions & 0 deletions lib/models/response/streaming/transactions/event.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
defmodule ExOanda.Response.TransactionEvent do
@moduledoc """
Schema for Oanda streaming transaction response.
"""

use Ecto.Schema
import Ecto.Changeset
import PolymorphicEmbed
alias ExOanda.{
OrderFillTransaction,
TakeProfitOrderTransaction,
StopLossOrderTransaction,
TrailingStopLossOrderTransaction,
MarketOrderRejectTransaction,
MarketOrderTransaction,
OrderCancelTransaction,
TradeClientExtensionsModifyTransaction,
DailyFinancingTransaction,
Response.TransactionHeartbeat
}

@primary_key false

embedded_schema do
polymorphic_embeds_one :event,
# Note: this list is incomplete based the Oanda docs
types: [
ORDERFILL: OrderFillTransaction,
TAKE_PROFIT_ORDER: TakeProfitOrderTransaction,
STOP_LOSS_ORDER: StopLossOrderTransaction,
TRAILING_STOP_LOSS_ORDER: TrailingStopLossOrderTransaction,
MARKET_ORDER_REJECT: MarketOrderRejectTransaction,
MARKET_ORDER: MarketOrderTransaction,
ORDER_CANCEL: OrderCancelTransaction,
TRADE_CLIENT_EXTENSIONS_MODIFY: TradeClientExtensionsModifyTransaction,
DAILY_FINANCING: DailyFinancingTransaction,
HEARTBEAT: TransactionHeartbeat
],
type_field_name: :type,
on_type_not_found: :raise,
on_replace: :update
end

@doc false
def changeset(struct, params) do
struct
|> cast(params, [])
|> cast_polymorphic_embed(:event)
end
end
23 changes: 23 additions & 0 deletions lib/models/response/streaming/transactions/heartbeat.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule ExOanda.Response.TransactionHeartbeat do
@moduledoc """
Schema for Oanda transaction heartbeat response.
"""

use TypedEctoSchema
import Ecto.Changeset
alias ExOanda.Type.Atom

@primary_key false

typed_embedded_schema do
field(:last_transaction_id, :string)
field(:time, :utc_datetime_usec)
field(:type, Atom, default: :HEARTBEAT)
end

@doc false
def changeset(struct, params) do
struct
|> cast(params, [:last_transaction_id, :time, :type])
end
end
79 changes: 79 additions & 0 deletions lib/streaming/streaming.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
defmodule ExOanda.Streaming do
@moduledoc """
Interface for Oanda streaming endpoints.
"""

alias ExOanda.API
alias ExOanda.Connection, as: Conn
alias ExOanda.Transform, as: TF

@price_stream_params NimbleOptions.new!(
instruments: [
type: {:list, :string},
required: true
]
)

@doc """
Stream transactions for an account.
## Examples
iex> ExOanda.Streaming.transaction_stream(conn, "101-004-22222222-001", &IO.inspect/1)
:ok
"""
def transaction_stream(%Conn{} = conn, account_id, stream_to, params \\ []) do
stream(conn, account_id, :transactions, stream_to, params)
end

@doc """
Stream prices for an instrument(s).
## Examples
iex> ExOanda.Streaming.price_stream(conn, "101-004-22222222-001", &IO.inspect/1, instruments: ["EUR_USD"])
:ok
## Supported parameters
#{NimbleOptions.docs(@price_stream_params)}
"""
def price_stream(%Conn{} = conn, account_id, stream_to, params \\ []) do
case NimbleOptions.validate(params, @price_stream_params) do
{:ok, params} ->
stream(conn, account_id, :pricing, stream_to, format_instruments(params))
{:error, errors} ->
{:error, errors}
end
end

defp stream(%Conn{} = conn, account_id, stream_type, stream_to, params) do
Req.new(
auth: API.auth_bearer(conn),
url: "#{conn.stream_server}/accounts/#{account_id}/#{stream_type}/stream",
method: :get,
headers: API.base_headers(),
params: params,
into: fn {:data, data}, {req, resp} ->
data
|> String.split("\n", trim: true)
|> Enum.each(fn line ->
line
|> TF.transform_stream(stream_type)
|> stream_to.()
end)

{:cont, {req, resp}}
end
)
|> Req.request(conn.options)
end

defp format_instruments(params) do
instruments =
params
|> Keyword.fetch!(:instruments)
|> Enum.join(",")

%{instruments: instruments}
end
end
18 changes: 17 additions & 1 deletion lib/util/transform.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ defmodule ExOanda.Transform do
alias ExOanda.{
CodeGenerator,
HttpStatus,
Response
Response,
Response.TransactionEvent,
Response.PricingHeartbeat,
Response.Pricing
}

@spec transform(map(), atom()) :: Response.t()
Expand All @@ -16,6 +19,13 @@ defmodule ExOanda.Transform do
|> apply_changes()
end

def transform_stream(val, stream_type) do
val
|> Jason.decode!()
|> find_stream_schema(stream_type)
|> then(fn {schema, data} -> preprocess_data(schema, data) end)
end

defp preprocess_body(model, response) do
data =
response
Expand Down Expand Up @@ -77,4 +87,10 @@ defmodule ExOanda.Transform do

changeset
end

# Transactions are varied, so polymorphic embed is used to handle different types of transactions.
defp find_stream_schema(val, :transactions), do: {TransactionEvent, %{"event" => val}}

defp find_stream_schema(%{"type" => "HEARTBEAT"} = val, :pricing), do: {PricingHeartbeat, val}
defp find_stream_schema(val, :pricing), do: {Pricing, val}
end
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ defmodule ExOanda.MixProject do
{:ex_doc, "~> 0.31", only: :dev, runtime: false},
{:miss, "~> 0.1.5"},
{:nimble_options, "~> 1.1"},
{:polymorphic_embed, "~> 5.0"},
{:recase, "~> 0.8.1"},
{:req, "~> 0.5.2"},
{:req_telemetry, "~> 0.1.1"},
Expand Down
2 changes: 2 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
%{
"attrs": {:hex, :attrs, "0.6.0", "25d738b47829f964a786ef73897d2550b66f3e7d1d7c49a83bc8fd81c71bed93", [:mix], [], "hexpm", "9c30ac15255c2ba8399263db55ba32c2f4e5ec267b654ce23df99168b405c82e"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"},
"castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"},
Expand Down Expand Up @@ -29,6 +30,7 @@
"plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"},
"plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"},
"plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"},
"polymorphic_embed": {:hex, :polymorphic_embed, "5.0.0", "8edf0f8262e26d2bd884f972ab34e2ae970019ad8a4a2df8fc319117f5238d40", [:mix], [{:attrs, "~> 0.6", [hex: :attrs, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 4.1", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_html_helpers, "~> 1.0", [hex: :phoenix_html_helpers, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}], "hexpm", "462976e51f74858cd5cc886fd23bf74616805c11f3abe7df352dd242de1b50e6"},
"ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"},
"recase": {:hex, :recase, "0.8.1", "ab98cd35857a86fa5ca99036f575241d71d77d9c2ab0c39aacf1c9b61f6f7d1d", [:mix], [], "hexpm", "9fd8d63e7e43bd9ea385b12364e305778b2bbd92537e95c4b2e26fc507d5e4c2"},
"req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"},
Expand Down

0 comments on commit f73f028

Please sign in to comment.