Fix write to and query from cluster

pull/6832/head
Jack Zampolin 2016-06-13 11:30:56 -07:00
parent 32472026a3
commit 216021371a
3 changed files with 16 additions and 13 deletions

View File

@ -107,17 +107,20 @@ func (pe *ponyExpress) listen() {
l := NewConcurrencyLimiter((pe.wconc + pe.qconc) * 2)
// Concume incoming packages
counter := 0
for p := range pe.packageChan {
serv := counter % len(pe.addresses)
l.Increment()
go func(p Package) {
defer l.Decrement()
switch p.T {
case Write:
pe.spinOffWritePackage(p)
pe.spinOffWritePackage(p, serv)
case Query:
pe.spinOffQueryPackage(p)
pe.spinOffQueryPackage(p, serv)
}
}(p)
counter++
}
}

View File

@ -9,22 +9,22 @@ import (
"time"
)
func (pe *ponyExpress) spinOffQueryPackage(p Package) {
func (pe *ponyExpress) spinOffQueryPackage(p Package, serv int) {
pe.Add(1)
pe.rc.Increment()
go func() {
// Send the query
pe.prepareQuerySend(p)
pe.prepareQuerySend(p, serv)
pe.Done()
pe.rc.Decrement()
}()
}
// Prepares to send the GET request
func (pe *ponyExpress) prepareQuerySend(p Package) {
func (pe *ponyExpress) prepareQuerySend(p Package, serv int) {
// TODO: Implement round robin queryies
queryURL := fmt.Sprintf("http://%v/query?db=%v&q=%v", pe.addresses[0], pe.database, url.QueryEscape(string(p.Body)))
queryURL := fmt.Sprintf("http://%v/query?db=%v&q=%v", pe.addresses[serv], pe.database, url.QueryEscape(string(p.Body)))
// Send the query
pe.makeGet(queryURL, p.StatementID, p.Tracer)

View File

@ -14,18 +14,18 @@ import (
// ###############################################
// Packages up Package from channel in goroutine
func (pe *ponyExpress) spinOffWritePackage(p Package) {
func (pe *ponyExpress) spinOffWritePackage(p Package, serv int) {
pe.Add(1)
pe.wc.Increment()
go func() {
pe.retry(p, time.Duration(time.Nanosecond))
pe.retry(p, time.Duration(time.Nanosecond), serv)
pe.Done()
pe.wc.Decrement()
}()
}
// Implements backoff and retry logic for 500 responses
func (pe *ponyExpress) retry(p Package, backoff time.Duration) {
func (pe *ponyExpress) retry(p Package, backoff time.Duration, serv int) {
// Set Backoff Interval to 500ms
backoffInterval := time.Duration(500 * time.Millisecond)
@ -34,7 +34,7 @@ func (pe *ponyExpress) retry(p Package, backoff time.Duration) {
bo := backoff + backoffInterval
// Make the write request
resp, elapsed, err := pe.prepareWrite(p.Body)
resp, elapsed, err := pe.prepareWrite(p.Body, serv)
// Find number of times request has been retried
numBackoffs := int(bo/backoffInterval) - 1
@ -66,17 +66,17 @@ func (pe *ponyExpress) retry(p Package, backoff time.Duration) {
fmt.Println(err)
// Backoff enforcement
time.Sleep(bo)
pe.retry(p, bo)
pe.retry(p, bo, serv)
}
}
// Prepares to send the POST request
func (pe *ponyExpress) prepareWrite(points []byte) (*http.Response, time.Duration, error) {
func (pe *ponyExpress) prepareWrite(points []byte, serv int) (*http.Response, time.Duration, error) {
// Construct address string
writeTemplate := "http://%v/write?db=%v&precision=%v"
address := fmt.Sprintf(writeTemplate, pe.addresses[0], pe.database, pe.precision)
address := fmt.Sprintf(writeTemplate, pe.addresses[serv], pe.database, pe.precision)
// Start timer
t := time.Now()