You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1510 lines
34 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package goes
  5. import (
  6. . "gopkg.in/check.v1"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "strings"
  11. "testing"
  12. "time"
  13. )
  14. var (
  15. ESHost = "localhost"
  16. ESPort = "9200"
  17. )
  18. // Hook up gocheck into the gotest runner.
  19. func Test(t *testing.T) { TestingT(t) }
  20. type GoesTestSuite struct{}
  21. var _ = Suite(&GoesTestSuite{})
  22. func (s *GoesTestSuite) SetUpTest(c *C) {
  23. h := os.Getenv("TEST_ELASTICSEARCH_HOST")
  24. if h != "" {
  25. ESHost = h
  26. }
  27. p := os.Getenv("TEST_ELASTICSEARCH_PORT")
  28. if p != "" {
  29. ESPort = p
  30. }
  31. }
  32. func (s *GoesTestSuite) TestNewClient(c *C) {
  33. conn := NewClient(ESHost, ESPort)
  34. c.Assert(conn, DeepEquals, &Client{ESHost, ESPort, http.DefaultClient, "", "", ""})
  35. }
  36. func (s *GoesTestSuite) TestWithHTTPClient(c *C) {
  37. tr := &http.Transport{
  38. DisableCompression: true,
  39. ResponseHeaderTimeout: 1 * time.Second,
  40. }
  41. cl := &http.Client{
  42. Transport: tr,
  43. }
  44. conn := NewClient(ESHost, ESPort).WithHTTPClient(cl)
  45. c.Assert(conn, DeepEquals, &Client{ESHost, ESPort, cl, "", "", ""})
  46. c.Assert(conn.Client.Transport.(*http.Transport).DisableCompression, Equals, true)
  47. c.Assert(conn.Client.Transport.(*http.Transport).ResponseHeaderTimeout, Equals, 1*time.Second)
  48. }
  49. func (s *GoesTestSuite) TestUrl(c *C) {
  50. r := Request{
  51. Query: "q",
  52. IndexList: []string{"i"},
  53. TypeList: []string{},
  54. Method: "GET",
  55. API: "_search",
  56. }
  57. c.Assert(r.URL().String(), Equals, "/i/_search")
  58. r.IndexList = []string{"a", "b"}
  59. c.Assert(r.URL().String(), Equals, "/a,b/_search")
  60. r.TypeList = []string{"c", "d"}
  61. c.Assert(r.URL().String(), Equals, "/a,b/c,d/_search")
  62. r.ExtraArgs = make(url.Values, 1)
  63. r.ExtraArgs.Set("version", "1")
  64. c.Assert(r.URL().String(), Equals, "/a,b/c,d/_search?version=1")
  65. r.ID = "1234"
  66. r.API = ""
  67. c.Assert(r.URL().String(), Equals, "/a,b/c,d/1234/?version=1")
  68. }
  69. func (s *GoesTestSuite) TestEsDown(c *C) {
  70. conn := NewClient("a.b.c.d", "1234")
  71. var query = map[string]interface{}{"query": "foo"}
  72. r := Request{
  73. Query: query,
  74. IndexList: []string{"i"},
  75. Method: "GET",
  76. API: "_search",
  77. }
  78. _, err := conn.Do(&r)
  79. c.Assert(err, ErrorMatches, ".* no such host")
  80. }
  81. func (s *GoesTestSuite) TestRunMissingIndex(c *C) {
  82. conn := NewClient(ESHost, ESPort)
  83. var query = map[string]interface{}{"query": "foo"}
  84. r := Request{
  85. Query: query,
  86. IndexList: []string{"i"},
  87. Method: "GET",
  88. API: "_search",
  89. }
  90. _, err := conn.Do(&r)
  91. c.Assert(err.Error(), Matches, "\\[40.\\] .*i.*")
  92. }
  93. func (s *GoesTestSuite) TestCreateIndex(c *C) {
  94. indexName := "testcreateindexgoes"
  95. conn := NewClient(ESHost, ESPort)
  96. defer conn.DeleteIndex(indexName)
  97. mapping := map[string]interface{}{
  98. "settings": map[string]interface{}{
  99. "index.number_of_shards": 1,
  100. "index.number_of_replicas": 0,
  101. },
  102. "mappings": map[string]interface{}{
  103. "_default_": map[string]interface{}{
  104. "_source": map[string]interface{}{
  105. "enabled": false,
  106. },
  107. "_all": map[string]interface{}{
  108. "enabled": false,
  109. },
  110. },
  111. },
  112. }
  113. resp, err := conn.CreateIndex(indexName, mapping)
  114. c.Assert(err, IsNil)
  115. c.Assert(resp.Acknowledged, Equals, true)
  116. }
  117. func (s *GoesTestSuite) TestDeleteIndexInexistantIndex(c *C) {
  118. conn := NewClient(ESHost, ESPort)
  119. resp, err := conn.DeleteIndex("foobar")
  120. c.Assert(err.Error(), Matches, "\\[404\\] .*foobar.*")
  121. resp.Raw = nil // Don't make us have to duplicate this.
  122. c.Assert(resp.Status, Equals, uint64(404))
  123. c.Assert(resp.Error, Matches, ".*foobar.*")
  124. }
  125. func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) {
  126. conn := NewClient(ESHost, ESPort)
  127. indexName := "testdeleteindexexistingindex"
  128. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  129. defer conn.DeleteIndex(indexName)
  130. c.Assert(err, IsNil)
  131. resp, err := conn.DeleteIndex(indexName)
  132. c.Assert(err, IsNil)
  133. expectedResponse := &Response{
  134. Acknowledged: true,
  135. Status: 200,
  136. }
  137. resp.Raw = nil
  138. c.Assert(resp, DeepEquals, expectedResponse)
  139. }
  140. func (s *GoesTestSuite) TestUpdateIndexSettings(c *C) {
  141. conn := NewClient(ESHost, ESPort)
  142. indexName := "testupdateindex"
  143. // Just in case
  144. conn.DeleteIndex(indexName)
  145. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  146. c.Assert(err, IsNil)
  147. defer conn.DeleteIndex(indexName)
  148. _, err = conn.UpdateIndexSettings(indexName, map[string]interface{}{
  149. "index": map[string]interface{}{
  150. "number_of_replicas": 0,
  151. },
  152. })
  153. c.Assert(err, IsNil)
  154. _, err = conn.DeleteIndex(indexName)
  155. c.Assert(err, IsNil)
  156. }
  157. func (s *GoesTestSuite) TestRefreshIndex(c *C) {
  158. conn := NewClient(ESHost, ESPort)
  159. indexName := "testrefreshindex"
  160. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  161. c.Assert(err, IsNil)
  162. defer conn.DeleteIndex(indexName)
  163. _, err = conn.RefreshIndex(indexName)
  164. c.Assert(err, IsNil)
  165. _, err = conn.DeleteIndex(indexName)
  166. c.Assert(err, IsNil)
  167. }
  168. func (s *GoesTestSuite) TestOptimize(c *C) {
  169. conn := NewClient(ESHost, ESPort)
  170. indexName := "testoptimize"
  171. conn.DeleteIndex(indexName)
  172. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  173. c.Assert(err, IsNil)
  174. defer conn.DeleteIndex(indexName)
  175. // we must wait for a bit otherwise ES crashes
  176. time.Sleep(1 * time.Second)
  177. response, err := conn.Optimize([]string{indexName}, url.Values{"max_num_segments": []string{"1"}})
  178. c.Assert(err, IsNil)
  179. c.Assert(response.All.Indices[indexName].Primaries["docs"].Count, Equals, 0)
  180. _, err = conn.DeleteIndex(indexName)
  181. c.Assert(err, IsNil)
  182. }
  183. func (s *GoesTestSuite) TestBulkSend(c *C) {
  184. indexName := "testbulkadd"
  185. docType := "tweet"
  186. tweets := []Document{
  187. {
  188. ID: "123",
  189. Index: indexName,
  190. Type: docType,
  191. BulkCommand: BulkCommandIndex,
  192. Fields: map[string]interface{}{
  193. "user": "foo",
  194. "message": "some foo message",
  195. },
  196. },
  197. {
  198. ID: nil,
  199. Index: indexName,
  200. Type: docType,
  201. BulkCommand: BulkCommandIndex,
  202. Fields: map[string]interface{}{
  203. "user": "bar",
  204. "message": "some bar message",
  205. },
  206. },
  207. }
  208. conn := NewClient(ESHost, ESPort)
  209. conn.DeleteIndex(indexName)
  210. _, err := conn.CreateIndex(indexName, nil)
  211. c.Assert(err, IsNil)
  212. defer conn.DeleteIndex(indexName)
  213. response, err := conn.BulkSend(tweets)
  214. i := Item{
  215. ID: "123",
  216. Type: docType,
  217. Version: 1,
  218. Index: indexName,
  219. Status: 201, //201 for indexing ( https://issues.apache.org/jira/browse/CONNECTORS-634 )
  220. }
  221. c.Assert(response.Items[0][BulkCommandIndex], Equals, i)
  222. c.Assert(err, IsNil)
  223. _, err = conn.RefreshIndex(indexName)
  224. c.Assert(err, IsNil)
  225. var query = map[string]interface{}{
  226. "query": map[string]interface{}{
  227. "match_all": map[string]interface{}{},
  228. },
  229. }
  230. searchResults, err := conn.Search(query, []string{indexName}, []string{}, url.Values{})
  231. c.Assert(err, IsNil)
  232. var expectedTotal uint64 = 2
  233. c.Assert(searchResults.Hits.Total, Equals, expectedTotal)
  234. extraDocID := ""
  235. checked := 0
  236. for _, hit := range searchResults.Hits.Hits {
  237. if hit.Source["user"] == "foo" {
  238. c.Assert(hit.ID, Equals, "123")
  239. checked++
  240. }
  241. if hit.Source["user"] == "bar" {
  242. c.Assert(len(hit.ID) > 0, Equals, true)
  243. extraDocID = hit.ID
  244. checked++
  245. }
  246. }
  247. c.Assert(checked, Equals, 2)
  248. docToDelete := []Document{
  249. {
  250. ID: "123",
  251. Index: indexName,
  252. Type: docType,
  253. BulkCommand: BulkCommandDelete,
  254. },
  255. {
  256. ID: extraDocID,
  257. Index: indexName,
  258. Type: docType,
  259. BulkCommand: BulkCommandDelete,
  260. },
  261. }
  262. response, err = conn.BulkSend(docToDelete)
  263. i = Item{
  264. ID: "123",
  265. Type: docType,
  266. Version: 2,
  267. Index: indexName,
  268. Status: 200, //200 for updates
  269. }
  270. c.Assert(response.Items[0][BulkCommandDelete], Equals, i)
  271. c.Assert(err, IsNil)
  272. _, err = conn.RefreshIndex(indexName)
  273. c.Assert(err, IsNil)
  274. searchResults, err = conn.Search(query, []string{indexName}, []string{}, url.Values{})
  275. c.Assert(err, IsNil)
  276. expectedTotal = 0
  277. c.Assert(searchResults.Hits.Total, Equals, expectedTotal)
  278. _, err = conn.DeleteIndex(indexName)
  279. c.Assert(err, IsNil)
  280. }
  281. func (s *GoesTestSuite) TestStats(c *C) {
  282. conn := NewClient(ESHost, ESPort)
  283. indexName := "teststats"
  284. conn.DeleteIndex(indexName)
  285. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  286. c.Assert(err, IsNil)
  287. defer conn.DeleteIndex(indexName)
  288. // we must wait for a bit otherwise ES crashes
  289. time.Sleep(1 * time.Second)
  290. response, err := conn.Stats([]string{indexName}, url.Values{})
  291. c.Assert(err, IsNil)
  292. c.Assert(response.All.Indices[indexName].Primaries["docs"].Count, Equals, 0)
  293. _, err = conn.DeleteIndex(indexName)
  294. c.Assert(err, IsNil)
  295. }
  296. func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) {
  297. indexName := "testindexwithfieldsinstruct"
  298. docType := "tweet"
  299. docID := "1234"
  300. conn := NewClient(ESHost, ESPort)
  301. // just in case
  302. conn.DeleteIndex(indexName)
  303. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  304. c.Assert(err, IsNil)
  305. defer conn.DeleteIndex(indexName)
  306. d := Document{
  307. Index: indexName,
  308. Type: docType,
  309. ID: docID,
  310. Fields: struct {
  311. user string
  312. message string
  313. }{
  314. "foo",
  315. "bar",
  316. },
  317. }
  318. response, err := conn.Index(d, nil)
  319. c.Assert(err, IsNil)
  320. expectedResponse := &Response{
  321. Status: 201,
  322. Index: indexName,
  323. ID: docID,
  324. Type: docType,
  325. Version: 1,
  326. }
  327. response.Raw = nil
  328. response.Shards = Shard{}
  329. c.Assert(response, DeepEquals, expectedResponse)
  330. }
  331. func (s *GoesTestSuite) TestIndexWithFieldsNotInMapOrStruct(c *C) {
  332. indexName := "testindexwithfieldsnotinmaporstruct"
  333. docType := "tweet"
  334. docID := "1234"
  335. conn := NewClient(ESHost, ESPort)
  336. // just in case
  337. conn.DeleteIndex(indexName)
  338. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  339. c.Assert(err, IsNil)
  340. defer conn.DeleteIndex(indexName)
  341. d := Document{
  342. Index: indexName,
  343. Type: docType,
  344. ID: docID,
  345. Fields: "test",
  346. }
  347. _, err = conn.Index(d, nil)
  348. c.Assert(err, Not(IsNil))
  349. }
  350. func (s *GoesTestSuite) TestIndexIdDefined(c *C) {
  351. indexName := "testindexiddefined"
  352. docType := "tweet"
  353. docID := "1234"
  354. conn := NewClient(ESHost, ESPort)
  355. // just in case
  356. conn.DeleteIndex(indexName)
  357. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  358. c.Assert(err, IsNil)
  359. defer conn.DeleteIndex(indexName)
  360. d := Document{
  361. Index: indexName,
  362. Type: docType,
  363. ID: docID,
  364. Fields: map[string]interface{}{
  365. "user": "foo",
  366. "message": "bar",
  367. },
  368. }
  369. response, err := conn.Index(d, nil)
  370. c.Assert(err, IsNil)
  371. expectedResponse := &Response{
  372. Status: 201,
  373. Index: indexName,
  374. ID: docID,
  375. Type: docType,
  376. Version: 1,
  377. }
  378. response.Raw = nil
  379. response.Shards = Shard{}
  380. c.Assert(response, DeepEquals, expectedResponse)
  381. }
  382. func (s *GoesTestSuite) TestIndexIdNotDefined(c *C) {
  383. indexName := "testindexidnotdefined"
  384. docType := "tweet"
  385. conn := NewClient(ESHost, ESPort)
  386. // just in case
  387. conn.DeleteIndex(indexName)
  388. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  389. c.Assert(err, IsNil)
  390. defer conn.DeleteIndex(indexName)
  391. d := Document{
  392. Index: indexName,
  393. Type: docType,
  394. Fields: map[string]interface{}{
  395. "user": "foo",
  396. "message": "bar",
  397. },
  398. }
  399. response, err := conn.Index(d, url.Values{})
  400. c.Assert(err, IsNil)
  401. c.Assert(response.Index, Equals, indexName)
  402. c.Assert(response.Type, Equals, docType)
  403. c.Assert(response.Version, Equals, 1)
  404. c.Assert(response.ID != "", Equals, true)
  405. }
  406. func (s *GoesTestSuite) TestDelete(c *C) {
  407. indexName := "testdelete"
  408. docType := "tweet"
  409. docID := "1234"
  410. conn := NewClient(ESHost, ESPort)
  411. // just in case
  412. conn.DeleteIndex(indexName)
  413. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  414. c.Assert(err, IsNil)
  415. defer conn.DeleteIndex(indexName)
  416. d := Document{
  417. Index: indexName,
  418. Type: docType,
  419. ID: docID,
  420. Fields: map[string]interface{}{
  421. "user": "foo",
  422. },
  423. }
  424. _, err = conn.Index(d, url.Values{})
  425. c.Assert(err, IsNil)
  426. response, err := conn.Delete(d, url.Values{})
  427. c.Assert(err, IsNil)
  428. expectedResponse := &Response{
  429. Status: 200,
  430. Found: true,
  431. Index: indexName,
  432. Type: docType,
  433. ID: docID,
  434. // XXX : even after a DELETE the version number seems to be incremented
  435. Version: 2,
  436. }
  437. response.Raw = nil
  438. response.Shards = Shard{}
  439. c.Assert(response, DeepEquals, expectedResponse)
  440. response, err = conn.Delete(d, url.Values{})
  441. c.Assert(err, IsNil)
  442. expectedResponse = &Response{
  443. Status: 404,
  444. Found: false,
  445. Index: indexName,
  446. Type: docType,
  447. ID: docID,
  448. // XXX : even after a DELETE the version number seems to be incremented
  449. Version: 3,
  450. }
  451. response.Raw = nil
  452. response.Shards = Shard{}
  453. c.Assert(response, DeepEquals, expectedResponse)
  454. }
  455. func (s *GoesTestSuite) TestDeleteByQuery(c *C) {
  456. indexName := "testdeletebyquery"
  457. docType := "tweet"
  458. docID := "1234"
  459. conn := NewClient(ESHost, ESPort)
  460. version, _ := conn.Version()
  461. // just in case
  462. conn.DeleteIndex(indexName)
  463. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  464. c.Assert(err, IsNil)
  465. defer conn.DeleteIndex(indexName)
  466. d := Document{
  467. Index: indexName,
  468. Type: docType,
  469. ID: docID,
  470. Fields: map[string]interface{}{
  471. "user": "foo",
  472. },
  473. }
  474. _, err = conn.Index(d, url.Values{})
  475. c.Assert(err, IsNil)
  476. _, err = conn.RefreshIndex(indexName)
  477. c.Assert(err, IsNil)
  478. query := map[string]interface{}{
  479. "query": map[string]interface{}{
  480. "bool": map[string]interface{}{
  481. "must": []map[string]interface{}{
  482. {
  483. "match_all": map[string]interface{}{},
  484. },
  485. },
  486. },
  487. },
  488. }
  489. //should be 1 doc before delete by query
  490. response, err := conn.Search(query, []string{indexName}, []string{docType}, url.Values{})
  491. c.Assert(err, IsNil)
  492. c.Assert(response.Hits.Total, Equals, uint64(1))
  493. response, err = conn.DeleteByQuery(query, []string{indexName}, []string{docType}, url.Values{})
  494. // There's no delete by query in ES 2.x
  495. if version > "2" && version < "5" {
  496. c.Assert(err, ErrorMatches, ".* does not support delete by query")
  497. return
  498. }
  499. c.Assert(err, IsNil)
  500. expectedResponse := &Response{
  501. Status: 200,
  502. Found: false,
  503. Index: "",
  504. Type: "",
  505. ID: "",
  506. Version: 0,
  507. }
  508. response.Raw = nil
  509. response.Shards = Shard{}
  510. response.Took = 0
  511. c.Assert(response, DeepEquals, expectedResponse)
  512. _, err = conn.RefreshIndex(indexName)
  513. c.Assert(err, IsNil)
  514. //should be 0 docs after delete by query
  515. response, err = conn.Search(query, []string{indexName}, []string{docType}, url.Values{})
  516. c.Assert(err, IsNil)
  517. c.Assert(response.Hits.Total, Equals, uint64(0))
  518. }
  519. func (s *GoesTestSuite) TestGet(c *C) {
  520. indexName := "testget"
  521. docType := "tweet"
  522. docID := "111"
  523. source := map[string]interface{}{
  524. "f1": "foo",
  525. "f2": "foo",
  526. }
  527. conn := NewClient(ESHost, ESPort)
  528. version, _ := conn.Version()
  529. conn.DeleteIndex(indexName)
  530. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  531. c.Assert(err, IsNil)
  532. defer conn.DeleteIndex(indexName)
  533. d := Document{
  534. Index: indexName,
  535. Type: docType,
  536. ID: docID,
  537. Fields: source,
  538. }
  539. _, err = conn.Index(d, url.Values{})
  540. c.Assert(err, IsNil)
  541. response, err := conn.Get(indexName, docType, docID, url.Values{})
  542. c.Assert(err, IsNil)
  543. expectedResponse := &Response{
  544. Status: 200,
  545. Index: indexName,
  546. Type: docType,
  547. ID: docID,
  548. Version: 1,
  549. Found: true,
  550. Source: source,
  551. }
  552. response.Raw = nil
  553. c.Assert(response, DeepEquals, expectedResponse)
  554. expectedResponse = &Response{
  555. Status: 200,
  556. Index: indexName,
  557. Type: docType,
  558. ID: docID,
  559. Version: 1,
  560. Found: true,
  561. Fields: map[string]interface{}{
  562. "f1": []interface{}{"foo"},
  563. },
  564. }
  565. fields := make(url.Values, 1)
  566. // The fields param is no longer supported in ES 5.x
  567. if version < "5" {
  568. fields.Set("fields", "f1")
  569. } else {
  570. expectedResponse.Source = map[string]interface{}{"f1": "foo"}
  571. expectedResponse.Fields = nil
  572. fields.Set("_source", "f1")
  573. }
  574. response, err = conn.Get(indexName, docType, docID, fields)
  575. c.Assert(err, IsNil)
  576. response.Raw = nil
  577. c.Assert(response, DeepEquals, expectedResponse)
  578. }
  579. func (s *GoesTestSuite) TestSearch(c *C) {
  580. indexName := "testsearch"
  581. docType := "tweet"
  582. docID := "1234"
  583. source := map[string]interface{}{
  584. "user": "foo",
  585. "message": "bar",
  586. }
  587. conn := NewClient(ESHost, ESPort)
  588. conn.DeleteIndex(indexName)
  589. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  590. c.Assert(err, IsNil)
  591. defer conn.DeleteIndex(indexName)
  592. d := Document{
  593. Index: indexName,
  594. Type: docType,
  595. ID: docID,
  596. Fields: source,
  597. }
  598. _, err = conn.Index(d, url.Values{})
  599. c.Assert(err, IsNil)
  600. _, err = conn.RefreshIndex(indexName)
  601. c.Assert(err, IsNil)
  602. // I can feel my eyes bleeding
  603. query := map[string]interface{}{
  604. "query": map[string]interface{}{
  605. "bool": map[string]interface{}{
  606. "must": []map[string]interface{}{
  607. {
  608. "match_all": map[string]interface{}{},
  609. },
  610. },
  611. },
  612. },
  613. }
  614. response, _ := conn.Search(query, []string{indexName}, []string{docType}, url.Values{})
  615. expectedHits := Hits{
  616. Total: 1,
  617. MaxScore: 1.0,
  618. Hits: []Hit{
  619. {
  620. Index: indexName,
  621. Type: docType,
  622. ID: docID,
  623. Score: 1.0,
  624. Source: source,
  625. },
  626. },
  627. }
  628. c.Assert(response.Hits, DeepEquals, expectedHits)
  629. }
  630. func (s *GoesTestSuite) TestCount(c *C) {
  631. indexName := "testcount"
  632. docType := "tweet"
  633. docID := "1234"
  634. source := map[string]interface{}{
  635. "user": "foo",
  636. "message": "bar",
  637. }
  638. conn := NewClient(ESHost, ESPort)
  639. conn.DeleteIndex(indexName)
  640. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  641. c.Assert(err, IsNil)
  642. defer conn.DeleteIndex(indexName)
  643. d := Document{
  644. Index: indexName,
  645. Type: docType,
  646. ID: docID,
  647. Fields: source,
  648. }
  649. _, err = conn.Index(d, url.Values{})
  650. c.Assert(err, IsNil)
  651. _, err = conn.RefreshIndex(indexName)
  652. c.Assert(err, IsNil)
  653. // I can feel my eyes bleeding
  654. query := map[string]interface{}{
  655. "query": map[string]interface{}{
  656. "bool": map[string]interface{}{
  657. "must": []map[string]interface{}{
  658. {
  659. "match_all": map[string]interface{}{},
  660. },
  661. },
  662. },
  663. },
  664. }
  665. response, _ := conn.Count(query, []string{indexName}, []string{docType}, url.Values{})
  666. c.Assert(response.Count, Equals, 1)
  667. }
  668. func (s *GoesTestSuite) TestIndexStatus(c *C) {
  669. indexName := "testindexstatus"
  670. conn := NewClient(ESHost, ESPort)
  671. // _status endpoint was removed in ES 2.0
  672. if version, _ := conn.Version(); version > "2" {
  673. return
  674. }
  675. conn.DeleteIndex(indexName)
  676. mapping := map[string]interface{}{
  677. "settings": map[string]interface{}{
  678. "index.number_of_shards": 1,
  679. "index.number_of_replicas": 1,
  680. },
  681. }
  682. _, err := conn.CreateIndex(indexName, mapping)
  683. c.Assert(err, IsNil)
  684. defer conn.DeleteIndex(indexName)
  685. // gives ES some time to do its job
  686. time.Sleep(1 * time.Second)
  687. _, err = conn.RefreshIndex(indexName)
  688. c.Assert(err, IsNil)
  689. response, err := conn.IndexStatus([]string{"testindexstatus"})
  690. c.Assert(err, IsNil)
  691. expectedShards := Shard{Total: 2, Successful: 1, Failed: 0}
  692. c.Assert(response.Shards, Equals, expectedShards)
  693. primarySizeInBytes := response.Indices[indexName].Index["primary_size_in_bytes"].(float64)
  694. sizeInBytes := response.Indices[indexName].Index["size_in_bytes"].(float64)
  695. refreshTotal := response.Indices[indexName].Refresh["total"].(float64)
  696. c.Assert(primarySizeInBytes > 0, Equals, true)
  697. c.Assert(sizeInBytes > 0, Equals, true)
  698. c.Assert(refreshTotal > 0, Equals, true)
  699. expectedIndices := map[string]IndexStatus{
  700. indexName: {
  701. Index: map[string]interface{}{
  702. "primary_size_in_bytes": primarySizeInBytes,
  703. "size_in_bytes": sizeInBytes,
  704. },
  705. Translog: map[string]uint64{
  706. "operations": 0,
  707. },
  708. Docs: map[string]uint64{
  709. "num_docs": 0,
  710. "max_doc": 0,
  711. "deleted_docs": 0,
  712. },
  713. Merges: map[string]interface{}{
  714. "current": float64(0),
  715. "current_docs": float64(0),
  716. "current_size_in_bytes": float64(0),
  717. "total": float64(0),
  718. "total_time_in_millis": float64(0),
  719. "total_docs": float64(0),
  720. "total_size_in_bytes": float64(0),
  721. },
  722. Refresh: map[string]interface{}{
  723. "total": refreshTotal,
  724. "total_time_in_millis": float64(0),
  725. },
  726. Flush: map[string]interface{}{
  727. "total": float64(0),
  728. "total_time_in_millis": float64(0),
  729. },
  730. },
  731. }
  732. c.Assert(response.Indices, DeepEquals, expectedIndices)
  733. }
  734. func (s *GoesTestSuite) TestScroll(c *C) {
  735. indexName := "testscroll"
  736. docType := "tweet"
  737. tweets := []Document{
  738. {
  739. ID: nil,
  740. Index: indexName,
  741. Type: docType,
  742. BulkCommand: BulkCommandIndex,
  743. Fields: map[string]interface{}{
  744. "user": "foo",
  745. "message": "some foo message",
  746. },
  747. },
  748. {
  749. ID: nil,
  750. Index: indexName,
  751. Type: docType,
  752. BulkCommand: BulkCommandIndex,
  753. Fields: map[string]interface{}{
  754. "user": "bar",
  755. "message": "some bar message",
  756. },
  757. },
  758. {
  759. ID: nil,
  760. Index: indexName,
  761. Type: docType,
  762. BulkCommand: BulkCommandIndex,
  763. Fields: map[string]interface{}{
  764. "user": "foo",
  765. "message": "another foo message",
  766. },
  767. },
  768. }
  769. conn := NewClient(ESHost, ESPort)
  770. mapping := map[string]interface{}{
  771. "settings": map[string]interface{}{
  772. "index.number_of_shards": 1,
  773. "index.number_of_replicas": 0,
  774. },
  775. }
  776. defer conn.DeleteIndex(indexName)
  777. _, err := conn.CreateIndex(indexName, mapping)
  778. c.Assert(err, IsNil)
  779. _, err = conn.BulkSend(tweets)
  780. c.Assert(err, IsNil)
  781. _, err = conn.RefreshIndex(indexName)
  782. c.Assert(err, IsNil)
  783. var query map[string]interface{}
  784. version, _ := conn.Version()
  785. if version > "5" {
  786. query = map[string]interface{}{
  787. "query": map[string]interface{}{
  788. "bool": map[string]interface{}{
  789. "filter": map[string]interface{}{
  790. "term": map[string]interface{}{
  791. "user": "foo",
  792. },
  793. },
  794. },
  795. },
  796. }
  797. } else {
  798. query = map[string]interface{}{
  799. "query": map[string]interface{}{
  800. "filtered": map[string]interface{}{
  801. "filter": map[string]interface{}{
  802. "term": map[string]interface{}{
  803. "user": "foo",
  804. },
  805. },
  806. },
  807. },
  808. }
  809. }
  810. searchResults, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1)
  811. c.Assert(err, IsNil)
  812. c.Assert(len(searchResults.ScrollID) > 0, Equals, true)
  813. // Versions < 5.x don't include results in the initial response
  814. if version < "5" {
  815. searchResults, err = conn.Scroll(searchResults.ScrollID, "1m")
  816. c.Assert(err, IsNil)
  817. }
  818. // some data in first chunk
  819. c.Assert(searchResults.Hits.Total, Equals, uint64(2))
  820. c.Assert(len(searchResults.ScrollID) > 0, Equals, true)
  821. c.Assert(len(searchResults.Hits.Hits), Equals, 1)
  822. searchResults, err = conn.Scroll(searchResults.ScrollID, "1m")
  823. c.Assert(err, IsNil)
  824. // more data in second chunk
  825. c.Assert(searchResults.Hits.Total, Equals, uint64(2))
  826. c.Assert(len(searchResults.ScrollID) > 0, Equals, true)
  827. c.Assert(len(searchResults.Hits.Hits), Equals, 1)
  828. searchResults, err = conn.Scroll(searchResults.ScrollID, "1m")
  829. c.Assert(err, IsNil)
  830. // nothing in third chunk
  831. c.Assert(searchResults.Hits.Total, Equals, uint64(2))
  832. c.Assert(len(searchResults.ScrollID) > 0, Equals, true)
  833. c.Assert(len(searchResults.Hits.Hits), Equals, 0)
  834. }
  835. func (s *GoesTestSuite) TestAggregations(c *C) {
  836. indexName := "testaggs"
  837. docType := "tweet"
  838. tweets := []Document{
  839. {
  840. ID: nil,
  841. Index: indexName,
  842. Type: docType,
  843. BulkCommand: BulkCommandIndex,
  844. Fields: map[string]interface{}{
  845. "user": "foo",
  846. "message": "some foo message",
  847. "age": 25,
  848. },
  849. },
  850. {
  851. ID: nil,
  852. Index: indexName,
  853. Type: docType,
  854. BulkCommand: BulkCommandIndex,
  855. Fields: map[string]interface{}{
  856. "user": "bar",
  857. "message": "some bar message",
  858. "age": 30,
  859. },
  860. },
  861. {
  862. ID: nil,
  863. Index: indexName,
  864. Type: docType,
  865. BulkCommand: BulkCommandIndex,
  866. Fields: map[string]interface{}{
  867. "user": "foo",
  868. "message": "another foo message",
  869. },
  870. },
  871. }
  872. conn := NewClient(ESHost, ESPort)
  873. mapping := map[string]interface{}{
  874. "settings": map[string]interface{}{
  875. "index.number_of_shards": 1,
  876. "index.number_of_replicas": 0,
  877. },
  878. "mappings": map[string]interface{}{
  879. docType: map[string]interface{}{
  880. "properties": map[string]interface{}{
  881. "user": map[string]interface{}{
  882. "type": "string",
  883. "index": "not_analyzed",
  884. },
  885. },
  886. },
  887. },
  888. }
  889. defer conn.DeleteIndex(indexName)
  890. _, err := conn.CreateIndex(indexName, mapping)
  891. c.Assert(err, IsNil)
  892. _, err = conn.BulkSend(tweets)
  893. c.Assert(err, IsNil)
  894. _, err = conn.RefreshIndex(indexName)
  895. c.Assert(err, IsNil)
  896. query := map[string]interface{}{
  897. "aggs": map[string]interface{}{
  898. "user": map[string]interface{}{
  899. "terms": map[string]interface{}{
  900. "field": "user",
  901. "order": map[string]interface{}{
  902. "_term": "asc",
  903. },
  904. },
  905. "aggs": map[string]interface{}{
  906. "age": map[string]interface{}{
  907. "stats": map[string]interface{}{
  908. "field": "age",
  909. },
  910. },
  911. },
  912. },
  913. "age": map[string]interface{}{
  914. "stats": map[string]interface{}{
  915. "field": "age",
  916. },
  917. },
  918. },
  919. }
  920. resp, _ := conn.Search(query, []string{indexName}, []string{docType}, url.Values{})
  921. user, ok := resp.Aggregations["user"]
  922. c.Assert(ok, Equals, true)
  923. c.Assert(len(user.Buckets()), Equals, 2)
  924. c.Assert(user.Buckets()[0].Key(), Equals, "bar")
  925. c.Assert(user.Buckets()[1].Key(), Equals, "foo")
  926. barAge := user.Buckets()[0].Aggregation("age")
  927. c.Assert(barAge["count"], Equals, 1.0)
  928. c.Assert(barAge["sum"], Equals, 30.0)
  929. fooAge := user.Buckets()[1].Aggregation("age")
  930. c.Assert(fooAge["count"], Equals, 1.0)
  931. c.Assert(fooAge["sum"], Equals, 25.0)
  932. age, ok := resp.Aggregations["age"]
  933. c.Assert(ok, Equals, true)
  934. c.Assert(age["count"], Equals, 2.0)
  935. c.Assert(age["sum"], Equals, 25.0+30.0)
  936. }
  937. func (s *GoesTestSuite) TestPutMapping(c *C) {
  938. indexName := "testputmapping"
  939. docType := "tweet"
  940. conn := NewClient(ESHost, ESPort)
  941. // just in case
  942. conn.DeleteIndex(indexName)
  943. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  944. c.Assert(err, IsNil)
  945. defer conn.DeleteIndex(indexName)
  946. d := Document{
  947. Index: indexName,
  948. Type: docType,
  949. Fields: map[string]interface{}{
  950. "user": "foo",
  951. "message": "bar",
  952. },
  953. }
  954. response, err := conn.Index(d, url.Values{})
  955. c.Assert(err, IsNil)
  956. mapping := map[string]interface{}{
  957. "tweet": map[string]interface{}{
  958. "properties": map[string]interface{}{
  959. "count": map[string]interface{}{
  960. "type": "integer",
  961. "index": "not_analyzed",
  962. "store": true,
  963. },
  964. },
  965. },
  966. }
  967. response, err = conn.PutMapping("tweet", mapping, []string{indexName})
  968. c.Assert(err, IsNil)
  969. c.Assert(response.Acknowledged, Equals, true)
  970. c.Assert(response.TimedOut, Equals, false)
  971. }
  972. func (s *GoesTestSuite) TestIndicesExist(c *C) {
  973. indices := []string{"testindicesexist"}
  974. conn := NewClient(ESHost, ESPort)
  975. // just in case
  976. conn.DeleteIndex(indices[0])
  977. exists, err := conn.IndicesExist(indices)
  978. c.Assert(exists, Equals, false)
  979. _, err = conn.CreateIndex(indices[0], map[string]interface{}{})
  980. c.Assert(err, IsNil)
  981. defer conn.DeleteIndex(indices[0])
  982. time.Sleep(200 * time.Millisecond)
  983. exists, _ = conn.IndicesExist(indices)
  984. c.Assert(exists, Equals, true)
  985. indices = append(indices, "nonexistent")
  986. exists, _ = conn.IndicesExist(indices)
  987. c.Assert(exists, Equals, false)
  988. }
  989. func (s *GoesTestSuite) TestUpdate(c *C) {
  990. indexName := "testupdate"
  991. docType := "tweet"
  992. docID := "1234"
  993. conn := NewClient(ESHost, ESPort)
  994. // just in case
  995. conn.DeleteIndex(indexName)
  996. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  997. c.Assert(err, IsNil)
  998. defer conn.DeleteIndex(indexName)
  999. d := Document{
  1000. Index: indexName,
  1001. Type: docType,
  1002. ID: docID,
  1003. Fields: map[string]interface{}{
  1004. "user": "foo",
  1005. "message": "bar",
  1006. "counter": 1,
  1007. },
  1008. }
  1009. extraArgs := make(url.Values, 1)
  1010. response, err := conn.Index(d, extraArgs)
  1011. c.Assert(err, IsNil)
  1012. time.Sleep(200 * time.Millisecond)
  1013. expectedResponse := &Response{
  1014. Status: 201,
  1015. Index: indexName,
  1016. ID: docID,
  1017. Type: docType,
  1018. Version: 1,
  1019. }
  1020. response.Raw = nil
  1021. response.Shards.Successful = 0
  1022. response.Shards.Total = 0
  1023. c.Assert(response, DeepEquals, expectedResponse)
  1024. // Now that we have an ordinary document indexed, try updating it
  1025. var query map[string]interface{}
  1026. if version, _ := conn.Version(); version > "5" {
  1027. query = map[string]interface{}{
  1028. "script": map[string]interface{}{
  1029. "inline": "ctx._source.counter += params.count",
  1030. "lang": "painless",
  1031. "params": map[string]interface{}{
  1032. "count": 5,
  1033. },
  1034. },
  1035. "upsert": map[string]interface{}{
  1036. "message": "candybar",
  1037. "user": "admin",
  1038. "counter": 1,
  1039. },
  1040. }
  1041. } else {
  1042. query = map[string]interface{}{
  1043. "script": "ctx._source.counter += count",
  1044. "lang": "groovy",
  1045. "params": map[string]interface{}{
  1046. "count": 5,
  1047. },
  1048. "upsert": map[string]interface{}{
  1049. "message": "candybar",
  1050. "user": "admin",
  1051. "counter": 1,
  1052. },
  1053. }
  1054. }
  1055. response, err = conn.Update(d, query, extraArgs)
  1056. if err != nil && strings.Contains(err.(*SearchError).Msg, "dynamic scripting") {
  1057. c.Skip("Scripting is disabled on server, skipping this test")
  1058. return
  1059. }
  1060. time.Sleep(200 * time.Millisecond)
  1061. c.Assert(err, Equals, nil)
  1062. response, err = conn.Get(indexName, docType, docID, url.Values{})
  1063. c.Assert(err, Equals, nil)
  1064. c.Assert(response.Source["counter"], Equals, float64(6))
  1065. c.Assert(response.Source["user"], Equals, "foo")
  1066. c.Assert(response.Source["message"], Equals, "bar")
  1067. // Test another document, non-existent
  1068. docID = "555"
  1069. d.ID = docID
  1070. response, err = conn.Update(d, query, extraArgs)
  1071. c.Assert(err, Equals, nil)
  1072. time.Sleep(200 * time.Millisecond)
  1073. response, err = conn.Get(indexName, docType, docID, url.Values{})
  1074. c.Assert(err, Equals, nil)
  1075. c.Assert(response.Source["user"], Equals, "admin")
  1076. c.Assert(response.Source["message"], Equals, "candybar")
  1077. }
  1078. func (s *GoesTestSuite) TestGetMapping(c *C) {
  1079. indexName := "testmapping"
  1080. docType := "tweet"
  1081. conn := NewClient(ESHost, ESPort)
  1082. // just in case
  1083. conn.DeleteIndex(indexName)
  1084. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  1085. c.Assert(err, IsNil)
  1086. defer conn.DeleteIndex(indexName)
  1087. time.Sleep(300 * time.Millisecond)
  1088. response, err := conn.GetMapping([]string{docType}, []string{indexName})
  1089. c.Assert(err, Equals, nil)
  1090. c.Assert(len(response.Raw), Equals, 0)
  1091. d := Document{
  1092. Index: indexName,
  1093. Type: docType,
  1094. Fields: map[string]interface{}{
  1095. "user": "foo",
  1096. "message": "bar",
  1097. },
  1098. }
  1099. response, err = conn.Index(d, url.Values{})
  1100. c.Assert(err, IsNil)
  1101. time.Sleep(200 * time.Millisecond)
  1102. response, err = conn.GetMapping([]string{docType}, []string{indexName})
  1103. c.Assert(err, Equals, nil)
  1104. c.Assert(len(response.Raw), Not(Equals), 0)
  1105. }
  1106. func (s *GoesTestSuite) TestDeleteMapping(c *C) {
  1107. indexName := "testdeletemapping"
  1108. docType := "tweet"
  1109. conn := NewClient(ESHost, ESPort)
  1110. // just in case
  1111. conn.DeleteIndex(indexName)
  1112. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  1113. c.Assert(err, IsNil)
  1114. defer conn.DeleteIndex(indexName)
  1115. d := Document{
  1116. Index: indexName,
  1117. Type: docType,
  1118. Fields: map[string]interface{}{
  1119. "user": "foo",
  1120. "message": "bar",
  1121. },
  1122. }
  1123. response, err := conn.Index(d, url.Values{})
  1124. c.Assert(err, IsNil)
  1125. mapping := map[string]interface{}{
  1126. "tweet": map[string]interface{}{
  1127. "properties": map[string]interface{}{
  1128. "count": map[string]interface{}{
  1129. "type": "integer",
  1130. "index": "not_analyzed",
  1131. "store": true,
  1132. },
  1133. },
  1134. },
  1135. }
  1136. response, err = conn.PutMapping("tweet", mapping, []string{indexName})
  1137. c.Assert(err, IsNil)
  1138. time.Sleep(200 * time.Millisecond)
  1139. response, err = conn.DeleteMapping("tweet", []string{indexName})
  1140. if version, _ := conn.Version(); version > "2" {
  1141. c.Assert(err, ErrorMatches, ".*not supported.*")
  1142. return
  1143. }
  1144. c.Assert(err, IsNil)
  1145. c.Assert(response.Acknowledged, Equals, true)
  1146. c.Assert(response.TimedOut, Equals, false)
  1147. }
  1148. func (s *GoesTestSuite) TestAddAlias(c *C) {
  1149. aliasName := "testAlias"
  1150. indexName := "testalias_1"
  1151. docType := "testDoc"
  1152. docID := "1234"
  1153. source := map[string]interface{}{
  1154. "user": "foo",
  1155. "message": "bar",
  1156. }
  1157. conn := NewClient(ESHost, ESPort)
  1158. defer conn.DeleteIndex(indexName)
  1159. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  1160. c.Assert(err, IsNil)
  1161. defer conn.DeleteIndex(indexName)
  1162. d := Document{
  1163. Index: indexName,
  1164. Type: docType,
  1165. ID: docID,
  1166. Fields: source,
  1167. }
  1168. // Index data
  1169. _, err = conn.Index(d, url.Values{})
  1170. c.Assert(err, IsNil)
  1171. // Add alias
  1172. _, err = conn.AddAlias(aliasName, []string{indexName})
  1173. c.Assert(err, IsNil)
  1174. // Get document via alias
  1175. response, err := conn.Get(aliasName, docType, docID, url.Values{})
  1176. c.Assert(err, IsNil)
  1177. expectedResponse := &Response{
  1178. Status: 200,
  1179. Index: indexName,
  1180. Type: docType,
  1181. ID: docID,
  1182. Version: 1,
  1183. Found: true,
  1184. Source: source,
  1185. }
  1186. response.Raw = nil
  1187. c.Assert(response, DeepEquals, expectedResponse)
  1188. }
  1189. func (s *GoesTestSuite) TestRemoveAlias(c *C) {
  1190. aliasName := "testAlias"
  1191. indexName := "testalias_1"
  1192. docType := "testDoc"
  1193. docID := "1234"
  1194. source := map[string]interface{}{
  1195. "user": "foo",
  1196. "message": "bar",
  1197. }
  1198. conn := NewClient(ESHost, ESPort)
  1199. defer conn.DeleteIndex(indexName)
  1200. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  1201. c.Assert(err, IsNil)
  1202. defer conn.DeleteIndex(indexName)
  1203. d := Document{
  1204. Index: indexName,
  1205. Type: docType,
  1206. ID: docID,
  1207. Fields: source,
  1208. }
  1209. // Index data
  1210. _, err = conn.Index(d, url.Values{})
  1211. c.Assert(err, IsNil)
  1212. // Add alias
  1213. _, err = conn.AddAlias(aliasName, []string{indexName})
  1214. c.Assert(err, IsNil)
  1215. // Remove alias
  1216. _, err = conn.RemoveAlias(aliasName, []string{indexName})
  1217. c.Assert(err, IsNil)
  1218. // Get document via alias
  1219. _, err = conn.Get(aliasName, docType, docID, url.Values{})
  1220. c.Assert(err.Error(), Matches, "\\[404\\] .*"+aliasName+".*")
  1221. }
  1222. func (s *GoesTestSuite) TestAliasExists(c *C) {
  1223. index := "testaliasexist_1"
  1224. alias := "testaliasexists"
  1225. conn := NewClient(ESHost, ESPort)
  1226. // just in case
  1227. conn.DeleteIndex(index)
  1228. exists, err := conn.AliasExists(alias)
  1229. c.Assert(exists, Equals, false)
  1230. _, err = conn.CreateIndex(index, map[string]interface{}{})
  1231. c.Assert(err, IsNil)
  1232. defer conn.DeleteIndex(index)
  1233. time.Sleep(200 * time.Millisecond)
  1234. _, err = conn.AddAlias(alias, []string{index})
  1235. c.Assert(err, IsNil)
  1236. time.Sleep(200 * time.Millisecond)
  1237. defer conn.RemoveAlias(alias, []string{index})
  1238. exists, _ = conn.AliasExists(alias)
  1239. c.Assert(exists, Equals, true)
  1240. }