Intro
Install
1 2 3 4 5 6 7 8 9 10 11 12
| docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 fanout/pushpin:1.28.0
docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ fanout/pushpin:1.28.0
docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ fanout/pushpin:1.31.0
docker run -d --name pushpin -p 4430:4430 -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ -v /Users/shankai/Desktop/pushpin-certs:/usr/lib/pushpin/runner/certs fanout/pushpin:1.31.0
docker run -d --name pushpin -p 4430:4430 -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/archive/cbic/icos-fdns/icosEvent/pushpinx/pushpin-config:/etc/pushpin/ -v /Users/shankai/archive/cbic/icos-fdns/icosEvent/pushpinx/pushpin-certs:/usr/lib/pushpin/runner/certs fanout/pushpin:1.31.0
docker run -d --name pushpin -p 7999:7999 -p 5560:5560 -p 5561:5561 -p 5562:5562 -p 5563:5563 -v /Users/shankai/Desktop/pushpin-config:/etc/pushpin/ fanout/pushpin:1.32.2
|
Port
Configuration
Mock Backend [test]
1
| curl http://localhost:7999/stream
|
cli:
1
| pushpin-publish test "hello there"
|
restful:
1 2 3
| curl -d '{ "items": [ { "channel": "test", "formats": { "http-stream": { "content": "hello there\n" } } } ] }' \ http://localhost:5561/publish/
|
curl -d ‘{ “items”: [ { “channel”: “test”, “formats”: {
“http-stream”: { “content”: “hello there\n” } } } ] }’
http://localhost:5561/publish/
Backend [myChannel]
- http ( * 192.168.130.40:8099)
1 2 3 4 5 6 7 8 9 10 11 12 13
| var http = require('http'); http.createServer(function (req, res) {
var headers = req.headers; var channel = headers['channel'] || 'myChannel'; res.writeHead(200, { 'Content-Type': 'text/plain', 'Grip-Hold': 'stream', 'Grip-Channel': channel }); console.log(headers); res.end('Stream opened, prepare yourself!\n'); }).listen(8099, '0.0.0.0');
|
- over_http ( *,proto=ws 192.168.130.40:8098,over_http )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| var grip = require('grip'); var http = require('http');
http.createServer(function (req, res) { res.writeHead(200, { 'Sec-WebSocket-Extensions': 'grip', 'Content-Type': 'application/websocket-events' });
var body = ''; req.on('data', function (chunk) { body += chunk; });
req.on('end', function() { var inEvents = grip.decodeWebSocketEvents(body); var outEvents = []; if (inEvents[0].getType() == 'OPEN') { outEvents.push(new grip.WebSocketEvent('OPEN')); outEvents.push(new grip.WebSocketEvent('TEXT', 'c:' + grip.webSocketControlMessage( 'subscribe', {'channel': 'mychannel'}))); }
res.end(grip.encodeWebSocketEvents(outEvents)); }); }).listen(8098, '0.0.0.0');
|
Routes: * zhttpreq/tcp://127.0.0.1:10000
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import zmq import tnetstring
zmq_context = zmq.Context() sock = zmq_context.socket(zmq.REP) sock.connect('tcp://127.0.0.1:10000')
while True: req = tnetstring.loads(sock.recv()[1:]) print(req['headers'])
resp = { 'id': req['id'], 'code': 200, 'reason': 'OK', 'headers': [ ['Grip-Hold', 'stream'], ['Grip-Channel', 'test'], ['Content-Type', 'text/plain'] ], 'body': 'welcome to the stream\n' }
sock.send('T' + tnetstring.dumps(resp))
|
ZMQ
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import zmq
zmq_context = zmq.Context.instance() sock = zmq_context.socket(zmq.XPUB)
sock.rcvhwm = 0
sock.immediate = 1
sock.connect('tcp://localhost:5562')
while True: m = sock.recv() mtype = m[0] topic = m[1:] if mtype == '\x01': print('SUB %s' % topic) elif mtype == '\x00': print('UNSUB %s' % topic)
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
|
import sys import tnetstring import zmq
ctx = zmq.Context() sock = ctx.socket(zmq.SUB) sock.connect('ipc:///var/run/pushpin/pushpin-stats') sock.setsockopt(zmq.SUBSCRIBE, '')
while True: m_raw = sock.recv() mtype, mdata = m_raw.split(' ', 1) if mdata[0] != 'T': print 'unsupported format' continue m = tnetstring.loads(mdata[1:]) print '%s %s' % (mtype, m)
|