Elasticsearch #

Elasticsearch adalah mesin pencari dan analitik terdistribusi berbasis Apache Lucene yang dirancang untuk pencarian teks penuh, logging, monitoring, dan analitik data real-time dalam skala besar. Berbeda dari database tradisional yang dioptimalkan untuk menyimpan dan mengambil data secara tepat, Elasticsearch dioptimalkan untuk mencari, menganalisis, dan memvisualisasikan data dalam hitungan milidetik — bahkan pada dataset berukuran terabyte. Python berinteraksi dengan Elasticsearch melalui library elasticsearch-py resmi, dan sejak versi 8.x API-nya berubah signifikan: parameter body= dihapus, autentikasi menjadi wajib, dan TLS diaktifkan secara default.

Instalasi #

pip install elasticsearch
Pastikan versi library sesuai dengan versi Elasticsearch server kamu. Elasticsearch 8.x tidak kompatibel dengan library versi 7.x dan sebaliknya. Gunakan pip install "elasticsearch>=8,<9" untuk memastikan versi yang tepat.

Konsep Dasar Elasticsearch #

Sebelum menulis kode, penting memahami terminologi Elasticsearch:

Elasticsearch          ↔  Database Relasional
─────────────────────────────────────────────
Index                  ↔  Tabel
Document               ↔  Baris (row)
Field                  ↔  Kolom
Mapping                ↔  Schema / DDL
Shard                  ↔  Partisi horizontal

Konsep khusus Elasticsearch:
  Inverted Index  -- struktur data untuk full-text search (kata → dokumen)
  Analyzer        -- pipeline: tokenizer + filter untuk memproses teks
  Relevance Score -- skor relevansi dokumen terhadap query (_score)
  Aggregation     -- analitik: bucket (grouping) + metric (sum, avg, dll.)

Membuat Koneksi #

Elasticsearch 8.x mengaktifkan TLS dan autentikasi secara default. Cara koneksi berbeda tergantung apakah kamu menggunakan Elasticsearch lokal, self-managed cluster, atau Elastic Cloud.

from elasticsearch import Elasticsearch
import os

# ANTI-PATTERN: koneksi lama (format ES 7.x, tidak aman)
es = Elasticsearch([{"host": "localhost", "port": 9200}])  # ✗ -- deprecated

# BENAR: koneksi ES 8.x lokal dengan API key atau basic auth
es = Elasticsearch(
    hosts=os.getenv("ES_HOST", "https://localhost:9200"),
    api_key=os.getenv("ES_API_KEY"),          # direkomendasikan
    verify_certs=False,                        # hanya untuk dev/self-signed cert
    ssl_show_warn=False
)

# Atau dengan username/password
es = Elasticsearch(
    hosts=os.getenv("ES_HOST", "https://localhost:9200"),
    basic_auth=(
        os.getenv("ES_USER", "elastic"),
        os.getenv("ES_PASSWORD")
    ),
    verify_certs=False
)

# Koneksi ke Elastic Cloud (production)
es = Elasticsearch(
    cloud_id=os.getenv("ES_CLOUD_ID"),
    api_key=os.getenv("ES_API_KEY")
)

# Tes koneksi
info = es.info()
print(f"Elasticsearch {info['version']['number']}{info['cluster_name']}")
API Key adalah cara autentikasi yang direkomendasikan untuk produksi. Buat API key di Kibana → Security → API Keys, atau via API es.security.create_api_key(). API key bisa dibatasi izinnya (read-only, index tertentu), lebih aman daripada username/password.

Membuat Index dan Mapping #

Mapping mendefinisikan tipe setiap field dalam index — mirip schema di database relasional. Elasticsearch bisa auto-detect tipe (dynamic mapping), tapi mapping eksplisit lebih aman untuk produksi.

INDEX_PRODUK = "produk"

