Skip to content
program of growth
Share
Explore


import
pandas
as
pd
from
af
.
pipeline
.
data_reader
.
exceptions
import
DataReaderException
from
af
.
pipeline
.
data_reader
.
models
import
Occurrence
from
af
.
pipeline
.
data_reader
.
models
.
brapi
.
core
import
BaseListResponse
,
Study

from
af
.
pipeline
.
data_reader
.
models
.
brapi
.
germplasm
import
Germplasm
from
af
.
pipeline
.
data_reader
.
models
.
brapi
.
phenotyping
import
ObservationUnitQueryParams
from
af
.
pipeline
.
data_reader
.
models
.
brapi
.
phenotyping
import
ObservationUnitQueryParams


from
af
.
pipeline
.
data_reader
.
phenotype_data
import
PhenotypeData
from
af
.
pipeline
.
pandasutil
import
df_keep_columns
from
pydantic
import
ValidationError
,
BaseModel
,
parse_obj_as


# all urls are set here
GET_OBSERVATION_UNITS_URL =
"/observationunits"

GET_OBSERVATIONS_URL =
"/observations"

GET_STUDIES_BY_ID_URL =
"/studies/
{studyDbId}
"
# noqa:

GET_GERMPLASM_BY_DB_ID =
"/search/germplasm/
{searchResultDbId}
"


class
PhenotypeDataBrapi
(
PhenotypeData
):
"""Reads phenotype data from a brapi ebs data source."""

plots_api_fields_to_local_fields = {
"observationUnitDbId"
:
"plot_id"
,
"germplasmDbId"
:
"entry_id"
,
"studyDbId"
:
"occurr_id"
,
"trialDbId"
:
"expt_id"
,
"locationDbId"
:
"loc_id"
,
"observationUnitPosition.positionCoordinateX"
:
"pa_x"
,
"observationUnitPosition.positionCoordinateY"
:
"pa_y"
,
"replicate"
:
"rep_factor"
,
"block"
:
"blk"
,
}

plot_measurements_api_fields_to_local_fields = {
"observationUnitDbId"
:
"plot_id"
,
"observationVariableDbId"
:
"trait_id"
,
"value"
:
"trait_value"
,
}

brapi_list_page_size =
1000

def
get_plots
(
self
,
occurrence_id
:
str
=
None
) ->
pd
.
DataFrame
:

plots_data = []

page_num =
0

observation_units_filters =
ObservationUnitQueryParams
(
studyDbId
=
occurrence_id
,
observationLevel
=
"plot"
,
pageSize
=
self
.brapi_list_page_size
)

while
len
(plots_data) >=
self
.brapi_list_page_size or page_num ==
0
:

observation_units_filters.page = page_num

api_response =
self
.
get
(
endpoint
=GET_OBSERVATION_UNITS_URL,
params
=observation_units_filters.
dict
())

if not api_response.is_success:
raise
DataReaderException
(api_response.error)

brapi_response =
BaseListResponse
(**api_response.body)

plots_data = brapi_response.result.data

if
len
(plots_data) ==
0
and page_num ==
0
:
columns =
list
(
self
.plots_api_fields_to_local_fields.
values
())
columns.
append
(
"plot_qc"
)
return
pd
.
DataFrame
(
columns
=columns)

# paths to normalize json data to flat columns
columns_path = [
"observationUnitDbId"
,
"locationDbId"
,
"studyDbId"
,
"trialDbId"
,
"germplasmDbId"
,
[
"observationUnitPosition"
,
"positionCoordinateX"
],
[
"observationUnitPosition"
,
"positionCoordinateY"
],
]

# list record path to normalze
list_record_path = [
"observationUnitPosition"
,
"observationLevelRelationships"
]

# this dataframe will have observation level array as seperate rows
plots_unpivoted =
pd
.
json_normalize
(
plots_data,
record_path
=list_record_path,
meta
=columns_path,
)

plots_observation_levels_pivoted = plots_unpivoted.
pivot
(
index
=
"observationUnitDbId"
,
columns
=
"levelName"
,
values
=
"levelCode"
)

plots_observation_levels_droped = (
plots_unpivoted.
drop
(
columns
=[
"levelOrder"
,
"levelCode"
,
"levelName"
]).
drop_duplicates
().
reset_index
()
)

plots_page = plots_observation_levels_droped.
join
(
plots_observation_levels_pivoted,
on
=
"observationUnitDbId"
)

# keep only local field columns
plots_page =
df_keep_columns
(plots_page,
self
.plots_api_fields_to_local_fields.
keys
())

# since plot_qc not defined in brapi spec, set default value "G"
plots_page[
"plot_qc"
] =
"G"

