Introduction
Asynchronous connections, typically for websocket requests, can be run in Ascend.io using the asyncio library in Python. In this article, we will outline the code required to run asynchronous connections in Ascend.io and explain how it works.
Setting up Context
Before we dive into the code, we need to set up the context. The context contains the credentials required to access the data source. To set up the context, we create a function called context that takes in a credentials string as a parameter. The function doesn't need to do anything, but it needs to exist for Ascend.io to recognize it as the context function.
def context(credentials: str): pass
Listing Objects
Next, we need to list the objects in the data source. We create a function called list_objects that takes in the context and metadata as parameters. In this function, we yield a dictionary that contains the name of the object, its fingerprint, and whether it is a prefix.
def list_objects(context: dict, metadata: dict): yield { 'name': 'partition', 'fingerprint': 'partition', 'is_prefix': False }
Running Asynchronous Connections
To run asynchronous connections, we create a separate function called get_message that uses the asyncio library. In this function, we use the asyncio.sleep function to simulate a delay of 10 seconds before returning the message 'It worked'.
async def get_message(): await asyncio.sleep(10) return 'It worked'
We then call the get_message function from the read_bytes function and use the json.dumps function to convert the returned message to a JSON object.
def read_bytes(context: dict, metadata: dict): message = asyncio.run(get_message()) yield json.dumps({"col0": message})
Full Example
import json
import asyncio
def context(credentials: str):
pass
def list_objects(context: dict, metadata: dict):
yield {
'name': 'partition',
'fingerprint': 'partition',
'is_prefix': False
}
async def get_message():
await asyncio.sleep(10)
return 'It worked'
def read_bytes(context: dict, metadata: dict):
message = asyncio.run(get_message())
yield json.dumps({"col0": message})
Conclusion
In conclusion, running asynchronous connections in Ascend.io is straightforward using the asyncio library in Python. By creating a separate function for asynchronous connections and calling it synchronously from the read_bytes function, we can easily incorporate asynchronous connections into our data processing pipeline.
Comments
0 comments
Please sign in to leave a comment.