@@ -53,24 +53,31 @@ def create_metrics(self):
5353
5454class PanDDAParameters (pydantic .BaseModel ):
5555 dcid : int = pydantic .Field (gt = 0 )
56- comparator_threshold : int = pydantic .Field (default = 300 )
56+ comparator_threshold : int = pydantic .Field (default = 350 )
5757 automatic : Optional [bool ] = False
5858 comment : Optional [str ] = None
5959 scaling_id : list [int ]
60- timeout : float = pydantic .Field (default = 120 , alias = "timeout-minutes" )
61- backoff_delay : float = pydantic .Field (default = 45 , alias = "backoff-delay" )
62- backoff_max_try : int = pydantic .Field (default = 30 , alias = "backoff-max-try" )
63- backoff_multiplier : float = pydantic .Field (default = 1.1 , alias = "backoff-multiplier" )
60+ timeout : float = pydantic .Field (default = 180 , alias = "timeout-minutes" )
61+ backoff_delay : float = pydantic .Field (default = 20 , alias = "backoff-delay" )
62+ backoff_max_try : int = pydantic .Field (default = 10 , alias = "backoff-max-try" )
63+ backoff_multiplier : float = pydantic .Field (default = 2 , alias = "backoff-multiplier" )
6464 pipedream : Optional [bool ] = True
65+ reprocessing : Optional [bool ] = False
6566
6667
67- class PanDDA_PostParameters (pydantic .BaseModel ):
68+ class XChemCollate_Parameters (pydantic .BaseModel ):
6869 dcid : int = pydantic .Field (gt = 0 )
70+ program_id : int = pydantic .Field (gt = 0 )
6971 automatic : Optional [bool ] = False
7072 comment : Optional [str ] = None
7173 scaling_id : list [int ]
7274 processing_directory : str
7375 timeout : float = pydantic .Field (default = 60 , alias = "timeout-minutes" )
76+ backoff_delay : float = pydantic .Field (default = 20 , alias = "backoff-delay" )
77+ backoff_max_try : int = pydantic .Field (default = 10 , alias = "backoff-max-try" )
78+ backoff_multiplier : float = pydantic .Field (default = 2 , alias = "backoff-multiplier" )
79+ pipedream : Optional [bool ] = False
80+ reprocessing : Optional [bool ] = False
7481
7582
7683class DLSTriggerXChem (CommonService ):
@@ -230,6 +237,7 @@ def trigger_pandda_xchem(
230237 scaling_id = parameters .scaling_id [0 ]
231238 comparator_threshold = parameters .comparator_threshold
232239 pipedream = parameters .pipedream
240+ reprocessing = parameters .reprocessing
233241
234242 protein_info = get_protein_for_dcid (parameters .dcid , session )
235243 # protein_id = getattr(protein_info, "proteinId")
@@ -405,7 +413,7 @@ def trigger_pandda_xchem(
405413 )
406414 return {"success" : True }
407415
408- # If other dimple/PanDDA2 job is running, quit, dimple set to trigger even if it fails
416+ # If other dimple/PanDDA2 job is running, quit, dimple will trigger PanDDA2 even if it fails
409417 min_start_time = datetime .now () - timedelta (hours = 6 )
410418
411419 query = (
@@ -432,27 +440,6 @@ def trigger_pandda_xchem(
432440 )
433441 return {"success" : True }
434442
435- # Stop-gap, interval > max checkpoint time
436- min_start_time = datetime .now () - timedelta (minutes = 30 )
437-
438- query = (
439- (
440- session .query (AutoProcProgram , ProcessingJob .dataCollectionId ).join (
441- ProcessingJob ,
442- ProcessingJob .processingJobId == AutoProcProgram .processingJobId ,
443- )
444- )
445- .filter (ProcessingJob .dataCollectionId == dcid )
446- .filter (AutoProcProgram .processingPrograms .in_ (["PanDDA2" ]))
447- .filter (AutoProcProgram .recordTimeStamp > min_start_time )
448- )
449-
450- if triggered_processing_job := query .first ():
451- self .log .info (
452- f"Exiting PanDDA2/Pipedream trigger: another PanDDA2 job was recently launched for dcid { dcid } "
453- )
454- return {"success" : True }
455-
456443 # Now check if other upstream pipeline is running and if so, checkpoint (it might fail)
457444 min_start_time = datetime .now () - timedelta (hours = 6 )
458445 query = (
@@ -550,15 +537,15 @@ def trigger_pandda_xchem(
550537
551538 df = pd .read_sql (query .statement , query .session .bind )
552539
553- # use datasets processed in user spacegroup if possible
540+ # prioritise datasets processed in user-defined spacegroup
554541 if "user_sg" in locals ():
555542 df_filteredbysg = df [df ["spaceGroup" ] == user_sg ]
556543
557544 if not df_filteredbysg .empty :
558545 df = df_filteredbysg
559546 n_success_upstream = len (df )
560547 self .log .info (
561- f"There are { n_success_upstream } successful upstream jobs (excl fast-dp) in the user defined spacegroup { user_sg } \
548+ f"There are { n_success_upstream } successful upstream jobs (excl fast-dp) in the user- defined spacegroup { user_sg } \
562549 selecting the best one based on I/sigI*completeness * #unique reflections, from the most recent processing batch"
563550 )
564551
@@ -611,7 +598,7 @@ def trigger_pandda_xchem(
611598 )
612599 return {"success" : True }
613600
614- # mark a new batch whenever the gap is >= 12 hours
601+ # mark as new batch whenever the gap between jobs is >= 12 hours, consider most recent batch
615602 df2 = df2 .sort_values ("processingStartTime" ).reset_index (drop = True )
616603 df2 ["time_diff" ] = df2 ["processingStartTime" ].diff ()
617604 df2 ["batch" ] = (df2 ["time_diff" ] >= pd .Timedelta (hours = 12 )).cumsum () + 1
@@ -712,13 +699,15 @@ def trigger_pandda_xchem(
712699 compound_dir = dataset_dir / "compound"
713700
714701 self .log .info (f"Creating directory { dataset_dir } " )
715- try :
716- compound_dir .mkdir (parents = True , exist_ok = False )
717- except FileExistsError :
718- self .log .info (
719- f"Exiting PanDDA2/Pipedream trigger: { dataset_dir } already exists"
720- )
721- return {"success" : True }
702+
703+ if not reprocessing :
704+ try :
705+ compound_dir .mkdir (parents = True , exist_ok = False )
706+ except FileExistsError :
707+ self .log .info (
708+ f"Exiting PanDDA2/Pipedream trigger: { dataset_dir } already exists"
709+ )
710+ return {"success" : True }
722711
723712 # Copy the dimple files of the selected dataset
724713 shutil .copy (pdb , str (dataset_dir / "dimple.pdb" ))
@@ -756,6 +745,8 @@ def trigger_pandda_xchem(
756745 "database_path" : str (db_master ),
757746 "upstream_mtz" : pathlib .Path (upstream_mtz ).parts [- 1 ],
758747 "smiles" : str (CompoundSMILES ),
748+ "pipedream" : pipedream ,
749+ "reprocessing" : reprocessing ,
759750 }
760751
761752 dataset_list = sorted ([p .parts [- 1 ] for p in model_dir .iterdir () if p .is_dir ()])
@@ -799,45 +790,50 @@ def trigger_pandda_xchem(
799790 return {"success" : True }
800791
801792 @pydantic .validate_call (config = {"arbitrary_types_allowed" : True })
802- def trigger_pandda_xchem_post (
793+ def trigger_xchem_collate (
803794 self ,
804795 rw : workflows .recipe .RecipeWrapper ,
805796 * ,
806797 message : Dict ,
807- parameters : PanDDA_PostParameters ,
798+ parameters : XChemCollate_Parameters ,
808799 session : sqlalchemy .orm .session .Session ,
809800 transaction : int ,
810801 ** kwargs ,
811802 ):
812- """Trigger a PanDDA post-run job for an XChem fragment screening experiment.
803+ """Trigger an XChem Collate job for an XChem fragment screening experiment.
813804 Recipe parameters are described below with appropriate ispyb placeholder "{}"
814805 values:
815- - target: set this to "pandda_xchem_post "
806+ - target: set this to "xchem_collate "
816807 - dcid: the dataCollectionId for the given data collection i.e. "{ispyb_dcid}"
817808 - comment: a comment to be stored in the ProcessingJob.comment field
818809 - timeout-minutes: (optional) the max time (in minutes) allowed to wait for
819810 processing PanDDA jobs
820811 - automatic: boolean value passed to ProcessingJob.automatic field
821812 Example recipe parameters:
822- { "target": "pandda_xchem_post ",
813+ { "target": "xchem_collate ",
823814 "dcid": 123456,
824815 "scaling_id": [123456],
825816 "processing_directory": '/dls/labxchem/data/lb42888/lb42888-1/processing',
826817 "automatic": true,
827- "comment": "PanDDA2 post-run",
828818 }
829819 """
830820
831821 dcid = parameters .dcid
822+ program_id = parameters .program_id
832823 scaling_id = parameters .scaling_id [0 ]
833824 processing_directory = pathlib .Path (parameters .processing_directory )
825+ # reprocessing = parameters.reprocessing
826+ pipedream = parameters .pipedream
834827
835828 _ , ispyb_info = dlstbx .ispybtbx .ispyb_filter ({}, {"ispyb_dcid" : dcid }, session )
836829 visit = ispyb_info .get ("ispyb_visit" , "" )
837830 visit_proposal = visit .split ("-" )[0 ]
838831 visit_number = visit .split ("-" )[1 ]
839832
840- # From proposal and visit get all dcids
833+ protein_info = get_protein_for_dcid (parameters .dcid , session )
834+ acronym = getattr (protein_info , "acronym" )
835+
836+ # get all dcids for the visit
841837 query = (
842838 session .query (Proposal , BLSession , DataCollection )
843839 .join (BLSession , BLSession .proposalId == Proposal .proposalId )
@@ -850,6 +846,27 @@ def trigger_pandda_xchem_post(
850846 df = pd .read_sql (query .statement , query .session .bind )
851847 dcids = df ["dataCollectionId" ].tolist ()
852848
849+ # trigger on the final PanDDA/Pipedream program_id for the visit
850+ query = (
851+ (
852+ session .query (AutoProcProgram , ProcessingJob .dataCollectionId ).join (
853+ ProcessingJob ,
854+ ProcessingJob .processingJobId == AutoProcProgram .processingJobId ,
855+ )
856+ )
857+ .filter (ProcessingJob .dataCollectionId .in_ (dcids ))
858+ .filter (AutoProcProgram .processingPrograms .in_ (["PanDDA2" , "Pipedream" ]))
859+ .filter (AutoProcProgram .autoProcProgramId > program_id ) # noqa E711
860+ )
861+
862+ if triggered_processing_job := query .first ():
863+ self .log .info (
864+ f"Aborting xchem_collate trigger for dcid { dcid } as processing job has been started for dcid { triggered_processing_job .dataCollectionId } "
865+ )
866+ return {"success" : True }
867+
868+ # has processing finished for the current visit? checkpoint if not
869+ min_start_time = datetime .now () - timedelta (hours = 8 )
853870 query = (
854871 (
855872 session .query (AutoProcProgram , ProcessingJob .dataCollectionId ).join (
@@ -858,7 +875,77 @@ def trigger_pandda_xchem_post(
858875 )
859876 )
860877 .filter (ProcessingJob .dataCollectionId .in_ (dcids ))
861- .filter (AutoProcProgram .processingPrograms == "PanDDA2-post" )
878+ .filter (
879+ AutoProcProgram .processingPrograms .in_ (
880+ ["xia2 dials" , "PanDDA2" , "Pipedream" ]
881+ )
882+ )
883+ .filter (
884+ or_ (
885+ AutoProcProgram .processingStatus == None , # noqa E711
886+ AutoProcProgram .processingStartTime == None , # noqa E711
887+ )
888+ )
889+ .filter (AutoProcProgram .recordTimeStamp > min_start_time ) # noqa E711
890+ )
891+
892+ # Calculate message delay for backoff when max program id has finished but there remains
893+ # some jobs to finish in which case we checkpoint with the calculated message delay
894+ status = {
895+ "ntry" : 0 ,
896+ }
897+ if isinstance (message , dict ):
898+ status .update (message .get ("trigger-status" , {}))
899+ message_delay = int (
900+ parameters .backoff_delay * parameters .backoff_multiplier ** status ["ntry" ]
901+ )
902+ status ["ntry" ] += 1
903+ self .log .debug (f"dcid={ dcid } \n message_delay={ message_delay } \n { status } " )
904+
905+ # If there are any running (or yet to start) jobs, then checkpoint with delay
906+ waiting_processing_jobs = query .all ()
907+ if n_waiting_processing_jobs := len (waiting_processing_jobs ):
908+ self .log .info (
909+ f"Waiting on { n_waiting_processing_jobs } processing jobs for { dcid = } for XChemCollate"
910+ )
911+ waiting_appids = [
912+ row .AutoProcProgram .autoProcProgramId for row in waiting_processing_jobs
913+ ]
914+ if status ["ntry" ] >= parameters .backoff_max_try :
915+ # Give up waiting for this program to finish and trigger
916+ # collate with remaining results that are available
917+ self .log .info (
918+ f"Max-try exceeded, giving up waiting for related processings for appids { waiting_appids } \n "
919+ )
920+ else :
921+ # Send results to myself for next round of processing
922+ self .log .debug (f"Waiting for appids={ waiting_appids } " )
923+ rw .checkpoint (
924+ {
925+ "trigger-status" : status ,
926+ },
927+ delay = message_delay ,
928+ transaction = transaction ,
929+ )
930+
931+ return {"success" : True }
932+
933+ self .log .debug (
934+ f"PanDDA2/Pipedream processing has finished for { visit_proposal } -{ visit_number } "
935+ )
936+
937+ # Stop-gap
938+ min_start_time = datetime .now () - timedelta (minutes = 180 )
939+
940+ query = (
941+ session .query (AutoProcProgram , ProcessingJob .dataCollectionId )
942+ .join (
943+ ProcessingJob ,
944+ ProcessingJob .processingJobId == AutoProcProgram .processingJobId ,
945+ )
946+ .filter (ProcessingJob .dataCollectionId .in_ (dcids ))
947+ .filter (AutoProcProgram .processingPrograms .in_ (["XChemCollate" ]))
948+ .filter (AutoProcProgram .recordTimeStamp > min_start_time )
862949 .filter (
863950 or_ (
864951 AutoProcProgram .processingStatus == None , # noqa E711
@@ -869,18 +956,19 @@ def trigger_pandda_xchem_post(
869956
870957 if triggered_processing_job := query .first ():
871958 self .log .info (
872- f"Aborting PanDDA2_postrun trigger as another postrun job has started for dcid { triggered_processing_job . dataCollectionId } "
959+ f"Exiting XChemCollate trigger: another XChemCollate job has been launched for { acronym } "
873960 )
874961 return {"success" : True }
875962
876- self .log .debug ("PanDDA2 postrun trigger: Starting" )
963+ self .log .debug ("XChemCollate trigger: Starting" )
877964
878965 recipe_parameters = {
879- "dcid" : dcid , #
966+ "dcid" : max ( dcids ),
880967 "processing_directory" : str (processing_directory ),
881968 "scaling_id" : scaling_id ,
969+ "pipedream" : pipedream ,
882970 }
883-
884- self .upsert_proc (rw , dcid , "PanDDA2-post " , recipe_parameters )
971+ # Upsert on max dcid
972+ self .upsert_proc (rw , max ( dcids ) , "XChem-Collate " , recipe_parameters )
885973
886974 return {"success" : True }
0 commit comments