MongoDB #

MongoDB adalah database NoSQL berbasis dokumen yang menyimpan data dalam format BSON (Binary JSON) — bukan baris dan kolom seperti database relasional, melainkan dokumen yang fleksibel dan bisa berisi struktur bersarang. Ini membuatnya ideal untuk data yang skemanya sering berubah, dokumen yang kompleks, dan aplikasi yang butuh skala horizontal. Python berinteraksi dengan MongoDB melalui library PyMongo — driver resmi yang menyediakan API lengkap mulai dari operasi CRUD, aggregation pipeline, hingga transaksi multi-dokumen. Memahami cara kerja query operator MongoDB dan bagaimana aggregation pipeline bekerja adalah kunci untuk memanfaatkan MongoDB secara optimal.

Instalasi #

pip install pymongo

Untuk koneksi ke MongoDB Atlas (cloud) atau yang membutuhkan TLS, tambahkan dependency opsional:

pip install "pymongo[srv]"   # untuk connection string mongodb+srv://

Membuat Koneksi #

MongoClient mengelola connection pool secara otomatis — satu instance cukup untuk seluruh aplikasi dan thread-safe.

from pymongo import MongoClient
import os

# ANTI-PATTERN: hardcode koneksi di kode
client = MongoClient("localhost", 27017)  # ✗ -- tidak fleksibel untuk deployment

# BENAR: baca dari environment variable
MONGO_URI = os.getenv("MONGO_URI", "mongodb://localhost:27017")
client = MongoClient(
    MONGO_URI,
    serverSelectionTimeoutMS=5000,   # timeout jika server tidak bisa dijangkau
    connectTimeoutMS=10000,
    maxPoolSize=50                   # batas koneksi dalam pool
)

# Koneksi ke MongoDB Atlas
# MONGO_URI = "mongodb+srv://user:[email protected]/myapp?retryWrites=true"
client_atlas = MongoClient(os.getenv("MONGO_URI"))

# Tes koneksi
try:
    client.admin.command("ping")
    print("Koneksi MongoDB berhasil.")
except Exception as e:
    print(f"Koneksi gagal: {e}")

# Akses database dan collection
db         = client["myapp"]          # atau client.myapp
pengguna   = db["pengguna"]           # atau db.pengguna
produk     = db["produk"]
orders     = db["orders"]
MongoClient bersifat thread-safe dan mengelola connection pool secara internal. Buat satu instance saja di level aplikasi dan gunakan bersama-sama di seluruh kode — jangan buat instance baru per request atau per fungsi.

Struktur Dokumen #

Berbeda dari tabel relasional, dokumen MongoDB bisa berisi tipe data yang beragam dan struktur bersarang. Memahami struktur ini penting sebelum menulis query.

# Contoh dokumen pengguna
dokumen_pengguna = {
    # _id otomatis dibuat sebagai ObjectId jika tidak disertakan
    "nama":      "Budi Santoso",
    "email":     "[email protected]",
    "usia":      28,
    "aktif":     True,
    "tag":       ["python", "backend"],       # array
    "alamat": {                               # embedded document
        "kota":     "Bandung",
        "provinsi": "Jawa Barat",
        "kode_pos": "40115"
    },
    "riwayat_login": [                        # array of embedded documents
        {"waktu": "2024-03-15T10:00:00", "ip": "192.168.1.1"},
        {"waktu": "2024-03-16T08:30:00", "ip": "192.168.1.2"},
    ]
}

# Contoh dokumen produk
dokumen_produk = {
    "nama":     "Laptop Gaming ASUS ROG",
    "harga":    15000000,
    "stok":     5,
    "kategori": "Elektronik",
    "spesifikasi": {
        "ram":       "16GB",
        "storage":   "512GB SSD",
        "processor": "Intel i7"
    },
    "tag": ["laptop", "gaming", "asus"]
}

Insert Dokumen #

from pymongo import MongoClient
from datetime import datetime, timezone
from bson import ObjectId

