Spaces:
Runtime error
Runtime error
File size: 2,512 Bytes
c6dce0f ea13ed5 a2fd3b4 c6dce0f a2fd3b4 798abe7 a2fd3b4 c6dce0f a2fd3b4 c6dce0f ea13ed5 f1d29d4 a2fd3b4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
import os
import string
import random
# import tweepy
import time
import json
import requests
from requests_oauthlib import OAuth1
import pandas as pd
from datasets import load_dataset
BEARER_TOKEN = os.environ.get("BEARER_TOKEN")
CONSUMER_KEY = os.environ.get("CONSUMER_KEY")
CONSUMER_SECRET = os.environ.get("CONSUMER_SECRET")
ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN")
ACCESS_TOKEN_SECRET = os.environ.get("ACCESS_TOKEN_SECRET")
# client = tweepy.Client(
# bearer_token=BEARER_TOKEN,
# consumer_key=CONSUMER_KEY,
# consumer_secret=CONSUMER_SECRET,
# access_token=ACCESS_TOKEN,
# access_token_secret=ACCESS_TOKEN_SECRET,
# )
# Real tweets
real_dataset = load_dataset("dawood/elon-tweets", split="train", cache_dir="dataset_cache")
real_df = real_dataset.to_pandas()
real_tweets = real_df["Tweets"][:500].values.tolist()
def load_real_tweet():
index = random.randint(0, len(real_tweets))
return real_tweets[index]
# Fake tweets
fake_df = pd.read_csv("elon_generated_tweets.csv")
fake_tweets = fake_df.values.tolist()
def load_fake_tweet():
index = random.randint(0, len(fake_tweets))
return fake_tweets[index]
# response = client.create_tweet(
# text=load_real_tweet()
# )
def connect_to_endpoint(url, params=None, data=None, type='GET'):
resp = None
if type == 'GET':
resp = requests.get(url, json=data, params=params, auth=bearer_oauth)
elif type == 'POST':
resp = requests.post(url, json=data, params=params, auth=OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET))
elif type == 'DELETE':
resp = requests.delete(url, json=data, params=params, auth= OAuth1(CONSUMER_KEY, CONSUMER_SECRET, ACCESS_TOKEN, ACCESS_TOKEN_SECRET))
if not resp.status_code in (200, 201):
raise Exception(resp.status_code, resp.text)
return resp.json()
def create_poll():
''' Creates a Twitter poll using the given question and options '''
url = 'https://api.twitter.com/2/tweets'
real_or_fake = random.randint(0, 2)
function = load_real_tweet if real_or_fake > 0 else load_fake_tweet
tweet = function()
data = {
'text': tweet,
'poll': {
'options': [
"Elon",
"Not",
],
'duration_minutes': 24 * 60
}
}
connect_to_endpoint(url, data=data, type='POST')
if __name__ == "__main__":
while True:
create_poll()
time.sleep(24 * 60 * 60)
|