Skip to main content

Working with storages

The Actor class provides methods to work either with the default storages of the Actor, or with any other storage, named or unnamed.

Types of storages

There are three types of storages available to Actors.

First are datasets, which are append-only tables for storing the results of your Actors. You can open a dataset through the Actor.open_dataset() method, and work with it through the resulting Dataset class instance.

Next there are key-value stores, which function as a read/write storage for storing file-like objects, typically the Actor state or binary results. You can open a key-value store through the Actor.open_key_value_store() method, and work with it through the resulting KeyValueStore class instance.

Finally, there are request queues. These are queues into which you can put the URLs you want to scrape, and from which the Actor can dequeue them and process them. You can open a request queue through the Actor.open_request_queue() method, and work with it through the resulting RequestQueue class instance.

Each Actor run has its default dataset, default key-value store and default request queue.

Local storage emulation

To be able to develop Actors locally, the storages that the Apify platform provides are emulated on the local filesystem.

The storage contents are loaded from and saved to the storage folder in the Actor's main folder. Each storage type is stored in its own subfolder, so for example datasets are stored in the storage/datasets folder.

Each storage is then stored in its own folder, named after the storage, or called default if it's the default storage. For example, a request queue with the name my-queue would be stored in storage/request_queues/my-queue.

Each dataset item, key-value store record, or request in a request queue is then stored in its own file in the storage folder. Dataset items and request queue requests are always JSON files, and key-value store records can be any file type, based on its content type. For example, the Actor input is typically stored in storage/key_value_stores/default/INPUT.json.

Local storage persistence

By default, the storage contents are persisted across multiple Actor runs. To clean up the Actor storages before the running the Actor, use the --purge flag of the apify run command of the Apify CLI.

apify run --purge

Convenience methods for working with default storages

There are several methods for directly working with the default key-value store or default dataset of the Actor.

Actor.get_value('my-record') reads a record from the default key-value store of the Actor.

Actor.set_value('my-record', 'my-value') saves a new value to the record in the default key-value store.

Actor.get_input() reads the Actor input from the default key-value store of the Actor.

Actor.push_data([{'result': 'Hello, world!'}, ...]) saves results to the default dataset of the Actor.

Opening named and unnamed storages

The Actor.open_dataset(), Actor.open_key_value_store() and Actor.open_request_queue() methods can be used to open any storage for reading and writing. You can either use them without arguments to open the default storages, or you can pass a storage ID or name to open another storage.

src/main.py
from apify import Actor

async def main():
async with Actor:
# Work with the default dataset of the Actor
dataset = await Actor.open_dataset()
await dataset.push_data({'result': 'Hello, world!'})

# Work with the key-value store with ID 'mIJVZsRQrDQf4rUAf'
key_value_store = await Actor.open_key_value_store(id='mIJVZsRQrDQf4rUAf')
await key_value_store.set_value('record', 'Hello, world!')

# Work with the request queue with the name 'my-queue'
request_queue = await Actor.open_request_queue(name='my-queue')
await request_queue.add_request({ 'uniqueKey': 'v0Nr', 'url': 'https://example.com' })

Deleting storages

To delete a storage, you can use the Dataset.drop(), KeyValueStore.drop() or RequestQueue.drop() method.

src/main.py
from apify import Actor

async def main():
async with Actor:
# Open a key-value store with the name 'my-cool-store'
key_value_store = await Actor.open_key_value_store(name='my-cool-store')
await key_value_store.set_value('record', 'Hello, world!')
...

# Now we don't want it anymore
await key_value_store.drop()

Working with datasets

Reading & writing items

To write data into a dataset, you can use the Dataset.push_data() method.

To read data from a dataset, you can use the Dataset.get_data() method.

To get an iterator of the data, you can use the Dataset.iterate_items() method.

# Open a dataset and write some data in it
dataset = await Actor.open_dataset(name='my-cool-dataset')
await dataset.push_data([{'itemNo': i} for i in range(1000)])

# Read back the first half of the data
first_half = await dataset.get_data(limit=500)
print(first_half['items'])

# Iterate over the second half
second_half = []
async for item in dataset.iterate_items(offset=500):
second_half.append(item)
print(second_half)

Exporting items

