Skip to content
/ eip Public

Aggregator Integration Pattern for node.js

Notifications You must be signed in to change notification settings

Workable/eip

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

160 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

eip

Enterprise Integration Patterns for javascript.

This repo is based on jkanschik/node-eip implements various eip patterns for javascript and mainly enhances the aggregator functionality.

Installation

npm install --save eip

Usage

Create a Route

const eip = require('eip');
const route = new eip.Route('Route-0', {route: {retryLimit: 3, retryDelay: 1000}, isErrorRoute:false}, [])

Add a processor to the Route

route.process(event => event * 2).info()

Inject events to Route

route.inject(1);
// output: [2017-03-10 17:04:12.636] [INFO] [aggregator-eip] - [Route-0(info-1)] 2
route.inject(2);
// output:  [2017-03-10 17:04:13.082] [INFO] [aggregator-eip] - [Route-0(info-1)] 4

Builtin Processors

aggregate({ timeout = [1000], maxTimes = 3 }) // or
aggregate({ timer= new Timer(), store = new Store(), strategy = new AggregationStrategy() })
// Timer, Store and AggregationStrategy are abstract classes exported by eip that can be extended.
// see eip-mongo and eip-rabbit for implementations of Timer on top of rabbitmq and Store on top of mongo

dispatch(route1, route2)
filter(event => event > 5); // or
filter(event => doAsyncStuff(event).then(result => result > 5)) // filter can accept conditions tha return a promise
process(event => [event, event]) // custom processor that transforms the event
process(event => doAsyncStuff(event)) // transform event returning a promise
throttle(10, 1000) // 10 events per second (1000 ms)
throttleResource({
  eventsPerPeriod:1,
  periodInMs:1000
})

  //or
throttleResource({
  timer: new Timer(),
  resource: x => x,
  pubSub: new PubSub(),
  queue: new Queue()
})
// Timer, PubSub and Queue are abstract classes exported by eip that can be extended.
// see eip-redis and eip-rabbit for implementations of Timer on top of rabbitmq and pubSub and queue on top of redis

//logging
trace()
debug()
info()
warn()
error()
fatal()

// all logger scan transform the logging msg using a callback
// eg:
event => 'message to be logged:' + event;

Aggregator example

const aggregator = new eip.Route().aggregate({ timeout: [1000], maxTimes: 3 }).info();

aggregator.inject({
  headers: {
    id: 'the id to aggregate data',
    param: 'one'
  },
  body:'one'});

aggregator.inject({
  headers: {
    id: 'the id to aggregate data',
    param: 'two'
  },
  body:'two'});

aggregator.inject({
  headers: {
    id: 'the id to aggregate data',
    anotherParam: 'other'
  },
  body:'three'});


// it will be aggregated and log to output:
// [2017-03-10 16:32:07.605] [INFO] [aggregator-eip] - [Route-1(info-1)] {"body":["one","two","three"],"headers":{"status":"COMPLETED","id":"the id to aggregate data","param":"two","anotherParam":"other","aggregationNum":1,"timeoutNum":0,"previousStatus":"INITIAL"}}
// [2017-03-10 16:32:08.606] [DEBUG] [aggregator-eip] - [Route-1(aggregate-0)] [timeout-1] [1] Already completed


// we could have formatted the output using:
aggregator.info(aggregated => JSON.stringify(aggregated, null, 2));
/*
[2017-03-10 16:38:05.484] [INFO] [aggregator-eip] - [Route-1(info-2)] {
  "body": [
    "one",
    "two",
    "three"
  ],
  "headers": {
    "status": "COMPLETED",
    "id": "the id to aggregate data",
    "param": "two",
    "anotherParam": "other",
    "aggregationNum": 1,
    "timeoutNum": 0,
    "previousStatus": "INITIAL"
  }
}
[2017-03-10 16:38:06.152] [DEBUG] [aggregator-eip] - [Route-1(aggregate-0)] [timeout-1] [the id to aggregate data] Already completed
*/

License

MIT

About

Aggregator Integration Pattern for node.js

Resources

Stars

Watchers

Forks

Packages

No packages published

Contributors 4

  •  
  •  
  •  
  •