đź§ General idea
A workflow is a sequence of steps (WorkflowStep) with well‑defined inputs/outputs.
Defined in apps/workflows/collections/*.py.
Steps use WorkflowStepDataReference:
Workflow.input("field")— take a value from workflow inputstep.output("field")— take a value from step output.map(...)— transform a list.merge(...)— merge data by key
📚 Workflow list
- active_subscribers
- source_param_subscribers
- channel_reports
- campaign_reports_delivery
- duplicate_pools
- update_campaign
- process_subscribers
- firebase_reports
- firebase_subscribers_deliveries
- reactivating_unsubscribed_users
- campaign_planner
- campaign_runner
- unsubscription_spike_monitoring
- subscription_drop_monitoring
- SlackNotificationWorkflow
- start_new_workflow (internal)
âž• How to add a new workflow
-
Create a collection
apps/workflows/collections/<new_workflow>.py
DefineBaseWorkflow,name,input_model, steps viaadd_step(...). -
Define functions
Add classes toapps/workflows/functions/
Each function must havename,service,specs. -
Register the workflow
Add it toWorkflowsStock._datainapps/workflows/collections/stock.py. -
Allow start via API
Add it to theworkflowslist inapps/api/v1/workflows/serializers.py(meta‑classWorkflowSerializerFieldsMeta). -
LOCAL functions
Ifservice = LOCAL, add the class toLocalTasksConsumer._get_function_class()(apps/workflows/consumer.py). -
Celery / scheduling (if needed)
- create
apps/workflows/tasks/collections/<task>.py - add a task in
tasks/*.py - add schedule to
CELERY_BEAT_SCHEDULE
- create
-
Tests
Add tests intests/apps/workflow/and/ortests/api/workflow/.