Skip to content

Commit a54a2de

Browse files
committed
Merge pull request #317 from durple/master
Added toBeanstalkd block
2 parents 3fc5bfb + 304c9a2 commit a54a2de

File tree

3 files changed

+106
-3
lines changed

3 files changed

+106
-3
lines changed

st/library/library.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ var Blocks = map[string]func() blocks.BlockInterface{
1717
"towebsocket": NewToWebsocket,
1818
"tofile": NewToFile,
1919
"tolog": NewToLog,
20+
"tobeanstalkd": NewToBeanstalkd,
2021
"mask": NewMask,
2122
"filter": NewFilter,
2223
"sync": NewSync,

st/library/toBeanstalkd.go

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
package library
2+
3+
import (
4+
"encoding/json"
5+
"errors"
6+
"github.com/nutrun/lentil"
7+
"github.com/nytlabs/streamtools/st/blocks" // blocks
8+
"github.com/nytlabs/streamtools/st/util"
9+
)
10+
11+
// specify those channels we're going to use to communicate with streamtools
12+
type ToBeanstalkd struct {
13+
blocks.Block
14+
queryrule chan chan interface{}
15+
inrule chan interface{}
16+
in chan interface{}
17+
quit chan interface{}
18+
}
19+
20+
// we need to build a simple factory so that streamtools can make new blocks of this kind
21+
func NewToBeanstalkd() blocks.BlockInterface {
22+
return &ToBeanstalkd{}
23+
}
24+
25+
// Setup is called once before running the block. We build up the channels and specify what kind of block this is.
26+
func (b *ToBeanstalkd) Setup() {
27+
b.Kind = "ToBeanstalkd"
28+
b.in = b.InRoute("in")
29+
b.inrule = b.InRoute("rule")
30+
b.queryrule = b.QueryRoute("rule")
31+
b.quit = b.Quit()
32+
}
33+
34+
// Run is the block's main loop. Here we listen on the different channels we set up.
35+
func (b *ToBeanstalkd) Run() {
36+
var conn *lentil.Beanstalkd
37+
var tube = "default"
38+
var ttr = 0
39+
var host = ""
40+
var err error
41+
for {
42+
select {
43+
case msgI := <-b.inrule:
44+
// set hostname for beanstalkd server
45+
host, err = util.ParseString(msgI, "Host")
46+
if err != nil {
47+
b.Error(err.Error())
48+
continue
49+
}
50+
// set tube name
51+
tube, err = util.ParseString(msgI, "Tube")
52+
if err != nil {
53+
b.Error(errors.New("Could not parse tube name, setting to 'default'"))
54+
tube = "default"
55+
}
56+
// set time to reserve
57+
ttr, err = util.ParseInt(msgI, "TTR")
58+
if err != nil || ttr < 0 {
59+
b.Error(errors.New("Error parsing TTR. Setting TTR to 0"))
60+
ttr = 0
61+
}
62+
// create beanstalkd connection
63+
conn, err = lentil.Dial(host)
64+
if err != nil {
65+
// swallowing a panic from lentil here - streamtools must not die
66+
b.Error(errors.New("Could not initiate connection with beanstalkd server"))
67+
continue
68+
}
69+
// use the specified tube
70+
conn.Use(tube)
71+
case <-b.quit:
72+
// close connection to beanstalkd and quit
73+
if conn != nil {
74+
conn.Quit()
75+
}
76+
return
77+
case msg := <-b.in:
78+
// deal with inbound data
79+
msgStr, err := json.Marshal(msg)
80+
if err != nil {
81+
b.Error(err)
82+
continue
83+
}
84+
if conn != nil {
85+
_, err := conn.Put(0, 0, ttr, msgStr)
86+
if err != nil {
87+
b.Error(err.Error())
88+
}
89+
} else {
90+
b.Error(errors.New("Beanstalkd connection not initated or lost. Please check your beanstalkd server or block settings."))
91+
}
92+
case respChan := <-b.queryrule:
93+
// deal with a query request
94+
respChan <- map[string]interface{}{
95+
"Host": host,
96+
"Tube": tube,
97+
"TTR": ttr,
98+
}
99+
}
100+
}
101+
}

st/util/rule.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,15 +57,16 @@ func ParseInt(ruleI interface{}, key string) (int, error) {
5757
rule := ruleI.(map[string]interface{})
5858
var val int
5959
var ok bool
60-
60+
var floatval float64
6161
foundRule, ok := rule[key]
6262
if !ok {
6363
return val, errors.New("Key was not in rule")
6464
}
65-
val, ok = foundRule.(int)
65+
floatval, ok = foundRule.(float64)
6666
if !ok {
67-
return val, errors.New("Key was not an int!")
67+
return val, errors.New("Key was not a number")
6868
}
69+
val = int(floatval)
6970
return val, nil
7071
}
7172

0 commit comments

Comments
 (0)