Skip to content

Commit

Permalink
resolve #116
Browse files Browse the repository at this point in the history
  • Loading branch information
huahaiy committed Oct 7, 2024
1 parent e9b110d commit 39f4b9a
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 95 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# Change Log

## WIP

### Added
- [Datalog] `:offset` and `:limit` support, #126, #117
- [Datalog] `:order-by` support, #116
- [Datalog] `count-datoms` function to return the number of datoms of a pattern
- [Datalog] `cardinality` function to return the number of unique values of an
attribute

## 0.9.11 (2024-10-04)

### Fixed
Expand Down
7 changes: 5 additions & 2 deletions src/datalevin/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -411,16 +411,19 @@ Only usable for debug output.
[rstore (vec (replace {rdb :remote-db-placeholder} inputs))])))))

(defn q
"Executes a Datalog query. See [docs.datomic.com/on-prem/query.html](https://docs.datomic.com/on-prem/query.html).
"Executes a Datalog query, which supports [Datomic Query Format](https://docs.datomic.com/query/query-data-reference.html).
In addition, `:order-by` clause is supported, which can be followed by a single variable or a vector. The vector includes one or more variables, each optionally followed by a keyword `:asc` or `:desc`, specifying ascending or descending order, respectively. The default is `:asc`.
Usage:
```
(q '[:find ?value
:where [_ :likes ?value]
:order-by [?value :desc]
:timeout 5000]
db)
; => #{[\"fries\"] [\"candy\"] [\"pie\"] [\"pizza\"]}
; => #{[\"pizza\"] [\"pie\"] [\"fries\"] [\"candy\"]}
```"
[query & inputs]
(if-let [[store inputs'] (only-remote-db inputs)]
Expand Down
20 changes: 1 addition & 19 deletions src/datalevin/datom.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
(:require
[taoensso.nippy :as nippy]
[datalevin.constants :refer [tx0]]
[datalevin.util :refer [combine-hashes combine-cmp]])
[datalevin.util :refer [combine-hashes combine-cmp defcomp]])
(:import
[datalevin.utl BitOps]
[clojure.lang IFn$OOL]
[java.util Arrays]
[java.io DataInput DataOutput]))

Expand Down Expand Up @@ -175,23 +174,6 @@

(defmacro long-compare [x y] `(Long/compare ^long ~x ^long ~y))

(defmacro defcomp
[sym [arg1 arg2] & body]
(let [a1 (with-meta arg1 {})
a2 (with-meta arg2 {})]
`(def ~sym
(reify
java.util.Comparator
(compare [_# ~a1 ~a2]
(let [~arg1 ~arg1 ~arg2 ~arg2]
~@body))
clojure.lang.IFn
(invoke [this# ~a1 ~a2]
(.compare this# ~a1 ~a2))
IFn$OOL
(invokePrim [this# ~a1 ~a2]
(.compare this# ~a1 ~a2))))))

(defcomp cmp-datoms-eavt [^Datom d1, ^Datom d2]
(combine-cmp
(long-compare (.-e d1) (.-e d2))
Expand Down
8 changes: 8 additions & 0 deletions src/datalevin/db.clj
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@
(.put ^ConcurrentHashMap caches (s/dir store)
(LRUCache. (:cache-limit (s/opts store)) target))))

(defn cache-get
[store k]
(.get ^LRUCache (.get ^ConcurrentHashMap caches (s/dir store)) k))

(defn cache-put
[store k v]
(.put ^LRUCache (.get ^ConcurrentHashMap caches (s/dir store)) k v))

(defmacro wrap-cache
[store pattern body]
`(let [cache# (.get ^ConcurrentHashMap caches (s/dir ~store))]
Expand Down
51 changes: 45 additions & 6 deletions src/datalevin/parser.clj
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,8 @@

;; q* prefix because of https://dev.clojure.org/jira/browse/CLJS-2237
(deftrecord Query
[qfind qorig-find qwith qreturn-map qin qwhere qorig-where qtimeout])
[qfind qorig-find qwith qreturn-map qin qwhere qorig-where qtimeout
qorder qlimit qoffset])

(defn query->map [query]
(loop [parsed {}, key nil, qs query]
Expand Down Expand Up @@ -740,6 +741,16 @@
(raise ":find and :with should not use same variables: " (mapv :symbol shared)
{:error :parser/query, :vars shared, :form form})))

(when-some [order-spec (:qorder q)]
(let [find-vars (set (map :symbol (collect-vars (:qfind q))))
order-vars (set (filter symbol? order-spec))]
(when (not= (count order-vars) (/ (count order-spec) 2))
(raise "Repeated :order-by variables"
{:error :parser/query, :form order-spec}))
(when-not (set/subset? order-vars find-vars)
(raise "There are :order-by variable that is not in :find spec"
{:error :parser/query, :form order-spec}))))

(when-some [return-map (:qreturn-map q)]
(when (instance? FindScalar (:qfind q))
(raise (:type return-map) " does not work with single-scalar :find"
Expand Down Expand Up @@ -794,10 +805,37 @@
(defn parse-timeout [t]
(cond
(sequential? t) (recur (first t))
(number? t) t
(nil? t) nil
:else (raise "Unsupported timeout format"
{:error :parser/query :form t})))
(number? t) t
(nil? t) nil
:else (raise "Unsupported timeout format"
{:error :parser/query :form t})))

(defn- parse-order-vec [ob]
(loop [res [] in? false vs ob]
(if (seq vs)
(let [cv (first vs)]
(cond
(symbol? cv)
(if in?
(recur (-> res (conj :asc) (conj cv)) true (rest vs))
(recur (conj res cv) true (rest vs)))
(#{:asc :desc} cv)
(if in?
(recur (conj res cv) false (rest vs))
(raise "Incorrect order-by format" {:error :parser/query :form ob}))
:else
(raise "Incorrect order-by format" {:error :parser/query :form ob})))
(if (symbol? (peek res))
(conj res :asc)
res))))

(defn parse-order [[ob]]
(cond
(symbol? ob) [ob :asc]
(vector? ob) (parse-order-vec ob)
(nil? ob) nil
:else (raise "Unsupported order-by format"
{:error :parser/query :form ob})))

(defn parse-query [q]
(let [qm (cond
Expand All @@ -820,6 +858,7 @@
(or (:in qm) (default-in qwhere)))
:qwhere qwhere
:qorig-where where
:qtimeout (parse-timeout (:timeout qm))})]
:qtimeout (parse-timeout (:timeout qm))
:qorder (parse-order (:order-by qm))})]
(validate-query res q qm)
res))
129 changes: 85 additions & 44 deletions src/datalevin/query.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
[datalevin.constants :as c]
[datalevin.bits :as b])
(:import
[java.util Arrays List Collection]
[java.util Arrays List Collection Comparator]
[java.util.concurrent ConcurrentHashMap]
[clojure.lang ILookup LazilyPersistentVector]
[datalevin.utl LikeFSM LRUCache]
Expand Down Expand Up @@ -586,7 +586,7 @@
[context sym]
(when-some [rel (rel-with-attr context sym)]
(when-some [tuple (first (:tuples rel))]
(let [tg (if (u/array? tuple) r/typed-aget get)]
(let [tg (r/tuple-get tuple)]
(tg tuple ((:attrs rel) sym))))))

(defn- rel-contains-attrs? [rel attrs] (some #(contains? (:attrs rel) %) attrs))
Expand Down Expand Up @@ -659,11 +659,11 @@
(aset tuples-args i (get attrs arg)))
(aset static-args i arg))))
(fn call-fn [tuple]
(dotimes [i len]
(when-some [tuple-idx (aget tuples-args i)]
(let [tg (if (u/array? tuple) r/typed-aget get)
v (tg tuple tuple-idx)]
(aset static-args i v))))
(let [tg (r/tuple-get tuple)]
(dotimes [i len]
(when-some [tuple-idx (aget tuples-args i)]
(let [v (tg tuple tuple-idx)]
(aset static-args i v)))))
(call static-args))))

(defn -call-fn
Expand Down Expand Up @@ -2179,18 +2179,20 @@

(defn tuples->return-map
[return-map tuples]
(let [symbols (:symbols return-map)
idxs (range 0 (count symbols))]
(persistent!
(reduce
(fn [coll tuple]
(conj! coll
(let [get-i (if (r/tuple-array? tuple) r/typed-aget get)]
(if (seq tuples)
(let [symbols (:symbols return-map)
idxs (range 0 (count symbols))
get-i (r/tuple-get (first tuples))]
(persistent!
(reduce
(fn [coll tuple]
(conj! coll
(persistent!
(reduce
(fn [m i] (assoc! m (nth symbols i) (get-i tuple i)))
(transient {}) idxs)))))
(transient #{}) tuples))))
(transient {}) idxs))))
(transient #{}) tuples)))
#{}))

(defprotocol IPostProcess
(-post-process [find return-map tuples]))
Expand Down Expand Up @@ -2307,37 +2309,76 @@
(.put ^LRUCache *query-cache* q res)
res)))

(defn- order-comp
[tg idx di]
(if (identical? di :asc)
(fn [t1 t2] (compare (tg t1 idx) (tg t2 idx)))
(fn [t1 t2] (compare (tg t2 idx) (tg t1 idx)))))

(defn- order-comps
[tg find-vars order]
(let [pairs (partition-all 2 order)
idxs (mapv (fn [v] (u/index-of #(= v %) find-vars))
(into [] (map first) pairs))
comps (reverse (mapv #(order-comp tg %1 %2) idxs
(into [] (map second) pairs)))]
(reify Comparator
(compare [_ t1 t2]
(loop [comps comps
res (num 0)]
(if (not-empty comps)
(recur (next comps)
(let [r ((first comps) t1 t2)]
(if (= 0 r) res r)))
res))))))

(defn- order-result
[find-vars result order]
(if (seq result)
(let [tg (r/tuple-get (first result))
cmp (order-comps tg find-vars order)]
(sort cmp result))
result))

(defn- q*
[parsed-q inputs]
(binding [timeout/*deadline* (timeout/to-deadline (:qtimeout parsed-q))]
(let [find (:qfind parsed-q)
find-elements (dp/find-elements find)
result-arity (count find-elements)
with (:qwith parsed-q)
find-vars (dp/find-vars find)
all-vars (concatv find-vars (map :symbol with))

[parsed-q inputs] (plugin-inputs parsed-q inputs)

context
(-> (Context. parsed-q [] {} {} [] nil nil nil
(volatile! {}) true nil)
(resolve-ins inputs)
(resolve-redudants)
(-q true)
(collect all-vars))
result
(cond->> (:result-set context)
with (mapv #(subvec % 0 result-arity))

(some dp/aggregate? find-elements)
(aggregate find-elements context)

(some dp/pull? find-elements) (pull find-elements context)

true (-post-process find (:qreturn-map parsed-q)))]
(result-explain context result)
(if-let [order (:qorder parsed-q)]
(order-result find-vars result order)
result))))

(defn q
[q & inputs]
(let [parsed-q (parsed-q q)]
(binding [timeout/*deadline* (timeout/to-deadline (:qtimeout parsed-q))]
(let [find (:qfind parsed-q)
find-elements (dp/find-elements find)
result-arity (count find-elements)
with (:qwith parsed-q)
all-vars (concatv (dp/find-vars find) (map :symbol with))

[parsed-q inputs] (plugin-inputs parsed-q inputs)

context
(-> (Context. parsed-q [] {} {} [] nil nil nil
(volatile! {}) true nil)
(resolve-ins inputs)
(resolve-redudants)
(-q true)
(collect all-vars))
result
(cond->> (:result-set context)
with (mapv #(subvec % 0 result-arity))

(some dp/aggregate? find-elements)
(aggregate find-elements context)

(some dp/pull? find-elements) (pull find-elements context)

true (-post-process find (:qreturn-map parsed-q)))]
(result-explain context result)
result))))
;; search cache
(q* parsed-q inputs)))

(defn- plan-only
[q & inputs]
Expand Down
Loading

0 comments on commit 39f4b9a

Please sign in to comment.