Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Minor-improvements #19

Merged
merged 7 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ jobs:

- name: Build Docker image
run: |
docker build -t ${{ secrets.DOCKER_HUB_USERNAME }}/stockdog:alpha-${{ github.event.number }} .
docker build -t ${{ secrets.DOCKER_HUB_USERNAME }}/stockdog:alpha-${{ github.sha }} .

- name: Login to DockerHub
uses: docker/login-action@v3
Expand All @@ -53,12 +53,12 @@ jobs:
password: ${{ secrets.DOCKER_HUB_TOKEN }}

- name: Push Docker image
run: docker push ${{ secrets.DOCKER_HUB_USERNAME }}/stockdog:alpha-${{ github.event.number }}
run: docker push ${{ secrets.DOCKER_HUB_USERNAME }}/stockdog:alpha-${{ github.sha }}

- name: Update image metadata
uses: docker/metadata-action@v5
with:
images: ${{ secrets.DOCKER_HUB_USERNAME }}/stockdog:alpha-${{ github.event.number }}
images: ${{ secrets.DOCKER_HUB_USERNAME }}/stockdog:alpha-${{ github.sha }}
tags: |
type=sha
labels: |
Expand Down
12 changes: 11 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,19 @@ services:
- db-data:/var/lib/postgresql/data
networks:
- stockdog

grafana:
image: grafana/grafana:latest
ports:
- '3300:3000'
volumes:
- grafana-storage:/var/lib/grafana
networks:
- stockdog
depends_on:
- db
volumes:
db-data:
grafana-storage:

networks:
stockdog:
13 changes: 10 additions & 3 deletions src/asset-management/entities/asset-exchange.entity.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import { Entity, PrimaryGeneratedColumn, ManyToOne, OneToMany } from 'typeorm';
import {
Entity,
PrimaryGeneratedColumn,
ManyToOne,
OneToMany,
Unique,
} from 'typeorm';
import { Asset } from './asset.entity';
import { Exchange } from './exchange.entity';
import { TradingData } from './trading-data.entity';
import { DeliveryData } from './delivery-data.entity';

