From f73f028ea0eeb180d7953344cd514be7380d07fa Mon Sep 17 00:00:00 2001 From: nickbair Date: Mon, 7 Oct 2024 18:08:00 -0600 Subject: [PATCH] First pass at streaming APIs --- .../transaction/daily_financing.ex | 41 ++++++++++ lib/models/response/atom.ex | 1 + .../response/streaming/pricing/heartbeat.ex | 22 ++++++ .../response/streaming/pricing/pricing.ex | 47 +++++++++++ .../response/streaming/transactions/event.ex | 50 ++++++++++++ .../streaming/transactions/heartbeat.ex | 23 ++++++ lib/streaming/streaming.ex | 79 +++++++++++++++++++ lib/util/transform.ex | 18 ++++- mix.exs | 1 + mix.lock | 2 + 10 files changed, 283 insertions(+), 1 deletion(-) create mode 100644 lib/models/definitions/transaction/daily_financing.ex create mode 100644 lib/models/response/streaming/pricing/heartbeat.ex create mode 100644 lib/models/response/streaming/pricing/pricing.ex create mode 100644 lib/models/response/streaming/transactions/event.ex create mode 100644 lib/models/response/streaming/transactions/heartbeat.ex create mode 100644 lib/streaming/streaming.ex diff --git a/lib/models/definitions/transaction/daily_financing.ex b/lib/models/definitions/transaction/daily_financing.ex new file mode 100644 index 0000000..ac287ab --- /dev/null +++ b/lib/models/definitions/transaction/daily_financing.ex @@ -0,0 +1,41 @@ +defmodule ExOanda.DailyFinancing do + @moduledoc """ + Schema for Oanda daily financing. + """ + + use TypedEctoSchema + import Ecto.Changeset + alias ExOanda.Type.Atom + + @primary_key false + + typed_embedded_schema do + field(:id, :string) + field(:time, :utc_datetime_usec) + field(:user_id, :integer) + field(:account_id, :string) + field(:batch_id, :string) + field(:request_id, :string) + field(:type, Atom, default: :DAILY_FINANCING) + field(:financing, :integer) + field(:account_balance, :float) + field(:position_financings, {:array, :map}) + end + + @doc false + def changeset(struct, params) do + struct + |> cast(params, [ + :id, + :time, + :user_id, + :account_id, + :batch_id, + :request_id, + :type, + :financing, + :account_balance, + :position_financings + ]) + end +end diff --git a/lib/models/response/atom.ex b/lib/models/response/atom.ex index c66dff3..a5d38d5 100644 --- a/lib/models/response/atom.ex +++ b/lib/models/response/atom.ex @@ -10,6 +10,7 @@ defmodule ExOanda.Type.Atom do def type, do: :string def cast(value) when is_atom(value), do: {:ok, value} + def cast(value) when is_binary(value), do: {:ok, String.to_atom(value)} def cast(_), do: :error def load(value), do: {:ok, String.to_existing_atom(value)} diff --git a/lib/models/response/streaming/pricing/heartbeat.ex b/lib/models/response/streaming/pricing/heartbeat.ex new file mode 100644 index 0000000..1f34ee1 --- /dev/null +++ b/lib/models/response/streaming/pricing/heartbeat.ex @@ -0,0 +1,22 @@ +defmodule ExOanda.Response.PricingHeartbeat do + @moduledoc """ + Schema for Oanda pricing heartbeat response. + """ + + use TypedEctoSchema + import Ecto.Changeset + alias ExOanda.Type.Atom + + @primary_key false + + typed_embedded_schema do + field(:time, :utc_datetime_usec) + field(:type, Atom, default: :HEARTBEAT) + end + + @doc false + def changeset(struct, params) do + struct + |> cast(params, [:time, :type]) + end +end diff --git a/lib/models/response/streaming/pricing/pricing.ex b/lib/models/response/streaming/pricing/pricing.ex new file mode 100644 index 0000000..90b7f27 --- /dev/null +++ b/lib/models/response/streaming/pricing/pricing.ex @@ -0,0 +1,47 @@ +defmodule ExOanda.Response.Pricing do + @moduledoc """ + Schema for Oanda streaming pricing response. + """ + + use TypedEctoSchema + import Ecto.Changeset + + @primary_key false + + typed_embedded_schema do + field(:instrument, :string) + field(:status, Ecto.Enum, values: [:tradeable, :non_tradeable, :invalid]) + field(:time, :utc_datetime_usec) + field(:closeout_ask, :float) + field(:closeout_bid, :float) + + embeds_many :asks, Ask, primary_key: false do + field(:liquidity, :integer) + field(:price, :float) + end + + embeds_many :bids, Bid, primary_key: false do + field(:liquidity, :integer) + field(:price, :float) + end + end + + @doc false + def changeset(struct, params) do + struct + |> cast(params, [ + :instrument, + :status, + :time, + :closeout_ask, + :closeout_bid, + ]) + |> cast_embed(:asks, with: &price_changeset/2) + |> cast_embed(:bids, with: &price_changeset/2) + end + + defp price_changeset(struct, params) do + struct + |> cast(params, [:liquidity, :price]) + end +end diff --git a/lib/models/response/streaming/transactions/event.ex b/lib/models/response/streaming/transactions/event.ex new file mode 100644 index 0000000..b07d068 --- /dev/null +++ b/lib/models/response/streaming/transactions/event.ex @@ -0,0 +1,50 @@ +defmodule ExOanda.Response.TransactionEvent do + @moduledoc """ + Schema for Oanda streaming transaction response. + """ + + use Ecto.Schema + import Ecto.Changeset + import PolymorphicEmbed + alias ExOanda.{ + OrderFillTransaction, + TakeProfitOrderTransaction, + StopLossOrderTransaction, + TrailingStopLossOrderTransaction, + MarketOrderRejectTransaction, + MarketOrderTransaction, + OrderCancelTransaction, + TradeClientExtensionsModifyTransaction, + DailyFinancingTransaction, + Response.TransactionHeartbeat + } + + @primary_key false + + embedded_schema do + polymorphic_embeds_one :event, + # Note: this list is incomplete based the Oanda docs + types: [ + ORDERFILL: OrderFillTransaction, + TAKE_PROFIT_ORDER: TakeProfitOrderTransaction, + STOP_LOSS_ORDER: StopLossOrderTransaction, + TRAILING_STOP_LOSS_ORDER: TrailingStopLossOrderTransaction, + MARKET_ORDER_REJECT: MarketOrderRejectTransaction, + MARKET_ORDER: MarketOrderTransaction, + ORDER_CANCEL: OrderCancelTransaction, + TRADE_CLIENT_EXTENSIONS_MODIFY: TradeClientExtensionsModifyTransaction, + DAILY_FINANCING: DailyFinancingTransaction, + HEARTBEAT: TransactionHeartbeat + ], + type_field_name: :type, + on_type_not_found: :raise, + on_replace: :update + end + + @doc false + def changeset(struct, params) do + struct + |> cast(params, []) + |> cast_polymorphic_embed(:event) + end +end diff --git a/lib/models/response/streaming/transactions/heartbeat.ex b/lib/models/response/streaming/transactions/heartbeat.ex new file mode 100644 index 0000000..4037ada --- /dev/null +++ b/lib/models/response/streaming/transactions/heartbeat.ex @@ -0,0 +1,23 @@ +defmodule ExOanda.Response.TransactionHeartbeat do + @moduledoc """ + Schema for Oanda transaction heartbeat response. + """ + + use TypedEctoSchema + import Ecto.Changeset + alias ExOanda.Type.Atom + + @primary_key false + + typed_embedded_schema do + field(:last_transaction_id, :string) + field(:time, :utc_datetime_usec) + field(:type, Atom, default: :HEARTBEAT) + end + + @doc false + def changeset(struct, params) do + struct + |> cast(params, [:last_transaction_id, :time, :type]) + end +end diff --git a/lib/streaming/streaming.ex b/lib/streaming/streaming.ex new file mode 100644 index 0000000..2e59859 --- /dev/null +++ b/lib/streaming/streaming.ex @@ -0,0 +1,79 @@ +defmodule ExOanda.Streaming do + @moduledoc """ + Interface for Oanda streaming endpoints. + """ + + alias ExOanda.API + alias ExOanda.Connection, as: Conn + alias ExOanda.Transform, as: TF + + @price_stream_params NimbleOptions.new!( + instruments: [ + type: {:list, :string}, + required: true + ] + ) + + @doc """ + Stream transactions for an account. + + ## Examples + + iex> ExOanda.Streaming.transaction_stream(conn, "101-004-22222222-001", &IO.inspect/1) + :ok + """ + def transaction_stream(%Conn{} = conn, account_id, stream_to, params \\ []) do + stream(conn, account_id, :transactions, stream_to, params) + end + + @doc """ + Stream prices for an instrument(s). + + ## Examples + + iex> ExOanda.Streaming.price_stream(conn, "101-004-22222222-001", &IO.inspect/1, instruments: ["EUR_USD"]) + :ok + + ## Supported parameters + #{NimbleOptions.docs(@price_stream_params)} + """ + def price_stream(%Conn{} = conn, account_id, stream_to, params \\ []) do + case NimbleOptions.validate(params, @price_stream_params) do + {:ok, params} -> + stream(conn, account_id, :pricing, stream_to, format_instruments(params)) + {:error, errors} -> + {:error, errors} + end + end + + defp stream(%Conn{} = conn, account_id, stream_type, stream_to, params) do + Req.new( + auth: API.auth_bearer(conn), + url: "#{conn.stream_server}/accounts/#{account_id}/#{stream_type}/stream", + method: :get, + headers: API.base_headers(), + params: params, + into: fn {:data, data}, {req, resp} -> + data + |> String.split("\n", trim: true) + |> Enum.each(fn line -> + line + |> TF.transform_stream(stream_type) + |> stream_to.() + end) + + {:cont, {req, resp}} + end + ) + |> Req.request(conn.options) + end + + defp format_instruments(params) do + instruments = + params + |> Keyword.fetch!(:instruments) + |> Enum.join(",") + + %{instruments: instruments} + end +end diff --git a/lib/util/transform.ex b/lib/util/transform.ex index b9e4061..79e9020 100644 --- a/lib/util/transform.ex +++ b/lib/util/transform.ex @@ -6,7 +6,10 @@ defmodule ExOanda.Transform do alias ExOanda.{ CodeGenerator, HttpStatus, - Response + Response, + Response.TransactionEvent, + Response.PricingHeartbeat, + Response.Pricing } @spec transform(map(), atom()) :: Response.t() @@ -16,6 +19,13 @@ defmodule ExOanda.Transform do |> apply_changes() end + def transform_stream(val, stream_type) do + val + |> Jason.decode!() + |> find_stream_schema(stream_type) + |> then(fn {schema, data} -> preprocess_data(schema, data) end) + end + defp preprocess_body(model, response) do data = response @@ -77,4 +87,10 @@ defmodule ExOanda.Transform do changeset end + + # Transactions are varied, so polymorphic embed is used to handle different types of transactions. + defp find_stream_schema(val, :transactions), do: {TransactionEvent, %{"event" => val}} + + defp find_stream_schema(%{"type" => "HEARTBEAT"} = val, :pricing), do: {PricingHeartbeat, val} + defp find_stream_schema(val, :pricing), do: {Pricing, val} end diff --git a/mix.exs b/mix.exs index d7f5434..a35ca53 100644 --- a/mix.exs +++ b/mix.exs @@ -35,6 +35,7 @@ defmodule ExOanda.MixProject do {:ex_doc, "~> 0.31", only: :dev, runtime: false}, {:miss, "~> 0.1.5"}, {:nimble_options, "~> 1.1"}, + {:polymorphic_embed, "~> 5.0"}, {:recase, "~> 0.8.1"}, {:req, "~> 0.5.2"}, {:req_telemetry, "~> 0.1.1"}, diff --git a/mix.lock b/mix.lock index e82395f..910415a 100644 --- a/mix.lock +++ b/mix.lock @@ -1,4 +1,5 @@ %{ + "attrs": {:hex, :attrs, "0.6.0", "25d738b47829f964a786ef73897d2550b66f3e7d1d7c49a83bc8fd81c71bed93", [:mix], [], "hexpm", "9c30ac15255c2ba8399263db55ba32c2f4e5ec267b654ce23df99168b405c82e"}, "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, "castore": {:hex, :castore, "1.0.8", "dedcf20ea746694647f883590b82d9e96014057aff1d44d03ec90f36a5c0dc6e", [:mix], [], "hexpm", "0b2b66d2ee742cb1d9cb8c8be3b43c3a70ee8651f37b75a8b982e036752983f1"}, @@ -29,6 +30,7 @@ "plug": {:hex, :plug, "1.16.1", "40c74619c12f82736d2214557dedec2e9762029b2438d6d175c5074c933edc9d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "a13ff6b9006b03d7e33874945b2755253841b238c34071ed85b0e86057f8cddc"}, "plug_cowboy": {:hex, :plug_cowboy, "2.7.1", "87677ffe3b765bc96a89be7960f81703223fe2e21efa42c125fcd0127dd9d6b2", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "02dbd5f9ab571b864ae39418db7811618506256f6d13b4a45037e5fe78dc5de3"}, "plug_crypto": {:hex, :plug_crypto, "2.1.0", "f44309c2b06d249c27c8d3f65cfe08158ade08418cf540fd4f72d4d6863abb7b", [:mix], [], "hexpm", "131216a4b030b8f8ce0f26038bc4421ae60e4bb95c5cf5395e1421437824c4fa"}, + "polymorphic_embed": {:hex, :polymorphic_embed, "5.0.0", "8edf0f8262e26d2bd884f972ab34e2ae970019ad8a4a2df8fc319117f5238d40", [:mix], [{:attrs, "~> 0.6", [hex: :attrs, repo: "hexpm", optional: false]}, {:ecto, "~> 3.12", [hex: :ecto, repo: "hexpm", optional: false]}, {:jason, "~> 1.4", [hex: :jason, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 4.1", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_html_helpers, "~> 1.0", [hex: :phoenix_html_helpers, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}], "hexpm", "462976e51f74858cd5cc886fd23bf74616805c11f3abe7df352dd242de1b50e6"}, "ranch": {:hex, :ranch, "1.8.0", "8c7a100a139fd57f17327b6413e4167ac559fbc04ca7448e9be9057311597a1d", [:make, :rebar3], [], "hexpm", "49fbcfd3682fab1f5d109351b61257676da1a2fdbe295904176d5e521a2ddfe5"}, "recase": {:hex, :recase, "0.8.1", "ab98cd35857a86fa5ca99036f575241d71d77d9c2ab0c39aacf1c9b61f6f7d1d", [:mix], [], "hexpm", "9fd8d63e7e43bd9ea385b12364e305778b2bbd92537e95c4b2e26fc507d5e4c2"}, "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"},