if page_num ==
0
:
plots = plots_page
else:
plots = plots.
append
(plots_page)

# to get next page
page_num +=
1

# rename dataframe column with local field names
plots.
rename
(
columns
=
self
.plots_api_fields_to_local_fields,
inplace
=
True
)

return plots.
astype
(
str
)

def
get_plot_measurements
(
self
,
occurrence_id
:
str
=
None
,
trait_id
:
str
=
None
) ->
pd
.
DataFrame
:

plot_measurements_data = []

page_num =
0

observations_filters =
ObservationUnitQueryParams
(
studyDbId
=
occurrence_id
,
observationVariableDbId
=
trait_id
,
observationLevel
=
"plot"
,
pageSize
=
1000
)

while
len
(plot_measurements_data) >=
self
.brapi_list_page_size or page_num ==
0
:

observations_filters.page = page_num

api_response =
self
.
get
(
endpoint
=GET_OBSERVATIONS_URL,
params
=observations_filters.
dict
())

if not api_response.is_success:
raise
DataReaderException
(api_response.error)

brapi_response =
BaseListResponse
(**api_response.body)

plot_measurements_data = brapi_response.result.data

plot_measurements_page =
pd
.
DataFrame
(plot_measurements_data)

if page_num ==
0
:
plot_measurements = plot_measurements_page
else:
plot_measurements = plot_measurements.
append
(plot_measurements_page)

page_num +=
1

# keep only local field columns
plot_measurements =
df_keep_columns
(plot_measurements,
self
.plot_measurements_api_fields_to_local_fields.
keys
())

# rename columns to local field names
plot_measurements = plot_measurements.
rename
(
columns
=
self
.plot_measurements_api_fields_to_local_fields,
)

# trait_qc not part of brapi spec, so set to default value
plot_measurements[
"trait_qc"
] =
"G"

return plot_measurements.astype(
str
)

def
get_occurrence
(
self
,
occurrence_id
:
int
=
None
):

studies_url = GET_STUDIES_BY_ID_URL.
format
(
studyDbId
=
occurrence_id
)
api_response =
self
.
get
(
endpoint
=studies_url)

if not api_response.is_success:
raise
DataReaderException
(api_response.error)

result = api_response.body[
"result"
]

if result is
None
:
raise
DataReaderException
(
"Occurrence is not found"
)

# load it to model to make sure required fields are found
try:
_study =
Study
(**result)
except
ValidationError
as e:
raise
DataReaderException
(
str
(e))

return
Occurrence
(
occurrence_id
=_study.studyDbId,
occurrence_name
=_study.studyName,
experiment_id
=_study.trialDbId,
experiment_name
=_study.trialName,
location_id
=_study.locationDbId,
location
=_study.locationName,
)

def
get_experiment
(
self
,
experiment_id
:
int
=
None
):
raise
NotImplementedError

def
get_trait
(
self
,
trait_id
:
int
=
None
):
raise
NotImplementedError

def
search_germplasm
(
self
,
germplasm_search_ids
:
list
[
str
]):

search_query = {
"germplasmDbIds"
:
germplasm_search_ids
}

search_germplasm_response =
self
.
post
(
endpoint
=
"/search/germplasm/"
,
json
=search_query)
if not search_germplasm_response.is_success:
raise
DataReaderException
(search_germplasm.error)

if search_germplasm_response.body is
None
:
raise
DataReaderException
(
"Germplasms are not found"
)

if search_germplasm_response.http_status ==
202
:

search_germplasm_dbid = search_germplasm_response.body[
"result"
][
"searchResultDbId"
]

germplasm_url = GET_GERMPLASM_BY_DB_ID.
format
(
searchResultDbId
=search_germplasm_dbid)

get_germplasm =
self
.
get
(
endpoint
=germplasm_url)
germplasm_list =
parse_obj_as
(
list
[
Germplasm
], get_germplasm.body[
"result"
][
"data"
])
return germplasm_list
if search_germplasm_response.http_status ==
200
:
germplasm_list =
parse_obj_as
(
list
[
Germplasm
], search_germplasm_response.body[
"result"
][
"data"
])
return germplasm_list

if not get_germplasm.is_success:
raise
DataReaderException
(search_germplasm.error)

if get_germplasm.body is
None
:
raise
DataReaderException
(
"Germplasms are not found"
)





import
json
from
unittest
import
TestCase
from
unittest
.
mock
import
Mock
, patch
import
pandas
as
pd
from
af
.
pipeline
.
data_reader
.
exceptions
import
DataReaderException
from
af
.
pipeline
.
data_reader
.
models
import
Occurrence
from
af
.
pipeline
.
data_reader
.
models
.
brapi
.
germplasm
import
Germplasm

