## Import Packages

In [1]:
# Import packages
import requests
import pandas as pd
import pyarrow.parquet as pq
import io
import boto3
from urllib.parse import urlparse
from sagemaker_studio import Project

proj = Project()

In [2]:
# Data File Path
file_name = 'weather_data.csv'
s3_folder = '/datalake/'
data_path = proj.s3.root + s3_folder + file_name

# Set up S3 client
s3 = boto3.client('s3')


# Function to read data
def read_csv_from_s3(s3_uri):
    # Parse the S3 URI
    parsed = urlparse(s3_uri)
    bucket = parsed.netloc
    key = parsed.path.lstrip('/')

    obj = s3.get_object(Bucket=bucket, Key=key)
    df = pd.read_csv(obj['Body'])

    return df

In [3]:
w_df = read_csv_from_s3(data_path)

In [9]:
w_df['rounded_hour'] = pd.to_datetime(w_df['rounded_hour'], utc=True)


## Taxi Data Collection

- [Data URL](https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2016-01.parquet)
- [Website](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page)


#### Data Dictionary for relevant variables ####

| Column | Definition | 
| --- | ---|
| tpep_pickup_datetime | The date and time when the meter was engaged |
| tpep_dropoff_datetime | The date and time when the meter was disengaged |
| passenger_count | The number of passengers in the vehicle |
| trip_distance | The elapsed trip distance in miles reported by the taximeter |
| fare_amount | The time-and-distance fare calculated by the meter. For additional information on the following columns, [LINK](https://www.nyc.gov/site/tlc/passengers/taxi-fare.page) |
| total_amount | The total amount charged to passengers. Does not include cash tips |

**Date Range: 2016-01-01 : 2016-01-31**

In [4]:
# create function to read parquet from URL
def read_parquet_from_url(url):
    # Download the content
    response = requests.get(url)

    if response.status_code == 200:
        # Create a BytesIO object from the content
        parquet_file = io.BytesIO(response.content)

        # Read the parquet file
        table = pq.read_table(parquet_file)
        
        # Convert to pandas DataFrame
        df = table.to_pandas()
        
        return df
    else:
        print(f"Failed to download the file. Status code: {response.status_code}")
        return None


In [5]:
# Get taxi data
url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2016-01.parquet"
t_df = read_parquet_from_url(url)

if t_df is not None:
    print('NYC Taxi Data Sample \n\n')
    print(t_df.head())

NYC Taxi Data Sample 


   VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0         1  2016-01-01 00:12:22   2016-01-01 00:29:14                1   
1         1  2016-01-01 00:41:31   2016-01-01 00:55:10                2   
2         1  2016-01-01 00:53:37   2016-01-01 00:59:57                1   
3         1  2016-01-01 00:13:28   2016-01-01 00:18:07                1   
4         1  2016-01-01 00:33:04   2016-01-01 00:47:14                1   

   trip_distance  RatecodeID store_and_fwd_flag  PULocationID  DOLocationID  \
0            3.2           1                  N            48           262   
1            1.0           1                  N           162            48   
2            0.9           1                  N           246            90   
3            0.8           1                  N           170           162   
4            1.8           1                  N           161           140   

   payment_type  fare_amount  extra  mta_tax  tip_

In [6]:
# calculate trip time variable
t_df['tpep_pickup_datetime'] = pd.to_datetime(t_df['tpep_pickup_datetime'], utc=True)
t_df['tpep_dropoff_datetime'] = pd.to_datetime(t_df['tpep_dropoff_datetime'], utc=True)

# Calculate time difference in minutes
t_df['duration_minutes'] = (t_df['tpep_dropoff_datetime'] - t_df['tpep_pickup_datetime']).dt.total_seconds() / 60

# Round the ride start time to the nearest hour for joining to the weather data
t_df['rounded_hour'] = t_df['tpep_pickup_datetime'].dt.round('h')

# Keep only relevant columns
t_df = t_df[['passenger_count','trip_distance','total_amount','duration_minutes','rounded_hour']]


print('NYC Taxi Data Sample \n\n')
print( t_df.head())

NYC Taxi Data Sample 


   passenger_count  trip_distance  total_amount  duration_minutes  \
0                1            3.2         18.36         16.866667   
1                2            1.0         10.80         13.650000   
2                1            0.9          7.30          6.333333   
3                1            0.8          6.30          4.650000   
4                1            1.8         12.30         14.166667   

               rounded_hour  
0 2016-01-01 00:00:00+00:00  
1 2016-01-01 01:00:00+00:00  
2 2016-01-01 01:00:00+00:00  
3 2016-01-01 00:00:00+00:00  
4 2016-01-01 01:00:00+00:00  


In [10]:
# Join the taxi and weather data sets to prep for developing prediction model.
result_df = pd.merge(w_df, t_df, left_on='rounded_hour', right_on='rounded_hour')
result_df.head()

Unnamed: 0,timestamp,temp_f,rain_inches,windspeed,rounded_hour,passenger_count,trip_distance,total_amount,duration_minutes
0,2016-01-01 00:00:00,28.272346,3.538517,15.399464,2016-01-01 00:00:00+00:00,1,3.2,18.36,16.866667
1,2016-01-01 00:00:00,28.272346,3.538517,15.399464,2016-01-01 00:00:00+00:00,1,0.8,6.3,4.65
2,2016-01-01 00:00:00,28.272346,3.538517,15.399464,2016-01-01 00:00:00+00:00,5,3.46,21.3,30.3
3,2016-01-01 00:00:00,28.272346,3.538517,15.399464,2016-01-01 00:00:00+00:00,1,0.87,8.3,12.95
4,2016-01-01 00:00:00,28.272346,3.538517,15.399464,2016-01-01 00:00:00+00:00,2,2.6,12.3,12.15


In [11]:
# Path to where data will be stored
file_name = 'joined_data.csv'
s3_folder = '/datalake/'
data_path = proj.s3.root + s3_folder + file_name


# Create S3 client
s3_client = boto3.client('s3')


# Function to write the results to S3
def put_dataframe_to_s3_as_csv(s3_uri, dataframe):
    # Convert DataFrame to CSV string
    csv_buffer = io.StringIO()
    dataframe.to_csv(csv_buffer, index=False)

    # Parse the S3 URI
    parsed = urlparse(s3_uri)
    bucket = parsed.netloc
    key = parsed.path.lstrip('/')

    # Upload to S3
    s3_client.put_object(
        Bucket=bucket,
        Key=key,
        Body=csv_buffer.getvalue(),
        ContentType='text/csv'
    )

In [12]:
# Write result to S3
try:
    put_dataframe_to_s3_as_csv(data_path, result_df)
    print("DataFrame successfully uploaded as CSV to S3")
except Exception as e:
    print(f"Error uploading DataFrame to S3: {str(e)}")

DataFrame successfully uploaded as CSV to S3


In [13]:
result_df.columns

Index(['timestamp', 'temp_f', 'rain_inches', 'windspeed', 'rounded_hour',
       'passenger_count', 'trip_distance', 'total_amount', 'duration_minutes'],
      dtype='object')