kafka-event-processor.processor.system
new-system
(new-system configuration-overrides {:keys [kafka database processor-identifier configuration-prefix additional-dependencies processing-enabled kafka-consumer-group-configuration kafka-consumer-group processor-configuration processor rewind-check event-handler], :or {kafka :kafka, database :database, processor-identifier :main, configuration-prefix :service, additional-dependencies {}}})
Creates a new kafka event processor.
Does nothing if processing is not enabled.
- Processor identifier can be specified (defaults to :main).
- Configuration prefix can be specified (defaults to :service).
All system map keys can be overridden or they default where applicable:
- kafka: kafka (mandatory)
- database: database (mandatory)
- event-handler: {processor-identifier}-event-handler (mandatory)
- processing-enabled: {processor-identifier}-processing-enabled?
- kafka-consumer-group-configuration: kafka-{processor-identifier}-consumer-group-configuration
- kafka-consumer-group: kafka-{processor-identifier}-consumer-group
- processor-configuration: {processor-identifier}-processor-configuration
- processor: {processor-identifier}-processor
Optionally provide a system map key for rewind-check
Optional provide a map of system keys that are used as additional dependencies to the component
e.g.
(processors/new-system
configuration-overrides
{:processor-identifier :main
:kafka :kafka
:database :database
:event-handler :event-handler
:additional-dependencies {:atom :atom}})
Nothing is done with the event if an event-handler is not defined.
(deftype AtomEventHandler
[atom]
EventHandler
(extract-payload
[this event]
(-> event
(hal-json/json->resource)
(hal/get-property :payload)
(hal-json/json->resource)))
(processable?
[this processor event event-context]
true)
(on-event
[this processor {:keys [topic payload]} _]
(vent/react-to all {:channel topic :payload payload} processor))
(on-complete
[this processor {:keys [topic partition payload]} {:keys [event-processor]}]
(swap! atom conj {:processor event-processor
:topic topic
:partition partition
:event-id (event-resource->id payload)})))