# Insert satu dokumen
def tambah_pengguna(db, nama: str, email: str, usia: int) -> str:
    doc = {
        "nama":       nama,
        "email":      email,
        "usia":       usia,
        "aktif":      True,
        "tag":        [],
        "dibuat_pada": datetime.now(timezone.utc)
    }
    hasil = db["pengguna"].insert_one(doc)
    return str(hasil.inserted_id)

id_baru = tambah_pengguna(db, "Budi Santoso", "[email protected]", 28)
print(f"ID baru: {id_baru}")

# Insert banyak dokumen sekaligus
def tambah_banyak_pengguna(db, daftar: list[dict]) -> list[str]:
    for doc in daftar:
        doc["dibuat_pada"] = datetime.now(timezone.utc)
        doc.setdefault("aktif", True)
        doc.setdefault("tag", [])
    
    hasil = db["pengguna"].insert_many(daftar)
    return [str(oid) for oid in hasil.inserted_ids]

data_baru = [
    {"nama": "Sari Dewi",     "email": "[email protected]",  "usia": 25},
    {"nama": "Andi Prasetyo", "email": "[email protected]",  "usia": 32},
    {"nama": "Rina Marlina",  "email": "[email protected]",  "usia": 29},
]
ids = tambah_banyak_pengguna(db, data_baru)
print(f"{len(ids)} dokumen ditambahkan.")

Read — Query Operator #

MongoDB menggunakan operator berbasis dict untuk memfilter dokumen. Ini sangat berbeda dari SQL WHERE clause.

Operator Perbandingan #

col = db["pengguna"]

# Sama dengan (implisit)
col.find_one({"email": "[email protected]"})

# Operator eksplisit
col.find({"usia": {"$gt":  25}})          # usia > 25
col.find({"usia": {"$gte": 25}})          # usia >= 25
col.find({"usia": {"$lt":  30}})          # usia < 30
col.find({"usia": {"$lte": 30}})          # usia <= 30
col.find({"usia": {"$ne":  28}})          # usia != 28
col.find({"usia": {"$in":  [25, 28, 32]}})  # usia IN (25, 28, 32)
col.find({"usia": {"$nin": [25, 28]}})    # usia NOT IN (25, 28)

# Range
col.find({"usia": {"$gte": 20, "$lte": 30}})  # 20 <= usia <= 30

# Null dan keberadaan field
col.find({"foto": None})                       # foto == null
col.find({"foto": {"$exists": False}})         # field foto tidak ada sama sekali
col.find({"foto": {"$exists": True}})          # field foto ada (meski nilainya null)

Operator Logika #

# $and -- semua kondisi harus terpenuhi (default jika beberapa key)
col.find({"aktif": True, "usia": {"$gte": 25}})  # implisit AND

# $or -- salah satu kondisi terpenuhi
col.find({"$or": [
    {"nama": {"$regex": "budi", "$options": "i"}},
    {"email": {"$regex": "budi", "$options": "i"}}
]})

# $nor -- tidak ada yang terpenuhi
col.find({"$nor": [{"aktif": False}, {"usia": {"$lt": 18}}]})

# $not
col.find({"usia": {"$not": {"$lt": 18}}})  # usia tidak kurang dari 18

Query pada Field Bersarang dan Array #

# Field bersarang (embedded document) -- gunakan dot notation
col.find({"alamat.kota": "Bandung"})
col.find({"alamat.kode_pos": {"$regex": "^40"}})

# Array -- contains
col.find({"tag": "python"})                    # tag mengandung "python"
col.find({"tag": {"$in": ["python", "go"]}})   # tag mengandung python ATAU go
col.find({"tag": {"$all": ["python", "backend"]}})  # tag mengandung keduanya

# Array size
col.find({"tag": {"$size": 0}})               # tag kosong

Projection — Pilih Field Tertentu #

# ANTI-PATTERN: ambil seluruh dokumen padahal hanya butuh beberapa field
semua = list(col.find({"aktif": True}))        # ✗ -- ambil semua field, boros bandwidth

