Added LiveLiveXapianSearchQueryTestCase

This commit is contained in:
David Sauve 2009-12-04 14:16:08 -05:00
parent 8ed2e9196e
commit da34c32849
3 changed files with 327 additions and 280 deletions

View File

@ -15,6 +15,7 @@ from haystack import indexes, sites, backends
from haystack.backends.xapian_backend import SearchBackend, SearchQuery, _marshal_value
from haystack.exceptions import HaystackError
from haystack.query import SearchQuerySet, SQ
from haystack.sites import SearchSite
from core.models import MockTag, MockModel, AnotherMockModel
@ -58,285 +59,337 @@ class XapianMockSearchIndex(indexes.SearchIndex):
return ['%d' % (i * obj.id) for i in xrange(1, 4)]
class XapianSearchSite(sites.SearchSite):
pass
# class XapianSearchBackendTestCase(TestCase):
# def setUp(self):
# super(XapianSearchBackendTestCase, self).setUp()
#
# self.site = SearchSite()
# self.sb = SearchBackend(site=self.site)
# self.msi = XapianMockSearchIndex(XapianMockModel, backend=self.sb)
# self.site.register(XapianMockModel, XapianMockSearchIndex)
#
# self.sample_objs = []
#
# for i in xrange(1, 4):
# mock = XapianMockModel()
# mock.id = i
# mock.author = 'david%s' % i
# mock.pub_date = datetime.date(2009, 2, 25) - datetime.timedelta(days=i)
# mock.value = i * 5
# mock.flag = bool(i % 2)
# mock.slug = 'http://example.com/%d' % i
# self.sample_objs.append(mock)
#
# self.sample_objs[0].popularity = 834.0
# self.sample_objs[1].popularity = 35.5
# self.sample_objs[2].popularity = 972.0
#
# def tearDown(self):
# if os.path.exists(settings.HAYSTACK_XAPIAN_PATH):
# shutil.rmtree(settings.HAYSTACK_XAPIAN_PATH)
#
# super(XapianSearchBackendTestCase, self).tearDown()
#
# def xapian_search(self, query_string):
# database = xapian.Database(settings.HAYSTACK_XAPIAN_PATH)
# if query_string:
# qp = xapian.QueryParser()
# qp.set_database(database)
# query = qp.parse_query(query_string, xapian.QueryParser.FLAG_WILDCARD)
# else:
# query = xapian.Query(query_string) # Empty query matches all
# enquire = xapian.Enquire(database)
# enquire.set_query(query)
# matches = enquire.get_mset(0, database.get_doccount())
#
# document_list = []
#
# for match in matches:
# document = match.get_document()
# app_label, module_name, pk, model_data = pickle.loads(document.get_data())
# for key, value in model_data.iteritems():
# model_data[key] = _marshal_value(value)
# model_data['id'] = u'%s.%s.%d' % (app_label, module_name, pk)
# document_list.append(model_data)
#
# return document_list
#
# def silly_test(self):
#
# self.sb.update(self.msi, self.sample_objs)
#
# self.assertEqual(len(self.xapian_search('indexed')), 3)
# self.assertEqual(len(self.xapian_search('Indexed')), 3)
#
# def test_update(self):
# self.sb.update(self.msi, self.sample_objs)
#
# self.assertEqual(len(self.xapian_search('')), 3)
# self.assertEqual([dict(doc) for doc in self.xapian_search('')], [
# {'flag': u't', 'name': u'david1', 'text': u'indexed!\n1', 'sites': u"['1', '2', '3']", 'pub_date': u'20090224000000', 'value': u'000000000005', 'id': u'tests.xapianmockmodel.1', 'slug': u'http://example.com/1', 'popularity': '\xca\x84', 'django_id': u'1', 'django_ct': u'tests.xapianmockmodel'},
# {'flag': u'f', 'name': u'david2', 'text': u'indexed!\n2', 'sites': u"['2', '4', '6']", 'pub_date': u'20090223000000', 'value': u'000000000010', 'id': u'tests.xapianmockmodel.2', 'slug': u'http://example.com/2', 'popularity': '\xb4p', 'django_id': u'2', 'django_ct': u'tests.xapianmockmodel'},
# {'flag': u't', 'name': u'david3', 'text': u'indexed!\n3', 'sites': u"['3', '6', '9']", 'pub_date': u'20090222000000', 'value': u'000000000015', 'id': u'tests.xapianmockmodel.3', 'slug': u'http://example.com/3', 'popularity': '\xcb\x98', 'django_id': u'3', 'django_ct': u'tests.xapianmockmodel'}
# ])
#
# def test_duplicate_update(self):
# self.sb.update(self.msi, self.sample_objs)
# self.sb.update(self.msi, self.sample_objs) # Duplicates should be updated, not appended -- http://github.com/notanumber/xapian-haystack/issues/#issue/6
#
# self.assertEqual(len(self.xapian_search('')), 3)
#
# def test_remove(self):
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.sb.remove(self.sample_objs[0])
# self.assertEqual(len(self.xapian_search('')), 2)
# self.assertEqual([dict(doc) for doc in self.xapian_search('')], [
# {'flag': u'f', 'name': u'david2', 'text': u'indexed!\n2', 'sites': u"['2', '4', '6']", 'pub_date': u'20090223000000', 'value': u'000000000010', 'id': u'tests.xapianmockmodel.2', 'slug': u'http://example.com/2', 'popularity': '\xb4p', 'django_id': u'2', 'django_ct': u'tests.xapianmockmodel'},
# {'flag': u't', 'name': u'david3', 'text': u'indexed!\n3', 'sites': u"['3', '6', '9']", 'pub_date': u'20090222000000', 'value': u'000000000015', 'id': u'tests.xapianmockmodel.3', 'slug': u'http://example.com/3', 'popularity': '\xcb\x98', 'django_id': u'3', 'django_ct': u'tests.xapianmockmodel'}
# ])
#
# def test_clear(self):
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.sb.clear()
# self.assertEqual(len(self.xapian_search('')), 0)
#
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.sb.clear([AnotherMockModel])
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.sb.clear([XapianMockModel])
# self.assertEqual(len(self.xapian_search('')), 0)
#
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.sb.clear([AnotherMockModel, XapianMockModel])
# self.assertEqual(len(self.xapian_search('')), 0)
#
# def test_search(self):
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.assertEqual(self.sb.search(xapian.Query()), {'hits': 0, 'results': []})
# self.assertEqual(self.sb.search(xapian.Query(''))['hits'], 3)
# self.assertEqual([result.pk for result in self.sb.search(xapian.Query(''))['results']], [1, 2, 3])
# self.assertEqual(self.sb.search(xapian.Query('indexed'))['hits'], 3)
# self.assertEqual([result.pk for result in self.sb.search(xapian.Query(''))['results']], [1, 2, 3])
#
# def test_field_facets(self):
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.assertEqual(self.sb.search(xapian.Query(), facets=['name']), {'hits': 0, 'results': []})
# results = self.sb.search(xapian.Query('indexed'), facets=['name'])
# self.assertEqual(results['hits'], 3)
# self.assertEqual(results['facets']['fields']['name'], [('david1', 1), ('david2', 1), ('david3', 1)])
#
# results = self.sb.search(xapian.Query('indexed'), facets=['flag'])
# self.assertEqual(results['hits'], 3)
# self.assertEqual(results['facets']['fields']['flag'], [(False, 1), (True, 2)])
#
# results = self.sb.search(xapian.Query('indexed'), facets=['sites'])
# self.assertEqual(results['hits'], 3)
# self.assertEqual(results['facets']['fields']['sites'], [('1', 1), ('3', 2), ('2', 2), ('4', 1), ('6', 2), ('9', 1)])
#
# def test_date_facets(self):
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.assertEqual(self.sb.search(xapian.Query(), date_facets={'pub_date': {'start_date': datetime.datetime(2008, 10, 26), 'end_date': datetime.datetime(2009, 3, 26), 'gap_by': 'month'}}), {'hits': 0, 'results': []})
# results = self.sb.search(xapian.Query('indexed'), date_facets={'pub_date': {'start_date': datetime.datetime(2008, 10, 26), 'end_date': datetime.datetime(2009, 3, 26), 'gap_by': 'month'}})
# self.assertEqual(results['hits'], 3)
# self.assertEqual(results['facets']['dates']['pub_date'], [
# ('2009-02-26T00:00:00', 0),
# ('2009-01-26T00:00:00', 3),
# ('2008-12-26T00:00:00', 0),
# ('2008-11-26T00:00:00', 0),
# ('2008-10-26T00:00:00', 0),
# ])
#
# results = self.sb.search(xapian.Query('indexed'), date_facets={'pub_date': {'start_date': datetime.datetime(2009, 02, 01), 'end_date': datetime.datetime(2009, 3, 15), 'gap_by': 'day', 'gap_amount': 15}})
# self.assertEqual(results['hits'], 3)
# self.assertEqual(results['facets']['dates']['pub_date'], [
# ('2009-03-03T00:00:00', 0),
# ('2009-02-16T00:00:00', 3),
# ('2009-02-01T00:00:00', 0)
# ])
#
# # def test_query_facets(self):
# # self.sb.update(self.msi, self.sample_objs)
# # self.assertEqual(len(self.xapian_search('')), 3)
# #
# # self.assertEqual(self.sb.search(xapian.Query(), query_facets={'name': 'da*', {'hits': 0, 'results': []})
# # results = self.sb.search(xapian.Query('index'), query_facets={'name': 'da*'})
# # self.assertEqual(results['hits'], 3)
# # self.assertEqual(results['facets']['queries']['name'], ('da*', 3))
#
# def test_narrow_queries(self):
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.assertEqual(self.sb.search(xapian.Query(), narrow_queries=set([xapian.Query('XNAMEdavid1')])), {'hits': 0, 'results': []})
# results = self.sb.search(xapian.Query('indexed'), narrow_queries=set([xapian.Query('XNAMEdavid1')]))
# self.assertEqual(results['hits'], 1)
#
# def test_highlight(self):
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.assertEqual(self.sb.search(xapian.Query(), highlight=True), {'hits': 0, 'results': []})
# self.assertEqual(self.sb.search(xapian.Query('indexed'), highlight=True)['hits'], 3)
# self.assertEqual([result.highlighted['text'] for result in self.sb.search(xapian.Query('indexed'), highlight=True)['results']], ['<em>indexed</em>!\n1', '<em>indexed</em>!\n2', '<em>indexed</em>!\n3'])
#
# def test_spelling_suggestion(self):
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.assertEqual(self.sb.search(xapian.Query('indxe'))['hits'], 0)
# self.assertEqual(self.sb.search(xapian.Query('indxe'))['spelling_suggestion'], 'indexed')
#
# self.assertEqual(self.sb.search(xapian.Query('indxed'))['hits'], 0)
# self.assertEqual(self.sb.search(xapian.Query('indxed'))['spelling_suggestion'], 'indexed')
#
# self.assertEqual(self.sb.search(xapian.Query('foo'))['hits'], 0)
# self.assertEqual(self.sb.search(xapian.Query('foo'), spelling_query='indexy')['spelling_suggestion'], 'indexed')
#
# self.assertEqual(self.sb.search(xapian.Query('XNAMEdavid'))['hits'], 0)
# self.assertEqual(self.sb.search(xapian.Query('XNAMEdavid'))['spelling_suggestion'], 'david1')
#
# def test_more_like_this(self):
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# results = self.sb.more_like_this(self.sample_objs[0])
# self.assertEqual(results['hits'], 2)
# self.assertEqual([result.pk for result in results['results']], [3, 2])
#
# results = self.sb.more_like_this(self.sample_objs[0], additional_query=xapian.Query('david3'))
# self.assertEqual(results['hits'], 1)
# self.assertEqual([result.pk for result in results['results']], [3])
#
# results = self.sb.more_like_this(self.sample_objs[0], limit_to_registered_models=True)
# self.assertEqual(results['hits'], 2)
# self.assertEqual([result.pk for result in results['results']], [3, 2])
#
# def test_order_by(self):
# self.sb.update(self.msi, self.sample_objs)
#
# results = self.sb.search(xapian.Query(''), sort_by=['pub_date'])
# self.assertEqual([result.pk for result in results['results']], [3, 2, 1])
#
# results = self.sb.search(xapian.Query(''), sort_by=['-pub_date'])
# self.assertEqual([result.pk for result in results['results']], [1, 2, 3])
#
# results = self.sb.search(xapian.Query(''), sort_by=['id'])
# self.assertEqual([result.pk for result in results['results']], [1, 2, 3])
#
# results = self.sb.search(xapian.Query(''), sort_by=['-id'])
# self.assertEqual([result.pk for result in results['results']], [3, 2, 1])
#
# results = self.sb.search(xapian.Query(''), sort_by=['value'])
# self.assertEqual([result.pk for result in results['results']], [1, 2, 3])
#
# results = self.sb.search(xapian.Query(''), sort_by=['-value'])
# self.assertEqual([result.pk for result in results['results']], [3, 2, 1])
#
# results = self.sb.search(xapian.Query(''), sort_by=['popularity'])
# self.assertEqual([result.pk for result in results['results']], [2, 1, 3])
#
# results = self.sb.search(xapian.Query(''), sort_by=['-popularity'])
# self.assertEqual([result.pk for result in results['results']], [3, 1, 2])
#
# results = self.sb.search(xapian.Query(''), sort_by=['flag', 'id'])
# self.assertEqual([result.pk for result in results['results']], [2, 1, 3])
#
# results = self.sb.search(xapian.Query(''), sort_by=['flag', '-id'])
# self.assertEqual([result.pk for result in results['results']], [2, 3, 1])
#
# def test__marshal_value(self):
# self.assertEqual(_marshal_value('abc'), u'abc')
# self.assertEqual(_marshal_value(1), '000000000001')
# self.assertEqual(_marshal_value(2653), '000000002653')
# self.assertEqual(_marshal_value(25.5), '\xb2`')
# self.assertEqual(_marshal_value([1, 2, 3]), u'[1, 2, 3]')
# self.assertEqual(_marshal_value((1, 2, 3)), u'(1, 2, 3)')
# self.assertEqual(_marshal_value({'a': 1, 'c': 3, 'b': 2}), u"{'a': 1, 'c': 3, 'b': 2}")
# self.assertEqual(_marshal_value(datetime.datetime(2009, 5, 9, 16, 14)), u'20090509161400')
# self.assertEqual(_marshal_value(datetime.datetime(2009, 5, 9, 0, 0)), u'20090509000000')
# self.assertEqual(_marshal_value(datetime.datetime(1899, 5, 18, 0, 0)), u'18990518000000')
# self.assertEqual(_marshal_value(datetime.datetime(2009, 5, 18, 1, 16, 30, 250)), u'20090518011630000250')
#
# def test_build_schema(self):
# (content_field_name, fields) = self.sb.build_schema(self.site.all_searchfields())
# self.assertEqual(content_field_name, 'text')
# self.assertEqual(len(fields), 7)
# self.assertEqual(fields, [
# {'column': 0, 'field_name': 'name', 'type': 'text', 'multi_valued': 'false'},
# {'column': 1, 'field_name': 'text', 'type': 'text', 'multi_valued': 'false'},
# {'column': 2, 'field_name': 'popularity', 'type': 'float', 'multi_valued': 'false'},
# {'column': 3, 'field_name': 'sites', 'type': 'text', 'multi_valued': 'true'},
# {'column': 4, 'field_name': 'value', 'type': 'long', 'multi_valued': 'false'},
# {'column': 5, 'field_name': 'flag', 'type': 'boolean', 'multi_valued': 'false'},
# {'column': 6, 'field_name': 'pub_date', 'type': 'date', 'multi_valued': 'false'},
# ])
class XapianSearchBackendTestCase(TestCase):
class LiveXapianMockSearchIndex(indexes.SearchIndex):
text = indexes.CharField(document=True, use_template=True)
name = indexes.CharField(model_attr='author')
pub_date = indexes.DateField(model_attr='pub_date')
class LiveXapianSearchQueryTestCase(TestCase):
fixtures = ['initial_data.json']
def setUp(self):
super(XapianSearchBackendTestCase, self).setUp()
self.site = XapianSearchSite()
self.sb = SearchBackend(site=self.site)
self.msi = XapianMockSearchIndex(XapianMockModel, backend=self.sb)
self.site.register(XapianMockModel, XapianMockSearchIndex)
self.sample_objs = []
for i in xrange(1, 4):
mock = XapianMockModel()
mock.id = i
mock.author = 'david%s' % i
mock.pub_date = datetime.date(2009, 2, 25) - datetime.timedelta(days=i)
mock.value = i * 5
mock.flag = bool(i % 2)
mock.slug = 'http://example.com/%d' % i
self.sample_objs.append(mock)
self.sample_objs[0].popularity = 834.0
self.sample_objs[1].popularity = 35.5
self.sample_objs[2].popularity = 972.0
def tearDown(self):
if os.path.exists(settings.HAYSTACK_XAPIAN_PATH):
shutil.rmtree(settings.HAYSTACK_XAPIAN_PATH)
super(LiveXapianSearchQueryTestCase, self).setUp()
super(XapianSearchBackendTestCase, self).tearDown()
def xapian_search(self, query_string):
database = xapian.Database(settings.HAYSTACK_XAPIAN_PATH)
if query_string:
qp = xapian.QueryParser()
qp.set_database(database)
query = qp.parse_query(query_string, xapian.QueryParser.FLAG_WILDCARD)
else:
query = xapian.Query(query_string) # Empty query matches all
enquire = xapian.Enquire(database)
enquire.set_query(query)
matches = enquire.get_mset(0, database.get_doccount())
document_list = []
for match in matches:
document = match.get_document()
app_label, module_name, pk, model_data = pickle.loads(document.get_data())
for key, value in model_data.iteritems():
model_data[key] = _marshal_value(value)
model_data['id'] = u'%s.%s.%d' % (app_label, module_name, pk)
document_list.append(model_data)
site = SearchSite()
backend = SearchBackend(site=site)
index = LiveXapianMockSearchIndex(MockModel, backend=backend)
site.register(MockModel, LiveXapianMockSearchIndex)
backend.update(index, MockModel.objects.all())
return document_list
def silly_test(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('indexed')), 3)
self.assertEqual(len(self.xapian_search('Indexed')), 3)
def test_update(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
self.assertEqual([dict(doc) for doc in self.xapian_search('')], [
{'flag': u't', 'name': u'david1', 'text': u'indexed!\n1', 'sites': u"['1', '2', '3']", 'pub_date': u'20090224000000', 'value': u'000000000005', 'id': u'tests.xapianmockmodel.1', 'slug': u'http://example.com/1', 'popularity': '\xca\x84', 'django_id': u'1', 'django_ct': u'tests.xapianmockmodel'},
{'flag': u'f', 'name': u'david2', 'text': u'indexed!\n2', 'sites': u"['2', '4', '6']", 'pub_date': u'20090223000000', 'value': u'000000000010', 'id': u'tests.xapianmockmodel.2', 'slug': u'http://example.com/2', 'popularity': '\xb4p', 'django_id': u'2', 'django_ct': u'tests.xapianmockmodel'},
{'flag': u't', 'name': u'david3', 'text': u'indexed!\n3', 'sites': u"['3', '6', '9']", 'pub_date': u'20090222000000', 'value': u'000000000015', 'id': u'tests.xapianmockmodel.3', 'slug': u'http://example.com/3', 'popularity': '\xcb\x98', 'django_id': u'3', 'django_ct': u'tests.xapianmockmodel'}
])
self.sq = SearchQuery(backend=backend)
def test_duplicate_update(self):
self.sb.update(self.msi, self.sample_objs)
self.sb.update(self.msi, self.sample_objs) # Duplicates should be updated, not appended -- http://github.com/notanumber/xapian-haystack/issues/#issue/6
self.assertEqual(len(self.xapian_search('')), 3)
def test_get_spelling(self):
self.sq.add_filter(SQ(content='indxd'))
self.assertEqual(self.sq.get_spelling_suggestion(), u'indexed')
self.assertEqual(self.sq.get_spelling_suggestion('indxd'), u'indexed')
def test_remove(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
self.sb.remove(self.sample_objs[0])
self.assertEqual(len(self.xapian_search('')), 2)
self.assertEqual([dict(doc) for doc in self.xapian_search('')], [
{'flag': u'f', 'name': u'david2', 'text': u'indexed!\n2', 'sites': u"['2', '4', '6']", 'pub_date': u'20090223000000', 'value': u'000000000010', 'id': u'tests.xapianmockmodel.2', 'slug': u'http://example.com/2', 'popularity': '\xb4p', 'django_id': u'2', 'django_ct': u'tests.xapianmockmodel'},
{'flag': u't', 'name': u'david3', 'text': u'indexed!\n3', 'sites': u"['3', '6', '9']", 'pub_date': u'20090222000000', 'value': u'000000000015', 'id': u'tests.xapianmockmodel.3', 'slug': u'http://example.com/3', 'popularity': '\xcb\x98', 'django_id': u'3', 'django_ct': u'tests.xapianmockmodel'}
])
def test_log_query(self):
backends.reset_search_queries()
self.assertEqual(len(backends.queries), 0)
def test_clear(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
self.sb.clear()
self.assertEqual(len(self.xapian_search('')), 0)
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
self.sb.clear([AnotherMockModel])
self.assertEqual(len(self.xapian_search('')), 3)
self.sb.clear([XapianMockModel])
self.assertEqual(len(self.xapian_search('')), 0)
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
self.sb.clear([AnotherMockModel, XapianMockModel])
self.assertEqual(len(self.xapian_search('')), 0)
# Stow.
old_debug = settings.DEBUG
settings.DEBUG = False
def test_search(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
# Empty query
self.assertEqual(self.sb.search(xapian.Query()), {'hits': 0, 'results': []})
# Wildcard -- All
self.assertEqual(self.sb.search(xapian.Query(''))['hits'], 3)
self.assertEqual([result.pk for result in self.sb.search(xapian.Query(''))['results']], [1, 2, 3])
def test_field_facets(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
self.assertEqual(self.sb.search(xapian.Query(), facets=['name']), {'hits': 0, 'results': []})
results = self.sb.search(xapian.Query('indexed'), facets=['name'])
self.assertEqual(results['hits'], 3)
self.assertEqual(results['facets']['fields']['name'], [('david1', 1), ('david2', 1), ('david3', 1)])
len(self.sq.get_results())
self.assertEqual(len(backends.queries), 0)
results = self.sb.search(xapian.Query('indexed'), facets=['flag'])
self.assertEqual(results['hits'], 3)
self.assertEqual(results['facets']['fields']['flag'], [(False, 1), (True, 2)])
results = self.sb.search(xapian.Query('indexed'), facets=['sites'])
self.assertEqual(results['hits'], 3)
self.assertEqual(results['facets']['fields']['sites'], [('1', 1), ('3', 2), ('2', 2), ('4', 1), ('6', 2), ('9', 1)])
def test_date_facets(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
settings.DEBUG = True
# Redefine it to clear out the cached results.
self.sq = SearchQuery(backend=SearchBackend())
self.sq.add_filter(SQ(name='bar'))
len(self.sq.get_results())
self.assertEqual(len(backends.queries), 1)
self.assertEqual(backends.queries[0]['query_string'].get_description(), 'Xapian::Query(XNAMEbar)')
self.assertEqual(self.sb.search(xapian.Query(), date_facets={'pub_date': {'start_date': datetime.datetime(2008, 10, 26), 'end_date': datetime.datetime(2009, 3, 26), 'gap_by': 'month'}}), {'hits': 0, 'results': []})
results = self.sb.search(xapian.Query('indexed'), date_facets={'pub_date': {'start_date': datetime.datetime(2008, 10, 26), 'end_date': datetime.datetime(2009, 3, 26), 'gap_by': 'month'}})
self.assertEqual(results['hits'], 3)
self.assertEqual(results['facets']['dates']['pub_date'], [
('2009-02-26T00:00:00', 0),
('2009-01-26T00:00:00', 3),
('2008-12-26T00:00:00', 0),
('2008-11-26T00:00:00', 0),
('2008-10-26T00:00:00', 0),
])
# And again, for good measure.
self.sq = SearchQuery(backend=SearchBackend())
self.sq.add_filter(SQ(name='bar'))
self.sq.add_filter(SQ(text='moof'))
len(self.sq.get_results())
self.assertEqual(len(backends.queries), 2)
self.assertEqual(backends.queries[0]['query_string'].get_description(), u'Xapian::Query(XNAMEbar)')
self.assertEqual(backends.queries[1]['query_string'].get_description(), u'Xapian::Query((XNAMEbar AND XTEXTmoof))')
results = self.sb.search(xapian.Query('indexed'), date_facets={'pub_date': {'start_date': datetime.datetime(2009, 02, 01), 'end_date': datetime.datetime(2009, 3, 15), 'gap_by': 'day', 'gap_amount': 15}})
self.assertEqual(results['hits'], 3)
self.assertEqual(results['facets']['dates']['pub_date'], [
('2009-03-03T00:00:00', 0),
('2009-02-16T00:00:00', 3),
('2009-02-01T00:00:00', 0)
])
# def test_query_facets(self):
# self.sb.update(self.msi, self.sample_objs)
# self.assertEqual(len(self.xapian_search('')), 3)
#
# self.assertEqual(self.sb.search(xapian.Query(), query_facets={'name': 'da*', {'hits': 0, 'results': []})
# results = self.sb.search(xapian.Query('index'), query_facets={'name': 'da*'})
# self.assertEqual(results['hits'], 3)
# self.assertEqual(results['facets']['queries']['name'], ('da*', 3))
def test_narrow_queries(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
self.assertEqual(self.sb.search(xapian.Query(), narrow_queries=set([xapian.Query('XNAMEdavid1')])), {'hits': 0, 'results': []})
results = self.sb.search(xapian.Query('indexed'), narrow_queries=set([xapian.Query('XNAMEdavid1')]))
self.assertEqual(results['hits'], 1)
def test_highlight(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
self.assertEqual(self.sb.search(xapian.Query(), highlight=True), {'hits': 0, 'results': []})
self.assertEqual(self.sb.search(xapian.Query('indexed'), highlight=True)['hits'], 3)
self.assertEqual([result.highlighted['text'] for result in self.sb.search(xapian.Query('indexed'), highlight=True)['results']], ['<em>indexed</em>!\n1', '<em>indexed</em>!\n2', '<em>indexed</em>!\n3'])
def test_spelling_suggestion(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
self.assertEqual(self.sb.search(xapian.Query('indxe'))['hits'], 0)
self.assertEqual(self.sb.search(xapian.Query('indxe'))['spelling_suggestion'], 'indexed')
self.assertEqual(self.sb.search(xapian.Query('indxed'))['hits'], 0)
self.assertEqual(self.sb.search(xapian.Query('indxed'))['spelling_suggestion'], 'indexed')
self.assertEqual(self.sb.search(xapian.Query('foo'))['hits'], 0)
self.assertEqual(self.sb.search(xapian.Query('foo'), spelling_query='indexy')['spelling_suggestion'], 'indexed')
self.assertEqual(self.sb.search(xapian.Query('XNAMEdavid'))['hits'], 0)
self.assertEqual(self.sb.search(xapian.Query('XNAMEdavid'))['spelling_suggestion'], 'david1')
def test_more_like_this(self):
self.sb.update(self.msi, self.sample_objs)
self.assertEqual(len(self.xapian_search('')), 3)
results = self.sb.more_like_this(self.sample_objs[0])
self.assertEqual(results['hits'], 2)
self.assertEqual([result.pk for result in results['results']], [3, 2])
results = self.sb.more_like_this(self.sample_objs[0], additional_query=xapian.Query('david3'))
self.assertEqual(results['hits'], 1)
self.assertEqual([result.pk for result in results['results']], [3])
results = self.sb.more_like_this(self.sample_objs[0], limit_to_registered_models=True)
self.assertEqual(results['hits'], 2)
self.assertEqual([result.pk for result in results['results']], [3, 2])
def test_order_by(self):
self.sb.update(self.msi, self.sample_objs)
results = self.sb.search(xapian.Query(''), sort_by=['pub_date'])
self.assertEqual([result.pk for result in results['results']], [3, 2, 1])
results = self.sb.search(xapian.Query(''), sort_by=['-pub_date'])
self.assertEqual([result.pk for result in results['results']], [1, 2, 3])
results = self.sb.search(xapian.Query(''), sort_by=['id'])
self.assertEqual([result.pk for result in results['results']], [1, 2, 3])
results = self.sb.search(xapian.Query(''), sort_by=['-id'])
self.assertEqual([result.pk for result in results['results']], [3, 2, 1])
results = self.sb.search(xapian.Query(''), sort_by=['value'])
self.assertEqual([result.pk for result in results['results']], [1, 2, 3])
results = self.sb.search(xapian.Query(''), sort_by=['-value'])
self.assertEqual([result.pk for result in results['results']], [3, 2, 1])
results = self.sb.search(xapian.Query(''), sort_by=['popularity'])
self.assertEqual([result.pk for result in results['results']], [2, 1, 3])
results = self.sb.search(xapian.Query(''), sort_by=['-popularity'])
self.assertEqual([result.pk for result in results['results']], [3, 1, 2])
results = self.sb.search(xapian.Query(''), sort_by=['flag', 'id'])
self.assertEqual([result.pk for result in results['results']], [2, 1, 3])
results = self.sb.search(xapian.Query(''), sort_by=['flag', '-id'])
self.assertEqual([result.pk for result in results['results']], [2, 3, 1])
def test__marshal_value(self):
self.assertEqual(_marshal_value('abc'), u'abc')
self.assertEqual(_marshal_value(1), '000000000001')
self.assertEqual(_marshal_value(2653), '000000002653')
self.assertEqual(_marshal_value(25.5), '\xb2`')
self.assertEqual(_marshal_value([1, 2, 3]), u'[1, 2, 3]')
self.assertEqual(_marshal_value((1, 2, 3)), u'(1, 2, 3)')
self.assertEqual(_marshal_value({'a': 1, 'c': 3, 'b': 2}), u"{'a': 1, 'c': 3, 'b': 2}")
self.assertEqual(_marshal_value(datetime.datetime(2009, 5, 9, 16, 14)), u'20090509161400')
self.assertEqual(_marshal_value(datetime.datetime(2009, 5, 9, 0, 0)), u'20090509000000')
self.assertEqual(_marshal_value(datetime.datetime(1899, 5, 18, 0, 0)), u'18990518000000')
self.assertEqual(_marshal_value(datetime.datetime(2009, 5, 18, 1, 16, 30, 250)), u'20090518011630000250')
def test_build_schema(self):
(content_field_name, fields) = self.sb.build_schema(self.site.all_searchfields())
self.assertEqual(content_field_name, 'text')
self.assertEqual(len(fields), 7)
self.assertEqual(fields, [
{'column': 0, 'field_name': 'name', 'type': 'text', 'multi_valued': 'false'},
{'column': 1, 'field_name': 'text', 'type': 'text', 'multi_valued': 'false'},
{'column': 2, 'field_name': 'popularity', 'type': 'float', 'multi_valued': 'false'},
{'column': 3, 'field_name': 'sites', 'type': 'text', 'multi_valued': 'true'},
{'column': 4, 'field_name': 'value', 'type': 'long', 'multi_valued': 'false'},
{'column': 5, 'field_name': 'flag', 'type': 'boolean', 'multi_valued': 'false'},
{'column': 6, 'field_name': 'pub_date', 'type': 'date', 'multi_valued': 'false'},
])
# Restore.
settings.DEBUG = old_debug

View File

@ -139,12 +139,6 @@ class XapianSearchQueryTestCase(TestCase):
self.sq.add_filter(SQ(pub_date__in=[datetime.datetime(2009, 7, 6, 1, 56, 21)]))
self.assertEqual(self.sq.build_query().get_description(), u'Xapian::Query((why AND XPUB_DATE20090706015621))')
# def test_build_query_wildcard_filter_types(self):
# self.sq.add_filter(SQ(content='why'))
# self.sq.add_filter(SQ(title__startswith='haystack'))
# self.assertEqual(self.sq.build_query().get_description(), 'Xapian::Query((why AND XTITLEhaystack))')
# Because wildcards are expanded using existing documents, a more thorough test for this is performed in SearchBackend tests
# def test_stem_single_word(self):
# self.sq.add_filter(SQ(content='testing'))
# self.assertEqual(self.sq.build_query().get_description(), 'Xapian.Query(Ztest)')

View File

@ -134,7 +134,7 @@ class SearchBackend(BaseSearchBackend):
document_id = DOCUMENT_ID_TERM_PREFIX + get_identifier(obj)
data = index.prepare(obj)
for field in self.schema:
if field['field_name'] in data.keys():
prefix = DOCUMENT_CUSTOM_TERM_PREFIX + field['field_name'].upper()
@ -636,9 +636,9 @@ class SearchBackend(BaseSearchBackend):
term_list = []
for term in query:
for match in re.findall('[^A-Z]+', term): # Ignore field identifiers
for match in re.findall('[^A-Z]+', term): # Ignore field identifiers
term_list.append(database.get_spelling_suggestion(match))
return ' '.join(term_list)
def _database(self, writable=False):
@ -653,7 +653,7 @@ class SearchBackend(BaseSearchBackend):
"""
if writable:
self.content_field_name, self.schema = self.build_schema(self.site.all_searchfields())
database = xapian.WritableDatabase(settings.HAYSTACK_XAPIAN_PATH, xapian.DB_CREATE_OR_OPEN)
database.set_metadata('schema', pickle.dumps(self.schema, pickle.HIGHEST_PROTOCOL))
database.set_metadata('content', pickle.dumps(self.content_field_name, pickle.HIGHEST_PROTOCOL))