def buat_index_produk(es: Elasticsearch) -> None:
    # Hapus index jika sudah ada (untuk dev/testing)
    if es.indices.exists(index=INDEX_PRODUK):
        es.indices.delete(index=INDEX_PRODUK)

    es.indices.create(
        index=INDEX_PRODUK,
        mappings={
            "properties": {
                "nama": {
                    "type": "text",                  # dianalisis untuk full-text search
                    "analyzer": "indonesian",         # analyzer bahasa Indonesia
                    "fields": {
                        "keyword": {"type": "keyword"}  # untuk exact match dan sorting
                    }
                },
                "deskripsi": {
                    "type": "text",
                    "analyzer": "indonesian"
                },
                "harga": {
                    "type": "scaled_float",
                    "scaling_factor": 100            # simpan sebagai integer * 100
                },
                "stok":      {"type": "integer"},
                "kategori":  {"type": "keyword"},    # exact match, tidak dianalisis
                "tag":       {"type": "keyword"},    # array of keywords
                "aktif":     {"type": "boolean"},
                "rating":    {"type": "float"},
                "dibuat_pada": {
                    "type":   "date",
                    "format": "strict_date_optional_time"
                },
                "spesifikasi": {"type": "object"},   # embedded object, field bebas
                "lokasi": {
                    "type": "geo_point"              # koordinat GPS (opsional)
                }
            }
        },
        settings={
            "number_of_shards":   1,     # shard untuk dev (produksi: sesuaikan ukuran data)
            "number_of_replicas": 0,     # replica untuk dev (produksi: minimal 1)
            "analysis": {
                "analyzer": {
                    "indonesian": {
                        "type":      "custom",
                        "tokenizer": "standard",
                        "filter":    ["lowercase", "asciifolding"]
                    }
                }
            }
        }
    )
    print(f"Index '{INDEX_PRODUK}' berhasil dibuat.")

buat_index_produk(es)

Indexing Dokumen #

from datetime import datetime, timezone

# Index satu dokumen -- ES auto-generate ID
def tambah_produk(es: Elasticsearch, produk: dict) -> str:
    produk["dibuat_pada"] = datetime.now(timezone.utc).isoformat()
    
    hasil = es.index(index=INDEX_PRODUK, document=produk)
    return hasil["_id"]

# Index dengan ID eksplisit
def tambah_produk_dengan_id(es: Elasticsearch, produk_id: str, produk: dict) -> str:
    produk["dibuat_pada"] = datetime.now(timezone.utc).isoformat()
    
    hasil = es.index(index=INDEX_PRODUK, id=produk_id, document=produk)
    return hasil["_id"]

# Ambil dokumen berdasarkan ID
def ambil_produk(es: Elasticsearch, produk_id: str) -> dict | None:
    try:
        hasil = es.get(index=INDEX_PRODUK, id=produk_id)
        return {"id": hasil["_id"], **hasil["_source"]}
    except Exception:
        return None

# Contoh penggunaan
id1 = tambah_produk(es, {
    "nama":      "Laptop Gaming ASUS ROG Strix G15",
    "deskripsi": "Laptop gaming performa tinggi dengan prosesor AMD Ryzen 9",
    "harga":     18500000,
    "stok":      10,
    "kategori":  "Elektronik",
    "tag":       ["laptop", "gaming", "asus", "rog"],
    "aktif":     True,
    "rating":    4.7
})
print(f"Dokumen ditambahkan, ID: {id1}")

# Refresh agar dokumen langsung bisa di-search
es.indices.refresh(index=INDEX_PRODUK)

Bulk Indexing #

Untuk mengindeks banyak dokumen sekaligus, gunakan helpers.bulk() yang jauh lebih efisien daripada loop index() satu per satu.

from elasticsearch import helpers

def bulk_index_produk(es: Elasticsearch, daftar_produk: list[dict]) -> tuple[int, list]:
    def generate_actions():
        for produk in daftar_produk:
            yield {
                "_index":  INDEX_PRODUK,
                "_id":     produk.get("id"),       # None = auto-generate
                "_source": {
                    **produk,
                    "dibuat_pada": datetime.now(timezone.utc).isoformat()
                }
            }

    sukses, gagal = helpers.bulk(
        es,
        generate_actions(),
        chunk_size=500,          # kirim per 500 dokumen
        request_timeout=30
    )
    return sukses, gagal

data_produk = [
    {"nama": "iPhone 15 Pro", "harga": 20000000, "kategori": "Elektronik", "stok": 5, "rating": 4.9, "aktif": True},
    {"nama": "Samsung Galaxy S24", "harga": 15000000, "kategori": "Elektronik", "stok": 8, "rating": 4.6, "aktif": True},
    {"nama": "Sepatu Lari Nike Air Max", "harga": 1800000, "kategori": "Olahraga", "stok": 20, "rating": 4.5, "aktif": True},
    {"nama": "Kopi Arabika Gayo 500gr", "harga": 85000, "kategori": "Makanan", "stok": 100, "rating": 4.8, "aktif": True},
]

