-
Notifications
You must be signed in to change notification settings - Fork 1
/
write-data.py
81 lines (58 loc) · 3.06 KB
/
write-data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
# Databricks notebook source
from pyspark.sql.functions import year, month,dayofmonth,dayofweek
from pyspark.sql.types import IntegerType
# COMMAND ----------
import pandas as ps
import random
from datetime import datetime, timedelta
# Generate data for January 2023
start_date = datetime(2023, 1, 1)
end_date = datetime(2023, 12, 31)
date_generated = [start_date + timedelta(days=x) for x in range(0, (end_date-start_date).days)]
transport_types = ["Bus", "Train", "Tram", "Metro"]
routes = ["Route_" + str(i) for i in range(1, 11)]
stations = ["Station_" + str(i) for i in range(1, 21)]
# Randomly select 5 days as extreme weather days
extreme_weather_days = random.sample(date_generated, 5)
data = []
for date in date_generated:
for _ in range(32): # 32 records per day to get a total of 992 records for January
transport = random.choice(transport_types)
route = random.choice(routes)
# Normal operating hours
departure_hour = random.randint(5, 22)
departure_minute = random.randint(0, 59)
# Introducing Unusual Operating Hours for buses
if transport == "Bus" and random.random() < 0.05: # 5% chance
departure_hour = 3
departure_time = f"{departure_hour:02}:{departure_minute:02}"
# Normal duration
duration = random.randint(10, 120)
# Introducing Short Turnarounds
if random.random() < 0.05: # 5% chance
duration = random.randint(1, 5)
# General delay
delay = random.randint(0, 15)
# Weather Impact
if date in extreme_weather_days:
# Increase delay by 10 to 60 minutes
delay += random.randint(10, 60)
# 10% chance to change the route
if random.random() < 0.10:
route = random.choice(routes)
total_minutes = departure_minute + duration + delay
arrival_hour = departure_hour + total_minutes // 60
arrival_minute = total_minutes % 60
arrival_time = f"{arrival_hour:02}:{arrival_minute:02}"
passengers = random.randint(1, 100)
departure_station = random.choice(stations)
arrival_station = random.choice(stations)
data.append([date, transport, route, departure_time, arrival_time, passengers, departure_station, arrival_station, delay])
df = ps.DataFrame(data, columns=["Date", "TransportType", "Route", "DepartureTime", "ArrivalTime", "Passengers", "DepartureStation", "ArrivalStation", "Delay"])
spark_df = spark.createDataFrame(df)
session = spark.builder.getOrCreate()
session.conf.set(
"fs.azure.account.key.yassineessadistorageg2.blob.core.windows.net", "Par6PN6r2BUU9Z4Kzd4ITeN/l4SniXsOR6/Rrtup6LxocPLhWpzv5IxyynGRfT6rOixSc0QH2GUr+AStkS4mXQ==")
spark_df = spark_df.withColumn("Year", year(spark_df["Date"]).cast(IntegerType()))
spark_df = spark_df.withColumn("Month", month(spark_df["Date"]).cast(IntegerType()))
spark_df.coalesce(1).write.partitionBy("Year","Month").format("csv").option('header', True).mode("append").save("wasbs://[email protected]/public_transport_data/raw")