如何解决如何从1个S3文件读取和写入多个json对象到dynamodb python 3.8
我能够从S3存储桶到dynamodb读写单个json记录。但是,当我尝试从其中包含多个json对象的文件读取和写入时,它给了我错误。请在下面找到代码和错误-请您帮助解决相同的问题- Lambda代码(读取S3文件并写入dynamodb)
import json
import boto3
s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')
def lambda_handler(event,context):
# Todo implement
bucket = event['Records'][0]['s3']['bucket']['name']
json_file_name = event['Records'][0]['s3']['object']['key']
print(bucket)
print(json_file_name)
json_object = s3_client.get_object(Bucket=bucket,Key=json_file_name)
jsonFileReader = json_object['Body'].read()
print(jsonFileReader)
jsonFile = json.loads(jsonFileReader)
print(jsonFile)
print(type(jsonFile))
jsonDict = {"test":item for item in jsonFile}
print(type(jsonDict))
print(jsonDict)
table = dynamodb.Table('Twitter-data-stream')
print(type(table))
table.put_item(Item=jsonDict['test'])
return 'Hello from Lambda!'
cloudwatch中的错误-
[ERROR] JSONDecodeError: Extra data: line 1 column 230 (char 229)
Traceback (most recent call last):
File "/var/task/lambda_function.py",line 20,in lambda_handler
jsonFilerec = json.loads(jsonFileReader)
File "/var/lang/lib/python3.8/json/__init__.py",line 357,in loads
return _default_decoder.decode(s)
File "/var/lang/lib/python3.8/json/decoder.py",line 340,in decode
raise JSONDecodeError("Extra data",s,end)
请在下面找到S3文件样本记录
b'[{"id": "1305857561179152385","tweet": "If you like vintage coke machines and guys who look like Fred Flintstone you\'ll love the short we\'ve riffed: Coke R,"ts": "Tue Sep 15 13:14:38 +0000 2020"}][{"id": "1305858267067883521","tweet": "Chinese unicorn Genki Forest plots own beverage hits #China #Chinese #Brands #GoingGlobal\\u2026 ","ts": "Tue Sep 15 13:17:27 +0000 2020"}][{"id": "1305858731293507585","tweet": "RT @CinemaCheezy: If you like vintage coke machines and guys who look like Fred Flintstone you\'ll love the short we\'ve riffed: Coke Refresh\\u2026","ts": "Tue Sep 15 13:19:17 +0000 2020"}]'
添加用于生成Json文件的Producer / Input代码
import boto3
import json
from datetime import datetime
import calendar
import random
import time
import sys
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import preprocessor as p
#Variables that contains the user credentials to access Twitter API
consumer_key = '************'
consumer_secret ='******************'
access_token = '********************'
access_token_secret = '***************'
# Create tracklist with the words that will be searched for
tracklist = ['#coke']
awsRegionName='us-east-1'
awsAccessKey='************'
awsSecretKey='**********'
class TweetStreamListener(StreamListener):
# on success
def on_data(self,data):
# decode json
tweet = json.loads(data)
print(type(tweet))
#print(tweet)
if "text" in tweet.keys():
payload = {'id': str(tweet['id']),'tweet': str(tweet['text'].encode('utf8','replace')),#'tweet': str(tweet['text']),'ts': str(tweet['created_at']),},try:
print(tweet)
#print(payload)
put_response = kinesis_client.put_record(
StreamName=stream_name,Data=json.dumps(payload),PartitionKey=str(['screen_name']))
#PartitionKey=str(tweet['user']['screen_name']))
except (AttributeError,Exception) as e:
print(e)
pass
return True
# on failure
def on_error(self,status):
print("On_error status:",status)
stream_name = 'twitter-data-stream' # fill the name of Kinesis data stream you created
#stream_name = 'demo-datastream'
if __name__ == '__main__':
# create kinesis client connection
kinesis_client = boto3.client('kinesis',region_name=awsRegionName,aws_access_key_id=awsAccessKey,aws_secret_access_key=awsSecretKey)
# create instance of the tweepy tweet stream listener
listener = TweetStreamListener()
# set twitter keys/tokens
auth = OAuthHandler(consumer_key,consumer_secret)
auth.set_access_token(access_token,access_token_secret)
# create instance of the tweepy stream
stream = Stream(auth,listener)
# search twitter for tags or keywords from cli parameters
#query = sys.argv[1:] # list of CLI arguments
#query_fname = ' '.join(query) # string
stream.filter(track=tracklist)
#tweets = api.search(tracklist,count=10,lang='en',exclude='retweets',tweet_mode = 'extended')
关于, Priti
解决方法
也许您的json不正确:[tweet_data] [...] [...] [...]不是有效的json对象。 您应该对输入数据进行处理,以使它具有以下内容:[{tweet_data},{...},{...},{...},{...}]
版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。