Elasticsearch #
Elasticsearch adalah mesin pencari dan analitik terdistribusi berbasis Apache Lucene yang dirancang untuk pencarian teks penuh, logging, monitoring, dan analitik data real-time dalam skala besar. Berbeda dari database tradisional yang dioptimalkan untuk menyimpan dan mengambil data secara tepat, Elasticsearch dioptimalkan untuk mencari, menganalisis, dan memvisualisasikan data dalam hitungan milidetik — bahkan pada dataset berukuran terabyte. Python berinteraksi dengan Elasticsearch melalui library elasticsearch-py resmi, dan sejak versi 8.x API-nya berubah signifikan: parameter body= dihapus, autentikasi menjadi wajib, dan TLS diaktifkan secara default.
Instalasi #
pip install elasticsearch
Pastikan versi library sesuai dengan versi Elasticsearch server kamu. Elasticsearch 8.x tidak kompatibel dengan library versi 7.x dan sebaliknya. Gunakan pip install "elasticsearch>=8,<9" untuk memastikan versi yang tepat.Konsep Dasar Elasticsearch #
Sebelum menulis kode, penting memahami terminologi Elasticsearch:
Elasticsearch ↔ Database Relasional
─────────────────────────────────────────────
Index ↔ Tabel
Document ↔ Baris (row)
Field ↔ Kolom
Mapping ↔ Schema / DDL
Shard ↔ Partisi horizontal
Konsep khusus Elasticsearch:
Inverted Index -- struktur data untuk full-text search (kata → dokumen)
Analyzer -- pipeline: tokenizer + filter untuk memproses teks
Relevance Score -- skor relevansi dokumen terhadap query (_score)
Aggregation -- analitik: bucket (grouping) + metric (sum, avg, dll.)
Membuat Koneksi #
Elasticsearch 8.x mengaktifkan TLS dan autentikasi secara default. Cara koneksi berbeda tergantung apakah kamu menggunakan Elasticsearch lokal, self-managed cluster, atau Elastic Cloud.
from elasticsearch import Elasticsearch
import os
# ANTI-PATTERN: koneksi lama (format ES 7.x, tidak aman)
es = Elasticsearch([{"host": "localhost", "port": 9200}]) # ✗ -- deprecated
# BENAR: koneksi ES 8.x lokal dengan API key atau basic auth
es = Elasticsearch(
hosts=os.getenv("ES_HOST", "https://localhost:9200"),
api_key=os.getenv("ES_API_KEY"), # direkomendasikan
verify_certs=False, # hanya untuk dev/self-signed cert
ssl_show_warn=False
)
# Atau dengan username/password
es = Elasticsearch(
hosts=os.getenv("ES_HOST", "https://localhost:9200"),
basic_auth=(
os.getenv("ES_USER", "elastic"),
os.getenv("ES_PASSWORD")
),
verify_certs=False
)
# Koneksi ke Elastic Cloud (production)
es = Elasticsearch(
cloud_id=os.getenv("ES_CLOUD_ID"),
api_key=os.getenv("ES_API_KEY")
)
# Tes koneksi
info = es.info()
print(f"Elasticsearch {info['version']['number']} — {info['cluster_name']}")
API Key adalah cara autentikasi yang direkomendasikan untuk produksi. Buat API key di Kibana → Security → API Keys, atau via API es.security.create_api_key(). API key bisa dibatasi izinnya (read-only, index tertentu), lebih aman daripada username/password.Membuat Index dan Mapping #
Mapping mendefinisikan tipe setiap field dalam index — mirip schema di database relasional. Elasticsearch bisa auto-detect tipe (dynamic mapping), tapi mapping eksplisit lebih aman untuk produksi.
INDEX_PRODUK = "produk"
def buat_index_produk(es: Elasticsearch) -> None:
# Hapus index jika sudah ada (untuk dev/testing)
if es.indices.exists(index=INDEX_PRODUK):
es.indices.delete(index=INDEX_PRODUK)
es.indices.create(
index=INDEX_PRODUK,
mappings={
"properties": {
"nama": {
"type": "text", # dianalisis untuk full-text search
"analyzer": "indonesian", # analyzer bahasa Indonesia
"fields": {
"keyword": {"type": "keyword"} # untuk exact match dan sorting
}
},
"deskripsi": {
"type": "text",
"analyzer": "indonesian"
},
"harga": {
"type": "scaled_float",
"scaling_factor": 100 # simpan sebagai integer * 100
},
"stok": {"type": "integer"},
"kategori": {"type": "keyword"}, # exact match, tidak dianalisis
"tag": {"type": "keyword"}, # array of keywords
"aktif": {"type": "boolean"},
"rating": {"type": "float"},
"dibuat_pada": {
"type": "date",
"format": "strict_date_optional_time"
},
"spesifikasi": {"type": "object"}, # embedded object, field bebas
"lokasi": {
"type": "geo_point" # koordinat GPS (opsional)
}
}
},
settings={
"number_of_shards": 1, # shard untuk dev (produksi: sesuaikan ukuran data)
"number_of_replicas": 0, # replica untuk dev (produksi: minimal 1)
"analysis": {
"analyzer": {
"indonesian": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase", "asciifolding"]
}
}
}
}
)
print(f"Index '{INDEX_PRODUK}' berhasil dibuat.")
buat_index_produk(es)
Indexing Dokumen #
from datetime import datetime, timezone
# Index satu dokumen -- ES auto-generate ID
def tambah_produk(es: Elasticsearch, produk: dict) -> str:
produk["dibuat_pada"] = datetime.now(timezone.utc).isoformat()
hasil = es.index(index=INDEX_PRODUK, document=produk)
return hasil["_id"]
# Index dengan ID eksplisit
def tambah_produk_dengan_id(es: Elasticsearch, produk_id: str, produk: dict) -> str:
produk["dibuat_pada"] = datetime.now(timezone.utc).isoformat()
hasil = es.index(index=INDEX_PRODUK, id=produk_id, document=produk)
return hasil["_id"]
# Ambil dokumen berdasarkan ID
def ambil_produk(es: Elasticsearch, produk_id: str) -> dict | None:
try:
hasil = es.get(index=INDEX_PRODUK, id=produk_id)
return {"id": hasil["_id"], **hasil["_source"]}
except Exception:
return None
# Contoh penggunaan
id1 = tambah_produk(es, {
"nama": "Laptop Gaming ASUS ROG Strix G15",
"deskripsi": "Laptop gaming performa tinggi dengan prosesor AMD Ryzen 9",
"harga": 18500000,
"stok": 10,
"kategori": "Elektronik",
"tag": ["laptop", "gaming", "asus", "rog"],
"aktif": True,
"rating": 4.7
})
print(f"Dokumen ditambahkan, ID: {id1}")
# Refresh agar dokumen langsung bisa di-search
es.indices.refresh(index=INDEX_PRODUK)
Bulk Indexing #
Untuk mengindeks banyak dokumen sekaligus, gunakan helpers.bulk() yang jauh lebih efisien daripada loop index() satu per satu.
from elasticsearch import helpers
def bulk_index_produk(es: Elasticsearch, daftar_produk: list[dict]) -> tuple[int, list]:
def generate_actions():
for produk in daftar_produk:
yield {
"_index": INDEX_PRODUK,
"_id": produk.get("id"), # None = auto-generate
"_source": {
**produk,
"dibuat_pada": datetime.now(timezone.utc).isoformat()
}
}
sukses, gagal = helpers.bulk(
es,
generate_actions(),
chunk_size=500, # kirim per 500 dokumen
request_timeout=30
)
return sukses, gagal
data_produk = [
{"nama": "iPhone 15 Pro", "harga": 20000000, "kategori": "Elektronik", "stok": 5, "rating": 4.9, "aktif": True},
{"nama": "Samsung Galaxy S24", "harga": 15000000, "kategori": "Elektronik", "stok": 8, "rating": 4.6, "aktif": True},
{"nama": "Sepatu Lari Nike Air Max", "harga": 1800000, "kategori": "Olahraga", "stok": 20, "rating": 4.5, "aktif": True},
{"nama": "Kopi Arabika Gayo 500gr", "harga": 85000, "kategori": "Makanan", "stok": 100, "rating": 4.8, "aktif": True},
]
sukses, gagal = bulk_index_produk(es, data_produk)
print(f"Berhasil: {sukses}, Gagal: {len(gagal)}")
es.indices.refresh(index=INDEX_PRODUK)
Full-Text Search #
Ini adalah keunggulan utama Elasticsearch dibanding database biasa — kemampuan mencari teks secara natural, toleran terhadap variasi kata.
Match Query #
# match -- full-text search pada satu field
def cari_produk_sederhana(es: Elasticsearch, kata_kunci: str) -> list[dict]:
hasil = es.search(
index=INDEX_PRODUK,
query={
"match": {
"nama": {
"query": kata_kunci,
"operator": "and" # semua kata harus ada (default: "or")
}
}
}
)
return [
{"id": h["_id"], "score": h["_score"], **h["_source"]}
for h in hasil["hits"]["hits"]
]
# multi_match -- cari di beberapa field sekaligus
def cari_multi_field(es: Elasticsearch, kata_kunci: str) -> list[dict]:
hasil = es.search(
index=INDEX_PRODUK,
query={
"multi_match": {
"query": kata_kunci,
"fields": ["nama^3", "deskripsi", "tag"], # ^3 = boost field nama 3x
"type": "best_fields"
}
}
)
return [
{"id": h["_id"], "score": round(h["_score"], 2), **h["_source"]}
for h in hasil["hits"]["hits"]
]
# match_phrase -- cari frasa persis
def cari_frasa(es: Elasticsearch, frasa: str) -> list[dict]:
hasil = es.search(
index=INDEX_PRODUK,
query={"match_phrase": {"nama": frasa}}
)
return [{"id": h["_id"], **h["_source"]} for h in hasil["hits"]["hits"]]
Bool Query — Kombinasi Kondisi #
bool query adalah cara paling fleksibel untuk menggabungkan berbagai kondisi pencarian.
def cari_produk_lanjutan(
es: Elasticsearch,
kata_kunci: str = None,
kategori: str = None,
harga_min: float = None,
harga_max: float = None,
rating_min: float = None,
hanya_aktif: bool = True,
) -> list[dict]:
must = [] # HARUS terpenuhi, mempengaruhi skor
filter_ = [] # HARUS terpenuhi, TIDAK mempengaruhi skor (lebih efisien)
should = [] # SEBAIKNYA terpenuhi (meningkatkan skor jika terpenuhi)
if kata_kunci:
must.append({
"multi_match": {
"query": kata_kunci,
"fields": ["nama^3", "deskripsi"],
"type": "best_fields"
}
})
if kategori:
filter_.append({"term": {"kategori": kategori}})
if hanya_aktif:
filter_.append({"term": {"aktif": True}})
range_harga = {}
if harga_min is not None:
range_harga["gte"] = harga_min
if harga_max is not None:
range_harga["lte"] = harga_max
if range_harga:
filter_.append({"range": {"harga": range_harga}})
if rating_min is not None:
filter_.append({"range": {"rating": {"gte": rating_min}}})
query = {"bool": {}}
if must:
query["bool"]["must"] = must
if filter_:
query["bool"]["filter"] = filter_
if should:
query["bool"]["should"] = should
if not must and not filter_:
query = {"match_all": {}}
hasil = es.search(
index=INDEX_PRODUK,
query=query,
sort=[
{"_score": {"order": "desc"}},
{"rating": {"order": "desc"}},
{"harga": {"order": "asc"}}
],
size=20
)
return [
{
"id": h["_id"],
"score": round(h["_score"] or 0, 2),
**h["_source"]
}
for h in hasil["hits"]["hits"]
]
# Contoh penggunaan
hasil = cari_produk_lanjutan(
es,
kata_kunci="laptop gaming",
kategori="Elektronik",
harga_max=20000000,
rating_min=4.5
)
for p in hasil:
print(f"[{p['score']}] {p['nama']} — Rp{p['harga']:,.0f}")
Pagination dan Highlight #
def cari_dengan_paginasi(
es: Elasticsearch,
kata_kunci: str,
halaman: int = 1,
per_halaman: int = 10
) -> dict:
offset = (halaman - 1) * per_halaman
hasil = es.search(
index=INDEX_PRODUK,
query={
"multi_match": {
"query": kata_kunci,
"fields": ["nama^2", "deskripsi"]
}
},
highlight={
"fields": {
"nama": {"number_of_fragments": 0}, # tampilkan seluruh field
"deskripsi": {"fragment_size": 150, "number_of_fragments": 2}
},
"pre_tags": ["<mark>"], # tag HTML pembuka highlight
"post_tags": ["</mark>"] # tag HTML penutup highlight
},
from_=offset,
size=per_halaman,
track_total_hits=True # hitung total dokumen yang cocok
)
total = hasil["hits"]["total"]["value"]
hits = hasil["hits"]["hits"]
return {
"total": total,
"halaman": halaman,
"per_halaman": per_halaman,
"total_halaman": -(-total // per_halaman), # ceiling division
"hasil": [
{
"id": h["_id"],
"score": round(h["_score"], 2),
**h["_source"],
"highlight": h.get("highlight", {})
}
for h in hits
]
}
response = cari_dengan_paginasi(es, "laptop gaming", halaman=1)
print(f"Total: {response['total']} hasil")
for item in response["hasil"]:
print(f"- {item['nama']}")
if "nama" in item["highlight"]:
print(f" → {item['highlight']['nama'][0]}")
Aggregasi #
Aggregasi Elasticsearch memungkinkan analitik real-time — menghitung distribusi, statistik, dan trend dari data yang sudah terindeks.
def analitik_produk(es: Elasticsearch) -> dict:
hasil = es.search(
index=INDEX_PRODUK,
query={"term": {"aktif": True}},
size=0, # 0 = hanya kembalikan aggregasi, bukan dokumen
aggs={
# Bucket aggregation: kelompokkan berdasarkan kategori
"per_kategori": {
"terms": {
"field": "kategori",
"size": 20
},
"aggs": {
# Nested metric aggregation dalam setiap bucket
"rata_harga": {"avg": {"field": "harga"}},
"harga_min": {"min": {"field": "harga"}},
"harga_max": {"max": {"field": "harga"}},
"rata_rating": {"avg": {"field": "rating"}},
"total_stok": {"sum": {"field": "stok"}},
}
},
# Statistik keseluruhan
"statistik_harga": {
"extended_stats": {"field": "harga"}
},
# Histogram harga -- distribusi dalam range tertentu
"distribusi_harga": {
"histogram": {
"field": "harga",
"interval": 5000000 # per 5 juta
}
},
# Range aggregation -- bucket berdasarkan range custom
"segmen_harga": {
"range": {
"field": "harga",
"ranges": [
{"key": "Budget", "to": 1000000},
{"key": "Menengah", "from": 1000000, "to": 5000000},
{"key": "Premium", "from": 5000000}
]
}
}
}
)
aggs = hasil["aggregations"]
# Tampilkan per kategori
print("=== Per Kategori ===")
for bucket in aggs["per_kategori"]["buckets"]:
print(f"{bucket['key']}: {bucket['doc_count']} produk, "
f"rata-rata Rp{bucket['rata_harga']['value']:,.0f}, "
f"rating {bucket['rata_rating']['value']:.1f}")
# Distribusi harga
print("\n=== Segmen Harga ===")
for bucket in aggs["segmen_harga"]["buckets"]:
print(f"{bucket['key']}: {bucket['doc_count']} produk")
return aggs
Update dan Delete Dokumen #
# Update sebagian field (partial update)
def update_produk(es: Elasticsearch, produk_id: str, perubahan: dict) -> bool:
try:
es.update(
index=INDEX_PRODUK,
id=produk_id,
doc=perubahan # hanya field yang disertakan yang diubah
)
return True
except Exception as e:
print(f"Update gagal: {e}")
return False
# Update menggunakan script (untuk operasi atomik)
def increment_stok(es: Elasticsearch, produk_id: str, jumlah: int) -> bool:
try:
es.update(
index=INDEX_PRODUK,
id=produk_id,
script={
"source": "ctx._source.stok += params.jumlah",
"params": {"jumlah": jumlah}
}
)
return True
except Exception:
return False
# Update by query -- update banyak dokumen sekaligus
def nonaktifkan_kategori(es: Elasticsearch, kategori: str) -> int:
hasil = es.update_by_query(
index=INDEX_PRODUK,
query={"term": {"kategori": kategori}},
script={"source": "ctx._source.aktif = false"}
)
return hasil["updated"]
# Delete dokumen
def hapus_produk(es: Elasticsearch, produk_id: str) -> bool:
try:
es.delete(index=INDEX_PRODUK, id=produk_id)
return True
except Exception:
return False
# Delete by query
def hapus_produk_tidak_aktif(es: Elasticsearch) -> int:
hasil = es.delete_by_query(
index=INDEX_PRODUK,
query={"term": {"aktif": False}}
)
return hasil["deleted"]
Penanganan Error #
from elasticsearch import (
Elasticsearch,
NotFoundError,
ConflictError,
ConnectionError,
TransportError
)
def ambil_produk_aman(es: Elasticsearch, produk_id: str) -> dict | None:
try:
hasil = es.get(index=INDEX_PRODUK, id=produk_id)
return {"id": hasil["_id"], **hasil["_source"]}
except NotFoundError:
return None # dokumen tidak ditemukan
except ConnectionError:
print("Tidak bisa terhubung ke Elasticsearch.")
return None
except TransportError as e:
print(f"Transport error [{e.status_code}]: {e.error}")
return None
Kapan Menggunakan Elasticsearch #
Gunakan Elasticsearch untuk:
✓ Full-text search dengan ranking relevansi (e-commerce, blog, dokumentasi)
✓ Log aggregation dan analitik real-time (ELK Stack)
✓ Auto-complete dan search-as-you-type
✓ Faceted search (filter by kategori, harga, rating)
✓ Analitik data besar dengan aggregasi kompleks
Jangan gunakan sebagai database utama karena:
✗ Tidak mendukung transaksi ACID
✗ Operasi JOIN antar index tidak didukung secara native
✗ Update dokumen lebih lambat dari database relasional
✗ Storage lebih besar karena inverted index
Pola umum: database utama (PostgreSQL/MySQL) + Elasticsearch sebagai search layer
Ringkasan #
- Elasticsearch 8.x wajib autentikasi — gunakan API key (direkomendasikan) atau basic auth; jangan gunakan format koneksi ES 7.x yang sudah deprecated.
- Mapping eksplisit lebih aman — definisikan tipe setiap field secara eksplisit daripada mengandalkan dynamic mapping; hindari field yang tidak perlu di-index untuk menghemat storage.
textvskeyword— gunakantextuntuk full-text search (nama produk, deskripsi); gunakankeyworduntuk exact match, sorting, dan aggregasi (kategori, status, tag).boolquery adalah fondasi — gabungkanmust(pengaruhi skor),filter(tidak pengaruhi skor, lebih cepat), danshould(opsional, meningkatkan skor) untuk query yang fleksibel.filterlebih cepat darimust— kondisi yang tidak perlu mempengaruhi skor (kategori, status, range harga) sebaiknya diletakkan difilter, bukanmust.bulk()untuk indexing massal — jauh lebih efisien daripada loopindex()satu per satu; gunakan generator untuk menghemat memori saat data sangat besar.size=0untuk aggregasi murni — setsize=0saat hanya butuh hasil aggregasi tanpa dokumen; menghemat bandwidth dan memori.highlight— gunakan untuk menampilkan potongan teks yang cocok dengan query, sangat membantu UX di fitur pencarian.track_total_hits=True— aktifkan jika butuh jumlah total dokumen yang cocok untuk pagination yang akurat.- Elasticsearch sebagai search layer — gunakan bersama database utama, bukan sebagai pengganti; simpan data di PostgreSQL/MySQL, indeks ke Elasticsearch untuk pencarian.