Want to simplify the creation and management of AI workflows? CrewAI flows offer structured patterns for orchestrating AI agent interactions. They enable developers to effectively combine coding tasks and Crews, offering a powerful framework for developing AI automation. With Agentic Flows in CrewAI, you can design structured, event-driven workflows that streamline task coordination, manage state, and control execution within your AI applications.
What are Crews?
Crews from crewAI enable the orchestration of AI agents for task automation. It facilitates seamless collaboration between agents to solve complex problems. So why “flows” you ask? CrewAI flows are structured patterns for orchestrating AI agent interactions. They define how agents collaborate and communicate to achieve tasks. These Flows consist of a sequence of tasks where the output of one task can trigger the next, and the system provides flexible mechanisms for state management and conditional execution control.
What are Flows?
Flows are event-driven, meaning that they react to specific triggers or conditions. You can design workflows that dynamically adjust based on the execution results from different tasks, enabling seamless execution of complex AI processes.
Workflow Control
In CrewAI Flows, developers can structure the flow of tasks and control how information passes between them. Tasks can be chained together, creating a logical flow of operations. This structure enables developers to define a sequence of operations where certain tasks are executed conditionally based on the output of previous tasks.
State Management
We can use Structured State Management, which uses a predefined schema, typically through Pydantic’s BaseModel, to ensure that the data passed between tasks follows a specific structure. It provides the benefits of type safety, validation, and easier management of complex data states.
Input Flexibility
Flows can accept inputs to initialize or update their state at any point in the execution. Depending on the workflow’s requirements, these inputs can be provided at the beginning, during, or after execution.
Event-Driven Architecture
CrewAI Flows allow workflows to adjust based on the results from different tasks dynamically. Tasks can listen for outputs from previous steps, enabling a reactive system where new tasks can be triggered based on the outputs of earlier ones. The decorators @listen() and @router() enable this level of flexibility, allowing developers to link tasks conditionally and dynamically based on their results. Remember that the @start() decorator is used to mark the starting point of a Flow.
Decorators and Conditional Logic | Description |
@listen() | Creates listener methods triggered by specific events or task outputs. |
@router() | Enables conditional routing within the flow, allowing different execution paths based on the outputs of prior steps. Useful for managing success or failure outcomes. |
or_ | Triggers a listener when any of the specified methods emit an output. Ideal for proceeding after any task completion. |
and_ | Ensures a listener is triggered only when all specified methods emit outputs. Useful for waiting until multiple conditions or tasks are completed before proceeding. |
Task Routing
Flows also enable the use of routing to control how execution proceeds based on specific conditions. The @router() decorator facilitates this by allowing methods to choose their execution path based on the results of prior tasks. For example, a method might check the output of a previous task and decide whether to proceed along one route or another, depending on whether certain conditions are met.
Also read: Building Collaborative AI Agents With CrewAI
Flows in Action
Let’s build an agentic system using flows from CrewAI that recommends movies based on genre. This will be a simple agentic system that will help us understand what is working behind it.
Installations
!pip install crewai -U
!pip install crewai-tools
Warning control
import warnings
warnings.filterwarnings('ignore')
Load environment variables
Go to Serper and get your Serper api-key for Google search. We’ll be using the 4o-mini model from OpenAI.
import os
os.environ["OPENAI_API_KEY"] = ‘’
os.environ['OPENAI_MODEL_NAME'] = 'gpt-4o-mini-2024-07-18'
os.environ["SERPER_API_KEY"]=''
Import necessary modules
from crewai import Agent, Task, Crew
from crewai.flow.flow import listen, start, and_, or_, router
from crewai_tools import SerperDevTool
from crewai import Flow
from pydantic import BaseModel
Define agent
To keep things simple, we’ll define a single agent that all the tasks will use. This agent will have a google search tool that all the tasks can use.
movie_agent = Agent(
role="Recommend popular movie specific to the genre",
goal="Provide a list of movies based on user preferences",
backstory="You are a cinephile, "
"you recommend good movies to your friends, "
"the movies should be of the same genre",
tools=[SerperDevTool()],
verbose=True
)
Define tasks
action_task = Task(
name="ActionTask",
description="Recommends a popular action movie",
expected_output="A list of 10 popular movies",
agent=movie_agent
)
comedy_task = Task(
name="ComedyTask",
description="Recommends a popular comedy movie",
expected_output="A list of 10 popular movies",
agent=movie_agent
)
drama_task = Task(
name="DramaTask",
description="Recommends a popular drama movie",
expected_output="A list of 10 popular movies",
agent=movie_agent
)
sci_fi_task = Task(
name="SciFiTask",
description="Recommends a sci-fi movie",
expected_output="A list of 10 popular movies",
agent=movie_agent
)
Define crews for each genre
action_crew = Crew(
agents=[movie_agent],
tasks=[action_task],
verbose=True,
)
comedy_crew = Crew(
agents=[movie_agent],
tasks=[comedy_task],
verbose=True
)
drama_crew = Crew(
agents=[movie_agent],
tasks=[drama_task],
verbose=True
)
sci_fi_crew = Crew(
agents=[movie_agent],
tasks=[sci_fi_task],
verbose=True
)
Define genres and GenreState
GENRES = ["action", "comedy", "drama", "sci-fi"]
class GenreState(BaseModel):
genre: str = ""
Define MovieRecommendationFlow
We define a class inheriting from the Flow class and we can optionally use state functionality to add or modify state attributes, here we have already defined GenreState with genre attribute of type string. (Notice that our pydantic model “GenreState” is passed in square brackets)
class MovieRecommendationFlow(Flow[GenreState]):
@start()
def input_genre(self):
genre = input("Enter a genre: ")
print(f"Genre input received: {genre}")
self.state.genre = genre
return genre
@router(input_genre)
def route_to_crew(self):
genre = self.state.genre
if genre not in GENRES:
raise ValueError(f"Invalid genre: {genre}")
if genre == "action":
return "action"
elif genre == "comedy":
return "comedy"
elif genre == "drama":
return "drama"
elif genre == "sci-fi":
return "sci-fi"
@listen("action")
def action_movies(self, genre):
recommendations = action_crew.kickoff()
return recommendations
@listen("comedy")
def comedy_movies(self, genre):
recommendations = comedy_crew.kickoff()
return recommendations
@listen("drama")
def drama_movies(self, genre):
recommendations = drama_crew.kickoff()
return recommendations
@listen("sci-fi")
def sci_fi_movies(self, genre):
recommendations = sci_fi_crew.kickoff()
return recommendations
@listen(or_("action_movies", "comedy_movies", "drama_movies", "sci_fi_movies"))
def finalize_recommendation(self, recommendations):
print("Final movie recommendations:")
return recommendations
We should specify the flow of execution by @listen decorator by passing the previous method and the output of the previous method can be passed as an argument in the next method. (Note: The flow should start with the @start decorator)
The router is being used here to decide which method should execute based on the output of the method inside the @router decorator.
The or_ specifies that the method should execute as soon as any of the methods in the or_( ) method returns an output.
(Note: use and_( ) if you want the method to wait until all outputs are received).
Plot the flow
flow = MovieRecommendationFlow()
flow.plot()
Display the flow diagram
from IPython.core.display import display, HTML
with open('/content/crewai_flow.html', 'r') as f:
html_content = f.read()
display(HTML(html_content))
This diagram should give you a better understanding of what’s happening. Based on the “input genre,” only one of the four tasks is executed, and when one of them returns an output, the “Finalize Recommendation” gets triggered.
Kickoff the flow
recommendations = await flow.kickoff_async()
I entered sci-fi as input when asked.
After the entire execution of the Flow, this is the output I got for the sci-fi movie recommendations I wanted.
Also read: CrewAI Multi-Agent System for Writing Article from YouTube Videos
Conclusion
In this article, we’ve explored how CrewAI’s event-driven workflows simplify the creation and management of AI task orchestration. By leveraging structured flows, developers can design efficient, dynamic AI systems that enable seamless coordination between tasks and agents. With powerful tools like the @listen(), @router(), and state management mechanisms, CrewAI allows for flexible and adaptive workflows. The hands-on example of a movie recommendation system highlights how Agentic Flows in CrewAI can be effectively used to tailor AI applications, with tasks reacting to user inputs and dynamically adjusting to conditions.
Also, if you are looking for an AI Agent course online then, explore: Agentic AI Pioneer Program.
Frequently Asked Question
Ans. Use the kickoff() method with an inputs parameter: flow.kickoff(inputs={“counter”: 10}). Here “counter” can be a variable used inside task or agent definitions.
Ans. @start() marks methods as flow starting points that run in parallel. @listen() marks methods that execute when specified tasks complete.
Ans. Either use flow.plot() or run crewai flow plot command to generate an interactive HTML visualization.
Ans. Yes, crewAI Flows support human-in-the-loop feedback.