sukses, gagal = bulk_index_produk(es, data_produk)
print(f"Berhasil: {sukses}, Gagal: {len(gagal)}")
es.indices.refresh(index=INDEX_PRODUK)

Ini adalah keunggulan utama Elasticsearch dibanding database biasa — kemampuan mencari teks secara natural, toleran terhadap variasi kata.

Match Query #

# match -- full-text search pada satu field
def cari_produk_sederhana(es: Elasticsearch, kata_kunci: str) -> list[dict]:
    hasil = es.search(
        index=INDEX_PRODUK,
        query={
            "match": {
                "nama": {
                    "query":    kata_kunci,
                    "operator": "and"     # semua kata harus ada (default: "or")
                }
            }
        }
    )
    return [
        {"id": h["_id"], "score": h["_score"], **h["_source"]}
        for h in hasil["hits"]["hits"]
    ]

# multi_match -- cari di beberapa field sekaligus
def cari_multi_field(es: Elasticsearch, kata_kunci: str) -> list[dict]:
    hasil = es.search(
        index=INDEX_PRODUK,
        query={
            "multi_match": {
                "query":  kata_kunci,
                "fields": ["nama^3", "deskripsi", "tag"],  # ^3 = boost field nama 3x
                "type":   "best_fields"
            }
        }
    )
    return [
        {"id": h["_id"], "score": round(h["_score"], 2), **h["_source"]}
        for h in hasil["hits"]["hits"]
    ]

# match_phrase -- cari frasa persis
def cari_frasa(es: Elasticsearch, frasa: str) -> list[dict]:
    hasil = es.search(
        index=INDEX_PRODUK,
        query={"match_phrase": {"nama": frasa}}
    )
    return [{"id": h["_id"], **h["_source"]} for h in hasil["hits"]["hits"]]

Bool Query — Kombinasi Kondisi #

bool query adalah cara paling fleksibel untuk menggabungkan berbagai kondisi pencarian.

def cari_produk_lanjutan(
    es:          Elasticsearch,
    kata_kunci:  str  = None,
    kategori:    str  = None,
    harga_min:   float = None,
    harga_max:   float = None,
    rating_min:  float = None,
    hanya_aktif: bool  = True,
) -> list[dict]:

    must    = []   # HARUS terpenuhi, mempengaruhi skor
    filter_ = []   # HARUS terpenuhi, TIDAK mempengaruhi skor (lebih efisien)
    should  = []   # SEBAIKNYA terpenuhi (meningkatkan skor jika terpenuhi)

    if kata_kunci:
        must.append({
            "multi_match": {
                "query":  kata_kunci,
                "fields": ["nama^3", "deskripsi"],
                "type":   "best_fields"
            }
        })

    if kategori:
        filter_.append({"term": {"kategori": kategori}})

    if hanya_aktif:
        filter_.append({"term": {"aktif": True}})

    range_harga = {}
    if harga_min is not None:
        range_harga["gte"] = harga_min
    if harga_max is not None:
        range_harga["lte"] = harga_max
    if range_harga:
        filter_.append({"range": {"harga": range_harga}})

    if rating_min is not None:
        filter_.append({"range": {"rating": {"gte": rating_min}}})

    query = {"bool": {}}
    if must:
        query["bool"]["must"] = must
    if filter_:
        query["bool"]["filter"] = filter_
    if should:
        query["bool"]["should"] = should
    if not must and not filter_:
        query = {"match_all": {}}

    hasil = es.search(
        index=INDEX_PRODUK,
        query=query,
        sort=[
            {"_score":  {"order": "desc"}},
            {"rating":  {"order": "desc"}},
            {"harga":   {"order": "asc"}}
        ],
        size=20
    )

    return [
        {
            "id":    h["_id"],
            "score": round(h["_score"] or 0, 2),
            **h["_source"]
        }
        for h in hasil["hits"]["hits"]
    ]

# Contoh penggunaan
hasil = cari_produk_lanjutan(
    es,
    kata_kunci="laptop gaming",
    kategori="Elektronik",
    harga_max=20000000,
    rating_min=4.5
)
for p in hasil:
    print(f"[{p['score']}] {p['nama']} — Rp{p['harga']:,.0f}")

Pagination dan Highlight #

