Merge pull request #12 from bobrik/bulk-witout-index

Bulk witout index
This commit is contained in:
Jérôme Renard 2014-07-03 20:40:53 +02:00
commit 15921ffa9c
2 changed files with 25 additions and 27 deletions

19
goes.go
View File

@ -100,9 +100,9 @@ func (c *Connection) IndexStatus(indexList []string) (Response, error) {
return r.Run() return r.Run()
} }
// Bulk adds multiple documents in bulk mode to the index for a given type // Bulk adds multiple documents in bulk mode
func (c *Connection) BulkSend(index string, documents []Document) (Response, error) { func (c *Connection) BulkSend(documents []Document) (Response, error) {
// We do not generate a traditionnal JSON here (often a one liner) // We do not generate a traditional JSON here (often a one liner)
// Elasticsearch expects one line of JSON per line (EOL = \n) // Elasticsearch expects one line of JSON per line (EOL = \n)
// plus an extra \n at the very end of the document // plus an extra \n at the very end of the document
// //
@ -159,11 +159,10 @@ func (c *Connection) BulkSend(index string, documents []Document) (Response, err
bulkData[len(bulkData)-1] = []byte(nil) bulkData[len(bulkData)-1] = []byte(nil)
r := Request{ r := Request{
Conn: c, Conn: c,
IndexList: []string{index}, method: "POST",
method: "POST", api: "_bulk",
api: "_bulk", bulkData: bytes.Join(bulkData, []byte("\n")),
bulkData: bytes.Join(bulkData, []byte("\n")),
} }
return r.Run() return r.Run()
@ -357,7 +356,7 @@ func (r *Request) Url() string {
func (a Aggregation) Buckets() []Bucket { func (a Aggregation) Buckets() []Bucket {
result := []Bucket{} result := []Bucket{}
if buckets, ok := a["buckets"]; ok { if buckets, ok := a["buckets"]; ok {
for _, bucket := range buckets.([]interface {}) { for _, bucket := range buckets.([]interface{}) {
result = append(result, bucket.(map[string]interface{})) result = append(result, bucket.(map[string]interface{}))
} }
} }
@ -376,7 +375,7 @@ func (b Bucket) DocCount() uint64 {
} }
// Aggregation returns aggregation by name from bucket // Aggregation returns aggregation by name from bucket
func (b Bucket) Aggregation(name string) Aggregation{ func (b Bucket) Aggregation(name string) Aggregation {
if agg, ok := b[name]; ok { if agg, ok := b[name]; ok {
return agg.(map[string]interface{}) return agg.(map[string]interface{})
} else { } else {

View File

@ -195,7 +195,7 @@ func (s *GoesTestSuite) TestBulkSend(c *C) {
tweets := []Document{ tweets := []Document{
Document{ Document{
Id: "123", Id: "123",
Index: nil, Index: indexName,
Type: docType, Type: docType,
BulkCommand: BULK_COMMAND_INDEX, BulkCommand: BULK_COMMAND_INDEX,
Fields: map[string]interface{}{ Fields: map[string]interface{}{
@ -221,7 +221,7 @@ func (s *GoesTestSuite) TestBulkSend(c *C) {
_, err := conn.CreateIndex(indexName, nil) _, err := conn.CreateIndex(indexName, nil)
c.Assert(err, IsNil) c.Assert(err, IsNil)
response, err := conn.BulkSend(indexName, tweets) response, err := conn.BulkSend(tweets)
i := Item{ i := Item{
Id: "123", Id: "123",
Type: docType, Type: docType,
@ -277,7 +277,7 @@ func (s *GoesTestSuite) TestBulkSend(c *C) {
}, },
} }
response, err = conn.BulkSend(indexName, docToDelete) response, err = conn.BulkSend(docToDelete)
i = Item{ i = Item{
Id: "123", Id: "123",
Type: docType, Type: docType,
@ -475,7 +475,7 @@ func (s *GoesTestSuite) TestGet(c *C) {
Type: docType, Type: docType,
Id: docId, Id: docId,
Version: 1, Version: 1,
Found: true, Found: true,
Source: source, Source: source,
} }
@ -491,7 +491,7 @@ func (s *GoesTestSuite) TestGet(c *C) {
Type: docType, Type: docType,
Id: docId, Id: docId,
Version: 1, Version: 1,
Found: true, Found: true,
Fields: map[string]interface{}{ Fields: map[string]interface{}{
"f1": []interface{}{"foo"}, "f1": []interface{}{"foo"},
}, },
@ -681,7 +681,7 @@ func (s *GoesTestSuite) TestScroll(c *C) {
_, err := conn.CreateIndex(indexName, mapping) _, err := conn.CreateIndex(indexName, mapping)
c.Assert(err, IsNil) c.Assert(err, IsNil)
_, err = conn.BulkSend(indexName, tweets) _, err = conn.BulkSend(tweets)
c.Assert(err, IsNil) c.Assert(err, IsNil)
_, err = conn.RefreshIndex(indexName) _, err = conn.RefreshIndex(indexName)
@ -728,7 +728,6 @@ func (s *GoesTestSuite) TestScroll(c *C) {
c.Assert(len(searchResults.Hits.Hits), Equals, 0) c.Assert(len(searchResults.Hits.Hits), Equals, 0)
} }
func (s *GoesTestSuite) TestAggregations(c *C) { func (s *GoesTestSuite) TestAggregations(c *C) {
indexName := "testaggs" indexName := "testaggs"
docType := "tweet" docType := "tweet"
@ -740,9 +739,9 @@ func (s *GoesTestSuite) TestAggregations(c *C) {
Type: docType, Type: docType,
BulkCommand: BULK_COMMAND_INDEX, BulkCommand: BULK_COMMAND_INDEX,
Fields: map[string]interface{}{ Fields: map[string]interface{}{
"user" : "foo", "user": "foo",
"message" : "some foo message", "message": "some foo message",
"age" : 25, "age": 25,
}, },
}, },
@ -752,9 +751,9 @@ func (s *GoesTestSuite) TestAggregations(c *C) {
Type: docType, Type: docType,
BulkCommand: BULK_COMMAND_INDEX, BulkCommand: BULK_COMMAND_INDEX,
Fields: map[string]interface{}{ Fields: map[string]interface{}{
"user" : "bar", "user": "bar",
"message" : "some bar message", "message": "some bar message",
"age" : 30, "age": 30,
}, },
}, },
@ -764,8 +763,8 @@ func (s *GoesTestSuite) TestAggregations(c *C) {
Type: docType, Type: docType,
BulkCommand: BULK_COMMAND_INDEX, BulkCommand: BULK_COMMAND_INDEX,
Fields: map[string]interface{}{ Fields: map[string]interface{}{
"user" : "foo", "user": "foo",
"message" : "another foo message", "message": "another foo message",
}, },
}, },
} }
@ -784,7 +783,7 @@ func (s *GoesTestSuite) TestAggregations(c *C) {
_, err := conn.CreateIndex(indexName, mapping) _, err := conn.CreateIndex(indexName, mapping)
c.Assert(err, IsNil) c.Assert(err, IsNil)
_, err = conn.BulkSend(indexName, tweets) _, err = conn.BulkSend(tweets)
c.Assert(err, IsNil) c.Assert(err, IsNil)
_, err = conn.RefreshIndex(indexName) _, err = conn.RefreshIndex(indexName)
@ -836,5 +835,5 @@ func (s *GoesTestSuite) TestAggregations(c *C) {
c.Assert(ok, Equals, true) c.Assert(ok, Equals, true)
c.Assert(age["count"], Equals, 2.0) c.Assert(age["count"], Equals, 2.0)
c.Assert(age["sum"], Equals, 25.0 + 30.0) c.Assert(age["sum"], Equals, 25.0+30.0)
} }