forked from fdesjardins/bunyan-postgres-stream
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathindex.js
More file actions
85 lines (76 loc) · 2.2 KB
/
index.js
File metadata and controls
85 lines (76 loc) · 2.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
const Writable = require('stream').Writable
const pg = require('pg')
class LogStream extends Writable {
constructor (options) {
super(options)
if (options.connection === undefined || options.tableName === undefined) {
throw new Error('Invalid bunyan-postgres stream configuration')
}
// This if statement is rather fragile. If you are getting a "_write() not implemented"
// error, this is probably the problem. Make sure that this condition is true if the
// connection is a knex connnection.
if (options.connection.client && options.connection.name === 'knex') {
this.knex = options.connection
this._write = this._writeKnex
}
if (typeof options.connection === 'object') {
this.connection = options.connection
this._write = this._writePgPool
this.pool = new pg.Pool(this.connection)
this.on('finish', () => {
if (this.pool) {
return this.pool.end()
}
})
}
this.tableName = options.tableName
}
_writeKnex (chunk, enc, cb) {
const content = JSON.parse(chunk.toString())
this.knex
.insert({
name: content.name,
level: content.level,
hostname: content.hostname,
msg: content.msg,
pid: content.pid,
time: content.time,
content: JSON.stringify(content)
})
.into(this.tableName)
.asCallback(cb)
}
writePgPool (client, content) {
return client.query({
text: `insert into ${
this.tableName
} (name, level, hostname, msg, pid, time, content) values ($1, $2, $3, $4, $5, $6, $7);`,
values: [
content.name,
content.level,
content.hostname,
content.msg,
content.pid,
content.time,
JSON.stringify(content)
.split("'")
.join("''")
]
})
}
_writePgPool (chunk, env, cb) {
const content = JSON.parse(chunk.toString())
this.pool
.connect()
.then(client => {
return this.writePgPool(client, content).then(result => {
cb(null, result.rows)
client.release()
})
})
.catch(err => cb(err))
}
}
module.exports = (options = {}) => {
return new LogStream(options)
}