Skip to content
Contributing to the Task Library
Share
Explore

icon picker
Design your task

Determine the best python client, if there is one, for the service you are exposing

Most of the third party services turned into task library tasks use that service's existing python client. For example, the Postgres tasks use psycopg, the Snowflake tasks use snowflake, the Docker tasks use docker, the Twitter tasks use tweepy. You'll need to determine what the python client is for the tasks you are trying to implement. Depending on what the python client exposes in terms of functionality, this also may start to define what tasks you can easily write.
It's possible there are multiple python clients, or that there are NO python clients for the service you are trying to connect.

Choosing the right client

If there are multiple possible python clients, it's time to do a little research on which is the best, using your judgement regarding ease of use, how well supported it is (by the service itself or the open source community), how feature complete it is, and if it has any unnecessary dependencies.

Adding the client to prefect


It's probably the case that the client you found isn't a dependency of Prefect yet, but if you are going to use it in a task library task, other users will need to install it to use your task.
To make sure new users install it as a dependency, we need to add it to setup.py. It is best practice to move any new imports that are not dependencies of boring old vanilla Prefect core into the "extras" section in setup.py , so that they are not installed unless a user specifically installs them. Take a look at Prefect's existing "extras" section to see where to put them ( ).
Specify at least a lower bound for your dependency in setup.py ; a quick rule of thumb is to set the lower bound to the major and minor version you tested on, though testing backwards to at least the first major release version is appreciated.

What if there is no suitable client?

If there is no python client at all or you don't want to use it for some reason, depending on the service ー for example if it has a REST API ー you may be able to write a task that interacts with it using other python libraries, like requests. A few examples we have of tasks doing just that are:

Determine task granularity

Think about how you would normally use the service you are converting into tasks. Are there separate small tasks that might happen independently or in different combinations? Is there one main unit of work you want to represent all as one single task? Prefect generally encourages tasks embody the smallest unit of work possible, but that isn't always the smartest route depending on tasks, so take some time to think critically about this here.
Even if there are small tasks that expose individual pieces of the API, there may still be opportunity to create a "capstone" task or tasks that do several common granular operations together in a row. These also can be fun to design and develop, so don't leave them out of your consideration!
Overall, the goal should be easiest re-use for other users. Don't stress about exhaustively contributing all the possible ways to access this API or all the possible use cases, but considering anything you choose to tackle with the knowledge that other users will want to use, configure, and possibly contribute into it later too should be a priority during task design!
Once you have a shortlist of tasks to tackle, pick one to get started with. You can always change your mind later!

Determine task's run method signature

All tasks subclass from prefect.core.Task and must implement a run method, which does the actual work of the task.
For example, the run method signature for the CreateBranchTask in the Github tasks looks like this:
def run(
self,
repo: str = None,
base: str = None,
branch_name: str = None,
token: str = None,
) -> dict:
This run method accepts arguments that expose the configuration elements that are specific to this task; the repo string, base of the branch, the branch name, and the token needed to authenticate to the Github API.
Usually most of these really just match the configuration possible on the python client you are wrapping or the REST API endpoint you are exposing, so at its most basic this step may entail you comparing the signature of the client method to the desired signature of your task's run method.
While we're at it, now is the time to decide if there is anything special about what the return value of this task is. If the task produces some piece of data that is likely useful to a downstream user, go ahead and return it. In our above example, the task returns the metadata about the branch that was created. Consider the downstream tasks and users when deciding what to return ー for example, this task goes ahead and converts the JSON response from the API into a Python dictionary using json.loads before returning, so that the downstream task doesn't have to parse it. This is always a tradeoff between overloading your task with unrelated wrangling work and prioritizing ease of use for downstream users, so use your judgement, but in general we advise to make the upstream data as easy to use in Python as possible before returning it.

Determine tasks's init method signature

Tasks usually also expose configuration in the task subclass' init method. Generally they are the same arguments you decided on for the run method, so in the most basic case you can transfer that over to your task's init method. But why? And when are they not exactly the same as the run method arguments?
Exposing arguments to the run method is great for one-off configuration, but some common types of configuration are better evaluated when the task is first initialized. For example, consider the following two cases:
from tasks.prefect.github.repos import CreateBranch
task = CreateBranch()

with Flow('my flow') as f:
task(repo='prefecthq/prefect', branch='issue-1-make-prefect')
task(repo='prefecthq/prefect', branch='issue-2-make-it-better')
task(repo='prefecthq/prefect', branch='issue-3-add-task-to-task-library')
Compare that to:
from tasks.prefect.github.repors import CreateBranch
task = MyTask(repo='prefecthq/prefect')