@Entity()
@Unique(['asset', 'exchange'])
export class AssetExchange {
@PrimaryGeneratedColumn()
id: number;
Expand All @@ -15,9 +22,9 @@ export class AssetExchange {
@ManyToOne(() => Exchange, (exchange) => exchange.assetExchanges)
exchange: Exchange;

@OneToMany(() => TradingData, (tradingData) => tradingData)
@OneToMany(() => TradingData, (tradingData) => tradingData.assetExchange)
tradingData: TradingData[];

@OneToMany(() => DeliveryData, (deliveryData) => deliveryData)
@OneToMany(() => DeliveryData, (deliveryData) => deliveryData.assetExchange)
deliveryData: DeliveryData[];
}
11 changes: 9 additions & 2 deletions src/asset-management/entities/delivery-data.entity.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { Entity, Column, ManyToOne, PrimaryGeneratedColumn } from 'typeorm';
import {
Entity,
Column,
ManyToOne,
PrimaryGeneratedColumn,
Unique,
} from 'typeorm';
import { AssetExchange } from './asset-exchange.entity';

@Entity()
@Unique(['date', 'assetExchange'])
export class DeliveryData {
@PrimaryGeneratedColumn()
id: number;
Expand All @@ -17,7 +24,7 @@ export class DeliveryData {

@ManyToOne(
() => AssetExchange,
(assetExchange) => assetExchange.tradingData,
(assetExchange) => assetExchange.deliveryData,
{
onDelete: 'CASCADE',
},
Expand Down
9 changes: 8 additions & 1 deletion src/asset-management/entities/trading-data.entity.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import { Entity, Column, ManyToOne, PrimaryGeneratedColumn } from 'typeorm';
import {
Entity,
Column,
ManyToOne,
PrimaryGeneratedColumn,
Unique,
} from 'typeorm';
import { AssetExchange } from './asset-exchange.entity';

@Entity()
@Unique(['date', 'assetExchange'])
export class TradingData {
@PrimaryGeneratedColumn()
id: number;
Expand Down
7 changes: 5 additions & 2 deletions src/asset-management/services/delivery-data.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { DeliveryDataDTO } from '../dto';
import { DeliveryData } from '../entities';
import { DeliveryDataRepository } from '../repositories';
import { validateAndThrowError } from '../utils/validate-dto-error';
import { InsertResult } from 'typeorm';

@Injectable()
export class DeliveryDataService {
Expand All @@ -18,8 +19,10 @@ export class DeliveryDataService {
return deliveryData;
}

async saveDeliveryData(deliveryData: DeliveryDataDTO): Promise<DeliveryData> {
async saveDeliveryData(deliveryData: DeliveryDataDTO): Promise<InsertResult> {
await validateAndThrowError(deliveryData, 'deliveryDataDTO');
return this.deliveryDataRepository.create(deliveryData as DeliveryData);
return this.deliveryDataRepository.upsert(deliveryData as DeliveryData, {
conflictPaths: { date: true, assetExchange: true },
});
}
}
7 changes: 5 additions & 2 deletions src/asset-management/services/trading-data.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { TradingDataDTO } from '../dto';
import { TradingData } from '../entities';
import { TradingDataRepository } from '../repositories';
import { validateAndThrowError } from '../utils/validate-dto-error';
import { InsertResult } from 'typeorm';

@Injectable()
export class TradingDataService {
Expand All @@ -16,8 +17,10 @@ export class TradingDataService {
return tradingData;
}

async saveTradingData(tradingData: TradingDataDTO): Promise<TradingData> {
async saveTradingData(tradingData: TradingDataDTO): Promise<InsertResult> {
await validateAndThrowError(tradingData, 'TradingDataDTO');
return this.tradingDataRepository.create(tradingData as TradingData);
return this.tradingDataRepository.upsert(tradingData as TradingData, {
conflictPaths: { date: true, assetExchange: true },
});
}
}
19 changes: 13 additions & 6 deletions src/data-sync/bse.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { AssetDto, DeliveryDataDTO, TradingDataDTO } from './dto';
import parseCSV from './utils/csv-parser';
import { Stream } from 'stream';
import { Exchange } from 'src/asset-management/types/enums';
import { AxiosHeaders } from 'axios';
enum STOCK_DATA_CSV_HEADERS {
SYMBOL = 'Security Id',
NAME_OF_COMPANY = 'Issuer Name',
Expand Down Expand Up @@ -41,6 +42,8 @@ enum DELIVERY_DATA_CSV_HEADERS {
export class BseService {
constructor(private readonly AM: AssetManagement) {}

private logger = new Logger(BseService.name);

async handleAssetData(csvData: Stream) {
const bseExchange = await this.AM.exchangeService.findOrCreateExchange(
Exchange.BSE,
Expand Down Expand Up @@ -68,14 +71,14 @@ export class BseService {
);
const parser = await parseCSV(streamData, CSV_SEPARATOR.PIPE);
for await (const record of parser) {
const assetExchangeCode = record[TRADE_DATA_CSV_HEADERS.SYMBOL];
const assetExchangeCode = record[DELIVERY_DATA_CSV_HEADERS.SYMBOL];
const assetExchange = await this.AM.assetExchangeService.findBySymbol(
assetExchangeCode,
bseExchange,
['asset'],
);
if (!assetExchange) {
Logger.warn(
this.logger.warn(
`Asset Details not found for stock: ${
record[TRADE_DATA_CSV_HEADERS.NAME]
}`,
Expand Down Expand Up @@ -114,7 +117,7 @@ export class BseService {
['asset'],
);
if (!assetExchange) {
Logger.warn(
this.logger.warn(
`Asset Details not found for stock: ${
record[TRADE_DATA_CSV_HEADERS.NAME]
} AssetExchangeCode: ${assetExchangeCode}`,
Expand Down Expand Up @@ -161,14 +164,18 @@ export class BseService {
const year = date.getFullYear();
const month = ('0' + (date.getMonth() + 1)).slice(-2); // Months are 0-based in JavaScript
const day = ('0' + date.getDate()).slice(-2);
const headers = new AxiosHeaders({
'User-Agent':
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11',
Referer: 'https://www.bseindia.com/markets/marketinfo/BhavCopy.aspx',
encoding: null,
});

return {
userAgent:
'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.95 Safari/537.11',
referer: 'https://www.bseindia.com/markets/marketinfo/BhavCopy.aspx',
assetUrl: `https://api.bseindia.com/BseIndiaAPI/api/LitsOfScripCSVDownload/w?segment=Equity&status=Active&industry=&Group=&Scripcode=`,
deliveryURL: `https://www.bseindia.com/BSEDATA/gross/${year}/SCBSEALL${day}${month}.zip`,
tradingURL: `https://www.bseindia.com/download/BhavCopy/Equity/BhavCopy_BSE_CM_0_0_0_${year}${month}${day}_F_0000.CSV`,
headers,
};
}
}
9 changes: 8 additions & 1 deletion src/data-sync/data-sync.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ import { DataSyncService } from './data-sync.service';
import { BseService } from './bse.service';
import { AssetManagement } from './asset-management.service';
import { NseService } from './nse.service';
import { HttpClient } from './utils/httpClient';

@Module({
imports: [AssetManagementModule],
providers: [AssetManagement, DataSyncService, BseService, NseService],
providers: [
HttpClient,
AssetManagement,
DataSyncService,
BseService,
NseService,
],
})
export class DataSyncModule {}
121 changes: 47 additions & 74 deletions src/data-sync/data-sync.service.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,56 @@
import { Injectable, Logger } from '@nestjs/common';
import { Timeout } from '@nestjs/schedule';
import axios from 'axios';
import { AxiosHeaders } from 'axios';
import { NseService } from './nse.service';
import { BseService } from './bse.service';
import * as unzipper from 'unzipper';
import { PassThrough, Readable } from 'stream';
import { PassThrough } from 'stream';
import { Cron, CronExpression } from '@nestjs/schedule';
import { getCurrentDate, getUtcTradeDays } from './utils/trade-days';
import { HttpClient } from './utils/httpClient';

@Injectable()
export class DataSyncService {
constructor(
private httpClient: HttpClient,
private nseService: NseService,
private bseService: BseService,
) {}
private readonly logger = new Logger(DataSyncService.name);

async handleAssetDataBSE(url: string, userAgent: string, referer: string) {
const response = await axios.get(url, {
headers: {
'User-Agent': userAgent,
Referer: referer,
encoding: null,
},
responseType: 'stream',
});
await this.bseService.handleAssetData(response.data);
@Cron(CronExpression.EVERY_DAY_AT_6PM, { timeZone: 'Asia/Kolkata' })
async execute() {
const currentDate = getCurrentDate();
this.logger.log(
`Data Sync Started for ${currentDate.toISOString().slice(0, 10)}`,
);

const workdays = getUtcTradeDays(currentDate, currentDate);
const workdaysStr = workdays.map((date) => date.toISOString().slice(0, 10));
for (const dateStr of workdaysStr) {
await this.handleBSEDataSync(new Date(dateStr));
await this.handleNSEDataSync(new Date(dateStr));
}
}

@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
async handleDeliveryDataBSE(url: string, userAgent: string, referer: string) {
async handleBSEDataSync(date: Date) {
const start = performance.now();

this.logger.log('BSE Data Sync Started');
const response = await axios.get(url, {
headers: {
'User-Agent': userAgent,
Referer: referer,
encoding: null,
},
responseType: 'stream',
});

const { assetUrl, tradingURL, deliveryURL, headers } =
this.bseService.generateFileUrls(date);
const assetResponse = await this.httpClient.get(assetUrl, headers);
await this.bseService.handleAssetData(assetResponse.data);
const tradeResponse = await this.httpClient.get(tradingURL, headers);
await this.bseService.handleTradingData(tradeResponse.data);
await this.handleDeliveryDataBSE(deliveryURL, headers);
const end = performance.now();
const timeTaken = end - start;
this.logger.log(`BSE Data Sync Finished. Time taken: ${timeTaken} ms`);
}

async handleDeliveryDataBSE(url: string, headers: AxiosHeaders) {
const response = await this.httpClient.get(url, headers);
response.data.pipe(unzipper.Parse()).on('entry', async (entry) => {
const fileName = entry.path;
if (fileName.includes('SCBSEALL')) {
Expand All @@ -54,59 +67,19 @@ export class DataSyncService {
});
}

async handleTradingDataBSE(url: string, userAgent: string, referer: string) {
const response = await axios.get(url, {
headers: {
'User-Agent': userAgent,
Referer: referer,
encoding: null,
},
responseType: 'stream',
});
await this.bseService.handleTradingData(response.data);
}

@Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT)
async handleBSEDataSync() {
const { assetUrl, tradingURL, deliveryURL, userAgent, referer } =
this.bseService.generateFileUrls(new Date(2024, 3, 19));
await this.handleAssetDataBSE(assetUrl, userAgent, referer);
await this.handleDeliveryDataBSE(deliveryURL, userAgent, referer);
await this.handleTradingDataBSE(tradingURL, userAgent, referer);
}

@Timeout(5000)
async handleNSEDataSync() {
async handleNSEDataSync(date: Date) {
const start = performance.now();
this.logger.log('NSE Data Sync Started');
const { assetUrl, tradingURL, userAgent, referer } =
this.nseService.generateFileUrls(new Date(2024, 3, 19));

const assetResponse = await axios.get(assetUrl, {
headers: {
Accept: '*/*"',
Connection: 'keep-alive',
'User-Agent': userAgent,
Referer: referer,
},
});
const { assetUrl, tradingURL, headers } =
this.nseService.generateFileUrls(date);

const assetResponseStream = Readable.from(
JSON.stringify(assetResponse.data),
);

await this.nseService.handleAssetData(assetResponseStream);
const tradeResponse = await axios.get(tradingURL, {
headers: {
Accept: '*/*"',
Connection: 'keep-alive',
'User-Agent': userAgent,
Referer: referer,
},
});
const tradeResponseStream = Readable.from(
JSON.stringify(tradeResponse.data),
);
const assetResponse = await this.httpClient.get(assetUrl, headers);

await this.nseService.handleTradingData(tradeResponseStream);
await this.nseService.handleAssetData(assetResponse.data);
const tradeResponse = await this.httpClient.get(tradingURL, headers);
await this.nseService.handleTradingData(tradeResponse.data);
const end = performance.now();
const timeTaken = end - start;
this.logger.log(`NSE Data Sync Finished. Time taken: ${timeTaken} ms`);
}
}
Loading