Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate labeling framework v2 #3995

Merged
merged 4 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Sanbase.BlockchainAddress.BlockchainAddressLabelChange.SqlQuery do
SELECT
toUnixTimestamp(dt),
address,
dictGetString('default.labels_dict', 'fqn', label_id) AS label_fqn,
dictGetString('default.labels', 'fqn', label_id) AS label_fqn,
sign
FROM address_label_changes
PREWHERE
Expand Down
91 changes: 57 additions & 34 deletions lib/sanbase/clickhouse/exchange_address.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,18 @@ defmodule Sanbase.Clickhouse.ExchangeAddress do
end

sql = """
SELECT DISTINCT lower(JSONExtractString(metadata, 'owner')) AS exchange
FROM (
SELECT argMax(metadata, version) AS metadata, argMax(sign, version) AS sign
FROM blockchain_address_labels
PREWHERE
blockchain = {{blockchain}} AND
#{exchange_type_filter(exchange_type)}
GROUP BY blockchain, asset_id, label, address
HAVING sign = 1
)
ORDER BY exchange
SELECT DISTINCT dictGet('labels', 'value', label_id)
FROM current_label_addresses
WHERE
blockchain = {{blockchain}} AND
label_id IN ( SELECT label_id FROM label_metadata WHERE key = 'owner' ) AND
address IN (
SELECT DISTINCT address
FROM current_label_addresses
WHERE
blockchain = {{blockchain}} AND
#{exchange_type_filter(exchange_type)}
)
"""

