如何解决通过为查询提供发布日期和时间来进行 Twitter 抓取
以下代码将通过指定 userId 抓取特定用户在 Twitter 上的帖子。
我想通过指定帖子的日期和时间进行抓取,例如“2020 年 4 月 1 日之前的帖子”或“2021 年 1 月之后的帖子”。 (我想在抓取阶段应用查询“发布日期/时间”,而不是抓取后按发布日期/时间过滤。)
如何修改代码?
我自己尝试通过反复试验来做到这一点,但无法做到。
# Import of various libraries
import pandas as pd
from requests_oauthlib import OAuth1Session
import json
import datetime,time,sys
from abc import ABCMeta,abstractmethod
# Set the Twitter API key
CK = 'XXXXXXXXXXX' # Consumer Key or API Key
CS = 'XXXXXXXXXXX' # Consumer Secret or API Secret Key
AT = 'XXXXXXXXXXX' # Access Token
AS = 'XXXXXXXXXXX' # Accesss Token Secert
class TweetsGetter(object):
__metaclass__ = ABCMeta
def __init__(self):
self.session = OAuth1Session(CK,CS,AT,AS)
@abstractmethod
def specifyUrlAndParams(self,keyword):
'''
Call destination URL,return parameters
'''
@abstractmethod
def pickupTweet(self,res_text,includeRetweet):
'''
Retrieve the tweet from res_text,set it to an array,and return it
'''
@abstractmethod
def getLimitContext(self,res_text):
'''
Obtain information on the number of times limit (at startup)
'''
def collect(self,total = -1,onlyText = False,includeRetweet = False):
'''
Start getting tweets.
'''
#----------------
# Check the frequency limit.
#----------------
self.checkLimit()
#----------------
# URL、Parameters
#----------------
url,params = self.specifyUrlAndParams()
params['include_rts'] = str(includeRetweet).lower()
# include_rts is a parameter for statuses/user_timeline,not valid for search/tweets
#----------------
# Getting Tweets
#----------------
cnt = 0
unavailableCnt = 0
while True:
res = self.session.get(url,params = params)
if res.status_code == 503:
# 503 : Service Unavailable
if unavailableCnt > 10:
raise Exception('Twitter API error %d' % res.status_code)
unavailableCnt += 1
print ('Service Unavailable 503')
self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
continue
unavailableCnt = 0
if res.status_code != 200:
raise Exception('Twitter API error %d' % res.status_code)
tweets = self.pickupTweet(json.loads(res.text))
if len(tweets) == 0:
# `len(tweets) ! = params['count']`,but since count seems to be the maximum value,it can't be used to determine it.
# ⇒ "== 0"
# https://dev.twitter.com/discussions/7513
break
for tweet in tweets:
if (('retweeted_status' in tweet) and (includeRetweet is False)):
pass
else:
if onlyText is True:
yield tweet['text']
else:
yield tweet
cnt += 1
if cnt % 100 == 0:
print ('%d件 ' % cnt)
if total > 0 and cnt >= total:
return
params['max_id'] = tweet['id'] - 1
# Header confirmation (limited number of times)
# Check X-Rate-Limit-Remaining as it may not be included in some cases.
if ('X-Rate-Limit-Remaining' in res.headers and 'X-Rate-Limit-Reset' in res.headers):
if (int(res.headers['X-Rate-Limit-Remaining']) == 0):
self.waitUntilReset(int(res.headers['X-Rate-Limit-Reset']))
self.checkLimit()
else:
print ('not found - X-Rate-Limit-Remaining or X-Rate-Limit-Reset')
self.checkLimit()
def checkLimit(self):
'''
Query a limit on the number of times to wait until access is available.
'''
unavailableCnt = 0
while True:
url = "https://api.twitter.com/1.1/application/rate_limit_status.json"
res = self.session.get(url)
if res.status_code == 503:
# 503 : Service Unavailable
if unavailableCnt > 10:
raise Exception('Twitter API error %d' % res.status_code)
unavailableCnt += 1
print ('Service Unavailable 503')
self.waitUntilReset(time.mktime(datetime.datetime.now().timetuple()) + 30)
continue
unavailableCnt = 0
if res.status_code != 200:
raise Exception('Twitter API error %d' % res.status_code)
remaining,reset = self.getLimitContext(json.loads(res.text))
if (remaining == 0):
self.waitUntilReset(reset)
else:
break
def waitUntilReset(self,reset):
'''
sleep until reset time
'''
seconds = reset - time.mktime(datetime.datetime.now().timetuple())
seconds = max(seconds,0)
print ('\n =====================')
print (' == waiting %d sec ==' % seconds)
print (' =====================')
sys.stdout.flush()
time.sleep(seconds + 10) # Just in case + 10 sec.
@staticmethod
def byUser(screen_name):
return TweetsGetterByUser(screen_name)
class TweetsGetterByUser(TweetsGetter):
'''
Retrieve tweets by specifying a user
'''
def __init__(self,screen_name):
super(TweetsGetterByUser,self).__init__()
self.screen_name = screen_name
def specifyUrlAndParams(self):
'''
Call destination URL,return parameters
'''
url = 'https://api.twitter.com/1.1/statuses/user_timeline.json'
params = {'screen_name':self.screen_name,'count':200}
return url,params
def pickupTweet(self,res_text):
'''
Retrieve the tweet from res_text,and return it
'''
results = []
for tweet in res_text:
results.append(tweet)
return results
def getLimitContext(self,res_text):
'''
Obtain information on the number of times limit (at startup)
'''
remaining = res_text['resources']['statuses']['/statuses/user_timeline']['remaining']
reset = res_text['resources']['statuses']['/statuses/user_timeline']['reset']
return int(remaining),int(reset)
if __name__ == '__main__':
getter = TweetsGetter.byUser('@elonmusk')
list_text = []
list_id = []
list_user_screenname = []
list_created_at = []
list_favorite_count = []
list_retweet_count = []
for tweet in getter.collect(total = 3000):
list_text.append(tweet['text'])
list_id.append(tweet['id'])
list_user_screenname.append(tweet['user']['screen_name'])
list_created_at.append(tweet['created_at'])
list_favorite_count.append(tweet['favorite_count'])
list_retweet_count.append(tweet['retweet_count'])
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。