You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

1391 lines
32 KiB

  1. // Copyright 2013 Belogik. All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. package goes
  5. import (
  6. . "gopkg.in/check.v1"
  7. "net/http"
  8. "net/url"
  9. "os"
  10. "strings"
  11. "testing"
  12. "time"
  13. )
  14. var (
  15. ES_HOST = "localhost"
  16. ES_PORT = "9200"
  17. )
  18. // Hook up gocheck into the gotest runner.
  19. func Test(t *testing.T) { TestingT(t) }
  20. type GoesTestSuite struct{}
  21. var _ = Suite(&GoesTestSuite{})
  22. func (s *GoesTestSuite) SetUpTest(c *C) {
  23. h := os.Getenv("TEST_ELASTICSEARCH_HOST")
  24. if h != "" {
  25. ES_HOST = h
  26. }
  27. p := os.Getenv("TEST_ELASTICSEARCH_PORT")
  28. if p != "" {
  29. ES_PORT = p
  30. }
  31. }
  32. func (s *GoesTestSuite) TestNewConnection(c *C) {
  33. conn := NewConnection(ES_HOST, ES_PORT)
  34. c.Assert(conn, DeepEquals, &Connection{ES_HOST, ES_PORT, http.DefaultClient})
  35. }
  36. func (s *GoesTestSuite) TestWithClient(c *C) {
  37. tr := &http.Transport{
  38. DisableCompression: true,
  39. ResponseHeaderTimeout: 1 * time.Second,
  40. }
  41. cl := &http.Client{
  42. Transport: tr,
  43. }
  44. conn := NewConnection(ES_HOST, ES_PORT).WithClient(cl)
  45. c.Assert(conn, DeepEquals, &Connection{ES_HOST, ES_PORT, cl})
  46. c.Assert(conn.Client.Transport.(*http.Transport).DisableCompression, Equals, true)
  47. c.Assert(conn.Client.Transport.(*http.Transport).ResponseHeaderTimeout, Equals, 1*time.Second)
  48. }
  49. func (s *GoesTestSuite) TestUrl(c *C) {
  50. conn := NewConnection(ES_HOST, ES_PORT)
  51. r := Request{
  52. Conn: conn,
  53. Query: "q",
  54. IndexList: []string{"i"},
  55. TypeList: []string{},
  56. method: "GET",
  57. api: "_search",
  58. }
  59. c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/i/_search")
  60. r.IndexList = []string{"a", "b"}
  61. c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/_search")
  62. r.TypeList = []string{"c", "d"}
  63. c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/_search")
  64. r.ExtraArgs = make(url.Values, 1)
  65. r.ExtraArgs.Set("version", "1")
  66. c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/_search?version=1")
  67. r.id = "1234"
  68. r.api = ""
  69. c.Assert(r.Url(), Equals, "http://"+ES_HOST+":"+ES_PORT+"/a,b/c,d/1234/?version=1")
  70. }
  71. func (s *GoesTestSuite) TestEsDown(c *C) {
  72. conn := NewConnection("a.b.c.d", "1234")
  73. var query = map[string]interface{}{"query": "foo"}
  74. r := Request{
  75. Conn: conn,
  76. Query: query,
  77. IndexList: []string{"i"},
  78. method: "GET",
  79. api: "_search",
  80. }
  81. _, err := r.Run()
  82. c.Assert(err, ErrorMatches, "Get http://a.b.c.d:1234/i/_search:(.*)lookup a.b.c.d: no such host")
  83. }
  84. func (s *GoesTestSuite) TestRunMissingIndex(c *C) {
  85. conn := NewConnection(ES_HOST, ES_PORT)
  86. var query = map[string]interface{}{"query": "foo"}
  87. r := Request{
  88. Conn: conn,
  89. Query: query,
  90. IndexList: []string{"i"},
  91. method: "GET",
  92. api: "_search",
  93. }
  94. _, err := r.Run()
  95. c.Assert(err.Error(), Equals, "[404] IndexMissingException[[i] missing]")
  96. }
  97. func (s *GoesTestSuite) TestCreateIndex(c *C) {
  98. indexName := "testcreateindexgoes"
  99. conn := NewConnection(ES_HOST, ES_PORT)
  100. defer conn.DeleteIndex(indexName)
  101. mapping := map[string]interface{}{
  102. "settings": map[string]interface{}{
  103. "index.number_of_shards": 1,
  104. "index.number_of_replicas": 0,
  105. },
  106. "mappings": map[string]interface{}{
  107. "_default_": map[string]interface{}{
  108. "_source": map[string]interface{}{
  109. "enabled": false,
  110. },
  111. "_all": map[string]interface{}{
  112. "enabled": false,
  113. },
  114. },
  115. },
  116. }
  117. resp, err := conn.CreateIndex(indexName, mapping)
  118. c.Assert(err, IsNil)
  119. c.Assert(resp.Acknowledged, Equals, true)
  120. }
  121. func (s *GoesTestSuite) TestDeleteIndexInexistantIndex(c *C) {
  122. conn := NewConnection(ES_HOST, ES_PORT)
  123. resp, err := conn.DeleteIndex("foobar")
  124. c.Assert(err.Error(), Equals, "[404] IndexMissingException[[foobar] missing]")
  125. c.Assert(resp, DeepEquals, Response{})
  126. }
  127. func (s *GoesTestSuite) TestDeleteIndexExistingIndex(c *C) {
  128. conn := NewConnection(ES_HOST, ES_PORT)
  129. indexName := "testdeleteindexexistingindex"
  130. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  131. c.Assert(err, IsNil)
  132. resp, err := conn.DeleteIndex(indexName)
  133. c.Assert(err, IsNil)
  134. expectedResponse := Response{}
  135. expectedResponse.Acknowledged = true
  136. resp.Raw = nil
  137. c.Assert(resp, DeepEquals, expectedResponse)
  138. }
  139. func (s *GoesTestSuite) TestRefreshIndex(c *C) {
  140. conn := NewConnection(ES_HOST, ES_PORT)
  141. indexName := "testrefreshindex"
  142. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  143. c.Assert(err, IsNil)
  144. _, err = conn.RefreshIndex(indexName)
  145. c.Assert(err, IsNil)
  146. _, err = conn.DeleteIndex(indexName)
  147. c.Assert(err, IsNil)
  148. }
  149. func (s *GoesTestSuite) TestOptimize(c *C) {
  150. conn := NewConnection(ES_HOST, ES_PORT)
  151. indexName := "testoptimize"
  152. conn.DeleteIndex(indexName)
  153. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  154. c.Assert(err, IsNil)
  155. // we must wait for a bit otherwise ES crashes
  156. time.Sleep(1 * time.Second)
  157. response, err := conn.Optimize([]string{indexName}, url.Values{"max_num_segments": []string{"1"}})
  158. c.Assert(err, IsNil)
  159. c.Assert(response.All.Indices[indexName].Primaries["docs"].Count, Equals, 0)
  160. _, err = conn.DeleteIndex(indexName)
  161. c.Assert(err, IsNil)
  162. }
  163. func (s *GoesTestSuite) TestBulkSend(c *C) {
  164. indexName := "testbulkadd"
  165. docType := "tweet"
  166. tweets := []Document{
  167. {
  168. Id: "123",
  169. Index: indexName,
  170. Type: docType,
  171. BulkCommand: BULK_COMMAND_INDEX,
  172. Fields: map[string]interface{}{
  173. "user": "foo",
  174. "message": "some foo message",
  175. },
  176. },
  177. {
  178. Id: nil,
  179. Index: indexName,
  180. Type: docType,
  181. BulkCommand: BULK_COMMAND_INDEX,
  182. Fields: map[string]interface{}{
  183. "user": "bar",
  184. "message": "some bar message",
  185. },
  186. },
  187. }
  188. conn := NewConnection(ES_HOST, ES_PORT)
  189. conn.DeleteIndex(indexName)
  190. _, err := conn.CreateIndex(indexName, nil)
  191. c.Assert(err, IsNil)
  192. response, err := conn.BulkSend(tweets)
  193. i := Item{
  194. Id: "123",
  195. Type: docType,
  196. Version: 1,
  197. Index: indexName,
  198. Status: 201, //201 for indexing ( https://issues.apache.org/jira/browse/CONNECTORS-634 )
  199. }
  200. c.Assert(response.Items[0][BULK_COMMAND_INDEX], Equals, i)
  201. c.Assert(err, IsNil)
  202. _, err = conn.RefreshIndex(indexName)
  203. c.Assert(err, IsNil)
  204. var query = map[string]interface{}{
  205. "query": map[string]interface{}{
  206. "match_all": map[string]interface{}{},
  207. },
  208. }
  209. searchResults, err := conn.Search(query, []string{indexName}, []string{}, url.Values{})
  210. c.Assert(err, IsNil)
  211. var expectedTotal uint64 = 2
  212. c.Assert(searchResults.Hits.Total, Equals, expectedTotal)
  213. extraDocId := ""
  214. checked := 0
  215. for _, hit := range searchResults.Hits.Hits {
  216. if hit.Source["user"] == "foo" {
  217. c.Assert(hit.Id, Equals, "123")
  218. checked++
  219. }
  220. if hit.Source["user"] == "bar" {
  221. c.Assert(len(hit.Id) > 0, Equals, true)
  222. extraDocId = hit.Id
  223. checked++
  224. }
  225. }
  226. c.Assert(checked, Equals, 2)
  227. docToDelete := []Document{
  228. {
  229. Id: "123",
  230. Index: indexName,
  231. Type: docType,
  232. BulkCommand: BULK_COMMAND_DELETE,
  233. },
  234. {
  235. Id: extraDocId,
  236. Index: indexName,
  237. Type: docType,
  238. BulkCommand: BULK_COMMAND_DELETE,
  239. },
  240. }
  241. response, err = conn.BulkSend(docToDelete)
  242. i = Item{
  243. Id: "123",
  244. Type: docType,
  245. Version: 2,
  246. Index: indexName,
  247. Status: 200, //200 for updates
  248. }
  249. c.Assert(response.Items[0][BULK_COMMAND_DELETE], Equals, i)
  250. c.Assert(err, IsNil)
  251. _, err = conn.RefreshIndex(indexName)
  252. c.Assert(err, IsNil)
  253. searchResults, err = conn.Search(query, []string{indexName}, []string{}, url.Values{})
  254. c.Assert(err, IsNil)
  255. expectedTotal = 0
  256. c.Assert(searchResults.Hits.Total, Equals, expectedTotal)
  257. _, err = conn.DeleteIndex(indexName)
  258. c.Assert(err, IsNil)
  259. }
  260. func (s *GoesTestSuite) TestStats(c *C) {
  261. conn := NewConnection(ES_HOST, ES_PORT)
  262. indexName := "teststats"
  263. conn.DeleteIndex(indexName)
  264. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  265. c.Assert(err, IsNil)
  266. // we must wait for a bit otherwise ES crashes
  267. time.Sleep(1 * time.Second)
  268. response, err := conn.Stats([]string{indexName}, url.Values{})
  269. c.Assert(err, IsNil)
  270. c.Assert(response.All.Indices[indexName].Primaries["docs"].Count, Equals, 0)
  271. _, err = conn.DeleteIndex(indexName)
  272. c.Assert(err, IsNil)
  273. }
  274. func (s *GoesTestSuite) TestIndexWithFieldsInStruct(c *C) {
  275. indexName := "testindexwithfieldsinstruct"
  276. docType := "tweet"
  277. docId := "1234"
  278. conn := NewConnection(ES_HOST, ES_PORT)
  279. // just in case
  280. conn.DeleteIndex(indexName)
  281. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  282. c.Assert(err, IsNil)
  283. defer conn.DeleteIndex(indexName)
  284. d := Document{
  285. Index: indexName,
  286. Type: docType,
  287. Id: docId,
  288. Fields: struct {
  289. user string
  290. message string
  291. }{
  292. "foo",
  293. "bar",
  294. },
  295. }
  296. extraArgs := make(url.Values, 1)
  297. extraArgs.Set("ttl", "86400000")
  298. response, err := conn.Index(d, extraArgs)
  299. c.Assert(err, IsNil)
  300. expectedResponse := Response{
  301. Index: indexName,
  302. Id: docId,
  303. Type: docType,
  304. Version: 1,
  305. }
  306. response.Raw = nil
  307. c.Assert(response, DeepEquals, expectedResponse)
  308. }
  309. func (s *GoesTestSuite) TestIndexWithFieldsNotInMapOrStruct(c *C) {
  310. indexName := "testindexwithfieldsnotinmaporstruct"
  311. docType := "tweet"
  312. docId := "1234"
  313. conn := NewConnection(ES_HOST, ES_PORT)
  314. // just in case
  315. conn.DeleteIndex(indexName)
  316. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  317. c.Assert(err, IsNil)
  318. defer conn.DeleteIndex(indexName)
  319. d := Document{
  320. Index: indexName,
  321. Type: docType,
  322. Id: docId,
  323. Fields: "test",
  324. }
  325. extraArgs := make(url.Values, 1)
  326. extraArgs.Set("ttl", "86400000")
  327. _, err = conn.Index(d, extraArgs)
  328. c.Assert(err, Not(IsNil))
  329. }
  330. func (s *GoesTestSuite) TestIndexIdDefined(c *C) {
  331. indexName := "testindexiddefined"
  332. docType := "tweet"
  333. docId := "1234"
  334. conn := NewConnection(ES_HOST, ES_PORT)
  335. // just in case
  336. conn.DeleteIndex(indexName)
  337. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  338. c.Assert(err, IsNil)
  339. defer conn.DeleteIndex(indexName)
  340. d := Document{
  341. Index: indexName,
  342. Type: docType,
  343. Id: docId,
  344. Fields: map[string]interface{}{
  345. "user": "foo",
  346. "message": "bar",
  347. },
  348. }
  349. extraArgs := make(url.Values, 1)
  350. extraArgs.Set("ttl", "86400000")
  351. response, err := conn.Index(d, extraArgs)
  352. c.Assert(err, IsNil)
  353. expectedResponse := Response{
  354. Index: indexName,
  355. Id: docId,
  356. Type: docType,
  357. Version: 1,
  358. }
  359. response.Raw = nil
  360. c.Assert(response, DeepEquals, expectedResponse)
  361. }
  362. func (s *GoesTestSuite) TestIndexIdNotDefined(c *C) {
  363. indexName := "testindexidnotdefined"
  364. docType := "tweet"
  365. conn := NewConnection(ES_HOST, ES_PORT)
  366. // just in case
  367. conn.DeleteIndex(indexName)
  368. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  369. c.Assert(err, IsNil)
  370. defer conn.DeleteIndex(indexName)
  371. d := Document{
  372. Index: indexName,
  373. Type: docType,
  374. Fields: map[string]interface{}{
  375. "user": "foo",
  376. "message": "bar",
  377. },
  378. }
  379. response, err := conn.Index(d, url.Values{})
  380. c.Assert(err, IsNil)
  381. c.Assert(response.Index, Equals, indexName)
  382. c.Assert(response.Type, Equals, docType)
  383. c.Assert(response.Version, Equals, 1)
  384. c.Assert(response.Id != "", Equals, true)
  385. }
  386. func (s *GoesTestSuite) TestDelete(c *C) {
  387. indexName := "testdelete"
  388. docType := "tweet"
  389. docId := "1234"
  390. conn := NewConnection(ES_HOST, ES_PORT)
  391. // just in case
  392. conn.DeleteIndex(indexName)
  393. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  394. c.Assert(err, IsNil)
  395. defer conn.DeleteIndex(indexName)
  396. d := Document{
  397. Index: indexName,
  398. Type: docType,
  399. Id: docId,
  400. Fields: map[string]interface{}{
  401. "user": "foo",
  402. },
  403. }
  404. _, err = conn.Index(d, url.Values{})
  405. c.Assert(err, IsNil)
  406. response, err := conn.Delete(d, url.Values{})
  407. c.Assert(err, IsNil)
  408. expectedResponse := Response{
  409. Found: true,
  410. Index: indexName,
  411. Type: docType,
  412. Id: docId,
  413. // XXX : even after a DELETE the version number seems to be incremented
  414. Version: 2,
  415. }
  416. response.Raw = nil
  417. c.Assert(response, DeepEquals, expectedResponse)
  418. response, err = conn.Delete(d, url.Values{})
  419. c.Assert(err, IsNil)
  420. expectedResponse = Response{
  421. Found: false,
  422. Index: indexName,
  423. Type: docType,
  424. Id: docId,
  425. // XXX : even after a DELETE the version number seems to be incremented
  426. Version: 3,
  427. }
  428. response.Raw = nil
  429. c.Assert(response, DeepEquals, expectedResponse)
  430. }
  431. func (s *GoesTestSuite) TestDeleteByQuery(c *C) {
  432. indexName := "testdeletebyquery"
  433. docType := "tweet"
  434. docId := "1234"
  435. conn := NewConnection(ES_HOST, ES_PORT)
  436. // just in case
  437. conn.DeleteIndex(indexName)
  438. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  439. c.Assert(err, IsNil)
  440. defer conn.DeleteIndex(indexName)
  441. d := Document{
  442. Index: indexName,
  443. Type: docType,
  444. Id: docId,
  445. Fields: map[string]interface{}{
  446. "user": "foo",
  447. },
  448. }
  449. _, err = conn.Index(d, url.Values{})
  450. c.Assert(err, IsNil)
  451. _, err = conn.RefreshIndex(indexName)
  452. c.Assert(err, IsNil)
  453. query := map[string]interface{}{
  454. "query": map[string]interface{}{
  455. "bool": map[string]interface{}{
  456. "must": []map[string]interface{}{
  457. {
  458. "match_all": map[string]interface{}{},
  459. },
  460. },
  461. },
  462. },
  463. }
  464. //should be 1 doc before delete by query
  465. response, err := conn.Search(query, []string{indexName}, []string{docType}, url.Values{})
  466. c.Assert(err, IsNil)
  467. c.Assert(response.Hits.Total, Equals, uint64(1))
  468. response, err = conn.Query(query, []string{indexName}, []string{docType}, "DELETE", url.Values{})
  469. c.Assert(err, IsNil)
  470. expectedResponse := Response{
  471. Found: false,
  472. Index: "",
  473. Type: "",
  474. Id: "",
  475. Version: 0,
  476. }
  477. response.Raw = nil
  478. c.Assert(response, DeepEquals, expectedResponse)
  479. //should be 0 docs after delete by query
  480. response, err = conn.Search(query, []string{indexName}, []string{docType}, url.Values{})
  481. c.Assert(err, IsNil)
  482. c.Assert(response.Hits.Total, Equals, uint64(0))
  483. }
  484. func (s *GoesTestSuite) TestGet(c *C) {
  485. indexName := "testget"
  486. docType := "tweet"
  487. docId := "111"
  488. source := map[string]interface{}{
  489. "f1": "foo",
  490. "f2": "foo",
  491. }
  492. conn := NewConnection(ES_HOST, ES_PORT)
  493. conn.DeleteIndex(indexName)
  494. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  495. c.Assert(err, IsNil)
  496. defer conn.DeleteIndex(indexName)
  497. d := Document{
  498. Index: indexName,
  499. Type: docType,
  500. Id: docId,
  501. Fields: source,
  502. }
  503. _, err = conn.Index(d, url.Values{})
  504. c.Assert(err, IsNil)
  505. response, err := conn.Get(indexName, docType, docId, url.Values{})
  506. c.Assert(err, IsNil)
  507. expectedResponse := Response{
  508. Index: indexName,
  509. Type: docType,
  510. Id: docId,
  511. Version: 1,
  512. Found: true,
  513. Source: source,
  514. }
  515. response.Raw = nil
  516. c.Assert(response, DeepEquals, expectedResponse)
  517. fields := make(url.Values, 1)
  518. fields.Set("fields", "f1")
  519. response, err = conn.Get(indexName, docType, docId, fields)
  520. c.Assert(err, IsNil)
  521. expectedResponse = Response{
  522. Index: indexName,
  523. Type: docType,
  524. Id: docId,
  525. Version: 1,
  526. Found: true,
  527. Fields: map[string]interface{}{
  528. "f1": []interface{}{"foo"},
  529. },
  530. }
  531. response.Raw = nil
  532. c.Assert(response, DeepEquals, expectedResponse)
  533. }
  534. func (s *GoesTestSuite) TestSearch(c *C) {
  535. indexName := "testsearch"
  536. docType := "tweet"
  537. docId := "1234"
  538. source := map[string]interface{}{
  539. "user": "foo",
  540. "message": "bar",
  541. }
  542. conn := NewConnection(ES_HOST, ES_PORT)
  543. conn.DeleteIndex(indexName)
  544. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  545. c.Assert(err, IsNil)
  546. defer conn.DeleteIndex(indexName)
  547. d := Document{
  548. Index: indexName,
  549. Type: docType,
  550. Id: docId,
  551. Fields: source,
  552. }
  553. _, err = conn.Index(d, url.Values{})
  554. c.Assert(err, IsNil)
  555. _, err = conn.RefreshIndex(indexName)
  556. c.Assert(err, IsNil)
  557. // I can feel my eyes bleeding
  558. query := map[string]interface{}{
  559. "query": map[string]interface{}{
  560. "bool": map[string]interface{}{
  561. "must": []map[string]interface{}{
  562. {
  563. "match_all": map[string]interface{}{},
  564. },
  565. },
  566. },
  567. },
  568. }
  569. response, err := conn.Search(query, []string{indexName}, []string{docType}, url.Values{})
  570. expectedHits := Hits{
  571. Total: 1,
  572. MaxScore: 1.0,
  573. Hits: []Hit{
  574. {
  575. Index: indexName,
  576. Type: docType,
  577. Id: docId,
  578. Score: 1.0,
  579. Source: source,
  580. },
  581. },
  582. }
  583. c.Assert(response.Hits, DeepEquals, expectedHits)
  584. }
  585. func (s *GoesTestSuite) TestCount(c *C) {
  586. indexName := "testcount"
  587. docType := "tweet"
  588. docId := "1234"
  589. source := map[string]interface{}{
  590. "user": "foo",
  591. "message": "bar",
  592. }
  593. conn := NewConnection(ES_HOST, ES_PORT)
  594. conn.DeleteIndex(indexName)
  595. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  596. c.Assert(err, IsNil)
  597. defer conn.DeleteIndex(indexName)
  598. d := Document{
  599. Index: indexName,
  600. Type: docType,
  601. Id: docId,
  602. Fields: source,
  603. }
  604. _, err = conn.Index(d, url.Values{})
  605. c.Assert(err, IsNil)
  606. _, err = conn.RefreshIndex(indexName)
  607. c.Assert(err, IsNil)
  608. // I can feel my eyes bleeding
  609. query := map[string]interface{}{
  610. "query": map[string]interface{}{
  611. "bool": map[string]interface{}{
  612. "must": []map[string]interface{}{
  613. {
  614. "match_all": map[string]interface{}{},
  615. },
  616. },
  617. },
  618. },
  619. }
  620. response, err := conn.Count(query, []string{indexName}, []string{docType}, url.Values{})
  621. c.Assert(response.Count, Equals, 1)
  622. }
  623. func (s *GoesTestSuite) TestIndexStatus(c *C) {
  624. indexName := "testindexstatus"
  625. conn := NewConnection(ES_HOST, ES_PORT)
  626. conn.DeleteIndex(indexName)
  627. mapping := map[string]interface{}{
  628. "settings": map[string]interface{}{
  629. "index.number_of_shards": 1,
  630. "index.number_of_replicas": 1,
  631. },
  632. }
  633. _, err := conn.CreateIndex(indexName, mapping)
  634. c.Assert(err, IsNil)
  635. defer conn.DeleteIndex(indexName)
  636. // gives ES some time to do its job
  637. time.Sleep(1 * time.Second)
  638. response, err := conn.IndexStatus([]string{"testindexstatus"})
  639. c.Assert(err, IsNil)
  640. expectedShards := Shard{Total: 2, Successful: 1, Failed: 0}
  641. c.Assert(response.Shards, Equals, expectedShards)
  642. primarySizeInBytes := response.Indices[indexName].Index["primary_size_in_bytes"].(float64)
  643. sizeInBytes := response.Indices[indexName].Index["size_in_bytes"].(float64)
  644. c.Assert(primarySizeInBytes > 0, Equals, true)
  645. c.Assert(sizeInBytes > 0, Equals, true)
  646. expectedIndices := map[string]IndexStatus{
  647. indexName: {
  648. Index: map[string]interface{}{
  649. "primary_size_in_bytes": primarySizeInBytes,
  650. "size_in_bytes": sizeInBytes,
  651. },
  652. Translog: map[string]uint64{
  653. "operations": 0,
  654. },
  655. Docs: map[string]uint64{
  656. "num_docs": 0,
  657. "max_doc": 0,
  658. "deleted_docs": 0,
  659. },
  660. Merges: map[string]interface{}{
  661. "current": float64(0),
  662. "current_docs": float64(0),
  663. "current_size_in_bytes": float64(0),
  664. "total": float64(0),
  665. "total_time_in_millis": float64(0),
  666. "total_docs": float64(0),
  667. "total_size_in_bytes": float64(0),
  668. },
  669. Refresh: map[string]interface{}{
  670. "total": float64(1),
  671. "total_time_in_millis": float64(0),
  672. },
  673. Flush: map[string]interface{}{
  674. "total": float64(0),
  675. "total_time_in_millis": float64(0),
  676. },
  677. },
  678. }
  679. c.Assert(response.Indices, DeepEquals, expectedIndices)
  680. }
  681. func (s *GoesTestSuite) TestScroll(c *C) {
  682. indexName := "testscroll"
  683. docType := "tweet"
  684. tweets := []Document{
  685. {
  686. Id: nil,
  687. Index: indexName,
  688. Type: docType,
  689. BulkCommand: BULK_COMMAND_INDEX,
  690. Fields: map[string]interface{}{
  691. "user": "foo",
  692. "message": "some foo message",
  693. },
  694. },
  695. {
  696. Id: nil,
  697. Index: indexName,
  698. Type: docType,
  699. BulkCommand: BULK_COMMAND_INDEX,
  700. Fields: map[string]interface{}{
  701. "user": "bar",
  702. "message": "some bar message",
  703. },
  704. },
  705. {
  706. Id: nil,
  707. Index: indexName,
  708. Type: docType,
  709. BulkCommand: BULK_COMMAND_INDEX,
  710. Fields: map[string]interface{}{
  711. "user": "foo",
  712. "message": "another foo message",
  713. },
  714. },
  715. }
  716. conn := NewConnection(ES_HOST, ES_PORT)
  717. mapping := map[string]interface{}{
  718. "settings": map[string]interface{}{
  719. "index.number_of_shards": 1,
  720. "index.number_of_replicas": 0,
  721. },
  722. }
  723. defer conn.DeleteIndex(indexName)
  724. _, err := conn.CreateIndex(indexName, mapping)
  725. c.Assert(err, IsNil)
  726. _, err = conn.BulkSend(tweets)
  727. c.Assert(err, IsNil)
  728. _, err = conn.RefreshIndex(indexName)
  729. c.Assert(err, IsNil)
  730. query := map[string]interface{}{
  731. "query": map[string]interface{}{
  732. "filtered": map[string]interface{}{
  733. "filter": map[string]interface{}{
  734. "term": map[string]interface{}{
  735. "user": "foo",
  736. },
  737. },
  738. },
  739. },
  740. }
  741. scan, err := conn.Scan(query, []string{indexName}, []string{docType}, "1m", 1)
  742. c.Assert(err, IsNil)
  743. c.Assert(len(scan.ScrollId) > 0, Equals, true)
  744. searchResults, err := conn.Scroll(scan.ScrollId, "1m")
  745. c.Assert(err, IsNil)
  746. // some data in first chunk
  747. c.Assert(searchResults.Hits.Total, Equals, uint64(2))
  748. c.Assert(len(searchResults.ScrollId) > 0, Equals, true)
  749. c.Assert(len(searchResults.Hits.Hits), Equals, 1)
  750. searchResults, err = conn.Scroll(searchResults.ScrollId, "1m")
  751. c.Assert(err, IsNil)
  752. // more data in second chunk
  753. c.Assert(searchResults.Hits.Total, Equals, uint64(2))
  754. c.Assert(len(searchResults.ScrollId) > 0, Equals, true)
  755. c.Assert(len(searchResults.Hits.Hits), Equals, 1)
  756. searchResults, err = conn.Scroll(searchResults.ScrollId, "1m")
  757. c.Assert(err, IsNil)
  758. // nothing in third chunk
  759. c.Assert(searchResults.Hits.Total, Equals, uint64(2))
  760. c.Assert(len(searchResults.ScrollId) > 0, Equals, true)
  761. c.Assert(len(searchResults.Hits.Hits), Equals, 0)
  762. }
  763. func (s *GoesTestSuite) TestAggregations(c *C) {
  764. indexName := "testaggs"
  765. docType := "tweet"
  766. tweets := []Document{
  767. {
  768. Id: nil,
  769. Index: indexName,
  770. Type: docType,
  771. BulkCommand: BULK_COMMAND_INDEX,
  772. Fields: map[string]interface{}{
  773. "user": "foo",
  774. "message": "some foo message",
  775. "age": 25,
  776. },
  777. },
  778. {
  779. Id: nil,
  780. Index: indexName,
  781. Type: docType,
  782. BulkCommand: BULK_COMMAND_INDEX,
  783. Fields: map[string]interface{}{
  784. "user": "bar",
  785. "message": "some bar message",
  786. "age": 30,
  787. },
  788. },
  789. {
  790. Id: nil,
  791. Index: indexName,
  792. Type: docType,
  793. BulkCommand: BULK_COMMAND_INDEX,
  794. Fields: map[string]interface{}{
  795. "user": "foo",
  796. "message": "another foo message",
  797. },
  798. },
  799. }
  800. conn := NewConnection(ES_HOST, ES_PORT)
  801. mapping := map[string]interface{}{
  802. "settings": map[string]interface{}{
  803. "index.number_of_shards": 1,
  804. "index.number_of_replicas": 0,
  805. },
  806. }
  807. defer conn.DeleteIndex(indexName)
  808. _, err := conn.CreateIndex(indexName, mapping)
  809. c.Assert(err, IsNil)
  810. _, err = conn.BulkSend(tweets)
  811. c.Assert(err, IsNil)
  812. _, err = conn.RefreshIndex(indexName)
  813. c.Assert(err, IsNil)
  814. query := map[string]interface{}{
  815. "aggs": map[string]interface{}{
  816. "user": map[string]interface{}{
  817. "terms": map[string]interface{}{
  818. "field": "user",
  819. "order": map[string]interface{}{
  820. "_term": "asc",
  821. },
  822. },
  823. "aggs": map[string]interface{}{
  824. "age": map[string]interface{}{
  825. "stats": map[string]interface{}{
  826. "field": "age",
  827. },
  828. },
  829. },
  830. },
  831. "age": map[string]interface{}{
  832. "stats": map[string]interface{}{
  833. "field": "age",
  834. },
  835. },
  836. },
  837. }
  838. resp, err := conn.Search(query, []string{indexName}, []string{docType}, url.Values{})
  839. user, ok := resp.Aggregations["user"]
  840. c.Assert(ok, Equals, true)
  841. c.Assert(len(user.Buckets()), Equals, 2)
  842. c.Assert(user.Buckets()[0].Key(), Equals, "bar")
  843. c.Assert(user.Buckets()[1].Key(), Equals, "foo")
  844. barAge := user.Buckets()[0].Aggregation("age")
  845. c.Assert(barAge["count"], Equals, 1.0)
  846. c.Assert(barAge["sum"], Equals, 30.0)
  847. fooAge := user.Buckets()[1].Aggregation("age")
  848. c.Assert(fooAge["count"], Equals, 1.0)
  849. c.Assert(fooAge["sum"], Equals, 25.0)
  850. age, ok := resp.Aggregations["age"]
  851. c.Assert(ok, Equals, true)
  852. c.Assert(age["count"], Equals, 2.0)
  853. c.Assert(age["sum"], Equals, 25.0+30.0)
  854. }
  855. func (s *GoesTestSuite) TestPutMapping(c *C) {
  856. indexName := "testputmapping"
  857. docType := "tweet"
  858. conn := NewConnection(ES_HOST, ES_PORT)
  859. // just in case
  860. conn.DeleteIndex(indexName)
  861. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  862. c.Assert(err, IsNil)
  863. defer conn.DeleteIndex(indexName)
  864. d := Document{
  865. Index: indexName,
  866. Type: docType,
  867. Fields: map[string]interface{}{
  868. "user": "foo",
  869. "message": "bar",
  870. },
  871. }
  872. response, err := conn.Index(d, url.Values{})
  873. c.Assert(err, IsNil)
  874. mapping := map[string]interface{}{
  875. "tweet": map[string]interface{}{
  876. "properties": map[string]interface{}{
  877. "count": map[string]interface{}{
  878. "type": "integer",
  879. "index": "not_analyzed",
  880. "store": true,
  881. },
  882. },
  883. },
  884. }
  885. response, err = conn.PutMapping("tweet", mapping, []string{indexName})
  886. c.Assert(err, IsNil)
  887. c.Assert(response.Acknowledged, Equals, true)
  888. c.Assert(response.TimedOut, Equals, false)
  889. }
  890. func (s *GoesTestSuite) TestIndicesExist(c *C) {
  891. indices := []string{"testindicesexist"}
  892. conn := NewConnection(ES_HOST, ES_PORT)
  893. // just in case
  894. conn.DeleteIndex(indices[0])
  895. exists, err := conn.IndicesExist(indices)
  896. c.Assert(exists, Equals, false)
  897. _, err = conn.CreateIndex(indices[0], map[string]interface{}{})
  898. c.Assert(err, IsNil)
  899. defer conn.DeleteIndex(indices[0])
  900. time.Sleep(200 * time.Millisecond)
  901. exists, err = conn.IndicesExist(indices)
  902. c.Assert(exists, Equals, true)
  903. indices = append(indices, "nonexistent")
  904. exists, err = conn.IndicesExist(indices)
  905. c.Assert(exists, Equals, false)
  906. }
  907. func (s *GoesTestSuite) TestUpdate(c *C) {
  908. indexName := "testupdate"
  909. docType := "tweet"
  910. docId := "1234"
  911. conn := NewConnection(ES_HOST, ES_PORT)
  912. // just in case
  913. conn.DeleteIndex(indexName)
  914. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  915. c.Assert(err, IsNil)
  916. defer conn.DeleteIndex(indexName)
  917. d := Document{
  918. Index: indexName,
  919. Type: docType,
  920. Id: docId,
  921. Fields: map[string]interface{}{
  922. "user": "foo",
  923. "message": "bar",
  924. "counter": 1,
  925. },
  926. }
  927. extraArgs := make(url.Values, 1)
  928. response, err := conn.Index(d, extraArgs)
  929. c.Assert(err, IsNil)
  930. time.Sleep(200 * time.Millisecond)
  931. expectedResponse := Response{
  932. Index: indexName,
  933. Id: docId,
  934. Type: docType,
  935. Version: 1,
  936. }
  937. response.Raw = nil
  938. c.Assert(response, DeepEquals, expectedResponse)
  939. // Now that we have an ordinary document indexed, try updating it
  940. query := map[string]interface{}{
  941. "script": "ctx._source.counter += count",
  942. "params": map[string]interface{}{
  943. "count": 5,
  944. },
  945. "upsert": map[string]interface{}{
  946. "message": "candybar",
  947. "user": "admin",
  948. "counter": 1,
  949. },
  950. }
  951. response, err = conn.Update(d, query, extraArgs)
  952. if err != nil && strings.Contains(err.(*SearchError).Msg, "dynamic scripting disabled") {
  953. c.Skip("Scripting is disabled on server, skipping this test")
  954. return
  955. }
  956. time.Sleep(200 * time.Millisecond)
  957. c.Assert(err, Equals, nil)
  958. response, err = conn.Get(indexName, docType, docId, url.Values{})
  959. c.Assert(err, Equals, nil)
  960. c.Assert(response.Source["counter"], Equals, float64(6))
  961. c.Assert(response.Source["user"], Equals, "foo")
  962. c.Assert(response.Source["message"], Equals, "bar")
  963. // Test another document, non-existent
  964. docId = "555"
  965. d.Id = docId
  966. response, err = conn.Update(d, query, extraArgs)
  967. c.Assert(err, Equals, nil)
  968. time.Sleep(200 * time.Millisecond)
  969. response, err = conn.Get(indexName, docType, docId, url.Values{})
  970. c.Assert(err, Equals, nil)
  971. c.Assert(response.Source["user"], Equals, "admin")
  972. c.Assert(response.Source["message"], Equals, "candybar")
  973. }
  974. func (s *GoesTestSuite) TestGetMapping(c *C) {
  975. indexName := "testmapping"
  976. docType := "tweet"
  977. conn := NewConnection(ES_HOST, ES_PORT)
  978. // just in case
  979. conn.DeleteIndex(indexName)
  980. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  981. c.Assert(err, IsNil)
  982. defer conn.DeleteIndex(indexName)
  983. time.Sleep(300 * time.Millisecond)
  984. response, err := conn.GetMapping([]string{docType}, []string{indexName})
  985. c.Assert(err, Equals, nil)
  986. c.Assert(len(response.Raw), Equals, 0)
  987. d := Document{
  988. Index: indexName,
  989. Type: docType,
  990. Fields: map[string]interface{}{
  991. "user": "foo",
  992. "message": "bar",
  993. },
  994. }
  995. response, err = conn.Index(d, url.Values{})
  996. c.Assert(err, IsNil)
  997. time.Sleep(200 * time.Millisecond)
  998. response, err = conn.GetMapping([]string{docType}, []string{indexName})
  999. c.Assert(err, Equals, nil)
  1000. c.Assert(len(response.Raw), Not(Equals), 0)
  1001. }
  1002. func (s *GoesTestSuite) TestDeleteMapping(c *C) {
  1003. indexName := "testdeletemapping"
  1004. docType := "tweet"
  1005. conn := NewConnection(ES_HOST, ES_PORT)
  1006. // just in case
  1007. conn.DeleteIndex(indexName)
  1008. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  1009. c.Assert(err, IsNil)
  1010. defer conn.DeleteIndex(indexName)
  1011. d := Document{
  1012. Index: indexName,
  1013. Type: docType,
  1014. Fields: map[string]interface{}{
  1015. "user": "foo",
  1016. "message": "bar",
  1017. },
  1018. }
  1019. response, err := conn.Index(d, url.Values{})
  1020. c.Assert(err, IsNil)
  1021. mapping := map[string]interface{}{
  1022. "tweet": map[string]interface{}{
  1023. "properties": map[string]interface{}{
  1024. "count": map[string]interface{}{
  1025. "type": "integer",
  1026. "index": "not_analyzed",
  1027. "store": true,
  1028. },
  1029. },
  1030. },
  1031. }
  1032. response, err = conn.PutMapping("tweet", mapping, []string{indexName})
  1033. c.Assert(err, IsNil)
  1034. time.Sleep(200 * time.Millisecond)
  1035. response, err = conn.DeleteMapping("tweet", []string{indexName})
  1036. c.Assert(err, IsNil)
  1037. c.Assert(response.Acknowledged, Equals, true)
  1038. c.Assert(response.TimedOut, Equals, false)
  1039. }
  1040. func (s *GoesTestSuite) TestAddAlias(c *C) {
  1041. aliasName := "testAlias"
  1042. indexName := "testalias_1"
  1043. docType := "testDoc"
  1044. docId := "1234"
  1045. source := map[string]interface{}{
  1046. "user": "foo",
  1047. "message": "bar",
  1048. }
  1049. conn := NewConnection(ES_HOST, ES_PORT)
  1050. defer conn.DeleteIndex(indexName)
  1051. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  1052. c.Assert(err, IsNil)
  1053. defer conn.DeleteIndex(indexName)
  1054. d := Document{
  1055. Index: indexName,
  1056. Type: docType,
  1057. Id: docId,
  1058. Fields: source,
  1059. }
  1060. // Index data
  1061. _, err = conn.Index(d, url.Values{})
  1062. c.Assert(err, IsNil)
  1063. // Add alias
  1064. _, err = conn.AddAlias(aliasName, []string{indexName})
  1065. c.Assert(err, IsNil)
  1066. // Get document via alias
  1067. response, err := conn.Get(aliasName, docType, docId, url.Values{})
  1068. c.Assert(err, IsNil)
  1069. expectedResponse := Response{
  1070. Index: indexName,
  1071. Type: docType,
  1072. Id: docId,
  1073. Version: 1,
  1074. Found: true,
  1075. Source: source,
  1076. }
  1077. response.Raw = nil
  1078. c.Assert(response, DeepEquals, expectedResponse)
  1079. }
  1080. func (s *GoesTestSuite) TestRemoveAlias(c *C) {
  1081. aliasName := "testAlias"
  1082. indexName := "testalias_1"
  1083. docType := "testDoc"
  1084. docId := "1234"
  1085. source := map[string]interface{}{
  1086. "user": "foo",
  1087. "message": "bar",
  1088. }
  1089. conn := NewConnection(ES_HOST, ES_PORT)
  1090. defer conn.DeleteIndex(indexName)
  1091. _, err := conn.CreateIndex(indexName, map[string]interface{}{})
  1092. c.Assert(err, IsNil)
  1093. defer conn.DeleteIndex(indexName)
  1094. d := Document{
  1095. Index: indexName,
  1096. Type: docType,
  1097. Id: docId,
  1098. Fields: source,
  1099. }
  1100. // Index data
  1101. _, err = conn.Index(d, url.Values{})
  1102. c.Assert(err, IsNil)
  1103. // Add alias
  1104. _, err = conn.AddAlias(aliasName, []string{indexName})
  1105. c.Assert(err, IsNil)
  1106. // Remove alias
  1107. _, err = conn.RemoveAlias(aliasName, []string{indexName})
  1108. c.Assert(err, IsNil)
  1109. // Get document via alias
  1110. _, err = conn.Get(aliasName, docType, docId, url.Values{})
  1111. c.Assert(err.Error(), Equals, "[404] IndexMissingException[["+aliasName+"] missing]")
  1112. }
  1113. func (s *GoesTestSuite) TestAliasExists(c *C) {
  1114. index := "testaliasexist_1"
  1115. alias := "testaliasexists"
  1116. conn := NewConnection(ES_HOST, ES_PORT)
  1117. // just in case
  1118. conn.DeleteIndex(index)
  1119. exists, err := conn.AliasExists(alias)
  1120. c.Assert(exists, Equals, false)
  1121. _, err = conn.CreateIndex(index, map[string]interface{}{})
  1122. c.Assert(err, IsNil)
  1123. defer conn.DeleteIndex(index)
  1124. time.Sleep(200 * time.Millisecond)
  1125. _, err = conn.AddAlias(alias, []string{index})
  1126. c.Assert(err, IsNil)
  1127. time.Sleep(200 * time.Millisecond)
  1128. defer conn.RemoveAlias(alias, []string{index})
  1129. exists, err = conn.AliasExists(alias)
  1130. c.Assert(exists, Equals, true)
  1131. }