from
af
.
pipeline
.
data_reader
.
phenotype_data_brapi
import
PhenotypeDataBrapi
from
pandas
.
_testing
import assert_frame_equal


from
conftest
import get_json_resource, get_test_plot_measurements, get_test_plots


def
get_brapi_observation_units_response():
"""returns a mock brapi response for observation units."""
return get_json_resource(__file__, "brapi_observationunits_mock_response.json")


def
get_brapi_observations_response():
""" returns a mock brapi response for observation units """
return get_json_resource(__file__, "brapi_observations_mock_response.json")


def
get_brapi_studies_response():
""" returns a mock brapi response for studies """
return get_json_resource(__file__, "brapi_studies_mock_response.json")


# def get_search_result_dbid
def
get_brapi_search_result_dbid_mock_response():
""" returns a mock brapi response for germplasm """
return get_json_resource(__file__, "brapi_search_result_dbid_mock_response.json")


def
brapi_germplasm_response():
""" returns a mock brapi response for germplasm """
return get_json_resource(__file__, "brapi_germplasm_mock_response.json")


def
get_test_occurrence_brapi() ->
Occurrence
:
test_occurrence = {
"occurrence_id": 7,
"occurrence_name": "test_occurrence",
"experiment_id": 4,
"experiment_name": "test_experiment",
"location_id": 6,
"location": "test_location",
}
return
Occurrence
(**test_occurrence)


class
TestPhenotypeDataBrapi
(
TestCase
):
@patch("af.pipeline.data_reader.data_reader.requests.get")
def
test_get_plots(
self
,
mock_get
):

mock_get
.return_value.status_code = 200

mock_get
.return_value.json =
Mock
(
side_effect
=[get_brapi_observation_units_response()])

plots_test_df = get_test_plots()

plots_result_df =
PhenotypeDataBrapi
(
api_base_url
="http://test").get_plots("testid")

# assert dataframe is returned
assert isinstance(plots_result_df,
pd
.
DataFrame
)

# arrange columns
plots_result_df = plots_result_df[plots_test_df.columns]

assert_frame_equal(plots_result_df, plots_test_df.astype(
str
))

@patch("af.pipeline.data_reader.data_reader.requests.get")
def
test_get_plots_with_pages(
self
,
mock_get
):

PhenotypeDataBrapi
.brapi_list_page_size = 2

mock_get
.return_value.status_code = 200

_pagination = """{
"pageSize": 2,
"totalPages": 2,
"currentPage": 0,
"totalCount": 3
}"""

first_page = get_brapi_observation_units_response()

pagination =
json
.loads(_pagination)
first_page["metadata"]["pagination"] = pagination

second_page = get_brapi_observation_units_response()
pagination =
json
.loads(_pagination)
pagination["currentPage"] = 1
second_page["metadata"]["pagination"] = pagination
second_page["result"]["data"].pop()
second_page_item = second_page["result"]["data"][0]
second_page_item["observationUnitDbId"] = 2911

mock_get
.return_value.json =
Mock
(
side_effect
=[first_page, second_page])

# expected result
plots_expected = get_test_plots()
plots_expected_page_2 = plots_expected.iloc[0].copy()
plots_expected_page_2["plot_id"] = 2911
plots_expected = plots_expected.append(plots_expected_page_2)

plots_result =
PhenotypeDataBrapi
(
api_base_url
="http://test").get_plots("testid")

# assert dataframe is returned
assert isinstance(plots_result,
pd
.
DataFrame
)

# arrange columns
plots_result = plots_result[plots_expected.columns]

assert_frame_equal(plots_result, plots_expected.astype(
str
))

@patch("af.pipeline.data_reader.data_reader.requests.get")
def
test_get_plots_empty_result(
self
,
mock_get
):
mock_get
.return_value.status_code = 200

brapi_response = get_brapi_observation_units_response()
brapi_response["result"]["data"] = []

mock_get
.return_value.json =
Mock
(
side_effect
=[brapi_response])

plots_test_df = get_test_plots()

plots_result_df =
PhenotypeDataBrapi
(
api_base_url
="http://test").get_plots("testid")

# assert dataframe is returned
assert isinstance(plots_result_df,
pd
.
DataFrame
)

assert len(plots_result_df) == 0

assert
set
(plots_result_df.columns) ==
set
(plots_test_df.columns)

@patch("af.pipeline.data_reader.data_reader.requests.get")
def
test_get_plot_measurements(
self
,
mock_get
):

mock_get
.return_value.status_code = 200