with Flow('my flow') as f:
task(branch='issue-1-make-prefect')
task(branch='issue-2-make-it-better')
task(branch='issue-3-add-task-to-task-library')
In the first example the user had to provide reused=thing over and over even though it was the same for every instance of that task; it is easier for them if they can pass some of the configuration at initialization that will be common to all instances of that task, and some of it when they add the individually parameterized versions of that task to the flow.
It's important to note that users initialize instances of task classes outside of flow runs, so anything in your init method cannot be tied to the runtime environment of the task. For example, do not depend on a shared memory space for open connections or clients, threads, filepaths, etc during your task's init.
You can selectively choose what configuration should be set at initialization - and add those kwargs only to the init method - and what configuration should be set when a task is added to a flow - and add those kwargs only to the run method. But if there isn't an obvious delineation or there is a possibility of other use cases that configure at different times, it can be hard to make that call. In practice, many tasks in the task library expose the same kwargs in the init method and the run method so the user can choose when they want to configure it themselves based on their situation. If you choose to do this, to support users who opt to mix and match configuration across these kwargs, there is one extra step you have to add - the prefect.tasks.utilities.defaults_from_attrs decorator.
For example, here's a skeleton version back to our CreateBranch task:
class CreateBranch(Task):
def __init__(
self,
repo: str = None,
base: str = "master",
branch_name: str = None,
token_secret: str = None,
**kwargs: Any
):
self.repo = repo
self.base = base
self.branch_name = branch_name
self.token_secret = token_secret
super().__init__(**kwargs)

@defaults_from_attrs("repo", "base", "branch_name") def run(
self,
repo: str = None,
base: str = None,
branch_name: str = None,
token: str = None,
) -> dict:
# task code here
Note that repo, branch_name, and token all default to None in both the task init and run methods. If the user sets the repo='prefecthq/prefect' at init, looking at the run signature the repo will be overwritten to None unless you set the task code to fallback on self.repo , which is a lot of boilerplate to add task authors to add. Instead, use the @default_from_attrs decorator on the run method, as shown above, which will inject the named attributes from self to the run method at runtime if none were provided by the user.
By providing all these configuration options, we make the task more flexible for the user; but we also make it possible for them to accidentally make a mistake. Since all arguments are optional at all times, it's possible the user will not pass a necessary argument ー such as repo ー to init OR the task in the flow by accident, and they will get a runtime error downstream when the task code actually tries to query a repo with the value None. To defensively code against this, it is conventional to throw a ValueError at runtime if those values remain None to give a heads up to the user that they have misconfigured their task at some level. You can see an example of that for our CreateBranch task here: .
In summary, it's likely you will copy the configuration arguments you decided on for the run method into your task's init method, and add the @default_from_attrs decorator to your run method to save on boilerplate. Stop and consider if any of those arguments depend on local environment, since those probably are the only cases that don't belong in the init signature.

Consider your client again; including authentication

Probably while you were exposing configuration to your task's init and run methods, something necessary to authenticate to the third party service your task is integrating with ー maybe a username and password, maybe a token or API key ー came up.
Across the task library, and even in people's individual task code, there are a few different patterns we see. Tasks might
bake their credentials hardcoded into their task - this is the least secure, but is often useful during initial testing before committing anything to git
assume the user has environment variables specific to the service that you care about, and use a client API that will look those up. For example, the boto3 client's eventually defaults to looking up environment variables following specific conventional names: AWS_SECRET_ACCESS_KEY and AWS_ACCESS_KEY_ID.
find the credentials from a new, bespoke section in the Prefect config file (e.g. through a call to prefect.context.get('your_key_name') or a call to an arbitrary environment variable name you set with os.getenv or some other generic Python way to find configuration on the host. These usually aren't following a specific standard so can be a bit confusing to new users
use to grab the credentials under a chosen generic name, either by assuming a user will use a SecretTask before calling your task library task or by embedding a call to prefect.client.secrets.get during the task library's code. This is the most 'Prefect'y way to grab credentials, which lets Prefect raise errors early if the secrets are not found and works seamlessly with the secret store offers to users of Prefect Cloud whenever/if ever they wish to upgrade.

Implementing authentication using style number 4, while letting the client library fall back on style number 2 if necessary, is the preferred style for Prefect task library tasks to handle authentication. Research how your particular python library you are using is handling authentication and if it falls back natively to style number 2.
While you are doing this, you may decide to refactor out the actual client instantiation to a helper function that steps through the various types of fallbacks you want to expose, especially if you are going to reuse it for multiple tasks.
To walk through an example, here is how the boto client is instantiated for AWS tasks:
the tasks themselves call a helper function, optionally passing credentials that were received via method #4
the helper function actually generates the boto client
using the credentials passed if they exist
falling back to method #3 with a bespoke AWS_CREDENTIALS secret that Prefect thinks you might have (we made a special case for this since AWS is so common)
then finally falling back to method #2, where the boto client will do its own lookup if credentials passed to it are None

Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.