From 0e6e5b0d2875daf02871b2162a93f0f19ea24068 Mon Sep 17 00:00:00 2001 From: William Benton Date: Wed, 15 Nov 2017 11:47:19 +0100 Subject: [PATCH] Use Student's T to totally overfit stock returns --- requirements.txt | 1 + src/app.py | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/requirements.txt b/requirements.txt index 3b6db7b..907b44a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ Flask==0.12 Flask-SocketIO==2.8.2 gevent==1.2.1 gevent-websocket==0.9.5 +scipy==0.17.1 diff --git a/src/app.py b/src/app.py index bb9528c..e1bf95b 100644 --- a/src/app.py +++ b/src/app.py @@ -45,8 +45,9 @@ def simstep(pf, params, prng): symbol. """ def daily_return(sym): - mean, stddev = params[sym] - change = (prng.normalvariate(mean, stddev) + 100) / 100.0 + from scipy.stats import t + df, loc, scale = params[sym] + change = (t.ppf(prng.uniform(0, 1), df=df, loc=loc, scale=scale) + 100) / 100.0 return change return {s: daily_return(s) * v for s, v in pf.items()} @@ -93,6 +94,7 @@ def processing_loop(spark_master, input_queue, output_queue, wikieod_file): import pyspark from pyspark import sql as pysql from pyspark.sql import functions as pyfuncs + from scipy.stats import t spark = pysql.SparkSession.builder.master(spark_master).getOrCreate() sc = spark.sparkContext @@ -104,11 +106,10 @@ def processing_loop(spark_master, input_queue, output_queue, wikieod_file): 'change', (pyfuncs.col('close') / pyfuncs.lag('close', 1).over( pysql.Window.partitionBy('ticker').orderBy( df['date'])) - 1.0) * 100) - - mv = ddf.groupBy('ticker').agg(pyfuncs.avg('change').alias('mean'), - pyfuncs.sqrt(pyfuncs.variance('change')).alias('stddev')) - - dist_map = mv.rdd.map(lambda r: (r[0], (r[1], r[2]))).collectAsMap() + + changes = ddf.groupBy("ticker").agg(pyfuncs.collect_list("change").alias("changes")) + + dist_map = changes.rdd.map(lambda r: (r[0], t.fit(r[1]))).collectAsMap() priceDF = ddf.orderBy('date', ascending=False).groupBy('ticker').agg( pyfuncs.first('close').alias('price'),