mock_get
.return_value.json =
Mock
(
side_effect
=[get_brapi_observations_response()])

plot_measurements_test_df = get_test_plot_measurements()

plot_measurements_result_df =
PhenotypeDataBrapi
(
api_base_url
="http://test").get_plot_measurements("testid")

# assert dataframe is returned
assert isinstance(plot_measurements_result_df,
pd
.
DataFrame
)

plot_measurements_result_df = plot_measurements_result_df[plot_measurements_test_df.columns]

assert_frame_equal(plot_measurements_result_df, plot_measurements_test_df.astype(
str
))

@patch("af.pipeline.data_reader.data_reader.requests.get")
def
test_get_plots_measurements_with_pages(
self
,
mock_get
):

PhenotypeDataBrapi
.brapi_list_page_size = 2

mock_get
.return_value.status_code = 200

_pagination = """{
"pageSize": 2,
"totalPages": 2,
"currentPage": 0,
"totalCount": 3
}"""

first_page = get_brapi_observations_response()

pagination =
json
.loads(_pagination)
first_page["metadata"]["pagination"] = pagination

second_page = get_brapi_observations_response()
pagination =
json
.loads(_pagination)
pagination["currentPage"] = 1
second_page["metadata"]["pagination"] = pagination
second_page["result"]["data"].pop()
second_page_item = second_page["result"]["data"][0]
second_page_item["observationUnitDbId"] = 2911

mock_get
.return_value.json =
Mock
(
side_effect
=[first_page, second_page])

plot_measurements_expected = get_test_plot_measurements()
plot_measurements_expected_page_2 = plot_measurements_expected.iloc[0].copy()
plot_measurements_expected_page_2["plot_id"] = 2911
plot_measurements_expected = plot_measurements_expected.append(plot_measurements_expected_page_2)

plot_measurements_result =
PhenotypeDataBrapi
(
api_base_url
="http://test").get_plot_measurements("testid")

# assert dataframe is returned
assert isinstance(plot_measurements_result,
pd
.
DataFrame
)

# arrange columns
plot_measurements_result = plot_measurements_result[plot_measurements_expected.columns]

assert_frame_equal(plot_measurements_result, plot_measurements_expected.astype(
str
))

@patch("af.pipeline.data_reader.data_reader.requests.get")
def
test_get_occurrence(
self
,
mock_get
):
from
af
.
pipeline
.
data_reader
.
phenotype_data_brapi
import
PhenotypeDataBrapi

mock_get
.return_value.status_code = 200
mock_get
.return_value.json.return_value = get_brapi_studies_response()
test_occurrence = get_test_occurrence_brapi()

occurrence_result = (
PhenotypeDataBrapi
(
api_base_url
="http://test")).get_occurrence(
occurrence_id
=test_occurrence.occurrence_id
)

for field, value in test_occurrence:
assert value == occurrence_result.dict()[field]

@patch("af.pipeline.data_reader.data_reader.requests.get")
def
test_get_occurrence_none_result(
self
,
mock_get
):
from
af
.
pipeline
.
data_reader
.
phenotype_data_brapi
import
PhenotypeDataBrapi

mock_get
.return_value.status_code = 200

brapi_response = get_brapi_studies_response()
brapi_response["result"] = None

mock_get
.return_value.json.return_value = brapi_response

with
self
.assertRaises(
DataReaderException
):
PhenotypeDataBrapi
(
api_base_url
="http://test").get_occurrence(
occurrence_id
="test")

@patch("af.pipeline.data_reader.data_reader.requests.post")
def
test_search_germplasm_case_1(
self
,
mock_post
):
""""""
brapi_post_response = brapi_germplasm_response()

mock_post
.return_value.status_code = 200
mock_post
.return_value.json.return_value = brapi_post_response
search_query = {
"germplasmDbIds": ["bd76c553-3862-11eb-95eb-0242ac140004",
"zg6c553-3862-11eb-95eb-0242ac140004"]
}

germplasm_result = (
PhenotypeDataBrapi
(
api_base_url
="http://test")).search_germplasm(
germplasm_search_ids
=search_query.values
)

assert germplasm_result[0].germplasmName == "TANGKAI ROTAN"
assert germplasm_result[0].germplasmPUI == "9"
assert germplasm_result[0].germplasmDbId == "bd76c553-3862-11eb-95eb-0242ac140004"
assert germplasm_result[0].pedigree == "TR"

assert germplasm_result[1].germplasmName == "TANGKAI BOTAN"
assert germplasm_result[1].germplasmPUI == "19"
assert germplasm_result[1].germplasmDbId == "zg76c553-3862-11eb-95eb-0242ac140004"
assert germplasm_result[1].pedigree == "TR"

