Skip to content
This repository has been archived by the owner on Sep 23, 2023. It is now read-only.

Fatal1ty/tinkoff-api

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

tinkoff-api

Python Tinkoff API client for asyncio and humans.

build Latest Version Python Version License Code style: black

Table of contens

Covered APIs

Features

  • Clients for both REST and Streaming protocols in Tinkoff Investments
  • Presence of data classes for all interaction with API
  • Automatic reconnection and keep-alive connections
  • Internal exclusive rate limiter for every resource in REST protocol
  • Friendly exceptions for API errors

Installation

Use pip to install:

$ pip install tinkoff-api

Usage examples

REST API client:

import asyncio
from datetime import datetime
from tinkoff.investments import (
    TinkoffInvestmentsRESTClient,
    Environment,
    CandleResolution,
)
from tinkoff.investments.client.exceptions import TinkoffInvestmentsError


async def show_apple_year_candles():
    try:
        async with TinkoffInvestmentsRESTClient(
            token="TOKEN", environment=Environment.SANDBOX
        ) as client:

            candles = await client.market.candles.get(
                figi="BBG000B9XRY4",
                dt_from=datetime(2019, 1, 1),
                dt_to=datetime(2019, 12, 31),
                interval=CandleResolution.DAY,
            )
            for candle in candles:
                print(f"{candle.time}: {candle.h}")
    except TinkoffInvestmentsError as e:
        print(e)


async def jackpot():
    try:
        async with TinkoffInvestmentsRESTClient(
            token="TOKEN", environment=Environment.SANDBOX
        ) as client:

            instruments = await client.market.instruments.search("AAPL")
            apple = instruments[0]

            account = await client.sandbox.accounts.register()
            await client.sandbox.accounts.positions.set_balance(
                figi=apple.figi,
                balance=100,
                broker_account_id=account.brokerAccountId,
            )

            print("We created the following portfolio:")
            positions = await client.portfolio.get_positions()
            for position in positions:
                print(f"{position.name}: {position.lots} lots")
    except TinkoffInvestmentsError as e:
        print(e)


asyncio.run(jackpot())

Streaming Client:

import asyncio
from datetime import datetime
from tinkoff.investments import (
    CandleEvent,
    CandleResolution,
    TinkoffInvestmentsStreamingClient,
)

client = TinkoffInvestmentsStreamingClient(token="TOKEN")


@client.events.candles("BBG009S39JX6", CandleResolution.MIN_1)
@client.events.candles("BBG000B9XRY4", CandleResolution.MIN_1)
async def on_candle(candle: CandleEvent, server_time: datetime):
    print(candle, server_time)


asyncio.run(client.run())

Dynamic subscriptions in runtime:

import asyncio
from datetime import datetime
from tinkoff.investments import (
    CandleEvent,
    CandleResolution,
    TinkoffInvestmentsStreamingClient,
)

client = TinkoffInvestmentsStreamingClient(token="TOKEN")


@client.events.candles("BBG000B9XRY4", CandleResolution.HOUR)
async def on_candle(candle: CandleEvent, server_time: datetime):
    if candle.h > 1000:
        await client.events.candles.subscribe(
            callback=on_candle,
            figi=candle.figi,
            interval=CandleResolution.MIN_1,
        )
    elif candle.h < 1000:
        await client.events.candles.unsubscribe(
            candle.figi, CandleResolution.MIN_1
        )


asyncio.run(client.run())

Complete simple bot:

import asyncio
from datetime import datetime

from tinkoff.investments import (
    CandleEvent,
    CandleResolution,
    OperationType,
    TinkoffInvestmentsRESTClient,
    TinkoffInvestmentsStreamingClient,
)

streaming = TinkoffInvestmentsStreamingClient("TOKEN")
rest = TinkoffInvestmentsRESTClient("TOKEN")


@streaming.events.candles("BBG000B9XRY4", CandleResolution.MIN_1)
async def buy_apple(candle: CandleEvent, server_time: datetime):
    if candle.c > 350:
        await rest.orders.create_market_order(
            figi="BBG000B9XRY4",
            lots=1,
            operation=OperationType.BUY,
            broker_account_id=123,
        )


asyncio.run(streaming.run())

Historical data:

import asyncio
from datetime import datetime

from tinkoff.investments import (
    CandleResolution,
    Environment,
    TinkoffInvestmentsRESTClient,
)
from tinkoff.investments.utils.historical_data import HistoricalData


async def get_minute_candles():
    # show 1 minute candles for AAPL in 1 year period of time
    async with TinkoffInvestmentsRESTClient(
        token="TOKEN", environment=Environment.SANDBOX
    ) as client:
        historical_data = HistoricalData(client)
        async for candle in historical_data.iter_candles(
            figi="BBG000B9XRY4",
            dt_from=datetime(2019, 1, 1),
            dt_to=datetime(2020, 1, 1),
            interval=CandleResolution.MIN_1,
        ):
            print(candle)


asyncio.run(get_minute_candles())

TODO

  • allow to provide str constants along with specific enum objects
  • add ability to unsubscribe by pattern
  • rename some fields
  • make some fields in snake case
  • generate documentation