-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathindex.js
More file actions
81 lines (69 loc) · 3.19 KB
/
index.js
File metadata and controls
81 lines (69 loc) · 3.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
var Main = Packages.org.apache.camel.main.Main;
var RouteBuilder = Packages.org.apache.camel.builder.RouteBuilder;
var BasicDataSource = Packages.org.apache.commons.dbcp2.BasicDataSource;
var IOException = Packages.java.io.IOException;
var Properties = Packages.java.util.Properties;
var LoggerFactory = Packages.org.slf4j.LoggerFactory;
var Files = Packages.java.nio.file.Files;
var Paths = Packages.java.nio.file.Paths;
var Integer = Packages.java.lang.Integer;
var LOG = LoggerFactory.getLogger('main');
var twitterRouteBuilder = Java.extend(RouteBuilder);
var route = new twitterRouteBuilder() {
configure: function() {
var dsl = Java.super(route);
// Create route to store tweets in database
dsl.from('seda:tweet2db?multipleConsumers=true&concurrentConsumers='+dsl.getContext().getRegistry().lookupByName('db.pool'))
.setHeader('id', dsl.simple('${body.id}'))
.setHeader('status', dsl.simple('${body.text}'))
.setHeader('created', dsl.simple('${body.createdAt.time}'))
.setHeader('screenname', dsl.simple('${body.user.screenName}'))
.setBody(dsl.constant('INSERT INTO tweets (id, status, created, screenname) VALUES (:?id, :?status, :?created, :?screenname)'))
.to('jdbc:lykely?useHeadersAsParameters=true');
// Create route to log every 10th tweet
dsl.from('seda:tweet2log?multipleConsumers=true&concurrentConsumers=5')
.sample(10).log('${body.text}');
// Build the twitter streaming URL
var baseUrl = 'twitter://streaming/filter?type=event&lang=en&consumerKey='+dsl.getContext().getRegistry().lookupByName('api.key')+'&';
baseUrl += 'consumerSecret='+dsl.getContext().getRegistry().lookupByName('api.secret')+'&';
baseUrl += 'accessToken='+dsl.getContext().getRegistry().lookupByName('access.token')+'&';
baseUrl += 'accessTokenSecret='+dsl.getContext().getRegistry().lookupByName('access.secret')+'&';
baseUrl += 'keywords=#grammys';
// Create route to pull tweets from event based stream
dsl.from(baseUrl)
.filter(dsl.simple('${body.isRetweet()} == false')) // Filter out retweets
.multicast().to('seda:tweet2db', 'seda:tweet2log'); // Multicast tweets to DB and logs
}
};
var props = new Properties();
try {
// Load the twitter.properties config
var props = new Properties();
var path = Paths.get('twitter.properties');
var fis = Files.newInputStream(path);
props.load(fis);
// Build a connection pooling datasource
var ds = new BasicDataSource();
ds.setDriverClassName("org.postgresql.Driver");
ds.setUrl(props.getProperty('db.url'));
ds.setUsername(props.getProperty('db.user'));
ds.setPassword(props.getProperty('db.pass'));
try {
ds.setMaxTotal(Integer.parseInt(props.getProperty('db.pool')));
} catch (nfe) {
LOG.error('Unable to parse integer: '+nfe.message+' - '+props.getProperty('db.pool'));
ds.setMaxTotal(20);
}
// Initialize the Camel Context and bind the configuration
var main = new Main();
main.bind('lykely', ds);
props.entrySet().forEach(function (e) {
main.bind(e.getKey(), e.getValue());
});
// Attach the routes to the CamelContext
main.addRouteBuilder(route);
// Start the Camel Context
main.run();
} catch (err) {
LOG.error(err.message);
}