-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathtrigger_interface.py
More file actions
250 lines (187 loc) · 8.42 KB
/
trigger_interface.py
File metadata and controls
250 lines (187 loc) · 8.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
"""
Example: Manual Trigger/Poll/Fetch Interface
Demonstrates how to use the new trigger interface for manual control
over the scrape lifecycle: trigger -> status -> fetch.
Use cases:
- Start multiple scrapes concurrently
- Custom polling logic
- Save job IDs for later retrieval
- Optimize cost and timing
Run: python examples/11_trigger_interface.py
"""
import asyncio
import time
from brightdata import BrightDataClient
# ============================================================================
# Example 1: Basic Trigger/Poll/Fetch Pattern
# ============================================================================
async def example_basic_trigger():
"""Trigger a scrape, wait, and fetch results manually."""
print("=" * 60)
print("Example 1: Basic Trigger/Poll/Fetch")
print("=" * 60)
async with BrightDataClient() as client:
amazon = client.scrape.amazon
# Step 1: Trigger the scrape (returns immediately)
print("\n🚀 Triggering Amazon product scrape...")
job = await amazon.products_trigger(url="https://www.amazon.com/dp/B0CRMZHDG8")
print(f"✅ Job triggered: {job.snapshot_id}")
# Step 2: Check status manually
print("\n🔍 Checking job status...")
status = await job.status()
print(f"Status: {status}")
# Step 3: Wait for completion (with custom timeout)
print("\n⏳ Waiting for completion...")
await job.wait(timeout=180, verbose=True)
# Step 4: Fetch results
print("\n📥 Fetching results...")
data = await job.fetch()
print(f"✅ Got {len(data) if isinstance(data, list) else 1} records")
# Or use convenience method (wait + fetch + wrap in ScrapeResult)
print("\n💡 Alternative: Use to_result_async()...")
result = await job.to_result()
print(f"Success: {result.success}")
print(f"Cost: ${result.cost:.4f}")
# ============================================================================
# Example 2: Concurrent Scraping (Trigger Multiple, Fetch Later)
# ============================================================================
async def example_concurrent_scraping():
"""Trigger multiple scrapes concurrently, then fetch all."""
print("\n\n" + "=" * 60)
print("Example 2: Concurrent Scraping")
print("=" * 60)
async with BrightDataClient() as client:
amazon = client.scrape.amazon
# URLs to scrape
urls = [
"https://www.amazon.com/dp/B0CRMZHDG8",
"https://www.amazon.com/dp/B09B9C8K3T",
"https://www.amazon.com/dp/B0CX23V2ZK",
]
# Step 1: Trigger all scrapes (non-blocking)
print("\n🚀 Triggering multiple scrapes...")
jobs = []
for i, url in enumerate(urls, 1):
job = await amazon.products_trigger(url=url)
jobs.append(job)
print(f" [{i}/{len(urls)}] Triggered: {job.snapshot_id[:12]}...")
print(f"\n✅ All {len(jobs)} jobs triggered!")
# Step 2: Wait for all to complete
print("\n⏳ Waiting for all jobs to complete...")
results = []
for i, job in enumerate(jobs, 1):
print(f" [{i}/{len(jobs)}] Waiting for job {job.snapshot_id[:12]}...")
result = await job.to_result(timeout=180)
results.append(result)
# Step 3: Process all results
print("\n📊 Results summary:")
total_cost = sum(r.cost or 0 for r in results)
successful = sum(1 for r in results if r.success)
print(f" - Successful: {successful}/{len(results)}")
print(f" - Total cost: ${total_cost:.4f}")
print(f" - Avg time: {sum(r.elapsed_ms() or 0 for r in results) / len(results):.0f}ms")
# ============================================================================
# Example 3: Custom Polling Logic
# ============================================================================
async def example_custom_polling():
"""Implement custom polling logic with your own intervals."""
print("\n\n" + "=" * 60)
print("Example 3: Custom Polling Logic")
print("=" * 60)
async with BrightDataClient() as client:
amazon = client.scrape.amazon
# Trigger the scrape
print("\n🚀 Triggering scrape...")
job = await amazon.products_trigger(url="https://www.amazon.com/dp/B0CRMZHDG8")
print(f"✅ Job ID: {job.snapshot_id}")
# Custom polling with exponential backoff
print("\n⏳ Custom polling with exponential backoff...")
poll_interval = 2 # Start with 2 seconds
max_interval = 20 # Max 20 seconds
max_attempts = 30
for attempt in range(max_attempts):
status = await job.status()
elapsed = time.time() - job.triggered_at.timestamp()
print(f" [{elapsed:.1f}s] Attempt {attempt + 1}: {status}")
if status == "ready":
print("✅ Job completed!")
data = await job.fetch()
print(f"📥 Got {len(data) if isinstance(data, list) else 1} records")
break
elif status == "error":
print("❌ Job failed")
break
# Wait with exponential backoff
await asyncio.sleep(poll_interval)
poll_interval = min(poll_interval * 1.5, max_interval)
else:
print("⏰ Timeout reached")
# ============================================================================
# Example 4: Save Job ID for Later Retrieval
# ============================================================================
async def example_save_and_resume():
"""Trigger a job, save the ID, and retrieve it later."""
print("\n\n" + "=" * 60)
print("Example 4: Save Job ID & Resume Later")
print("=" * 60)
async with BrightDataClient() as client:
amazon = client.scrape.amazon
# Phase 1: Trigger and save job ID
print("\n📝 Phase 1: Trigger and save job ID...")
job = await amazon.products_trigger(url="https://www.amazon.com/dp/B0CRMZHDG8")
snapshot_id = job.snapshot_id
print(f"✅ Job triggered: {snapshot_id}")
print(f"💾 Saved snapshot_id for later: {snapshot_id}")
# Simulate doing other work...
print("\n💤 Simulating other work (5 seconds)...")
await asyncio.sleep(5)
# Phase 2: Resume with saved snapshot_id
print("\n🔄 Phase 2: Resume with saved snapshot_id...")
print(f"📂 Loading snapshot_id: {snapshot_id}")
# Check status using the snapshot_id directly
status = await amazon.products_status(snapshot_id)
print(f"Status: {status}")
# Fetch if ready
if status == "ready":
data = await amazon.products_fetch(snapshot_id)
print(f"✅ Fetched {len(data) if isinstance(data, list) else 1} records")
else:
print("⏳ Job not ready yet, would need to wait longer...")
# ============================================================================
# Example 5: Sync Usage (for non-async code)
# ============================================================================
def example_sync_usage():
"""Use trigger interface in synchronous code."""
print("\n\n" + "=" * 60)
print("Example 5: Sync Usage")
print("=" * 60)
client = BrightDataClient()
amazon = client.scrape.amazon
# Trigger (sync)
print("\n🚀 Triggering scrape (sync)...")
job = amazon.products_trigger(url="https://www.amazon.com/dp/B0CRMZHDG8")
print(f"✅ Job ID: {job.snapshot_id}")
# Check status (sync)
print("\n🔍 Checking status (sync)...")
status = job.status()
print(f"Status: {status}")
# Wait and fetch (sync)
print("\n⏳ Waiting for completion (sync)...")
result = job.to_result(timeout=180)
print(f"Success: {result.success}")
print(f"Cost: ${result.cost:.4f}")
# ============================================================================
# Run All Examples
# ============================================================================
if __name__ == "__main__":
print("\n🚀 Trigger Interface Examples\n")
# Run async examples
asyncio.run(example_basic_trigger())
asyncio.run(example_concurrent_scraping())
asyncio.run(example_custom_polling())
asyncio.run(example_save_and_resume())
# Run sync example
example_sync_usage()
print("\n" + "=" * 60)
print("✅ All examples completed!")
print("=" * 60)