@patch("af.pipeline.data_reader.data_reader.requests.get")
@patch("af.pipeline.data_reader.data_reader.requests.post")
def
test_search_germplasm_case_2(
self
,
mock_post
,
mock_get
):
""""""
brapi_post_response = get_brapi_search_result_dbid_mock_response()

mock_post
.return_value.status_code = 202
mock_post
.return_value.json.return_value = brapi_post_response

brapi_get_response = brapi_germplasm_response()
mock_get
.return_value.status_code = 200
mock_get
.return_value.json.return_value = brapi_get_response

search_query = {
"germplasmDbIds": ["bd76c553-3862-11eb-95eb-0242ac140004",
"zg6c553-3862-11eb-95eb-0242ac140004"]
}

germplasm_result = (
PhenotypeDataBrapi
(
api_base_url
="http://test")).search_germplasm(
germplasm_search_ids
=search_query.values
)

assert germplasm_result[0].germplasmName == "TANGKAI ROTAN"
assert germplasm_result[0].germplasmPUI == "9"
assert germplasm_result[0].germplasmDbId == "bd76c553-3862-11eb-95eb-0242ac140004"
assert germplasm_result[0].pedigree == "TR"

assert germplasm_result[1].germplasmName == "TANGKAI BOTAN"
assert germplasm_result[1].germplasmPUI == "19"
assert germplasm_result[1].germplasmDbId == "zg76c553-3862-11eb-95eb-0242ac140004"
assert germplasm_result[1].pedigree == "TR"

# write test case for 200 ( where list of germplasm are returned directly)



{
"metadata"
: {
"pagination"
: {
"totalCount"
: 1,
"totalPages"
: 1
},
"status"
: [],
"datafiles"
: []
},
"result"
: {
"data"
: [
{
"commonCropName"
: "rice",
"germplasmPUI"
:"9",
"germplasmDbId"
: "bd76c553-3862-11eb-95eb-0242ac140004",
"defaultDisplayName"
: "TANGKAI ROTAN",
"accessionNumber"
: "IRGC 31",
"germplasmName"
: "TANGKAI ROTAN",
"pedigree"
: "TR",
"synonyms"
: [{
"synonym"
: "IRGC 31",
"type"
: "ACCNO"}],
"countryOfOriginCode"
: "IRRI-GRC",
"typeOfGermplasmStorageCode"
: [],
"taxonIds"
: [],
"donors"
: [],
"acquisitionDate"
: "1961-03-27",
"breedingMethodDbId"
: "70",
"additionalInfo"
: {
"TAXNO_AP_text"
: "2832",
"MLS_DATE_AP_text"
: "29-JUN-2004",
"COLL_AA_text"
: "IRGC 31 ;O. SATIVA;;;;;MYS",
"SampStat_AP_text"
: "T",
"STATUS_ACC_AP_text"
: "AV",
"ORI_COUN_AP_text"
: "MALAYSIA",
"SPP_CODE_AP_text"
: "S",
"IPSTAT_AP_text"
: "FAO (14/09/1994)",
"VGISO_AA_text"
: "1" }},
{
"commonCropName"
: "rice",
"germplasmPUI"
:"19",
"germplasmDbId"
: "zg76c553-3862-11eb-95eb-0242ac140004",
"defaultDisplayName"
: "TANGKAI BOTAN",
"accessionNumber"
: "IRGC 31",
"germplasmName"
: "TANGKAI BOTAN",
"pedigree"
: "TR",
"synonyms"
: [ {
"synonym"
: "IRGC 31",
"type"
: "ACCNO"}],
"countryOfOriginCode"
: "IRRI-GRC",
"typeOfGermplasmStorageCode"
: [],
"taxonIds"
: [],
"donors"
: [],
"acquisitionDate"
: "1961-03-27",
"breedingMethodDbId"
: "70",
"additionalInfo"
: {
"TAXNO_AP_text"
: "2832",
"MLS_DATE_AP_text"
: "29-JUN-2004",
"COLL_AA_text"
: "IRGC 31 ;O. SATIVA;;;;;MYS",
"SampStat_AP_text"
: "T",
"STATUS_ACC_AP_text"
: "AV",
"ORI_COUN_AP_text"
: "MALAYSIA",
"SPP_CODE_AP_text"
: "S",
"IPSTAT_AP_text"
: "FAO (14/09/1994)",
"VGISO_AA_text"
: "1" }
}
]
}
}

















Want to print your doc?
This is not the way.
Try clicking the ⋯ next to your doc name or using a keyboard shortcut (
CtrlP
) instead.