Skip to content

Commit

Permalink
Added banc_edgelist to fetch the full edgelist view
Browse files Browse the repository at this point in the history
  • Loading branch information
alexanderbates committed Jan 9, 2025
1 parent cfad2a5 commit ef6be07
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 8 deletions.
3 changes: 3 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ export(banc_brain_side_view)
export(banc_cave_client)
export(banc_cave_query)
export(banc_cave_tables)
export(banc_cave_views)
export(banc_cell_ids)
export(banc_cell_info)
export(banc_cellid_from_segid)
export(banc_change_log)
export(banc_decapitate)
export(banc_edgelist)
export(banc_front_view)
export(banc_ggneuron)
export(banc_ids)
Expand Down Expand Up @@ -83,6 +85,7 @@ export(banctable_update_rows)
export(choose_banc)
export(dr_banc)
export(elastix_xform)
export(franken_meta)
export(geom_neuron)
export(ggneuron)
export(ggplot2_neuron_path)
Expand Down
20 changes: 18 additions & 2 deletions R/banc-table.R
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,15 @@ banctable_move_to_bigdata <- function(table = "banc_meta",
# response = requests.post(url, headers=headers, json=body)
# print(response.text)

#' @export
#' @rdname banctable_query
franken_meta <- function(sql = "SELECT * FROM franken_meta",
base = "cns_meta", ...){
df <- banctable_query(sql=sql, base=base, ...) %>%
dplyr::select(-dplyr::starts_with(c("FAFB_", "MANC_")))
df
}

#' @export
#' @rdname banctable_query
banctable_append_rows <- function (df,
Expand All @@ -390,11 +399,12 @@ banctable_append_rows <- function (df,
chunkids = rep(seq_len(nchunks), rep(chunksize, nchunks))[seq_len(nx)]
chunks = split(df, chunkids)
oks = pbapply::pbsapply(chunks, banctable_append_rows,
table = table, base = base, chunksize = Inf, ...)
table = table, base = base, chunksize = Inf, bigdata = bigdata,
...)
return(all(oks))
}
pyl = fafbseg:::df2appendpayload(df)
if(bigdata){
if(!bigdata){
res = base$batch_append_rows(table_name = table, rows_data = pyl)
}else{
res = base$big_data_insert_rows(table_name = table, rows_data = pyl)
Expand Down Expand Up @@ -612,6 +622,9 @@ banctable_updateids <- function(){

# Update
cat('updating banc meta seatable...\n')
bc.new <- bc.new %>%
dplyr::filter(!is.na(`_id`)) %>%
dplyr::distinct(`_id`, .keep_all = TRUE)
bc.new[is.na(bc.new)] <- ''
bc.new[bc.new=="0"] <- ''
banctable_update_rows(df = bc.new,
Expand Down Expand Up @@ -703,3 +716,6 @@ banctable_annotate <- function(root_ids,
}


#' @export
#' @rdname banctable_query

168 changes: 165 additions & 3 deletions R/cave-tables.R
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,40 @@ banc_cave_tables <- function(datastack_name = NULL,
}
}

#' @rdname banc_cave_tables
#' @export
banc_edgelist <- function(...){
el <- with_banc(cave_view_query("synapses_v1_backbone_proofread_counts", fetch_all_rows= TRUE, ...))
el <- el %>%
dplyr::arrange(dplyr::desc(n))
el
}

#' @rdname banc_cave_tables
#' @export
banc_cave_views <- function(datastack_name = NULL,
select = NULL){
if(is.null(datastack_name))
datastack_name=banc_datastack_name()
fac <- fafbseg::flywire_cave_client(datastack_name = datastack_name)
dsinfo <- fac$info$get_datastack_info()
tt <- unique(names(fac$materialize$get_views()))
if(!is.null(select)){
chosen_tables <- grep(select, tt)
if (length(chosen_tables) == 0)
stop(sprintf("I cannot find a '%s' view for datastack: ", select),
datastack_name, "\nPlease ask for help on #annotation_infrastructure https://flywire-forum.slack.com/archives/C01M4LP2Y2D")
if (length(chosen_tables) == 1)
return(tt[chosen_tables])
chosen <- tt[rev(chosen_tables)[1]]
warning(sprintf("Multiple candidate '%s' views. Choosing: ", select),
chosen)
return(chosen)
}else{
return(tt)
}
}

#' @rdname banc_cave_tables
#' @export
banc_nuclei <- function (rootids = NULL,
Expand Down Expand Up @@ -300,7 +334,135 @@ banc_cave_cell_types <- function(user_id = NULL){
# chunksize = 1000)





# hidden
cave_view_query <- function(table,
datastack_name = getOption("fafbseg.cave.datastack_name","flywire_fafb_production"),
version = NULL,
timestamp = NULL,
live = is.null(version),
timetravel = FALSE,
filter_in_dict = NULL, filter_out_dict = NULL, filter_regex_dict = NULL,
filter_equal_dict=NULL, filter_greater_dict=NULL, filter_less_dict=NULL,
filter_greater_equal_dict=NULL, filter_less_equal_dict=NULL, filter_spatial_dict=NULL,
select_columns = NULL, offset = 0L, limit = NULL, fetch_all_rows = FALSE,
...) {
if (isTRUE(live) && !is.null(version))
warning("live=TRUE so ignoring materialization version")
if (is.null(live) && !is.null(timestamp))
live = TRUE
if (isFALSE(live) && is.null(version)) {
warning("Defaulting to latest materialisation version since live=FALSE\n",
"Specify `version='latest' instead to avoid this warning")
version = flywire_version("latest", datastack_name = datastack_name)
}
if (!is.null(timestamp) && !is.null(version))
stop("You can only supply one of timestamp and materialization version")
check_package_available("arrow")
fac = flywire_cave_client(datastack_name = datastack_name)
offset = checkmate::asInt(offset, lower = 0L)
if (!is.null(limit))
limit = checkmate::asInt(limit, lower = 0L)
is_view = table %in% fafbseg:::cave_views(fac)
version = fafbseg:::flywire_version(version, datastack_name = datastack_name)
if (!is.null(version)) {
available = version %in% fafbseg:::flywire_version("available", datastack_name = datastack_name)
if (!available) {
if (is_view)
stop("Sorry! Views only work with unexpired materialisation versions.\n",
"See https://flywire-forum.slack.com/archives/C01M4LP2Y2D/p1697956174773839 for info.")
timestamp = fafbseg:::flywire_timestamp(version, datastack_name = datastack_name,
convert = F)
message("Materialisation version no longer available. Falling back to (slower) timestamp!")
if (isFALSE(live))
live = TRUE
version = NULL
}
}
now = fafbseg:::flywire_timestamp(timestamp = "now", convert = FALSE)
if(timetravel) {
timestamp2 = fafbseg:::flywire_timestamp(version, timestamp = timestamp,
datastack_name = datastack_name)
timestamp = now
version = NULL
live = 2L
}
filter_in_dict = fafbseg:::cavedict_rtopy(filter_in_dict, wrap_table = if (isTRUE(live == 2))
table
else NULL)
filter_out_dict = fafbseg:::cavedict_rtopy(filter_out_dict)
filter_equal_dict = fafbseg:::cavedict_rtopy(filter_equal_dict)
filter_greater_dict = fafbseg:::cavedict_rtopy(filter_greater_dict)
filter_less_dict = fafbseg:::cavedict_rtopy(filter_less_dict)
filter_greater_equal_dict = fafbseg:::cavedict_rtopy(filter_greater_equal_dict)
filter_less_equal_dict = fafbseg:::cavedict_rtopy(filter_less_equal_dict)
filter_spatial_dict = fafbseg:::cavedict_rtopy(filter_spatial_dict)
if (!is.null(filter_regex_dict)) {
was_char = is.character(filter_regex_dict)
if (was_char) {
filter_regex_dict = as.list(filter_regex_dict)
if (isTRUE(live == 2)) {
warning("When live==2 / timetravel=T filter_regex_dict should be a list of form: ",
"`list(<table_name>=c(<colname>='<regex>'))`",
"\n", "I'm going to try and format your input correctly.")
filter_regex_dict = list(filter_regex_dict)
names(filter_regex_dict) = table
}
}
}
if (!is.null(select_columns)) {
if (isTRUE(live == 2) && is.character(select_columns)) {
warning("When live==2 / timetravel=T select_columns should be a list of form: ",
"`list(<table_name>=c('col1', 'col2'))`", "\n",
"I'm going to try and format your input correctly.")
select_columns = list(select_columns)
names(select_columns) = table
}
}
annotdfs = list()
while (offset >= 0) {
pymsg <- reticulate::py_capture_output({
annotdf <- if (is_view) {
if (!is.null(timestamp))
stop("Sorry! You cannot specify a timestamp when querying a view.\n",
"You can specify older timepoints by using unexpired materialisation versions.\n",
"See https://flywire-forum.slack.com/archives/C01M4LP2Y2D/p1697956174773839 for info.")
reticulate::py_call(fac$materialize$query_view,
view_name = table, materialization_version = version,
filter_in_dict = filter_in_dict, filter_out_dict = filter_out_dict,
filter_equal_dict=filter_equal_dict,
#filter_greater_dict=filter_equal_dict,
#filter_less_dict=filter_less_dict,filter_greater_equal_dict=filter_greater_equal_dict,
#filter_less_equal_dict=filter_less_equal_dict, filter_spatial_dict=filter_spatial_dict,
filter_regex_dict = filter_regex_dict, select_columns = select_columns,
offset = offset, limit = limit, ...)
}else{
stop("Sorry, no valid view specified")
}
annotdf <- fafbseg:::pandas2df(annotdf)
})
annotdfs[[length(annotdfs) + 1]] = annotdf
limited_query = isTRUE(grepl("Limited query to", pymsg))
if (limited_query && is.null(limit) && !fetch_all_rows)
warning(paste(pymsg, "\nUse fetch_all_rows=T or set an explicit limit to avoid warning!"))
else if (!limited_query && nzchar(pymsg)) {
warning(pymsg)
}
offset <- if (fetch_all_rows && limited_query)
offset + nrow(annotdf)
else -1L
}
res <- if (length(annotdfs) == 1)
annotdfs[[1]]
else dplyr::bind_rows(annotdfs)
if (timetravel) {
if (!all(c("pt_supervoxel_id", "pt_root_id") %in% colnames(res)))
stop("Sorry I do not know how to time travel dataframes without `pt_supervoxel_id`, `pt_root_id` columns!",
if (is.null(select_columns))
""
else "\nPlease review your value of `select_columns`!")
res$pt_root_id = flywire_updateids(res$pt_root_id, svids = res$pt_supervoxel_id,
timestamp = timestamp2, cache = T, Verbose = F)
}
res
}

12 changes: 9 additions & 3 deletions man/banc_cave_tables.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions man/banctable_query.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit ef6be07

Please sign in to comment.