Skip to content

Advanced Agent Frameworks: AutoGPT, BabyAGI, CrewAI

class AutonomousAgent:
def __init__(self, objective):
self.objective = objective
self.memory = []
self.completed_tasks = []
def run(self, max_iterations=10):
for i in range(max_iterations):
# Create task list
tasks = self.create_tasks()
# Execute tasks
for task in tasks:
result = self.execute_task(task)
self.memory.append({'task': task, 'result': result})
# Check if objective achieved
if self.is_objective_complete():
break
def create_tasks(self):
prompt = f"""
Objective: {self.objective}
Completed: {self.completed_tasks}
What are the next 3 tasks?
"""
response = llm.complete(prompt)
return parse_tasks(response)
from crewai import Agent, Task, Crew
# Define specialized agents
researcher = Agent(
role='Researcher',
goal='Find relevant information',
backstory='Expert at finding and analyzing information',
tools=[search_tool, scrape_tool]
)
writer = Agent(
role='Writer',
goal='Create compelling content',
backstory='Professional content writer',
tools=[grammar_check]
)
# Define tasks
research_task = Task(
description='Research topic: {topic}',
agent=researcher
)
write_task = Task(
description='Write article based on research',
agent=writer
)
# Create crew
crew = Crew(
agents=[researcher, writer],
tasks=[research_task, write_task],
verbose=True
)
# Execute
result = crew.kickoff(inputs={'topic': 'AI agents'})
class AgentMessage:
def __init__(self, sender, receiver, content, message_type):
self.sender = sender
self.receiver = receiver
self.content = content
self.type = message_type
class AgentCoordinator:
def __init__(self):
self.agents = {}
self.message_queue = []
def register_agent(self, agent):
self.agents[agent.name] = agent
def send_message(self, message):
self.message_queue.append(message)
def process_messages(self):
while self.message_queue:
msg = self.message_queue.pop(0)
receiver = self.agents[msg.receiver]
receiver.receive_message(msg)

Found an issue? Open an issue!