import memcache
# 设置缓存客户端
mc = memcache.Client(['127.0.0.1:11211'], debug=0)
# 读取缓存数据
cache_data = mc.get('api_response_data')
if cache_data is None:
# 缓存中没有数据,从API获取新数据
api_data = call_api()
# 将数据存入缓存
mc.set('api_response_data', api_data, 300)
# 从缓存获取数据
data = mc.get('api_response_data')
import json
import gzip
from google.cloud import storage
# 获取API响应数据
api_data = call_api()
# 转换为JSON格式
json_data = json.dumps(api_data)
# 压缩数据
compressed_data = gzip.compress(json_data.encode('utf-8'))
# 上传到GCS
client = storage.Client()
bucket = client.bucket('my-bucket')
blob = bucket.blob('data/response_data.json.gz')
blob.upload_from_string(compressed_data, content_type='application/gzip')
from google.cloud import pubsub_v1
from google.cloud import bigquery
import json
project_id = 'my-project-id'
topic_name = 'my-topic'
subscription_name = 'my-subscription'
table_id = 'my-dataset.my-table'
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()
bigquery_client = bigquery.Client()
# 创建Pub/Sub主题和订阅
topic_path = publisher.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(project_id, subscription_name)
if not bigquery_client.check_table(table_id):
# 如果BigQuery中没有目标表,创建表
schema = [
bigquery.SchemaField('field1', 'STRING'),
上一篇:API响应数据合并