You can also export the dataset items into a key-value store, as either a CSV or a JSON record, using the Dataset.export_to_csv() or Dataset.export_to_json() method.

# Open a dataset and write some data in it
dataset = await Actor.open_dataset(name='my-cool-dataset')
await dataset.push_data([{'itemNo': i} for i in range(1000)])

# Export the data as CSV and JSON
await dataset.export_to_csv('data.csv', to_key_value_store_name='my-cool-key-value-store')
await dataset.export_to_json('data.json', to_key_value_store_name='my-cool-key-value-store')

# Print the exported records
store = await Actor.open_key_value_store(name='my-cool-key-value-store')
print(await store.get_value('data.csv'))
print(await store.get_value('data.json'))

Working with key-value stores

Reading and writing records

To read records from a key-value store, you can use the KeyValueStore.get_value() method.

To write records into a key-value store, you can use the KeyValueStore.set_value() method. You can set the content type of a record with the content_type argument. To delete a record, set its value to None.

# Open a key-value store and write some data in it
store = await Actor.open_key_value_store(name='my-cool-key-value-store')
await store.set_value('automatic_text', 'abcd')
await store.set_value('automatic_json', {'ab': 'cd'})
await store.set_value('explicit_csv', 'a,b\nc,d', content_type='text/csv')

# Try that the values are read correctly
print(await store.get_value('automatic_text'))
print(await store.get_value('automatic_json'))
print(await store.get_value('explicit_csv'))

# Delete the `automatic_text` value
await store.set_value('automatic_text', None)

Iterating keys

To get an iterator of the key-value store record keys, you can use the KeyValueStore.iterate_keys() method.

# Print the info for each record
print('Records in store:')
async for (key, info) in store.iterate_keys():
print(f'{key=}, {info=}')

Public URLs of records

To get a publicly accessible URL of a key-value store record, you can use the KeyValueStore.get_public_url() method.

print(f'"my_record" record URL: {await store.get_public_url('my_record')}')

Working with request queues

Adding requests to a queue

To add a request into the queue, you can use the RequestQueue.add_request() method.

You can use the forefront boolean argument to specify whether the request should go to the beginning of the queue, or to the end.

You can use the uniqueKey of the request to uniquely identify a request. If you try to add more requests with the same unique key, only the first one will be added.

Reading requests

To fetch the next request from the queue for processing, you can use the RequestQueue.fetch_next_request() method.

To get info about a specific request from the queue, you can use the RequestQueue.get_request() method.

Handling requests

To mark a request as handled, you can use the RequestQueue.mark_request_as_handled() method.

To mark a request as not handled, so that it gets retried, you can use the RequestQueue.reclaim_request() method.

To check if all the requests in the queue are handled, you can use the RequestQueue.is_finished() method.

Full example

src/main.py
import asyncio
import random
from apify import Actor


async def main():
async with Actor:
# Open the queue
queue = await Actor.open_request_queue()

# Add some requests to the queue
for i in range(1, 10):
await queue.add_request({ 'uniqueKey': f'{i}', 'url': f'http://example.com/{i}' })

# Add a request to the start of the queue, for priority processing
await queue.add_request({ 'uniqueKey': '0', 'url': 'http://example.com/0' }, forefront=True)

# If you try to add an existing request again, it will not do anything
operation_info = await queue.add_request({ 'uniqueKey': '5', 'url': 'http://different-example.com/5' })
print(operation_info)
print(await queue.get_request(operation_info['requestId']))

# Finally, process the queue until all requests are handled
while not await queue.is_finished():
# Fetch the next unhandled request in the queue
request = await queue.fetch_next_request()
# This can happen due to the eventual consistency of the underlying request queue storage,
# best solution is just to sleep a bit
if request is None:
await asyncio.sleep(1)
continue

Actor.log.info(f'Processing request {request["uniqueKey"]}...')
Actor.log.info(f'Scraping URL {request["url"]}...')

# Do some fake work, which fails 30% of the time
await asyncio.sleep(1)
if random.random() > 0.3:
# If processing the request was successful, mark it as handled
Actor.log.info('Request successful.')
await queue.mark_request_as_handled(request)
else:
# If processing the request was unsuccessful, reclaim it so it can be processed again
Actor.log.warning('Request failed, will retry!')
await queue.reclaim_request(request)