# BENAR: gunakan projection untuk membatasi field yang dikembalikan
# 1 = sertakan, 0 = kecualikan (tidak bisa campur kecuali untuk _id)
semua = list(col.find(
    {"aktif": True},
    {"nama": 1, "email": 1, "usia": 1, "_id": 0}  # ✓ -- hanya nama, email, usia
))

# Kecualikan field besar
semua = list(col.find(
    {},
    {"riwayat_login": 0, "bio": 0}   # ambil semua kecuali dua field ini
))

Sorting dan Pagination #

import pymongo

# Sorting
col.find().sort("nama", pymongo.ASCENDING)    # A-Z
col.find().sort("nama", pymongo.DESCENDING)   # Z-A
col.find().sort([                             # multi-kolom
    ("usia", pymongo.DESCENDING),
    ("nama", pymongo.ASCENDING)
])

# Pagination
halaman     = 2
per_halaman = 10
offset      = (halaman - 1) * per_halaman

hasil = list(
    col.find({"aktif": True})
    .sort("dibuat_pada", pymongo.DESCENDING)
    .skip(offset)
    .limit(per_halaman)
)

# Hitung total untuk info pagination
total = col.count_documents({"aktif": True})
print(f"Halaman {halaman}, menampilkan {len(hasil)} dari {total} pengguna")

Update Dokumen #

MongoDB menyediakan berbagai operator update yang bekerja pada field tertentu tanpa menimpa dokumen secara keseluruhan.

col = db["pengguna"]

# ANTI-PATTERN: replace seluruh dokumen
col.update_one(
    {"email": "[email protected]"},
    {"nama": "Budi Wijaya"}         # ✗ -- menimpa seluruh dokumen, field lain hilang!
)

# BENAR: gunakan $set untuk update field tertentu
col.update_one(
    {"email": "[email protected]"},
    {"$set": {"nama": "Budi Wijaya", "usia": 29}}   # ✓ -- hanya field ini yang berubah
)

# Operator update yang umum
col.update_one(
    {"email": "[email protected]"},
    {
        "$set":       {"nama": "Budi Wijaya"},        # set nilai field
        "$inc":       {"usia": 1},                    # increment (+1)
        "$push":      {"tag": "devops"},              # tambah ke array
        "$addToSet":  {"tag": "backend"},             # tambah ke array jika belum ada
        "$pull":      {"tag": "junior"},              # hapus dari array
        "$unset":     {"foto": ""},                   # hapus field
        "$currentDate": {"diubah_pada": True}         # set ke waktu sekarang
    }
)

# upsert -- update jika ada, insert jika belum
col.update_one(
    {"email": "[email protected]"},
    {"$set": {"nama": "Budi", "aktif": True}},
    upsert=True
)

# Update banyak dokumen sekaligus
hasil = col.update_many(
    {"aktif": False, "usia": {"$lt": 18}},
    {"$set": {"label": "nonaktif-minor"}}
)
print(f"{hasil.modified_count} dokumen diperbarui.")

# findOneAndUpdate -- ambil dokumen lama sebelum/sesudah update
from pymongo import ReturnDocument

dokumen_baru = col.find_one_and_update(
    {"email": "[email protected]"},
    {"$set": {"aktif": False}},
    return_document=ReturnDocument.AFTER   # BEFORE untuk dokumen sebelum update
)

Delete Dokumen #

col = db["pengguna"]

# Hapus satu dokumen (yang pertama ditemukan)
hasil = col.delete_one({"email": "[email protected]"})
print(f"Dihapus: {hasil.deleted_count} dokumen")

# Hapus banyak dokumen
hasil = col.delete_many({"aktif": False})
print(f"Dihapus: {hasil.deleted_count} dokumen tidak aktif")