params = %{
Expand All @@ -89,14 +90,23 @@ defmodule Sanbase.Clickhouse.ExchangeAddress do

defp exchange_addresses_query(blockchain, limit) do
sql = """
SELECT DISTINCT(address), label, lower(JSONExtractString(metadata, 'owner')) AS owner
FROM(
SELECT address, label, argMax(metadata, version) AS metadata, argMax(sign, version) AS sign
FROM blockchain_address_labels
PREWHERE blockchain = {{blockchain}} AND #{exchange_type_filter(:both)}
GROUP BY blockchain, asset_id, label, address
HAVING sign = 1
)
SELECT address, label, owner FROM (
SELECT address, owner FROM (
SELECT address, label_id FROM current_label_addresses
WHERE
label_id IN ( SELECT label_id FROM label_metadata WHERE key = 'owner' ) AND
blockchain = {{blockchain}}
) INNER JOIN (
SELECT label_id, value AS owner
FROM label_metadata
WHERE key = 'owner'
) USING label_id
) INNER JOIN (
SELECT address, dictGet('labels', 'key', label_id) AS label
FROM current_label_addresses
WHERE #{exchange_type_filter(:both)}
AND blockchain = {{blockchain}}
) USING address
IvanIvanoff marked this conversation as resolved.
Show resolved Hide resolved
LIMIT {{limit}}
"""

Expand All @@ -110,17 +120,23 @@ defmodule Sanbase.Clickhouse.ExchangeAddress do

defp exchange_addresses_for_exchange_query(blockchain, owner, limit) do
sql = """
SELECT DISTINCT(address), label, lower(JSONExtractString(metadata, 'owner')) AS owner
FROM(
SELECT address, label, argMax(metadata, version) AS metadata, argMax(sign, version) AS sign
FROM blockchain_address_labels
PREWHERE
blockchain = {{blockchain}} AND
lower(JSONExtractString(metadata, 'owner')) = {{owner}} AND
#{exchange_type_filter(:both)}
GROUP BY blockchain, asset_id, label, address
HAVING sign = 1
)
SELECT DISTINCT address FROM (
SELECT address, owner FROM (
SELECT address, label_id FROM current_label_addresses
WHERE
label_id IN ( SELECT label_id FROM label_metadata WHERE key = 'owner' AND value = {{owner}}) AND
blockchain = {{blockchain}}
) INNER JOIN (
SELECT label_id, value AS owner
FROM label_metadata
WHERE key = 'owner'
) USING label_id
) INNER JOIN (
SELECT address, dictGet('labels', 'key', label_id) AS label
FROM current_label_addresses
WHERE #{exchange_type_filter(:both)}
AND blockchain = {{blockchain}}
) USING address
LIMIT {{limit}}
"""

Expand All @@ -133,9 +149,16 @@ defmodule Sanbase.Clickhouse.ExchangeAddress do
Sanbase.Clickhouse.Query.new(sql, params)
end

defp exchange_type_filter(:both),
do: "label IN ('centralized_exchange', 'decentralized_exchange')"
defp exchange_type_filter(type) when type in [:cex, :dex, :both] do
case type do
:cex ->
"label_id = (SELECT label_id FROM label_metadata WHERE key = 'centralized_exchange' LIMIT 1)"

defp exchange_type_filter(:dex), do: "label = 'decentralized_exchange'"
defp exchange_type_filter(:cex), do: "label = 'centralized_exchange'"
:dex ->
"label_id = (SELECT label_id FROM label_metadata WHERE key = 'decentralized_exchange' LIMIT 1)"

:both ->
"label_id IN (SELECT label_id FROM label_metadata WHERE key IN ('centralized_exchange', 'decentralized_exchange'))"
end
end
end
65 changes: 7 additions & 58 deletions lib/sanbase/clickhouse/label/label.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,6 @@ defmodule Sanbase.Clickhouse.Label do

@create_label_topic "label_changes"

def list_all(:all = _blockchain) do
query_struct = all_labels_query()

Sanbase.ClickhouseRepo.query_transform(query_struct, fn [label] -> label end)
end

def list_all(blockchain) do
query_struct = all_blockchain_labels_query(blockchain)

Sanbase.ClickhouseRepo.query_transform(query_struct, fn [label] ->
label
end)
end

def addresses_by_labels(label_fqn_or_fqns, opts \\ [])

def addresses_by_labels(label_fqn_or_fqns, opts) do
Expand Down Expand Up @@ -165,27 +151,6 @@ defmodule Sanbase.Clickhouse.Label do
do: {:error, "Username is required for creating custom address labels"}

# Private functions

defp all_labels_query() do
sql = """
SELECT DISTINCT(label) FROM blockchain_address_labels
"""

params = %{}
Sanbase.Clickhouse.Query.new(sql, params)
end

defp all_blockchain_labels_query(blockchain) do
sql = """
SELECT DISTINCT(label)
FROM blockchain_address_labels
PREWHERE blockchain = {{blockchain}}
"""

params = %{blockchain: blockchain}
Sanbase.Clickhouse.Query.new(sql, params)
end

# For backwards compatibility, if the slug is nil treat it as ethereum blockchain
defp slug_to_blockchain(nil), do: "ethereum"

Expand All @@ -194,7 +159,7 @@ defmodule Sanbase.Clickhouse.Label do

def addresses_by_label_fqns_query(label_fqns, nil = _blockchain) do
sql = """
SELECT address, blockchain, dictGetString('default.labels_dict', 'fqn', label_id) AS label_fqn
SELECT address, blockchain, dictGetString('default.labels', 'fqn', label_id) AS label_fqn
FROM label_addresses
PREWHERE
#{label_id_by_label_fqn_filter(label_fqns, argument_name: "label_fqns")}
Expand All @@ -208,7 +173,7 @@ defmodule Sanbase.Clickhouse.Label do

def addresses_by_label_fqns_query(label_fqns, blockchain) do
sql = """
SELECT address, blockchain, dictGetString('default.labels_dict', 'fqn', label_id) AS label_fqn
SELECT address, blockchain, dictGetString('default.labels', 'fqn', label_id) AS label_fqn
FROM label_addresses
PREWHERE
#{label_id_by_label_fqn_filter(label_fqns, argument_name: "label_fqns")} AND
Expand All @@ -223,7 +188,7 @@ defmodule Sanbase.Clickhouse.Label do

def addresses_by_label_keys_query(label_keys, nil = _blockchain) do
sql = """
SELECT address, blockchain, dictGetString('default.labels_dict', 'fqn', label_id) AS label_fqn
SELECT address, blockchain, dictGetString('default.labels', 'fqn', label_id) AS label_fqn
FROM label_addresses
PREWHERE
#{label_id_by_label_key_filter(label_keys, argument_name: "label_keys")}
Expand All @@ -237,7 +202,7 @@ defmodule Sanbase.Clickhouse.Label do

def addresses_by_label_keys_query(label_keys, blockchain) do
sql = """
SELECT address, blockchain, dictGetString('default.labels_dict', 'fqn', label_id) AS label_fqn
SELECT address, blockchain, dictGetString('default.labels', 'fqn', label_id) AS label_fqn
FROM label_addresses
PREWHERE
#{label_id_by_label_key_filter(label_keys, argument_name: "label_keys")} AND
Expand Down Expand Up @@ -265,26 +230,10 @@ defmodule Sanbase.Clickhouse.Label do
end
end

defp addresses_labels_query(slug, "ethereum", addresses) do
defp addresses_labels_query(slug, blockchain, addresses) do
sql = create_addresses_labels_query()
params = %{addresses: addresses, slug: slug}

Sanbase.Clickhouse.Query.new(sql, params)
end

defp addresses_labels_query(_slug, blockchain, addresses) do
sql = """
SELECT address, label, metadata
FROM(
SELECT address, label, argMax(metadata, version) AS metadata, argMax(sign, version) AS sign
FROM blockchain_address_labels
PREWHERE blockchain = {{blockchain}} AND address IN ({{addresses}})
GROUP BY blockchain, asset_id, label, address
HAVING sign = 1
)
"""
params = %{addresses: addresses, slug: slug, blockchain: blockchain}

params = %{blockchain: blockchain, addresses: addresses}
Sanbase.Clickhouse.Query.new(sql, params)
end

Expand Down Expand Up @@ -384,7 +333,7 @@ defmodule Sanbase.Clickhouse.Label do
FROM
current_label_addresses
WHERE
address IN [{{addresses}}]
address IN [{{addresses}}] AND blockchain = {{blockchain}}
) AS a
LEFT JOIN
(
Expand Down
4 changes: 2 additions & 2 deletions lib/sanbase/clickhouse/nft/nft_trade.ex
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,15 @@ defmodule Sanbase.Clickhouse.NftTrade do

defp fetch_label_query(contract, blockchain, field) do
sql = """
SELECT dictGet('default.labels_dict', '#{field}', label_id)
SELECT dictGet('default.labels', '#{field}', label_id)
FROM
(
SELECT labels
FROM default.current_labels
WHERE (blockchain = {{blockchain}}) AND (address = lower({{contract}}))
)
ARRAY JOIN labels AS label_id
WHERE dictGet('default.labels_dict', 'key', label_id) = 'name'
WHERE dictGet('default.labels', 'key', label_id) = 'name'
"""

params = %{blockchain: blockchain, contract: contract}
Expand Down
8 changes: 4 additions & 4 deletions lib/sanbase/metric/sql_query_helper.ex
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,14 @@ defmodule Sanbase.Metric.SqlQuery.Helper do
def label_id_by_label_fqn_filter(label_fqn, opts) when is_binary(label_fqn) do
arg_name = Keyword.fetch!(opts, :argument_name)

"label_id = dictGetUInt64('default.label_ids_dict', 'label_id', tuple({{#{arg_name}}}))"
"label_id = dictGetUInt64('default.labels_by_fqn', 'label_id', tuple({{#{arg_name}}}))"
end

def label_id_by_label_fqn_filter(label_fqns, opts) when is_list(label_fqns) do
arg_name = Keyword.fetch!(opts, :argument_name)

"label_id IN (
SELECT dictGetUInt64('default.label_ids_dict', 'label_id', tuple(fqn)) AS label_id
SELECT dictGetUInt64('default.labels_by_fqn', 'label_id', tuple(fqn)) AS label_id
FROM system.one
ARRAY JOIN [{{#{arg_name}}}] AS fqn
)"
Expand Down Expand Up @@ -305,7 +305,7 @@ defmodule Sanbase.Metric.SqlQuery.Helper do
label_fqn_key = "label_fqn_#{pos}"

str = "label_id IN (
SELECT dictGetUInt64('default.label_ids_dict', 'label_id', tuple(fqn)) AS label_id
SELECT dictGetUInt64('default.labels_by_fqn', 'label_id', tuple(fqn)) AS label_id
FROM system.one
ARRAY JOIN [{{#{label_fqn_key}}}] AS fqn
)"
Expand All @@ -318,7 +318,7 @@ defmodule Sanbase.Metric.SqlQuery.Helper do
label_fqn_key = "label_fqn_#{pos}"

str =
"label_id = dictGetUInt64('default.label_ids_dict', 'label_id', tuple({{#{label_fqn_key}}}))"
"label_id = dictGetUInt64('default.labels_by_fqn', 'label_id', tuple({{#{label_fqn_key}}}))"

{str, Map.put(params, label_fqn_key, value)}
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ defmodule SanbaseWeb.Graphql.Resolvers.BlockchainAddressResolver do
erc20: %{module: Transfers.Erc20Transfers, slug: nil}
}

def blockchain_address_labels(_root, args, _resolution) do
Map.get(args, :blockchain, :all)
|> Label.list_all()
end

def get_blockchain_address_labels(_root, _args, _resolution) do
BlockchainAddressLabelChange.labels_list()
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ defmodule SanbaseWeb.Graphql.Schema.BlockchainAddressQueries do
cache_resolve(&BlockchainAddressResolver.blockchain_address_label_changes/3)
end

field :blockchain_address_labels, list_of(:string) do
meta(access: :free)

arg(:blockchain, :string)

cache_resolve(&BlockchainAddressResolver.blockchain_address_labels/3)
end

field :get_blockchain_address_labels, list_of(:blockchain_address_label) do
meta(access: :free)

Expand Down
1 change: 0 additions & 1 deletion test/sanbase/billing/query_access_level_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ defmodule Sanbase.Billing.QueryAccessLevelTest do
:api_metric_distribution,
:assets_held_by_address,
:blockchain_address_label_changes,
:blockchain_address_labels,
:blockchain_address_transaction_volume_over_time,
:blockchain_address_user_pair,
:blockchain_address,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,6 @@ defmodule SanbaseWeb.Graphql.BlockchainAddressApiTest do
}
end

test "fetch all blockchain address labels with blockchainAddressLabelsApi", context do
rows = [["miner"], ["centralized_exchange"], ["decentralized_exchange"]]

Sanbase.Mock.prepare_mock2(&Sanbase.ClickhouseRepo.query/2, {:ok, %{rows: rows}})
|> Sanbase.Mock.run_with_mocks(fn ->
result =
blockchain_address_labels(context.conn)
|> get_in(["data", "blockchainAddressLabels"])

assert result == ["miner", "centralized_exchange", "decentralized_exchange"]
end)
end

test "fetch blockchain address labels for a given blockchain with blockchainAddressLabelsApi",
context do
rows = [["centralized_exchange"], ["decentralized_exchange"]]

Sanbase.Mock.prepare_mock2(&Sanbase.ClickhouseRepo.query/2, {:ok, %{rows: rows}})
|> Sanbase.Mock.run_with_mocks(fn ->
result =
blockchain_address_labels(context.conn, blockchain: "ethereum")
|> get_in(["data", "blockchainAddressLabels"])

assert result == ["centralized_exchange", "decentralized_exchange"]
end)
end

test "fetch blockchain address labels with getBlockchainAddressLabels API", context do
rows = [["santiment/miner:v1", "Miner"], ["santiment/owner->Coinbase:v1", "owner->Coinbase"]]

Expand Down Expand Up @@ -266,18 +239,6 @@ defmodule SanbaseWeb.Graphql.BlockchainAddressApiTest do
|> json_response(200)
end

defp blockchain_address_labels(conn, opts \\ []) do
query =
case Keyword.get(opts, :blockchain) do
nil -> "{ blockchainAddressLabels }"
blockchain -> ~s/{ blockchainAddressLabels(blockchain: "#{blockchain}") }/
end

conn
|> post("/graphql", query_skeleton(query))
|> json_response(200)
end

defp add_blockchain_address_labels_mutation(selector, labels) do
"""
mutation {
Expand Down