kafka-event-processor.processor.system

new-system

(new-system configuration-overrides {:keys [kafka database processor-identifier configuration-prefix additional-dependencies processing-enabled kafka-consumer-group-configuration kafka-consumer-group processor-configuration processor rewind-check event-handler], :or {kafka :kafka, database :database, processor-identifier :main, configuration-prefix :service, additional-dependencies {}}})

Creates a new kafka event processor.

Does nothing if processing is not enabled.

  • Processor identifier can be specified (defaults to :main).
  • Configuration prefix can be specified (defaults to :service).

All system map keys can be overridden or they default where applicable:

  • kafka: kafka (mandatory)
  • database: database (mandatory)
  • event-handler: {processor-identifier}-event-handler (mandatory)
  • processing-enabled: {processor-identifier}-processing-enabled?
  • kafka-consumer-group-configuration: kafka-{processor-identifier}-consumer-group-configuration
  • kafka-consumer-group: kafka-{processor-identifier}-consumer-group
  • processor-configuration: {processor-identifier}-processor-configuration
  • processor: {processor-identifier}-processor

Optionally provide a system map key for rewind-check

Optional provide a map of system keys that are used as additional dependencies to the component

e.g.

(processors/new-system configuration-overrides {:processor-identifier :main :kafka :kafka :database :database :event-handler :event-handler :additional-dependencies {:atom :atom}})

Nothing is done with the event if an event-handler is not defined.

(deftype AtomEventHandler [atom] EventHandler (extract-payload [this event] (-> event (hal-json/json->resource) (hal/get-property :payload) (hal-json/json->resource))) (processable? [this processor event event-context] true) (on-event [this processor {:keys [topic payload]} _] (vent/react-to all {:channel topic :payload payload} processor)) (on-complete [this processor {:keys [topic partition payload]} {:keys [event-processor]}] (swap! atom conj {:processor event-processor :topic topic :partition partition :event-id (event-resource->id payload)})))