Krypto1 / scrape_utils.py
KatGaw's picture
adding new reddit group
05a3e2c
raw
history blame
14.5 kB
# Import libraries
#Import packages
import pandas as pd
import numpy as np
import time
import datetime
from pycoingecko import CoinGeckoAPI
#from utils import slice
# Get API for CoinGecko
#cg = CoinGeckoAPI()
from dotenv import load_dotenv
from bs4 import BeautifulSoup
import requests
from pytrends.request import TrendReq
pytrends = TrendReq(hl='en-US')
from pytrends import dailydata
import yfinance as yf
import json
import prettytable
import os
from requests import Request, Session
from requests.exceptions import ConnectionError, Timeout, TooManyRedirects
import json
load_dotenv()
COINMARKET_API_KEY=os.environ["COINMARKET_API_KEY"]
# Historical crypto data
def scrape_historical_series(coin_name,symbol,date_start,date_end):
import datetime
""" Scrape historical series on the sample of coins.
Args:
coin_names(list): List of coins we will use for training.
date_start(list): List of values for Year_start,Month_start,Day_start.
date_end(list): List of values for Year_end,Month_end,Day_end.
Returns:
Dataframe with the evolution of prices, market capitalizaiton, and total volume over time, for each respective currency.
"""
df_ts_coins1=pd.DataFrame()
#DATE definitions
date_time = datetime.datetime(int(date_start[0]),int(date_start[1]),int(date_start[2]))
date_time_now = datetime.datetime(int(date_end[0]),int(date_end[1]),int(date_end[2]))
unix_past=time.mktime(date_time.timetuple()) #change the date format into unix for scraping
unix_now=time.mktime(date_time_now.timetuple())
past=datetime.datetime(int(date_start[0]),int(date_start[1]),int(date_start[2])).strftime('%Y-%m-%d')
now=datetime.datetime(int(date_end[0]),int(date_end[1]),int(date_end[2])).strftime('%Y-%m-%d')
datum_range=pd.date_range(start=past,end=now, freq='D')
#empty lists
unix_all=[]
#create date variable
for val in datum_range:
unix_all=np.append(unix_all,time.mktime(val.timetuple()))
url = ' https://pro-api.coinmarketcap.com/v2/cryptocurrency/quotes/historical'
parameters = {
'time_start': (int(unix_past)),
'time_end': (int(unix_now)),
'symbol': symbol,
'convert':'USD',
'interval': 'daily',
}
headers = {
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': COINMARKET_API_KEY,
}
session = Session()
session.headers.update(headers)
try:
response = session.get(url, params=parameters)
data_json = json.loads(response.text)
#data = json.loads(response.text['data']['quote']['USD'])
except (ConnectionError, Timeout, TooManyRedirects) as e:
print(e)
#SCRAPE FOR ETH
#create date variable
for val in datum_range:
unix_all=np.append(unix_all,time.mktime(val.timetuple()))
url = ' https://pro-api.coinmarketcap.com/v2/cryptocurrency/quotes/historical'
parameters = {
'time_start': (int(unix_past)),
'time_end': (int(unix_now)),
'symbol': 'ETH',
'convert':'USD',
'interval': 'daily',
}
headers = {
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': COINMARKET_API_KEY,
}
session = Session()
session.headers.update(headers)
try:
response = session.get(url, params=parameters)
data_json_eth = json.loads(response.text)
#data = json.loads(response.text['data']['quote']['USD'])
except (ConnectionError, Timeout, TooManyRedirects) as e:
print(e)
date=[]
price=[]
price_eth=[]
market_caps=[]
total_volumes=[]
for i in range(len(data_json['data'][symbol][0]['quotes'])):
date=np.append(date,data_json['data'][symbol][0]['quotes'][i]['quote']['USD']['timestamp'])
price=np.append(price,data_json['data'][symbol][0]['quotes'][i]['quote']['USD']['price'])
market_caps=np.append(market_caps,data_json['data'][symbol][0]['quotes'][i]['quote']['USD']['market_cap'])
total_volumes=np.append(total_volumes,data_json['data'][symbol][0]['quotes'][i]['quote']['USD']['volume_24h'])
price_eth=np.append(price_eth,data_json_eth['data']['ETH'][0]['quotes'][i]['quote']['USD']['price'])
ts_coins_cut=pd.DataFrame({'date':date, 'prices':price,'market_caps':market_caps,'total_vol':total_volumes,'price_eth':price_eth})
ts_coins_cut['id']=np.repeat(coin_name,len(ts_coins_cut))
ts_coins_cut['date']=pd.to_datetime(ts_coins_cut['date'])
# SCRAPE CURRENT DATA
unix_all=[]
#create date variable
for val in datum_range:
unix_all=np.append(unix_all,time.mktime(val.timetuple()))
url = ' https://pro-api.coinmarketcap.com/v2/cryptocurrency/quotes/latest'
parameters = {
'symbol': symbol,
'convert':'USD',
}
headers = {
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': COINMARKET_API_KEY,
}
session = Session()
session.headers.update(headers)
try:
response = session.get(url, params=parameters)
data_json = json.loads(response.text)
#data = json.loads(response.text['data']['quote']['USD'])
except (ConnectionError, Timeout, TooManyRedirects) as e:
print(e)
# Current data ETH
#create date variable
for val in datum_range:
unix_all=np.append(unix_all,time.mktime(val.timetuple()))
url = ' https://pro-api.coinmarketcap.com/v2/cryptocurrency/quotes/latest'
parameters = {
'symbol': 'ETH',
'convert':'USD',
}
headers = {
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': COINMARKET_API_KEY,
}
session = Session()
session.headers.update(headers)
try:
response = session.get(url, params=parameters)
data_json_eth = json.loads(response.text)
#data = json.loads(response.text['data']['quote']['USD'])
except (ConnectionError, Timeout, TooManyRedirects) as e:
print(e)
date=data_json['data'][str(symbol)][0]['quote']['USD']['last_updated']
market_cap=data_json['data'][str(symbol)][0]['quote']['USD']['market_cap']
total_volumes=data_json['data'][str(symbol)][0]['quote']['USD']['volume_24h']
price=data_json['data'][str(symbol)][0]['quote']['USD']['price']
price_eth=data_json_eth['data']['ETH'][0]['quote']['USD']['price']
# CREATE CURRENT ROW
from datetime import date
today = date.today()
df_today_row=pd.DataFrame({0:['id','date','prices','market_caps','total_vol','price_eth'],1:[coin_name[0],today.strftime('%Y-%m-%d %H:%M:%S'),price,market_cap,total_volumes,price_eth]}).T
df_today_row.columns=df_today_row.iloc[0,:]
df_today_row=df_today_row.drop(0)
ts_coins_cut.to_csv('ts_coins_cut.csv')
return ts_coins_cut, df_today_row
# 2. Macro variables, CLI
def scrape_cli(past,today):
"""Scrape data on leading indicator for USA.
Args:
past(date): Date for which you want to start scraping.
today(date): Date for which you want to end scraping.
Returns:
Dataframe with CLI and dates.
"""
countries=['USA'] #,'OECDE','OECD','NMEC']
past_date=past.strftime('%Y-%m')
today_date=today.strftime('%Y-%m')
clis=[]
bclis=[]
names=[]
datas_country=pd.DataFrame()
datas1=pd.DataFrame()
types=['CLI'] #,'BCLI']
for type in types:
print(type)
'''Scrape OECD data and create dataset in the form of time series where variables are CLI and BCLI for each country'''
for country in countries:
# Scrape data
# if type=='BCLI':
# mainpage=requests.get(f'https://stats.oecd.org/restsdmx/sdmx.ashx/GetData/MEI_CLI/BSCICP03.{country}.M/all?startTime={past}&endTime={today}')
if type=='CLI':
mainpage=requests.get(f'https://stats.oecd.org/restsdmx/sdmx.ashx/GetData/MEI_CLI/CSCICP03.{country}.M/all?startTime={past_date}&endTime={today_date}')
soup=BeautifulSoup(mainpage.content,'xml') #'html.parser')
whatis=soup.find_all("ObsValue")
whatis_key=soup.find_all("ObsKey")
country=([(str(whatis_key[i]).split('"REF_AREA" value="')[1][:3]) for i in range(len(whatis))])
dates=[pd.to_datetime(str(whatis_key[i]).split('"TIME_PERIOD" value="')[1][:7]) for i in range(len(whatis))]
measure=[(str(whatis_key[i]).split('"MEASURE" value="')[1][:7][:-2]) for i in range(len(whatis))]
values=[float(str(whatis[i]).split('value="')[1][0:-4]) for i in range(len(whatis))]
df_cli=pd.DataFrame({'date':dates,'country':country,'measure':measure,type:values})
df_cli.index=pd.to_datetime(df_cli['date'])
df_cli=df_cli.loc[df_cli['country']=='USA']['CLI'].astype('float').resample('M').mean()
return df_cli
def scrape_cpi_employment():
"""Scrape CPI and employment data."""
headers = {'Content-type': 'application/json'}
variables=['CUUR0000SA0','LNS12000000']
data = json.dumps({"seriesid": variables,"startyear":"2024", "endyear":"2024"})
p = requests.post('https://api.bls.gov/publicAPI/v2/timeseries/data/', data=data, headers=headers)
json_data = json.loads(p.text)
year_all=[]
period_all=[]
value_all=[]
series_id=[]
if len(json_data['Results'])>0:
for series in json_data['Results']['series']:
x=prettytable.PrettyTable(["series id","year","period","value","footnotes"])
seriesId = series['seriesID']
for item in series['data']:
year = item['year']
period = item['period']
value = item['value']
footnotes=""
for footnote in item['footnotes']:
if footnote:
footnotes = footnotes + footnote['text'] + ','
if 'M01' <= period <= 'M12':
x.add_row([seriesId,year,period,value,footnotes[0:-1]])
year_all=np.append(year_all,year)
period_all=np.append(period_all,period)
value_all=np.append(value_all,value)
if seriesId=='CUUR0000SA0':
series_id=np.append(series_id,'CPI')
if seriesId=='LNS12000000':
series_id=np.append(series_id,'Employment')
date=[(pd.to_datetime(f"{year_all[i]}'-'{int(period_all[i][-2:])}")) for i in range(len(year_all))]
df_cpi=pd.DataFrame({'date':date,'value':value_all})
df_cpi['series_id']=series_id
df_cpi.set_index('date',inplace=True)
df_cpi=pd.concat([df_cpi.loc[df_cpi['series_id']=='CPI'],df_cpi.loc[df_cpi['series_id']=='Employment']],axis=1)
df_cpi=df_cpi.drop(columns='series_id')
df_cpi.columns=['CPI','Employment']
else:
df_cpi=pd.DataFrame()
return df_cpi
def scrape_google_trends(currency, currency_short):
curr_neni=[]
names_values=[currency]
names_short=[currency_short]
from datetime import date
today = date.today()
Day_end = today.strftime("%d")
Month_end = today.strftime("%m")
Year_end = today.strftime("%Y")
Hour_end=21
Minute_end=20
past=today-datetime.timedelta(days=200)
Day_start = past.strftime("%d")
Month_start = past.strftime("%m")
Year_start = past.strftime("%Y")
date_start=[Year_start,Month_start,Day_start]
date_end=[Year_end,Month_end,Day_end]
date_all1=pd.date_range(past,today)
#data_all1=np.repeat(0,len(date))
keywords = []
google_data=pd.DataFrame()
for run_name in list(names_values):
'''Scrape Google trends and create one time-series in the form of concated time series across all currencies'''
#google_old_slice=slice(run_name,google_old_for_slice,google_old_for_slice['id'])
run=list(names_values).index(run_name)
time.sleep(5)
try:
data=dailydata.get_daily_data(str(run_name),int(Year_start), int(Month_start), int(Year_end), int(Month_end),verbose=False) #kw_list, 2021, 10, 2021, 11, geo = '',verbose=False,wait_time=5
data1=data.iloc[:,4]
except:
try:
time.sleep(5)
new_index=list(names_values).index(run_name)
data = dailydata.get_daily_data(word=names_short[new_index],start_year=Year_start, start_mon=Month_start, stop_year=Year_end, stop_mon=Month_end,verbose=False) #kw_list, 2021, 10, 2021, 11, geo = '',verbose=False,wait_time=5
data1=data.iloc[:,4]
except:
pass
curr_neni=np.append(curr_neni,run_name)
#print(f'no currency {run_name} to scrape in google trends')
data1=(np.repeat(0,len(date_all1)))
data1=pd.DataFrame({'google_trend':pd.Series(data1)})
data1.insert(0,'id',np.repeat(run_name,len(data1)))
#google_tog=pd.concat([google_old_slice.set_index('date'),data1],axis=0)
google_data=pd.concat([google_data,data1],axis=0)
#change index from date to date_new to match old_dataset
google_data.reset_index(inplace=True)
if int(np.mean(data1['google_trend']))==0==0:
google_data['date_new']=date_all1
google_data.set_index('date_new',inplace=True)
else:
google_data.columns=np.append('date_new',google_data.columns[1:])
google_data.set_index('date_new',inplace=True)
return google_data
def scrape_stocks(past,today):
# Set dates in the form needed for scraping
date_old = past
date_new = today
#date_new = date.today().strftime("%Y-%m-%d")
df=pd.DataFrame()
#the codes for variables we are going to scrape
codes=['^GSPC','GC=F','EURUSD%3DX','^TNX']
codes_names=['GSPC','GC=F','EURUSD','TNX']
for code in codes:
'''Scrape Yahoo finance and create dataset with time series for all the financial variables'''
code_index=codes.index(code)
code_name=codes_names[code_index]
df_code = yf.download(code,start=date_old, end=date_new,progress=False) #^IXIC print
df_code=pd.DataFrame(df_code)
df_code=df_code.reset_index()
df_code_ts=df_code.iloc[:,1]
df_code_ts=df_code_ts.rename(code_name)
df = pd.concat([df,df_code_ts],axis=1)
df_time=df_code.iloc[:,0]
#set the index to date_new
df.insert(0,'date',df_time)
df_finance=df.dropna()
df_finance.set_index('date',inplace=True)
df_finance.index=pd.to_datetime(df_finance.index)
#combine dataset with the old one
return df_finance