# findOneAndDelete -- hapus dan kembalikan dokumen yang dihapus
dokumen_terhapus = col.find_one_and_delete({"email": "[email protected]"})
if dokumen_terhapus:
    print(f"Dokumen dihapus: {dokumen_terhapus['nama']}")

Aggregation Pipeline #

Aggregation pipeline adalah fitur paling powerful di MongoDB — memungkinkan transformasi data melalui serangkaian tahap (stage) yang dieksekusi secara berurutan.

col = db["orders"]

# Pipeline dasar: filter → group → sort
pipeline = [
    # Stage 1: $match -- filter dokumen (seperti WHERE di SQL)
    {"$match": {"status": "selesai"}},

    # Stage 2: $group -- kelompokkan dan agregasi
    {"$group": {
        "_id":               "$pengguna_id",
        "jumlah_order":      {"$sum": 1},
        "total_belanja":     {"$sum": "$total"},
        "rata_belanja":      {"$avg": "$total"},
        "order_pertama":     {"$min": "$dibuat_pada"},
        "order_terakhir":    {"$max": "$dibuat_pada"},
    }},

    # Stage 3: $sort -- urutkan hasil
    {"$sort": {"total_belanja": -1}},

    # Stage 4: $limit -- batasi hasil
    {"$limit": 10}
]

hasil = list(col.aggregate(pipeline))
for r in hasil:
    print(f"Pengguna {r['_id']}: {r['jumlah_order']} order, total Rp{r['total_belanja']:,.0f}")

Pipeline Lanjutan #

col_pengguna = db["pengguna"]

# $lookup -- JOIN ke collection lain
pipeline_lookup = [
    {"$match": {"aktif": True}},

    # JOIN orders ke pengguna
    {"$lookup": {
        "from":         "orders",         # collection tujuan
        "localField":   "_id",            # field di collection ini
        "foreignField": "pengguna_id",    # field di collection tujuan
        "as":           "orders"          # nama field hasil JOIN
    }},

    # $addFields -- tambah field kalkulasi
    {"$addFields": {
        "jumlah_order": {"$size": "$orders"},
        "total_belanja": {"$sum": "$orders.total"}
    }},

    # $project -- pilih field yang dikembalikan
    {"$project": {
        "nama":          1,
        "email":         1,
        "jumlah_order":  1,
        "total_belanja": 1,
        "_id":           0
    }},

    {"$sort": {"total_belanja": -1}},
    {"$limit": 20}
]

hasil = list(col_pengguna.aggregate(pipeline_lookup))

# $unwind -- pecah array menjadi dokumen terpisah
pipeline_unwind = [
    {"$unwind": "$tag"},                    # pecah array tag
    {"$group": {
        "_id":   "$tag",
        "count": {"$sum": 1}
    }},
    {"$sort": {"count": -1}},
    {"$limit": 10}
]

top_tag = list(db["pengguna"].aggregate(pipeline_unwind))
for t in top_tag:
    print(f"Tag '{t['_id']}': {t['count']} pengguna")

Indexing #

Index sangat penting untuk performa query pada collection yang besar. Tanpa index, MongoDB melakukan full collection scan untuk setiap query.

import pymongo

col = db["pengguna"]

# Single field index
col.create_index("email", unique=True)               # index unik untuk email
col.create_index("aktif")                            # index biasa
col.create_index([("dibuat_pada", pymongo.DESCENDING)])  # descending

# Compound index -- urutan field penting!
col.create_index([
    ("aktif", pymongo.ASCENDING),
    ("dibuat_pada", pymongo.DESCENDING)
])

# Text index -- untuk full-text search
col.create_index([("nama", pymongo.TEXT), ("bio", pymongo.TEXT)])

# Query teks dengan text index
col.find({"$text": {"$search": "budi santoso"}})
col.find({"$text": {"$search": "\"budi santoso\""}})   # exact phrase

# TTL index -- dokumen otomatis dihapus setelah N detik
db["sesi"].create_index(
    "dibuat_pada",
    expireAfterSeconds=3600    # hapus dokumen 1 jam setelah dibuat_pada
)