def cari_dengan_paginasi(
    es:         Elasticsearch,
    kata_kunci: str,
    halaman:    int = 1,
    per_halaman: int = 10
) -> dict:
    offset = (halaman - 1) * per_halaman

    hasil = es.search(
        index=INDEX_PRODUK,
        query={
            "multi_match": {
                "query":  kata_kunci,
                "fields": ["nama^2", "deskripsi"]
            }
        },
        highlight={
            "fields": {
                "nama":      {"number_of_fragments": 0},   # tampilkan seluruh field
                "deskripsi": {"fragment_size": 150, "number_of_fragments": 2}
            },
            "pre_tags":  ["<mark>"],    # tag HTML pembuka highlight
            "post_tags": ["</mark>"]    # tag HTML penutup highlight
        },
        from_=offset,
        size=per_halaman,
        track_total_hits=True          # hitung total dokumen yang cocok
    )

    total = hasil["hits"]["total"]["value"]
    hits  = hasil["hits"]["hits"]

    return {
        "total":        total,
        "halaman":      halaman,
        "per_halaman":  per_halaman,
        "total_halaman": -(-total // per_halaman),   # ceiling division
        "hasil": [
            {
                "id":        h["_id"],
                "score":     round(h["_score"], 2),
                **h["_source"],
                "highlight": h.get("highlight", {})
            }
            for h in hits
        ]
    }

response = cari_dengan_paginasi(es, "laptop gaming", halaman=1)
print(f"Total: {response['total']} hasil")
for item in response["hasil"]:
    print(f"- {item['nama']}")
    if "nama" in item["highlight"]:
        print(f"  → {item['highlight']['nama'][0]}")

Aggregasi #

Aggregasi Elasticsearch memungkinkan analitik real-time — menghitung distribusi, statistik, dan trend dari data yang sudah terindeks.

def analitik_produk(es: Elasticsearch) -> dict:
    hasil = es.search(
        index=INDEX_PRODUK,
        query={"term": {"aktif": True}},
        size=0,    # 0 = hanya kembalikan aggregasi, bukan dokumen
        aggs={
            # Bucket aggregation: kelompokkan berdasarkan kategori
            "per_kategori": {
                "terms": {
                    "field": "kategori",
                    "size":  20
                },
                "aggs": {
                    # Nested metric aggregation dalam setiap bucket
                    "rata_harga":  {"avg":   {"field": "harga"}},
                    "harga_min":   {"min":   {"field": "harga"}},
                    "harga_max":   {"max":   {"field": "harga"}},
                    "rata_rating": {"avg":   {"field": "rating"}},
                    "total_stok":  {"sum":   {"field": "stok"}},
                }
            },

            # Statistik keseluruhan
            "statistik_harga": {
                "extended_stats": {"field": "harga"}
            },

            # Histogram harga -- distribusi dalam range tertentu
            "distribusi_harga": {
                "histogram": {
                    "field":    "harga",
                    "interval": 5000000    # per 5 juta
                }
            },

            # Range aggregation -- bucket berdasarkan range custom
            "segmen_harga": {
                "range": {
                    "field": "harga",
                    "ranges": [
                        {"key": "Budget",    "to":   1000000},
                        {"key": "Menengah",  "from": 1000000, "to": 5000000},
                        {"key": "Premium",   "from": 5000000}
                    ]
                }
            }
        }
    )

    aggs = hasil["aggregations"]

    # Tampilkan per kategori
    print("=== Per Kategori ===")
    for bucket in aggs["per_kategori"]["buckets"]:
        print(f"{bucket['key']}: {bucket['doc_count']} produk, "
              f"rata-rata Rp{bucket['rata_harga']['value']:,.0f}, "
              f"rating {bucket['rata_rating']['value']:.1f}")

    # Distribusi harga
    print("\n=== Segmen Harga ===")
    for bucket in aggs["segmen_harga"]["buckets"]:
        print(f"{bucket['key']}: {bucket['doc_count']} produk")

    return aggs

Update dan Delete Dokumen #

# Update sebagian field (partial update)
def update_produk(es: Elasticsearch, produk_id: str, perubahan: dict) -> bool:
    try:
        es.update(
            index=INDEX_PRODUK,
            id=produk_id,
            doc=perubahan    # hanya field yang disertakan yang diubah
        )
        return True
    except Exception as e:
        print(f"Update gagal: {e}")
        return False

# Update menggunakan script (untuk operasi atomik)
def increment_stok(es: Elasticsearch, produk_id: str, jumlah: int) -> bool:
    try:
        es.update(
            index=INDEX_PRODUK,
            id=produk_id,
            script={
                "source": "ctx._source.stok += params.jumlah",
                "params": {"jumlah": jumlah}
            }
        )
        return True
    except Exception:
        return False

# Update by query -- update banyak dokumen sekaligus
def nonaktifkan_kategori(es: Elasticsearch, kategori: str) -> int:
    hasil = es.update_by_query(
        index=INDEX_PRODUK,
        query={"term": {"kategori": kategori}},
        script={"source": "ctx._source.aktif = false"}
    )
    return hasil["updated"]

# Delete dokumen
def hapus_produk(es: Elasticsearch, produk_id: str) -> bool:
    try:
        es.delete(index=INDEX_PRODUK, id=produk_id)
        return True
    except Exception:
        return False

# Delete by query
def hapus_produk_tidak_aktif(es: Elasticsearch) -> int:
    hasil = es.delete_by_query(
        index=INDEX_PRODUK,
        query={"term": {"aktif": False}}
    )
    return hasil["deleted"]

Penanganan Error #

from elasticsearch import (
    Elasticsearch,
    NotFoundError,
    ConflictError,
    ConnectionError,
    TransportError
)

def ambil_produk_aman(es: Elasticsearch, produk_id: str) -> dict | None:
    try:
        hasil = es.get(index=INDEX_PRODUK, id=produk_id)
        return {"id": hasil["_id"], **hasil["_source"]}
    except NotFoundError:
        return None   # dokumen tidak ditemukan
    except ConnectionError:
        print("Tidak bisa terhubung ke Elasticsearch.")
        return None
    except TransportError as e:
        print(f"Transport error [{e.status_code}]: {e.error}")
        return None

Kapan Menggunakan Elasticsearch #

Gunakan Elasticsearch untuk:
  ✓ Full-text search dengan ranking relevansi (e-commerce, blog, dokumentasi)
  ✓ Log aggregation dan analitik real-time (ELK Stack)
  ✓ Auto-complete dan search-as-you-type
  ✓ Faceted search (filter by kategori, harga, rating)
  ✓ Analitik data besar dengan aggregasi kompleks

Jangan gunakan sebagai database utama karena:
  ✗ Tidak mendukung transaksi ACID
  ✗ Operasi JOIN antar index tidak didukung secara native
  ✗ Update dokumen lebih lambat dari database relasional
  ✗ Storage lebih besar karena inverted index

Pola umum: database utama (PostgreSQL/MySQL) + Elasticsearch sebagai search layer

Ringkasan #

  • Elasticsearch 8.x wajib autentikasi — gunakan API key (direkomendasikan) atau basic auth; jangan gunakan format koneksi ES 7.x yang sudah deprecated.
  • Mapping eksplisit lebih aman — definisikan tipe setiap field secara eksplisit daripada mengandalkan dynamic mapping; hindari field yang tidak perlu di-index untuk menghemat storage.
  • text vs keyword — gunakan text untuk full-text search (nama produk, deskripsi); gunakan keyword untuk exact match, sorting, dan aggregasi (kategori, status, tag).
  • bool query adalah fondasi — gabungkan must (pengaruhi skor), filter (tidak pengaruhi skor, lebih cepat), dan should (opsional, meningkatkan skor) untuk query yang fleksibel.
  • filter lebih cepat dari must — kondisi yang tidak perlu mempengaruhi skor (kategori, status, range harga) sebaiknya diletakkan di filter, bukan must.
  • bulk() untuk indexing massal — jauh lebih efisien daripada loop index() satu per satu; gunakan generator untuk menghemat memori saat data sangat besar.
  • size=0 untuk aggregasi murni — set size=0 saat hanya butuh hasil aggregasi tanpa dokumen; menghemat bandwidth dan memori.
  • highlight — gunakan untuk menampilkan potongan teks yang cocok dengan query, sangat membantu UX di fitur pencarian.
  • track_total_hits=True — aktifkan jika butuh jumlah total dokumen yang cocok untuk pagination yang akurat.
  • Elasticsearch sebagai search layer — gunakan bersama database utama, bukan sebagai pengganti; simpan data di PostgreSQL/MySQL, indeks ke Elasticsearch untuk pencarian.

← Sebelumnya: MongoDB   Berikutnya: Kafka →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact