Pushpin Notes

Intro

Install

1
2
3
4
5
6
7
8
9
10
11
12
docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 fanout/pushpin:1.28.0

docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ fanout/pushpin:1.28.0

docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ fanout/pushpin:1.31.0

docker run -d --name pushpin -p 4430:4430 -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ -v /Users/shankai/Desktop/pushpin-certs:/usr/lib/pushpin/runner/certs fanout/pushpin:1.31.0

docker run -d --name pushpin -p 4430:4430 -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/archive/cbic/icos-fdns/icosEvent/pushpinx/pushpin-config:/etc/pushpin/ -v /Users/shankai/archive/cbic/icos-fdns/icosEvent/pushpinx/pushpin-certs:/usr/lib/pushpin/runner/certs fanout/pushpin:1.31.0


docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ fanout/pushpin:1.32.2

Port

  • 7999
  • 5560
  • 5561
  • 5562

Configuration

  • conf: /etc/pushpin/pushpin.conf

  • routes: /etc/pushpin/routes

    1
    2
    * 192.168.130.40:8099
    *,proto=ws 192.168.130.40:8098,over_http

Mock Backend [test]

  • subscribe
1
curl http://localhost:7999/stream
  • publish

cli:

1
pushpin-publish test "hello there"

restful:

1
2
3
curl -d '{ "items": [ { "channel": "test", "formats": {
"http-stream": { "content": "hello there\n" } } } ] }' \
http://localhost:5561/publish/

curl -d ‘{ “items”: [ { “channel”: “test”, “formats”: {
“http-stream”: { “content”: “hello there\n” } } } ] }’
http://localhost:5561/publish/

Backend [myChannel]

  • http ( * 192.168.130.40:8099)
1
2
3
4
5
6
7
8
9
10
11
12
13
var http = require('http');
http.createServer(function (req, res) {

var headers = req.headers;
var channel = headers['channel'] || 'myChannel';
res.writeHead(200, {
'Content-Type': 'text/plain',
'Grip-Hold': 'stream',
'Grip-Channel': channel
});
console.log(headers);
res.end('Stream opened, prepare yourself!\n');
}).listen(8099, '0.0.0.0');
  • over_http ( *,proto=ws 192.168.130.40:8098,over_http )
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// npm install --save grip
var grip = require('grip');
var http = require('http');

http.createServer(function (req, res) {
res.writeHead(200, {
'Sec-WebSocket-Extensions': 'grip',
'Content-Type': 'application/websocket-events'
});

var body = '';
req.on('data', function (chunk) {
body += chunk;
});

req.on('end', function() {
var inEvents = grip.decodeWebSocketEvents(body);
var outEvents = [];
if (inEvents[0].getType() == 'OPEN') {
outEvents.push(new grip.WebSocketEvent('OPEN'));
outEvents.push(new grip.WebSocketEvent('TEXT',
'c:' + grip.webSocketControlMessage(
'subscribe',
{'channel': 'mychannel'})));
}

res.end(grip.encodeWebSocketEvents(outEvents));
});
}).listen(8098, '0.0.0.0');
  • zhttp.py

Routes: * zhttpreq/tcp://127.0.0.1:10000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import zmq
import tnetstring

zmq_context = zmq.Context()
sock = zmq_context.socket(zmq.REP)
sock.connect('tcp://127.0.0.1:10000')

while True:
req = tnetstring.loads(sock.recv()[1:])
print(req['headers'])

resp = {
'id': req['id'],
'code': 200,
'reason': 'OK',
'headers': [
['Grip-Hold', 'stream'],
['Grip-Channel', 'test'],
['Content-Type', 'text/plain']
],
'body': 'welcome to the stream\n'
}

sock.send('T' + tnetstring.dumps(resp))

ZMQ

  • XPUB.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import zmq

zmq_context = zmq.Context.instance()
sock = zmq_context.socket(zmq.XPUB)

# unlimited subscriptions
sock.rcvhwm = 0

# resend subscriptions after disconnect
sock.immediate = 1

sock.connect('tcp://localhost:5562')

while True:
m = sock.recv()
mtype = m[0]
topic = m[1:]
if mtype == '\x01':
print('SUB %s' % topic)
elif mtype == '\x00':
print('UNSUB %s' % topic)
  • stats
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/python

import sys
import tnetstring
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.connect('ipc:///var/run/pushpin/pushpin-stats')
sock.setsockopt(zmq.SUBSCRIBE, '')

while True:
m_raw = sock.recv()
mtype, mdata = m_raw.split(' ', 1)
if mdata[0] != 'T':
print 'unsupported format'
continue
m = tnetstring.loads(mdata[1:])
print '%s %s' % (mtype, m)