# Lihat semua index di collection
print(list(col.list_indexes()))

# Hapus index
col.drop_index("email_1")

Jangan buat index berlebihan. Setiap index mempercepat read tapi memperlambat write karena MongoDB harus memperbarui semua index saat insert/update/delete. Buat index hanya untuk field yang sering digunakan di filter query atau sort. Gunakan explain() untuk menganalisis apakah query menggunakan index dengan benar.

# Analisis eksekusi query
col.find({"aktif": True, "usia": {"$gte": 25}}).explain("executionStats")

Penanganan Error #

from pymongo.errors import (
    DuplicateKeyError,
    ConnectionFailure,
    OperationFailure,
    ServerSelectionTimeoutError
)

def tambah_pengguna_aman(db, nama: str, email: str) -> str | None:
    try:
        hasil = db["pengguna"].insert_one({
            "nama":  nama,
            "email": email,
            "aktif": True
        })
        return str(hasil.inserted_id)
    
    except DuplicateKeyError:
        print(f"Email '{email}' sudah terdaftar.")
        return None
    
    except OperationFailure as e:
        print(f"Operasi gagal: {e.details}")
        return None

def koneksi_aman(uri: str):
    try:
        client = MongoClient(uri, serverSelectionTimeoutMS=3000)
        client.admin.command("ping")
        return client
    except ServerSelectionTimeoutError:
        print("Tidak bisa terhubung ke MongoDB server.")
        return None
    except ConnectionFailure as e:
        print(f"Koneksi gagal: {e}")
        return None

Kapan Memilih MongoDB vs Database Relasional #

Pilih MongoDB jika:
  ✓ Skema data sering berubah atau tidak konsisten antar dokumen
  ✓ Data bersifat hierarkis dan sering dibaca sebagai satu unit (embedded)
  ✓ Butuh skala horizontal (sharding) dengan mudah
  ✓ Volume data sangat besar dengan kebutuhan write throughput tinggi
  ✓ Contoh: katalog produk e-commerce, konten CMS, log aplikasi, user profiles

Pilih database relasional (PostgreSQL, MySQL) jika:
  ✓ Data sangat relasional dan konsistensi ACID kritis (transaksi keuangan)
  ✓ Skema data stabil dan sudah terdefinisi dengan baik
  ✓ Butuh query JOIN yang kompleks antar banyak entitas
  ✓ Tim sudah familiar dengan SQL dan ekosistemnya

Ringkasan #

  • Satu MongoClient untuk seluruh aplikasi — client thread-safe dan mengelola connection pool; jangan buat instance baru per request.
  • find_one() vs find()find_one() mengembalikan dict atau None; find() mengembalikan Cursor yang harus diiterasi atau dikonversi ke list().
  • Operator $set wajib saat update — tanpa $set, seluruh dokumen akan ditimpa dan semua field lain akan hilang.
  • Projection untuk efisiensi — selalu batasi field yang dikembalikan dengan projection jika tidak butuh seluruh dokumen, terutama untuk field besar seperti array atau embedded document.
  • Dot notation untuk field bersarang — gunakan "alamat.kota" untuk query atau update field di dalam embedded document.
  • $addToSet vs $push — gunakan $addToSet untuk menambah ke array tanpa duplikat; $push selalu menambahkan meski sudah ada.
  • upsert=True — gunakan untuk operasi insert-atau-update yang idempoten tanpa harus cek keberadaan dulu.
  • Index untuk field yang sering di-filter dan di-sort — buat compound index dengan urutan field sesuai urutan query yang paling sering dipakai.
  • Aggregation pipeline — gunakan untuk transformasi data kompleks; stage $match di awal pipeline sangat penting untuk memanfaatkan index dan mengurangi dokumen yang diproses.
  • TTL index — manfaatkan untuk data yang memiliki masa kadaluarsa seperti session, token, atau log sementara.

← Sebelumnya: NoSQL   Berikutnya: Elasticsearch →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact